#### Instruction: Before run this code, please create table using cloud shell with "bq mk nasa_dataset".

## Data Collection

#### Download raw data from a data source, and create a RDD from them

In [1]:
! wget https://s3.amazonaws.com/imcbucket/data/nasa.dat

--2019-06-05 13:27:49--  https://s3.amazonaws.com/imcbucket/data/nasa.dat
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.169.69
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.169.69|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 205242368 (196M) [application/octet-stream]
Saving to: ‘nasa.dat’


2019-06-05 13:27:56 (25.9 MB/s) - ‘nasa.dat’ saved [205242368/205242368]



In [2]:
! ls -l nasa*

-rw-r--r-- 1 root root 205242368 Sep  6  2013 nasa.dat


In [3]:
! gsutil cp nasa.dat gs://aekanuntest/data

Copying file://nasa.dat [Content-Type=application/octet-stream]...
==> NOTE: You are uploading one or more large file(s), which would run          
significantly faster if you enable parallel composite uploads. This
feature can be enabled by editing the
"parallel_composite_upload_threshold" value in your .boto
configuration file. However, note that if you do this large files will
be uploaded as `composite objects
<https://cloud.google.com/storage/docs/composite-objects>`_,which
means that any user who downloads such objects will need to have a
compiled crcmod installed (see "gsutil help crcmod"). This is because
without a compiled crcmod, computing checksums on composite objects is
so slow that gsutil disables downloads of composite objects.

- [1 files][195.7 MiB/195.7 MiB]                                                
Operation completed over 1 objects/195.7 MiB.                                    


In [4]:
! head -3 nasa.dat

199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085


In [5]:
raw_rdd = sc.textFile('gs://aekanuntest/data/nasa.dat')

## Data Parsing

#### Make pattern matching for extracting some information from raw data.

In [6]:
import time
import datetime
import re
from pyspark.sql import Row

APACHE_ACCESS_LOG_PATTERN = '(\S*) - - \[(\d{2})\/(\S*)\/(\d{4}):(\d{2}):(\d{2}):(\d{2}) (\S*)\]'


In [7]:
def bejoindate(year,month,date):
    s = '-'
    seq = (year,month,date)
    return s.join(seq)

def bejointime(hour,minute,second):
    s = ':'
    seq = (hour,minute,second)
    return s.join(seq)

def bejoindatetime(date_name,time_name):
    s = ' '
    seq = (date_name,time_name)
    return s.join(seq)

def totimestamp(dt):
    return time.mktime(datetime.datetime.\
    strptime(dt, "%Y-%b-%d %H:%M:%S").timetuple())

In [8]:
def parse_apache_log_line(logline):
    pattern = re.compile(APACHE_ACCESS_LOG_PATTERN)
    result = pattern.match(logline)
    if result is None:
        return Row(
        datetime_stamp = None,
        ip_addr = None,
        day_of_month = None,
        month = None,
        year = None,
        hour = None,
        minute = None,
        second = None,
        timezone = None
        )
    return Row(
        #นำวันเดือนปีถูกแยกมาก่อนหน้านี้ กลับมา Join กันใหม่ใน Format ที่เหมาะสม
        datetime_stamp = totimestamp(bejoindatetime(bejoindate(result.group(4).zfill(2),\
                                                               result.group(3),result.group(2).zfill(2)),\
                                                    bejointime(result.group(5),result.group(6),result.group(7)))),
        ip_addr = result.group(1),
        day_of_month = result.group(2),
        month = result.group(3),
        year = result.group(4),
        hour = result.group(5),
        minute = result.group(6),
        second = result.group(7),
        timezone = result.group(8)
        )

In [9]:
parsed_rdd = raw_rdd.map(parse_apache_log_line)

In [10]:
parsed_rdd.take(1)

[Row(datetime_stamp=804556801.0, day_of_month='01', hour='00', ip_addr='199.72.81.55', minute='00', month='Jul', second='01', timezone='-0400', year='1995')]

#### Create a schema for the parsed data, and make data cleansing, and store data with their schema into the DataFrame.

In [11]:
raw_df = parsed_rdd.toDF()

In [12]:
from pyspark.sql.types import IntegerType, DecimalType, TimestampType

In [13]:
parsed_df = raw_df.withColumn('hour',raw_df['hour'].cast(IntegerType()))\
.withColumn('minute',raw_df['minute'].cast(IntegerType()))\
.withColumn('second',raw_df['second'].cast(IntegerType()))\
.withColumn('datetime_stamp',raw_df['datetime_stamp'].cast(TimestampType()))\
.dropna(how='any')

In [14]:
parsed_df.select(parsed_df['timezone']).distinct().orderBy(parsed_df['timezone']).show()

+--------+
|timezone|
+--------+
|   -0400|
+--------+



In [15]:
parsed_df.printSchema()

root
 |-- datetime_stamp: timestamp (nullable = true)
 |-- day_of_month: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- ip_addr: string (nullable = true)
 |-- minute: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- second: integer (nullable = true)
 |-- timezone: string (nullable = true)
 |-- year: string (nullable = true)



In [16]:
parsed_df.show(10)

+-------------------+------------+----+--------------------+------+-----+------+--------+----+
|     datetime_stamp|day_of_month|hour|             ip_addr|minute|month|second|timezone|year|
+-------------------+------------+----+--------------------+------+-----+------+--------+----+
|1995-07-01 00:00:01|          01|   0|        199.72.81.55|     0|  Jul|     1|   -0400|1995|
|1995-07-01 00:00:06|          01|   0|unicomp6.unicomp.net|     0|  Jul|     6|   -0400|1995|
|1995-07-01 00:00:09|          01|   0|      199.120.110.21|     0|  Jul|     9|   -0400|1995|
|1995-07-01 00:00:11|          01|   0|  burger.letters.com|     0|  Jul|    11|   -0400|1995|
|1995-07-01 00:00:11|          01|   0|      199.120.110.21|     0|  Jul|    11|   -0400|1995|
|1995-07-01 00:00:12|          01|   0|  burger.letters.com|     0|  Jul|    12|   -0400|1995|
|1995-07-01 00:00:12|          01|   0|  burger.letters.com|     0|  Jul|    12|   -0400|1995|
|1995-07-01 00:00:12|          01|   0|     205.21

In [17]:
parsed_df.count()

1891714

## Data Loading to BigQuery

#### Store the parsed data from the DataFrame into the BigQuery.

In [18]:
# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)


In [19]:
bucket

'dataproc-d148ab3e-1a71-4df8-85c6-19b9bc036446-us'

In [20]:
project

'iot-class-feb2017'

In [21]:
input_directory

'gs://dataproc-d148ab3e-1a71-4df8-85c6-19b9bc036446-us/hadoop/tmp/bigquery/pyspark_input'

In [22]:


# Output Parameters.
output_dataset = 'nasa_dataset'
output_table = 'nasa_output'

In [23]:
# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
#partitions = range(word_counts.getNumPartitions())
output_files = output_directory + '/part-*'

In [24]:
output_files

'gs://dataproc-d148ab3e-1a71-4df8-85c6-19b9bc036446-us/hadoop/tmp/bigquery/pyspark_output/part-*'

In [25]:
parsed_df.write.format('csv').save(output_directory)

In [26]:
! gsutil ls gs://dataproc-d148ab3e-1a71-4df8-85c6-19b9bc036446-us/hadoop/tmp/bigquery/pyspark_output/part-*

gs://dataproc-d148ab3e-1a71-4df8-85c6-19b9bc036446-us/hadoop/tmp/bigquery/pyspark_output/part-00000-dd7400b3-bae8-47db-9de4-1731b4b225dd-c000.csv
gs://dataproc-d148ab3e-1a71-4df8-85c6-19b9bc036446-us/hadoop/tmp/bigquery/pyspark_output/part-00001-dd7400b3-bae8-47db-9de4-1731b4b225dd-c000.csv


In [27]:
import subprocess
# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
    'bq load --source_format=CSV  '
    '--replace '
    '--autodetect '
    '{dataset}.{table} {files} '
    'datetime_stamp:timestamp,day_of_month:INTEGER,hour:INTEGER,ip_addr:STRING,minute:INTEGER,month:STRING,second:INTEGER,timezone:INTEGER,year:INTEGER '
    .format(
        dataset=output_dataset, table=output_table, files=output_files
    ).split())

0

In [28]:
# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.

output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
    output_path, True)

True

In [29]:
#! ls -l nasa.dat

In [30]:
#subprocess.check_call(['bq', 'load', '--source_format=CSV', '--replace', '--autodetect','nasa_dataset.nasa_output', 'gs://dataproc-d148ab3e-1a71-4df8-85c6-19b9bc036446-us/hadoop/tmp/bigquery/pyspark_output/part-*', 'datetime_stamp:timestamp,day_of_month:INTEGER,hour:INTEGER,ip_addr:STRING,minute:INTEGER,month:STRING,second:INTEGER,timezone:INTEGER,year:INTEGER'])