## Data Collection

#### Download raw data from a data source

In [1]:
! wget https://s3.amazonaws.com/imcbucket/data/nasa.dat

--2021-10-17 10:27:05--  https://s3.amazonaws.com/imcbucket/data/nasa.dat
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.84.141
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.84.141|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 205242368 (196M) [application/octet-stream]
Saving to: ‘nasa.dat’


2021-10-17 10:27:08 (77.5 MB/s) - ‘nasa.dat’ saved [205242368/205242368]



In [2]:
! ls -l nasa*

-rw-r--r-- 1 root root 205242368 Sep  6  2013 nasa.dat


In [3]:
! head -3 nasa.dat

199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085


In [4]:
! wc -l nasa.dat

1891714 nasa.dat


#### Put them into HDFS

In [5]:
! hdfs dfs -mkdir -p /rawzone/

In [6]:
! hdfs dfs -put nasa.dat /rawzone/

In [7]:
! hdfs dfs -ls /rawzone/

Found 1 items
-rw-r--r--   2 root hadoop  205242368 2021-10-17 10:27 /rawzone/nasa.dat


## Data Preprocessing

#### load the data and convert them to be RDD structure

In [8]:
raw_rdd = sc.textFile('/rawzone/nasa.dat')

In [9]:
raw_rdd.take(3)

['199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245',
 'unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985',
 '199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085']

#### Make pattern matching for extracting some information from raw data.

In [10]:
import time
import datetime
import re
from pyspark.sql import Row

APACHE_ACCESS_LOG_PATTERN = '(\S*) - - \[(\d{2})\/(\S*)\/(\d{4}):(\d{2}):(\d{2}):(\d{2}) (\S*)\]'


In [11]:
def bejoindate(year,month,date):
    s = '-'
    seq = (year,month,date)
    return s.join(seq)

def bejointime(hour,minute,second):
    s = ':'
    seq = (hour,minute,second)
    return s.join(seq)

def bejoindatetime(date_name,time_name):
    s = ' '
    seq = (date_name,time_name)
    return s.join(seq)

def totimestamp(dt):
    return time.mktime(datetime.datetime.\
    strptime(dt, "%Y-%b-%d %H:%M:%S").timetuple())

In [12]:
def parse_apache_log_line(logline):
    pattern = re.compile(APACHE_ACCESS_LOG_PATTERN)
    result = pattern.match(logline)
    if result is None:
        return Row(
        datetime_stamp = None,
        ip_addr = None,
        day_of_month = None,
        month = None,
        year = None,
        hour = None,
        minute = None,
        second = None,
        timezone = None
        )
    else:
        return Row(
        datetime_stamp = totimestamp(bejoindatetime(bejoindate(result.group(4).zfill(2),result.group(3),result.group(2).zfill(2)),bejointime(result.group(5),result.group(6),result.group(7)))),
        ip_addr = result.group(1),
        day_of_month = result.group(2),
        month = result.group(3),
        year = result.group(4),
        hour = result.group(5),
        minute = result.group(6),
        second = result.group(7),
        timezone = result.group(8)
        )

In [13]:
parsed_rdd = raw_rdd.map(parse_apache_log_line)

In [14]:
parsed_rdd.take(1)

[Row(datetime_stamp=804556801.0, day_of_month='01', hour='00', ip_addr='199.72.81.55', minute='00', month='Jul', second='01', timezone='-0400', year='1995')]

#### Create a schema for the parsed data, and make data cleansing, and store data with their schema into the DataFrame.

In [15]:
raw_df = parsed_rdd.toDF()

In [16]:
#raw_df.show()

In [17]:
from pyspark.sql.types import IntegerType, DecimalType, TimestampType

In [18]:
parsed_df = raw_df.withColumn('hour',raw_df['hour'].cast(IntegerType()))\
.withColumn('minute',raw_df['minute'].cast(IntegerType()))\
.withColumn('second',raw_df['second'].cast(IntegerType()))\
.withColumn('datetime_stamp',raw_df['datetime_stamp'].cast(DecimalType()))\
.dropna(how='any')

In [19]:
parsed_df.select(parsed_df['timezone']).distinct().orderBy(parsed_df['timezone']).show()

+--------+
|timezone|
+--------+
|   -0400|
+--------+



In [20]:
parsed_df.printSchema()

root
 |-- datetime_stamp: decimal(10,0) (nullable = true)
 |-- day_of_month: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- ip_addr: string (nullable = true)
 |-- minute: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- second: integer (nullable = true)
 |-- timezone: string (nullable = true)
 |-- year: string (nullable = true)



In [21]:
parsed_df.show(10)

+--------------+------------+----+--------------------+------+-----+------+--------+----+
|datetime_stamp|day_of_month|hour|             ip_addr|minute|month|second|timezone|year|
+--------------+------------+----+--------------------+------+-----+------+--------+----+
|     804556801|          01|   0|        199.72.81.55|     0|  Jul|     1|   -0400|1995|
|     804556806|          01|   0|unicomp6.unicomp.net|     0|  Jul|     6|   -0400|1995|
|     804556809|          01|   0|      199.120.110.21|     0|  Jul|     9|   -0400|1995|
|     804556811|          01|   0|  burger.letters.com|     0|  Jul|    11|   -0400|1995|
|     804556811|          01|   0|      199.120.110.21|     0|  Jul|    11|   -0400|1995|
|     804556812|          01|   0|  burger.letters.com|     0|  Jul|    12|   -0400|1995|
|     804556812|          01|   0|  burger.letters.com|     0|  Jul|    12|   -0400|1995|
|     804556812|          01|   0|     205.212.115.106|     0|  Jul|    12|   -0400|1995|
|     8045

In [22]:
parsed_df.count()

1891714

In [23]:
from pyspark.sql import functions as sparkf

In [24]:
parsed_df.withColumn('datetimeFromEpoch', sparkf.from_unixtime(sparkf.col('datetime_stamp'))).show()

+--------------+------------+----+--------------------+------+-----+------+--------+----+-------------------+
|datetime_stamp|day_of_month|hour|             ip_addr|minute|month|second|timezone|year|  datetimeFromEpoch|
+--------------+------------+----+--------------------+------+-----+------+--------+----+-------------------+
|     804556801|          01|   0|        199.72.81.55|     0|  Jul|     1|   -0400|1995|1995-07-01 00:00:01|
|     804556806|          01|   0|unicomp6.unicomp.net|     0|  Jul|     6|   -0400|1995|1995-07-01 00:00:06|
|     804556809|          01|   0|      199.120.110.21|     0|  Jul|     9|   -0400|1995|1995-07-01 00:00:09|
|     804556811|          01|   0|  burger.letters.com|     0|  Jul|    11|   -0400|1995|1995-07-01 00:00:11|
|     804556811|          01|   0|      199.120.110.21|     0|  Jul|    11|   -0400|1995|1995-07-01 00:00:11|
|     804556812|          01|   0|  burger.letters.com|     0|  Jul|    12|   -0400|1995|1995-07-01 00:00:12|
|     8045

In [25]:
final_df = parsed_df.withColumn('datetime_stamp', sparkf.from_unixtime(sparkf.col('datetime_stamp')))\
.withColumn('datetime_stamp', sparkf.col('datetime_stamp').cast(TimestampType()))

In [26]:
final_df.show()

+-------------------+------------+----+--------------------+------+-----+------+--------+----+
|     datetime_stamp|day_of_month|hour|             ip_addr|minute|month|second|timezone|year|
+-------------------+------------+----+--------------------+------+-----+------+--------+----+
|1995-07-01 00:00:01|          01|   0|        199.72.81.55|     0|  Jul|     1|   -0400|1995|
|1995-07-01 00:00:06|          01|   0|unicomp6.unicomp.net|     0|  Jul|     6|   -0400|1995|
|1995-07-01 00:00:09|          01|   0|      199.120.110.21|     0|  Jul|     9|   -0400|1995|
|1995-07-01 00:00:11|          01|   0|  burger.letters.com|     0|  Jul|    11|   -0400|1995|
|1995-07-01 00:00:11|          01|   0|      199.120.110.21|     0|  Jul|    11|   -0400|1995|
|1995-07-01 00:00:12|          01|   0|  burger.letters.com|     0|  Jul|    12|   -0400|1995|
|1995-07-01 00:00:12|          01|   0|  burger.letters.com|     0|  Jul|    12|   -0400|1995|
|1995-07-01 00:00:12|          01|   0|     205.21

#### Write the DataFrame into Hive, and get a Hive table

In [27]:
final_df

DataFrame[datetime_stamp: timestamp, day_of_month: string, hour: int, ip_addr: string, minute: int, month: string, second: int, timezone: string, year: string]

In [28]:
final_df.write.mode('overwrite').saveAsTable('nasa_webaccesslog')