# Data Collection

In [2]:
! wget https://docs.google.com/presentation/d/1-wZ-CZCstmCW-U8BmYl834xPd--f857V26WbKCilVeQ/edit#

--2022-01-24 11:41:44--  https://docs.google.com/presentation/d/1-wZ-CZCstmCW-U8BmYl834xPd--f857V26WbKCilVeQ/edit
Resolving docs.google.com (docs.google.com)... 142.251.6.102, 142.251.6.138, 142.251.6.101, ...
Connecting to docs.google.com (docs.google.com)|142.251.6.102|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘edit’

edit                    [ <=>                ] 631.93K  --.-KB/s    in 0.1s    

2022-01-24 11:41:44 (4.29 MB/s) - ‘edit’ saved [647101]



In [3]:
! head -3 nasa.dat

199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085


In [4]:
! wc -l nasa.dat

1891714 nasa.dat


# Put them into HDFS

In [6]:
! hdfs dfs -mkdir -p /rawzone/

! hdfs dfs -put nasa.dat /rawzone/

! hdfs dfs -ls /rawzone/nasa.dat

put: `/rawzone/nasa.dat': File exists
-rw-r--r--   2 root hadoop  205242368 2022-01-24 11:26 /rawzone/nasa.dat


In [7]:
raw_rdd = sc.textFile('/rawzone/nasa.dat')

raw_rdd.take(3)

22/01/24 11:46:06 WARN org.apache.hadoop.util.concurrent.ExecutorHelper: Thread (Thread[GetFileInfo #0,5,main]) interrupted: 
java.lang.InterruptedException
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:510)
	at com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:88)
	at org.apache.hadoop.util.concurrent.ExecutorHelper.logThrowableFromAfterExecute(ExecutorHelper.java:48)
	at org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor.afterExecute(HadoopThreadPoolExecutor.java:90)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1157)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
22/01/24 11:46:06 WARN org.apache.hadoop.util.concurrent.ExecutorHelper: Thread (Thread[GetFileInfo #1,5,main]) interrupted: 
java.lang.InterruptedException
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:510)
	at c

['199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245',
 'unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985',
 '199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085']

In [8]:
import time
import datetime
import re
from pyspark.sql import Row

APACHE_ACCESS_LOG_PATTERN = '(\S*) - - \[(\d{2})\/(\S*)\/(\d{4}):(\d{2}):(\d{2}):(\d{2}) (\S*)\]'


In [9]:
def bejoindate(year,month,date):
    s = '-'
    seq = (year,month,date)
    return s.join(seq)

def bejointime(hour,minute,second):
    s = ':'
    seq = (hour,minute,second)
    return s.join(seq)

def bejoindatetime(date_name,time_name):
    s = ' '
    seq = (date_name,time_name)
    return s.join(seq)

def totimestamp(dt):
    return time.mktime(datetime.datetime.\
    strptime(dt, "%Y-%b-%d %H:%M:%S").timetuple())

In [10]:
def parse_apache_log_line(logline):
    pattern = re.compile(APACHE_ACCESS_LOG_PATTERN)
    result = pattern.match(logline)
    if result is None:
        return Row(
        datetime_stamp = None,
        ip_addr = None,
        day_of_month = None,
        month = None,
        year = None,
        hour = None,
        minute = None,
        second = None,
        timezone = None
        )
    else:
        return Row(
        datetime_stamp = totimestamp(bejoindatetime(bejoindate(result.group(4).zfill(2),result.group(3),result.group(2).zfill(2)),bejointime(result.group(5),result.group(6),result.group(7)))),
        ip_addr = result.group(1),
        day_of_month = result.group(2),
        month = result.group(3),
        year = result.group(4),
        hour = result.group(5),
        minute = result.group(6),
        second = result.group(7),
        timezone = result.group(8)
        )

In [11]:
parsed_rdd = raw_rdd.map(parse_apache_log_line)

parsed_rdd.take(1)


[Row(datetime_stamp=804556801.0, ip_addr='199.72.81.55', day_of_month='01', month='Jul', year='1995', hour='00', minute='00', second='01', timezone='-0400')]

In [12]:
raw_df = parsed_rdd.toDF()

In [13]:
raw_df.printSchema()

root
 |-- datetime_stamp: double (nullable = true)
 |-- ip_addr: string (nullable = true)
 |-- day_of_month: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- second: string (nullable = true)
 |-- timezone: string (nullable = true)



# สำหรับ transform ข้อมูลให้เขียนลง BigQuery ได้ต่อไป

In [14]:
from pyspark.sql.types import IntegerType, DecimalType, TimestampType

parsed_df = raw_df.withColumn('hour',raw_df['hour'].cast(IntegerType()))\
.withColumn('minute',raw_df['minute'].cast(IntegerType()))\
.withColumn('second',raw_df['second'].cast(IntegerType()))\
.withColumn('datetime_stamp',raw_df['datetime_stamp'].cast(DecimalType()))\
.dropna(how='any')

In [15]:
parsed_df.printSchema()

root
 |-- datetime_stamp: decimal(10,0) (nullable = true)
 |-- ip_addr: string (nullable = true)
 |-- day_of_month: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- second: integer (nullable = true)
 |-- timezone: string (nullable = true)



In [16]:
parsed_df.show(10)

[Stage 3:>                                                          (0 + 1) / 1]

+--------------+--------------------+------------+-----+----+----+------+------+--------+
|datetime_stamp|             ip_addr|day_of_month|month|year|hour|minute|second|timezone|
+--------------+--------------------+------------+-----+----+----+------+------+--------+
|     804556801|        199.72.81.55|          01|  Jul|1995|   0|     0|     1|   -0400|
|     804556806|unicomp6.unicomp.net|          01|  Jul|1995|   0|     0|     6|   -0400|
|     804556809|      199.120.110.21|          01|  Jul|1995|   0|     0|     9|   -0400|
|     804556811|  burger.letters.com|          01|  Jul|1995|   0|     0|    11|   -0400|
|     804556811|      199.120.110.21|          01|  Jul|1995|   0|     0|    11|   -0400|
|     804556812|  burger.letters.com|          01|  Jul|1995|   0|     0|    12|   -0400|
|     804556812|  burger.letters.com|          01|  Jul|1995|   0|     0|    12|   -0400|
|     804556812|     205.212.115.106|          01|  Jul|1995|   0|     0|    12|   -0400|
|     8045

                                                                                

In [17]:
parsed_df.count()

                                                                                

1891714

# Transform ข้อมูลต่อไป เพื่อให้เขียนลง Hive ได้

In [19]:
from pyspark.sql import functions as sparkf

final_df = parsed_df.withColumn('datetime_stamp', sparkf.from_unixtime(sparkf.col('datetime_stamp')))\
.withColumn('datetime_stamp', sparkf.col('datetime_stamp').cast(TimestampType()))

final_df.printSchema()


root
 |-- datetime_stamp: timestamp (nullable = true)
 |-- ip_addr: string (nullable = true)
 |-- day_of_month: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- second: integer (nullable = true)
 |-- timezone: string (nullable = true)



In [20]:
final_df.write.mode('overwrite').saveAsTable('nasa_webaccesslog')

ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
22/01/24 11:57:10 WARN org.apache.hadoop.hive.ql.session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


In [21]:
! hive -e "show tables"

Hive Session ID = 05303302-dd29-4f2e-b4df-9dc5a33ca6a6

Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: true
Hive Session ID = 10a11ce1-c120-4796-8b31-55d1d2ff377a
OK
nasa_webaccesslog
Time taken: 1.364 seconds, Fetched: 1 row(s)


In [22]:
! hive -e "describe nasa_webaccesslog"

Hive Session ID = da7035e4-9f09-4874-a8bb-47d99dabb321

Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: true
Hive Session ID = 4c14ea15-97ab-4d26-8d72-7c3226d1c494
OK
datetime_stamp      	timestamp           	                    
ip_addr             	string              	                    
day_of_month        	string              	                    
month               	string              	                    
year                	string              	                    
hour                	int                 	                    
minute              	int                 	                    
second              	int                 	                    
timezone            	string              	                    
Time taken: 1.425 seconds, Fetched: 9 row(s)


In [23]:
! hive -e "select * from nasa_webaccesslog where datetime_stamp between '1995-07-01' and '1995-08-01' limit 10"

Hive Session ID = 00e8b2ed-a21e-4c52-be04-086ef1103c11

Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: true
Hive Session ID = 74985eac-ec04-4766-98e7-81c098765795
Query ID = root_20220124115947_1daedc79-8141-4771-990b-d10fed8d3b22
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1643023309979_0006)

[2K----------------------------------------------------------------------------------------------
[2K[36;1m        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED  
[22;0m[2K----------------------------------------------------------------------------------------------
[2KMap 1            container        INITED      1          0        0        1       0       0  
[2K----------------------------------------------------------------------------------------------
[2K[31;1mVERTICES: 00/01  [>>--------------------------] 0%    ELAPSED TIME: 0.