## Exercise 1: Schema on Read

In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, expr, desc, to_timestamp
from pyspark.sql.types import MapType, StringType, DateType
import pandas as pd
import matplotlib
import re

In [2]:
spark = SparkSession.builder.getOrCreate()

### Load the dataset

In [3]:
df_log = spark.read.text("data/gz/nasa_access_log_jul_95.gz")

### Explore dataset

In [4]:
df_log.printSchema()

root
 |-- value: string (nullable = true)



In [19]:
df_log.count()

1891715

In [5]:
df_log.show(5, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
+-----------------------

### Transform dataset

In [6]:
@udf(MapType(StringType(),StringType()))
def parse_udf(line):
    regex_pattern = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
    match = re.search(regex_pattern, line)
    if match is None:
        return (line, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = 0
    else:
        size = match.group(9)
    return {
        "host"          : match.group(1), 
        "client_identd" : match.group(2), 
        "user_id"       : match.group(3), 
        "date_time"     : match.group(4), 
        "method"        : match.group(5),
        "endpoint"      : match.group(6),
        "protocol"      : match.group(7),
        "response_code" : int(match.group(8)),
        "content_size"  : size
    }

In [7]:
df_parsed = df_log.withColumn('parsed', parse_udf(df_log['value']))

In [8]:
df_parsed.select('parsed').limit(5).toPandas()

Unnamed: 0,parsed
0,"{'response_code': '200', 'protocol': 'HTTP/1.0..."
1,"{'response_code': '200', 'protocol': 'HTTP/1.0..."
2,"{'response_code': '200', 'protocol': 'HTTP/1.0..."
3,"{'response_code': '304', 'protocol': 'HTTP/1.0..."
4,"{'response_code': '200', 'protocol': 'HTTP/1.0..."


### Generate columns

In [9]:
df_parsed.selectExpr("parsed['host'] as host").limit(5).show(5)

+--------------------+
|                host|
+--------------------+
|        199.72.81.55|
|unicomp6.unicomp.net|
|      199.120.110.21|
|  burger.letters.com|
|      199.120.110.21|
+--------------------+



In [25]:
df_parsed.selectExpr(["parsed['host']", "parsed['date_time']"]).show(5, truncate=False)

+--------------------+--------------------------+
|parsed[host]        |parsed[date_time]         |
+--------------------+--------------------------+
|199.72.81.55        |01/Jul/1995:00:00:01 -0400|
|unicomp6.unicomp.net|01/Jul/1995:00:00:06 -0400|
|199.120.110.21      |01/Jul/1995:00:00:09 -0400|
|burger.letters.com  |01/Jul/1995:00:00:11 -0400|
|199.120.110.21      |01/Jul/1995:00:00:11 -0400|
+--------------------+--------------------------+
only showing top 5 rows



In [11]:
fields = ["host", "client_identd","user_id", "date_time", "method", "endpoint", "protocol", "response_code", "content_size"]
exprs = [f"parsed['{field}'] AS {field}" for field in fields]
exprs

["parsed['host'] AS host",
 "parsed['client_identd'] AS client_identd",
 "parsed['user_id'] AS user_id",
 "parsed['date_time'] AS date_time",
 "parsed['method'] AS method",
 "parsed['endpoint'] AS endpoint",
 "parsed['protocol'] AS protocol",
 "parsed['response_code'] AS response_code",
 "parsed['content_size'] AS content_size"]

In [12]:
df_clean = df_parsed.selectExpr(*exprs)

In [13]:
df_clean.limit(5).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985
2,199.120.110.21,-,-,01/Jul/1995:00:00:09 -0400,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085
3,burger.letters.com,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0
4,199.120.110.21,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179


### Data types

In [79]:
# Transforming data types
df_final = (df_clean
                .withColumn('content_size', expr("CAST(content_size AS int)"))
                .withColumn('date_time', to_timestamp(col=df_clean['date_time'], format='dd/MMM/yyyy:HH:mm:ss XXXX'))
)

In [80]:
df_final.printSchema()

root
 |-- host: string (nullable = true)
 |-- client_identd: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date_time: timestamp (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- response_code: string (nullable = true)
 |-- content_size: integer (nullable = true)



### Statistics

**Popular hosts**

In [22]:
df_final.groupBy("host").count().orderBy(desc("count")).limit(10).toPandas()

Unnamed: 0,host,count
0,piweba3y.prodigy.com,17572
1,piweba4y.prodigy.com,11591
2,piweba1y.prodigy.com,9868
3,alyssa.prodigy.com,7852
4,siltb10.orl.mmc.com,7573
5,piweba2y.prodigy.com,5922
6,edams.ksc.nasa.gov,5434
7,163.206.89.4,4906
8,news.ti.com,4863
9,disarray.demon.co.uk,4353


**Popular content**

In [23]:
df_final.groupBy("endpoint").count().orderBy(desc("count")).limit(10).toPandas()

Unnamed: 0,endpoint,count
0,/images/NASA-logosmall.gif,111330
1,/images/KSC-logosmall.gif,89638
2,/images/MOSAIC-logosmall.gif,60467
3,/images/USA-logosmall.gif,60013
4,/images/WORLD-logosmall.gif,59488
5,/images/ksclogo-medium.gif,58801
6,/images/launch-logo.gif,40871
7,/shuttle/countdown/,40278
8,/ksc.html,40226
9,/images/ksclogosmall.gif,33585


**Large files**

In [24]:
# Create temporary view
df_final.createOrReplaceTempView("cleanlog")

# Execute query
spark.sql("""
    SELECT endpoint, content_size
    FROM cleanlog 
    ORDER BY content_size desc
""").limit(10).toPandas()

Unnamed: 0,endpoint,content_size
0,/shuttle/countdown/video/livevideo.jpeg,6823936
1,/statistics/1995/bkup/Mar95_full.html,3155499
2,/statistics/1995/bkup/Mar95_full.html,3155499
3,/statistics/1995/bkup/Mar95_full.html,3155499
4,/statistics/1995/bkup/Mar95_full.html,3155499
5,/statistics/1995/bkup/Mar95_full.html,3155499
6,/statistics/1995/bkup/Mar95_full.html,3155499
7,/statistics/1995/bkup/Mar95_full.html,3155499
8,/statistics/1995/Jun/Jun95_reverse_domains.html,2973350
9,/statistics/1995/Jun/Jun95_reverse_domains.html,2973350
