# Exercise 1: Schema on Read

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
dfLog = spark.read.text("data/NASA_access_log_Jul95.gz")

# Load the dataset

In [4]:
#Data Source: http://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz
dfLog = spark.read.text("data/NASA_access_log_Jul95.gz")

# Quick inspection of  the data set

In [5]:
# see the schema
dfLog.printSchema()

root
 |-- value: string (nullable = true)



In [6]:
# number of lines
dfLog.count()

1891715

In [7]:
#what's in there? 
dfLog.show(5)

+--------------------+
|               value|
+--------------------+
|199.72.81.55 - - ...|
|unicomp6.unicomp....|
|199.120.110.21 - ...|
|burger.letters.co...|
|199.120.110.21 - ...|
+--------------------+
only showing top 5 rows



In [8]:
#a better show?
dfLog.show(5, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
+-----------------------

In [9]:
#pandas to the rescue
pd.set_option('max_colwidth', 200)
dfLog.limit(5).toPandas()

Unnamed: 0,value
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985"
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085"
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0"
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179"


# Let' try simple parsing with split

In [10]:
from pyspark.sql.functions import split
# Todo
dfArray = dfLog.withColumn("tokenized", split("value", " "))
dfArray.limit(10).toPandas()

Unnamed: 0,value,tokenized
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245","[199.72.81.55, -, -, [01/Jul/1995:00:00:01, -0400], ""GET, /history/apollo/, HTTP/1.0"", 200, 6245]"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","[unicomp6.unicomp.net, -, -, [01/Jul/1995:00:00:06, -0400], ""GET, /shuttle/countdown/, HTTP/1.0"", 200, 3985]"
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085","[199.120.110.21, -, -, [01/Jul/1995:00:00:09, -0400], ""GET, /shuttle/missions/sts-73/mission-sts-73.html, HTTP/1.0"", 200, 4085]"
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0","[burger.letters.com, -, -, [01/Jul/1995:00:00:11, -0400], ""GET, /shuttle/countdown/liftoff.html, HTTP/1.0"", 304, 0]"
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179","[199.120.110.21, -, -, [01/Jul/1995:00:00:11, -0400], ""GET, /shuttle/missions/sts-73/sts-73-patch-small.gif, HTTP/1.0"", 200, 4179]"
5,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /images/NASA-logosmall.gif HTTP/1.0"" 304 0","[burger.letters.com, -, -, [01/Jul/1995:00:00:12, -0400], ""GET, /images/NASA-logosmall.gif, HTTP/1.0"", 304, 0]"
6,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/video/livevideo.gif HTTP/1.0"" 200 0","[burger.letters.com, -, -, [01/Jul/1995:00:00:12, -0400], ""GET, /shuttle/countdown/video/livevideo.gif, HTTP/1.0"", 200, 0]"
7,"205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/countdown.html HTTP/1.0"" 200 3985","[205.212.115.106, -, -, [01/Jul/1995:00:00:12, -0400], ""GET, /shuttle/countdown/countdown.html, HTTP/1.0"", 200, 3985]"
8,"d104.aa.net - - [01/Jul/1995:00:00:13 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","[d104.aa.net, -, -, [01/Jul/1995:00:00:13, -0400], ""GET, /shuttle/countdown/, HTTP/1.0"", 200, 3985]"
9,"129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] ""GET / HTTP/1.0"" 200 7074","[129.94.144.152, -, -, [01/Jul/1995:00:00:13, -0400], ""GET, /, HTTP/1.0"", 200, 7074]"


# Second attempt, let's build a custom parsing UDF 

In [11]:
from pyspark.sql.functions import udf

@udf
def parseUDF(line):
    # Todo
    import re
    PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
    match = re.search(PATTERN, line)
    if match is None:
        return (line, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = 0
    else:
        size = match.group(9)
        
    return {
        "host"          : match.group(1),
        "client_identd" : match.group(2),
        "user_id"       : match.group(3),
        "date_time"     : match.group(4),
        "method"        : match.group(5),
        "endpoint"      : match.group(6),
        "protocol"      : match.group(7),
        "response_code" : int(match.group(8)),
        "content_size"  : size
    }

In [12]:
#Let's start from the beginning
# Todo
dfParsed = dfLog.withColumn("parsed", parseUDF("value"))
dfParsed.limit(10).toPandas()

Unnamed: 0,value,parsed
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245","{response_code=200, protocol=HTTP/1.0, endpoint=/history/apollo/, content_size=6245, method=GET, date_time=01/Jul/1995:00:00:01 -0400, user_id=-, host=199.72.81.55, client_identd=-}"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","{response_code=200, protocol=HTTP/1.0, endpoint=/shuttle/countdown/, content_size=3985, method=GET, date_time=01/Jul/1995:00:00:06 -0400, user_id=-, host=unicomp6.unicomp.net, client_identd=-}"
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085","{response_code=200, protocol=HTTP/1.0, endpoint=/shuttle/missions/sts-73/mission-sts-73.html, content_size=4085, method=GET, date_time=01/Jul/1995:00:00:09 -0400, user_id=-, host=199.120.110.21, c..."
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0","{response_code=304, protocol=HTTP/1.0, endpoint=/shuttle/countdown/liftoff.html, content_size=0, method=GET, date_time=01/Jul/1995:00:00:11 -0400, user_id=-, host=burger.letters.com, client_identd=-}"
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179","{response_code=200, protocol=HTTP/1.0, endpoint=/shuttle/missions/sts-73/sts-73-patch-small.gif, content_size=4179, method=GET, date_time=01/Jul/1995:00:00:11 -0400, user_id=-, host=199.120.110.21..."
5,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /images/NASA-logosmall.gif HTTP/1.0"" 304 0","{response_code=304, protocol=HTTP/1.0, endpoint=/images/NASA-logosmall.gif, content_size=0, method=GET, date_time=01/Jul/1995:00:00:12 -0400, user_id=-, host=burger.letters.com, client_identd=-}"
6,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/video/livevideo.gif HTTP/1.0"" 200 0","{response_code=200, protocol=HTTP/1.0, endpoint=/shuttle/countdown/video/livevideo.gif, content_size=0, method=GET, date_time=01/Jul/1995:00:00:12 -0400, user_id=-, host=burger.letters.com, client..."
7,"205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/countdown.html HTTP/1.0"" 200 3985","{response_code=200, protocol=HTTP/1.0, endpoint=/shuttle/countdown/countdown.html, content_size=3985, method=GET, date_time=01/Jul/1995:00:00:12 -0400, user_id=-, host=205.212.115.106, client_iden..."
8,"d104.aa.net - - [01/Jul/1995:00:00:13 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","{response_code=200, protocol=HTTP/1.0, endpoint=/shuttle/countdown/, content_size=3985, method=GET, date_time=01/Jul/1995:00:00:13 -0400, user_id=-, host=d104.aa.net, client_identd=-}"
9,"129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] ""GET / HTTP/1.0"" 200 7074","{response_code=200, protocol=HTTP/1.0, endpoint=/, content_size=7074, method=GET, date_time=01/Jul/1995:00:00:13 -0400, user_id=-, host=129.94.144.152, client_identd=-}"


In [13]:
dfParsed.printSchema()

root
 |-- value: string (nullable = true)
 |-- parsed: string (nullable = true)



# Third attempt, let's fix our UDF

In [14]:
#from pyspark.sql.functions import udf # already imported
from pyspark.sql.types import MapType, StringType

@udf(MapType(StringType(),StringType()))
def parseUDFbetter(line):
    import re
    PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
    match = re.search(PATTERN, line)
    if match is None:
        return (line, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = 0
    else:
        size = match.group(9)
        
    return {
        "host"          : match.group(1),
        "client_identd" : match.group(2),
        "user_id"       : match.group(3),
        "date_time"     : match.group(4),
        "method"        : match.group(5),
        "endpoint"      : match.group(6),
        "protocol"      : match.group(7),
        "response_code" : int(match.group(8)),
        "content_size"  : size
    }

In [15]:
#Let's start from the beginning
# Todo
dfParsed = dfLog.withColumn("parsed", parseUDFbetter("value"))
dfParsed.limit(10).toPandas()

Unnamed: 0,value,parsed
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/history/apollo/', 'content_size': '6245', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:01 -0400', 'user_id': '-', 'host': '199.72..."
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:06 -0400', 'user_id': '-', 'host': 'uni..."
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/missions/sts-73/mission-sts-73.html', 'content_size': '4085', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:09 -0400', 'us..."
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0","{'response_code': '304', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/liftoff.html', 'content_size': '0', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:11 -0400', 'user_id': '-', 'ho..."
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/missions/sts-73/sts-73-patch-small.gif', 'content_size': '4179', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:11 -0400', ..."
5,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /images/NASA-logosmall.gif HTTP/1.0"" 304 0","{'response_code': '304', 'protocol': 'HTTP/1.0', 'endpoint': '/images/NASA-logosmall.gif', 'content_size': '0', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:12 -0400', 'user_id': '-', 'host': ..."
6,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/video/livevideo.gif HTTP/1.0"" 200 0","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/video/livevideo.gif', 'content_size': '0', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:12 -0400', 'user_id': '..."
7,"205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/countdown.html HTTP/1.0"" 200 3985","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/countdown.html', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:12 -0400', 'user_id': '-'..."
8,"d104.aa.net - - [01/Jul/1995:00:00:13 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:13 -0400', 'user_id': '-', 'host': 'd10..."
9,"129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] ""GET / HTTP/1.0"" 200 7074","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/', 'content_size': '7074', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:13 -0400', 'user_id': '-', 'host': '129.94.144.152', 'cli..."


In [16]:
#Bingo!! we'got a column of type map with the fields parsed
dfParsed.printSchema()

root
 |-- value: string (nullable = true)
 |-- parsed: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [17]:
dfParsed.select("parsed").limit(10).toPandas()

Unnamed: 0,parsed
0,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/history/apollo/', 'content_size': '6245', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:01 -0400', 'user_id': '-', 'host': '199.72..."
1,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:06 -0400', 'user_id': '-', 'host': 'uni..."
2,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/missions/sts-73/mission-sts-73.html', 'content_size': '4085', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:09 -0400', 'us..."
3,"{'response_code': '304', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/liftoff.html', 'content_size': '0', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:11 -0400', 'user_id': '-', 'ho..."
4,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/missions/sts-73/sts-73-patch-small.gif', 'content_size': '4179', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:11 -0400', ..."
5,"{'response_code': '304', 'protocol': 'HTTP/1.0', 'endpoint': '/images/NASA-logosmall.gif', 'content_size': '0', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:12 -0400', 'user_id': '-', 'host': ..."
6,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/video/livevideo.gif', 'content_size': '0', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:12 -0400', 'user_id': '..."
7,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/countdown.html', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:12 -0400', 'user_id': '-'..."
8,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:13 -0400', 'user_id': '-', 'host': 'd10..."
9,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/', 'content_size': '7074', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:13 -0400', 'user_id': '-', 'host': '129.94.144.152', 'cli..."


# Let's build separate columns

In [18]:
dfParsed.selectExpr("parsed['host'] as host").limit(5).show(5)

+--------------------+
|                host|
+--------------------+
|        199.72.81.55|
|unicomp6.unicomp.net|
|      199.120.110.21|
|  burger.letters.com|
|      199.120.110.21|
+--------------------+



In [19]:
dfParsed.selectExpr(["parsed['host']", "parsed['date_time']"]).show(5)

+--------------------+--------------------+
|        parsed[host]|   parsed[date_time]|
+--------------------+--------------------+
|        199.72.81.55|01/Jul/1995:00:00...|
|unicomp6.unicomp.net|01/Jul/1995:00:00...|
|      199.120.110.21|01/Jul/1995:00:00...|
|  burger.letters.com|01/Jul/1995:00:00...|
|      199.120.110.21|01/Jul/1995:00:00...|
+--------------------+--------------------+
only showing top 5 rows



In [20]:
fields = ["host", "client_identd","user_id", "date_time", "method", "endpoint", "protocol", "response_code", "content_size"]
exprs = ["parsed['{}'] as {}".format(field, field) for field in fields]
exprs

["parsed['host'] as host",
 "parsed['client_identd'] as client_identd",
 "parsed['user_id'] as user_id",
 "parsed['date_time'] as date_time",
 "parsed['method'] as method",
 "parsed['endpoint'] as endpoint",
 "parsed['protocol'] as protocol",
 "parsed['response_code'] as response_code",
 "parsed['content_size'] as content_size"]

In [21]:
dfClean = dfParsed.selectExpr(*exprs)
dfClean.limit(5).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985
2,199.120.110.21,-,-,01/Jul/1995:00:00:09 -0400,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085
3,burger.letters.com,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0
4,199.120.110.21,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179


## Popular hosts

In [22]:
dfClean.groupBy("host").count().limit(10).toPandas()

Unnamed: 0,host,count
0,ppp3_136.bekkoame.or.jp,39
1,ppp31.texoma.com,21
2,ix-wc7-20.ix.netcom.com,40
3,nb1-du5.polarnet.fnsb.ak.us,25
4,ttyb5.shasta.com,19
5,dialup00004.cinet.itl.net,10
6,dd14-025.compuserve.com,18
7,nigrlpr.actrix.gen.nz,3
8,uckm001.pn.itnet.it,34
9,queulen.puc.cl,87


In [23]:
from pyspark.sql.functions import desc
# Todo
dfClean.groupBy("host").count().orderBy(desc("count")).limit(10).toPandas()

Unnamed: 0,host,count
0,piweba3y.prodigy.com,17572
1,piweba4y.prodigy.com,11591
2,piweba1y.prodigy.com,9868
3,alyssa.prodigy.com,7852
4,siltb10.orl.mmc.com,7573
5,piweba2y.prodigy.com,5922
6,edams.ksc.nasa.gov,5434
7,163.206.89.4,4906
8,news.ti.com,4863
9,disarray.demon.co.uk,4353


## Popular content

In [24]:
from pyspark.sql.functions import desc
# Todo
dfClean.groupBy("endpoint").count().orderBy(desc("count")).limit(10).toPandas()

Unnamed: 0,endpoint,count
0,/images/NASA-logosmall.gif,111330
1,/images/KSC-logosmall.gif,89638
2,/images/MOSAIC-logosmall.gif,60467
3,/images/USA-logosmall.gif,60013
4,/images/WORLD-logosmall.gif,59488
5,/images/ksclogo-medium.gif,58801
6,/images/launch-logo.gif,40871
7,/shuttle/countdown/,40278
8,/ksc.html,40226
9,/images/ksclogosmall.gif,33585


## Large Files

In [25]:
dfClean.createOrReplaceTempView("cleanlog")
spark.sql("""
select endpoint, content_size
from cleanlog 
order by content_size desc
""").limit(10).toPandas()

Unnamed: 0,endpoint,content_size
0,/images/cdrom-1-95/img0007.jpg,99981
1,/shuttle/missions/sts-71/movies/sts-71-launch.mpg,999424
2,/shuttle/missions/sts-71/movies/sts-71-launch.mpg,999424
3,/history/apollo/apollo-13/images/index.gif,99942
4,/history/apollo/apollo-13/images/index.gif,99942
5,/history/apollo/apollo-13/images/index.gif,99942
6,/history/apollo/apollo-13/images/index.gif,99942
7,/history/apollo/apollo-13/images/index.gif,99942
8,/history/apollo/apollo-13/images/index.gif,99942
9,/history/apollo/apollo-13/images/index.gif,99942


In [26]:
from pyspark.sql.functions import expr
dfCleanTyped = dfClean.withColumn("content_size_bytes", expr("cast(content_size as int)"))
dfCleanTyped.limit(10).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size,content_size_bytes
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245,6245
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985,3985
2,199.120.110.21,-,-,01/Jul/1995:00:00:09 -0400,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085,4085
3,burger.letters.com,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0,0
4,199.120.110.21,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179,4179
5,burger.letters.com,-,-,01/Jul/1995:00:00:12 -0400,GET,/images/NASA-logosmall.gif,HTTP/1.0,304,0,0
6,burger.letters.com,-,-,01/Jul/1995:00:00:12 -0400,GET,/shuttle/countdown/video/livevideo.gif,HTTP/1.0,200,0,0
7,205.212.115.106,-,-,01/Jul/1995:00:00:12 -0400,GET,/shuttle/countdown/countdown.html,HTTP/1.0,200,3985,3985
8,d104.aa.net,-,-,01/Jul/1995:00:00:13 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985,3985
9,129.94.144.152,-,-,01/Jul/1995:00:00:13 -0400,GET,/,HTTP/1.0,200,7074,7074


In [27]:
dfCleanTyped.createOrReplaceTempView("cleantypedlog")
spark.sql("""
select endpoint, content_size
from cleantypedlog 
order by content_size_bytes desc
""").limit(10).toPandas()

Unnamed: 0,endpoint,content_size
0,/shuttle/countdown/video/livevideo.jpeg,6823936
1,/statistics/1995/bkup/Mar95_full.html,3155499
2,/statistics/1995/bkup/Mar95_full.html,3155499
3,/statistics/1995/bkup/Mar95_full.html,3155499
4,/statistics/1995/bkup/Mar95_full.html,3155499
5,/statistics/1995/bkup/Mar95_full.html,3155499
6,/statistics/1995/bkup/Mar95_full.html,3155499
7,/statistics/1995/bkup/Mar95_full.html,3155499
8,/statistics/1995/Jun/Jun95_reverse_domains.html,2973350
9,/statistics/1995/Jun/Jun95_reverse_domains.html,2973350


In [28]:
!ls

aws
data
Exercise 1 - Schema On Read.ipynb
Exercise 1 - Schema On Read - Solution.ipynb
Exercise 2 - Advanced Analytics NLP-Copy1.ipynb
Exercise 2 - Advanced Analytics NLP.ipynb
Exercise 2 - Advanced Analytics NLP - Solution.ipynb
Exercise 3 - Data Lake on S3.ipynb
Exercise 3 - Data Lake on S3 - Solution.ipynb
spark-warehouse


In [29]:
# Left for you, clean the date column :)
# 1- Create a udf that parses that weird format,
# 2- Create a new column with a data time string that spark would understand
# 3- Add a new date-time column properly typed
# 4- Print your schema

In [30]:
from pyspark.sql.types import TimestampType

@udf()
def parseDatetime(date_string):
    from datetime import datetime
    
    
    date_string = date_string.replace(':', 'T', 1).replace(' ', '')
    
    return str(datetime.strptime(date_string, '%d/%b/%YT%H:%M:%S%z'))

In [31]:
dfTimestampParsed = dfCleanTyped.withColumn("parsed_date_time", parseDatetime("date_time"))
dfTimestampParsed.limit(10).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size,content_size_bytes,parsed_date_time
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245,6245,1995-07-01 00:00:01-04:00
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985,3985,1995-07-01 00:00:06-04:00
2,199.120.110.21,-,-,01/Jul/1995:00:00:09 -0400,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085,4085,1995-07-01 00:00:09-04:00
3,burger.letters.com,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0,0,1995-07-01 00:00:11-04:00
4,199.120.110.21,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179,4179,1995-07-01 00:00:11-04:00
5,burger.letters.com,-,-,01/Jul/1995:00:00:12 -0400,GET,/images/NASA-logosmall.gif,HTTP/1.0,304,0,0,1995-07-01 00:00:12-04:00
6,burger.letters.com,-,-,01/Jul/1995:00:00:12 -0400,GET,/shuttle/countdown/video/livevideo.gif,HTTP/1.0,200,0,0,1995-07-01 00:00:12-04:00
7,205.212.115.106,-,-,01/Jul/1995:00:00:12 -0400,GET,/shuttle/countdown/countdown.html,HTTP/1.0,200,3985,3985,1995-07-01 00:00:12-04:00
8,d104.aa.net,-,-,01/Jul/1995:00:00:13 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985,3985,1995-07-01 00:00:13-04:00
9,129.94.144.152,-,-,01/Jul/1995:00:00:13 -0400,GET,/,HTTP/1.0,200,7074,7074,1995-07-01 00:00:13-04:00


In [32]:
dfTimestampParsed.printSchema()

root
 |-- host: string (nullable = true)
 |-- client_identd: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- response_code: string (nullable = true)
 |-- content_size: string (nullable = true)
 |-- content_size_bytes: integer (nullable = true)
 |-- parsed_date_time: string (nullable = true)



In [33]:
#dfCleanTyped("date_time").limit(5).show(5).toPandas()
dfCleanTyped.groupBy("date_time").count().orderBy(desc("count")).limit(10).toPandas()

Unnamed: 0,date_time,count
0,,864
1,13/Jul/1995:13:32:58 -0400,20
2,13/Jul/1995:13:32:59 -0400,18
3,13/Jul/1995:09:42:19 -0400,18
4,13/Jul/1995:09:42:20 -0400,17
5,13/Jul/1995:08:35:02 -0400,16
6,11/Jul/1995:14:47:44 -0400,16
7,13/Jul/1995:09:49:30 -0400,16
8,13/Jul/1995:08:56:34 -0400,15
9,12/Jul/1995:13:39:35 -0400,15


In [34]:
s= "13/Jul/1995:13:32:58 -0400"
s = s.replace(':', 'T', 1).replace(' ', '')
s
'2017/Feb/14T09:51:46.000-0600'

'2017/Feb/14T09:51:46.000-0600'

In [35]:
from datetime import datetime

date_string = "2017-02-14T09:51:46.000-0600"
# I'm using date_string[:-9] to skip ".000-0600"
format_date = datetime.strptime(date_string, '%Y-%m-%dT%H:%M:%S.%f%z')
print(format_date)

2017-02-14 09:51:46-06:00


In [36]:
date_string = "1995/Jul/13T13:32:58-0400"
# I'm using date_string[:-9] to skip ".000-0600"
format_date = datetime.strptime(date_string, '%Y/%b/%dT%H:%M:%S%z')
print(format_date)

1995-07-13 13:32:58-04:00


In [37]:
date_string = "13/Jul/1995T13:32:58-0400"
# I'm using date_string[:-9] to skip ".000-0600"
format_date = datetime.strptime(date_string, '%d/%b/%YT%H:%M:%S%z')
print(format_date)

1995-07-13 13:32:58-04:00


In [38]:
l = ["13/Jul/1995:13:32:58 -0400", "13/Jul/1995:13:32:59 -0400", "13/Jul/1995:09:42:19 -0400", 
     "13/Jul/1995:09:42:20 -0400", "13/Jul/1995:08:35:02 -0400", "11/Jul/1995:14:47:44 -0400", 
     "13/Jul/1995:09:49:30 -0400", "13/Jul/1995:08:56:34 -0400", "12/Jul/1995:13:39:35 -0400"]

def parseDatetime(date_string):
    from datetime import datetime
    
    
    date_string = date_string.replace(':', 'T', 1).replace(' ', '')
    
    return datetime.strptime(date_string, '%d/%b/%YT%H:%M:%S%z')
    
    
for date_string in l:
     print(str(parseDatetime(date_string)))

1995-07-13 13:32:58-04:00
1995-07-13 13:32:59-04:00
1995-07-13 09:42:19-04:00
1995-07-13 09:42:20-04:00
1995-07-13 08:35:02-04:00
1995-07-11 14:47:44-04:00
1995-07-13 09:49:30-04:00
1995-07-13 08:56:34-04:00
1995-07-12 13:39:35-04:00
