In [138]:
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib

In [139]:
spark = SparkSession.builder.getOrCreate()

In [140]:
# Download .gz file to current directory
!wget -O NASA_access_log_Jul95.gz http://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz

URL transformed to HTTPS due to an HSTS policy
--2023-07-17 06:50:06--  https://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz
Resolving ita.ee.lbl.gov (ita.ee.lbl.gov)... 131.243.2.164, 2620:83:8000:102::a4
Connecting to ita.ee.lbl.gov (ita.ee.lbl.gov)|131.243.2.164|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 20676672 (20M) [application/x-gzip]
Saving to: ‘NASA_access_log_Jul95.gz’


2023-07-17 06:50:12 (3.47 MB/s) - ‘NASA_access_log_Jul95.gz’ saved [20676672/20676672]



In [141]:
# Read .gz file from current directory and print schema
df_log = spark.read.text('NASA_access_log_Jul95.gz')
df_log.printSchema()

root
 |-- value: string (nullable = true)



### This file has a single "column" that encapsulates different information

In [142]:
# Show dataframe with pandas
pd.set_option('display.max_colwidth', 200)
df_log.limit(5).toPandas()

Unnamed: 0,value
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985"
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085"
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0"
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179"


### Try to parse it with spark split function by space


In [143]:
from pyspark.sql.functions import split
# Add split column that is an expression using split function on "value" column
df_split = df_log.withColumn('split', split('value', ' '))
df_split.limit(5).toPandas()

Unnamed: 0,value,split
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245","[199.72.81.55, -, -, [01/Jul/1995:00:00:01, -0400], ""GET, /history/apollo/, HTTP/1.0"", 200, 6245]"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","[unicomp6.unicomp.net, -, -, [01/Jul/1995:00:00:06, -0400], ""GET, /shuttle/countdown/, HTTP/1.0"", 200, 3985]"
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085","[199.120.110.21, -, -, [01/Jul/1995:00:00:09, -0400], ""GET, /shuttle/missions/sts-73/mission-sts-73.html, HTTP/1.0"", 200, 4085]"
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0","[burger.letters.com, -, -, [01/Jul/1995:00:00:11, -0400], ""GET, /shuttle/countdown/liftoff.html, HTTP/1.0"", 304, 0]"
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179","[199.120.110.21, -, -, [01/Jul/1995:00:00:11, -0400], ""GET, /shuttle/missions/sts-73/sts-73-patch-small.gif, HTTP/1.0"", 200, 4179]"


### Not a very good attempt, regular expressions are needed

### Declare a UDF that will be applied to every line/row of the file. It will return a string with dict-like form

In [144]:
from pyspark.sql.functions import udf
from pyspark.sql.types import MapType, StringType

# Decorating a function with udf is the same as registering it.
# We specify the return type in the udf argument. MapType(StringType(), StringType()) is a dictonary
# with string key and string value
@udf(MapType(StringType(),StringType()))
def parse_log(line):
    '''
    Define a regular expression pattern and try to match it to each line.
        - If pattern does not match return unchanged line. Could be anything except None so that parser can skip such line
        - If column 9, which is inferred as file size, contains '-' then set it to 0, else return the integer
        - Finally return a dictionary with proper column names
    
    '''
    import re
    PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
    match = re.search(PATTERN, line)
    
    # Return unchanged line if match doesn't exists. Spark will skip such row
    if match is None:
        return line
    
    # Group/Column 9 is a file size column, it can be "-" sometimes, when that is cast it to 0
    size_column = match.group(9)
    if size_column == '-':
        size = 0
    else:
        size = match.group(9)
    
    # Return a dict with keys as proper column names and values as contents of matched expressions
    return {
        "host"          : match.group(1), 
        "client_identd" : match.group(2), 
        "user_id"       : match.group(3), 
        "date_time"     : match.group(4), 
        "method"        : match.group(5),
        "endpoint"      : match.group(6),
        "protocol"      : match.group(7),
        "response_code" : int(match.group(8)),
        "content_size"  : size
    }

In [145]:
# Apply parse_log udf to "value" column
df_parsed = df_log.withColumn('parsed', parse_log('value'))

In [146]:
# We have a map object with key value pair in the parsed column
df_parsed.printSchema()

root
 |-- value: string (nullable = true)
 |-- parsed: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



### Having a map/dict object with key-value pairs inside a column, we can use `selectExpr` function to `select key['value'] as column_name`

In [147]:
df_parsed.selectExpr("parsed['host'] as host", "parsed['method'] as method").show(5)

+--------------------+------+
|                host|method|
+--------------------+------+
|        199.72.81.55|   GET|
|unicomp6.unicomp.net|   GET|
|      199.120.110.21|   GET|
|  burger.letters.com|   GET|
|      199.120.110.21|   GET|
+--------------------+------+
only showing top 5 rows



### Instead of typing every possible column to `selectExpr` we can use list comprehension and pass it as * arguments to the function

In [148]:
columns = ["host", "client_identd","user_id", "date_time", "method", "endpoint", "protocol", "response_code", "content_size"]
exprs = [f"parsed['{col}'] as {col}" for col in columns]
exprs

["parsed['host'] as host",
 "parsed['client_identd'] as client_identd",
 "parsed['user_id'] as user_id",
 "parsed['date_time'] as date_time",
 "parsed['method'] as method",
 "parsed['endpoint'] as endpoint",
 "parsed['protocol'] as protocol",
 "parsed['response_code'] as response_code",
 "parsed['content_size'] as content_size"]

In [149]:
# Unpack exprs as arguments to selectExpr
df_clean = df_parsed.selectExpr(*exprs)
df_clean.limit(5).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985
2,199.120.110.21,-,-,01/Jul/1995:00:00:09 -0400,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085
3,burger.letters.com,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0
4,199.120.110.21,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179


### Convert columns to proper data types

In [150]:
from pyspark.sql.functions import unix_timestamp, from_unixtime

#Since time string has -0400 offset, we need to set a proper session timezone
spark.conf.set("spark.sql.session.timeZone", "-04:00")

# Convert to unix timestamp first
df_clean = df_clean \
    .withColumn('date_time', unix_timestamp(df_clean['date_time'], "dd/MMM/yyyy':'HH:mm:ss Z"))

# From unix convert to string timestamp with an offset "Z" 
df_clean = df_clean \
    .withColumn('date_time', from_unixtime(df_clean['date_time'], "yyy-MM-dd HH:mm:ssZ"))

# Convert to regular timestamp format in UTC
df_clean = df_clean \
    .withColumn('date_time_ltz', to_timestamp('date_time', "yyy-MM-dd HH:mm:ssZ"))

# Unset the timezone
spark.conf.unset("spark.sql.session.timeZone")
                
df_clean.limit(5).toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size,date_time_ltz
0,199.72.81.55,-,-,1995-07-01 00:00:01-0400,GET,/history/apollo/,HTTP/1.0,200,6245,1995-07-01 04:00:01
1,unicomp6.unicomp.net,-,-,1995-07-01 00:00:06-0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985,1995-07-01 04:00:06
2,199.120.110.21,-,-,1995-07-01 00:00:09-0400,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085,1995-07-01 04:00:09
3,burger.letters.com,-,-,1995-07-01 00:00:11-0400,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0,1995-07-01 04:00:11
4,199.120.110.21,-,-,1995-07-01 00:00:11-0400,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179,1995-07-01 04:00:11


### Convert columns to integers

In [152]:
df_clean = df_clean \
    .withColumn('content_size', df_clean['content_size'].cast('int'))

df_clean.limit(5).toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size,date_time_ltz
0,199.72.81.55,-,-,1995-07-01 00:00:01-0400,GET,/history/apollo/,HTTP/1.0,200,6245,1995-07-01 04:00:01
1,unicomp6.unicomp.net,-,-,1995-07-01 00:00:06-0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985,1995-07-01 04:00:06
2,199.120.110.21,-,-,1995-07-01 00:00:09-0400,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085,1995-07-01 04:00:09
3,burger.letters.com,-,-,1995-07-01 00:00:11-0400,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0,1995-07-01 04:00:11
4,199.120.110.21,-,-,1995-07-01 00:00:11-0400,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179,1995-07-01 04:00:11


In [153]:
df_clean.printSchema()

root
 |-- host: string (nullable = true)
 |-- client_identd: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- response_code: string (nullable = true)
 |-- content_size: integer (nullable = true)
 |-- date_time_ltz: timestamp (nullable = true)

