#### Imports needed for the notebook

In [1]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
import org.apache.spark.SparkFiles

#### Download data file and convert to DataFrame for later manipulation

In [2]:
val dataUrl = "ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz"
sc.addFile(dataUrl)
val ingestDF = spark.read.text(SparkFiles.get("NASA_access_log_Jul95.gz"))

dataUrl = ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz
ingestDF = [value: string]


[value: string]

In [3]:
%%dataframe
ingestDF

value
"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245"
"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985"
"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085"
"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0"
"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179"
"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /images/NASA-logosmall.gif HTTP/1.0"" 304 0"
"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/video/livevideo.gif HTTP/1.0"" 200 0"
"205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/countdown.html HTTP/1.0"" 200 3985"
"d104.aa.net - - [01/Jul/1995:00:00:13 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985"
"129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] ""GET / HTTP/1.0"" 200 7074"


#### Extract fields from each line of the log file using regular expressions  
Regex modified from article "Web Server Log Analysis with Spark", part (2b)  
https://adataanalyst.com/spark/web-server-log-analysis-spark/

In [4]:
val parseDF = ingestDF.select(
        regexp_extract($"value", """^([^(\s|,)]+)""", 1).as("host"),
        regexp_extract($"value", """^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2})""", 1).as("timestamp"),
        regexp_extract($"value", """^.*\w+\s+([^\s]+)\s+HTTP.*""", 1).as("path"),
        regexp_extract($"value", """^.*"\s+([^\s]+)""", 1).cast("integer").as("status"),
        regexp_extract($"value", """^.*\s+(\d+)$""", 1).cast("integer").as("content_size"))
parseDF.cache

parseDF = [host: string, timestamp: string ... 3 more fields]


[host: string, timestamp: string ... 3 more fields]

In [5]:
%%dataframe
parseDF

host,timestamp,path,status,content_size
199.72.81.55,01/Jul/1995:00:00:01,/history/apollo/,200,6245
unicomp6.unicomp.net,01/Jul/1995:00:00:06,/shuttle/countdown/,200,3985
199.120.110.21,01/Jul/1995:00:00:09,/shuttle/missions/sts-73/mission-sts-73.html,200,4085
burger.letters.com,01/Jul/1995:00:00:11,/shuttle/countdown/liftoff.html,304,0
199.120.110.21,01/Jul/1995:00:00:11,/shuttle/missions/sts-73/sts-73-patch-small.gif,200,4179
burger.letters.com,01/Jul/1995:00:00:12,/images/NASA-logosmall.gif,304,0
burger.letters.com,01/Jul/1995:00:00:12,/shuttle/countdown/video/livevideo.gif,200,0
205.212.115.106,01/Jul/1995:00:00:12,/shuttle/countdown/countdown.html,200,3985
d104.aa.net,01/Jul/1995:00:00:13,/shuttle/countdown/,200,3985
129.94.144.152,01/Jul/1995:00:00:13,/,200,7074


#### Utility function to check parsing quality by counting nulls and empty strings

In [6]:
def countNE(df: DataFrame) {
    println("         Column           Null          Empty")
    for (c <- df.columns) {
        val nullCount = df.filter(col(c).isNull).count
        val emptyCount = df.filter(col(c) === "").count
        println(f"$c%15s$nullCount%15s$emptyCount%15s")
    }
}

countNE: (df: org.apache.spark.sql.DataFrame)Unit


In [7]:
countNE(parseDF)

         Column           Null          Empty
           host              0              0
      timestamp              0              1
           path              0           3094
         status              1              0
   content_size          19727              0


#### There are a lot of nulls in the `content_size` column.  Let's take a closer look to see if it's a parsing error.

In [8]:
%%dataframe --limit 50
parseDF.filter($"content_size".isNull)

host,timestamp,path,status,content_size
dd15-062.compuserve.com,01/Jul/1995:00:01:12,/news/sci.space.shuttle/archive/sci-space-shuttle-22-apr-1995-40.txt,404,
dynip42.efn.org,01/Jul/1995:00:02:14,/software,302,
ix-or10-06.ix.netcom.com,01/Jul/1995:00:02:40,/software/winvn,302,
ix-or10-06.ix.netcom.com,01/Jul/1995:00:03:24,/software,302,
link097.txdirect.net,01/Jul/1995:00:05:06,/shuttle,302,
ix-war-mi1-20.ix.netcom.com,01/Jul/1995:00:05:13,/shuttle/missions/sts-78/news,302,
ix-war-mi1-20.ix.netcom.com,01/Jul/1995:00:05:58,/shuttle/missions/sts-72/news,302,
netport-27.iu.net,01/Jul/1995:00:10:19,/pub/winvn/readme.txt,404,
netport-27.iu.net,01/Jul/1995:00:10:28,/pub/winvn/readme.txt,404,
dynip38.efn.org,01/Jul/1995:00:10:50,/software,302,


#### Nulls in `content_size` look like they are a result of bad requests, not parsing errors.
We don't need this column for the final result so we'll drop it going forward.
Let's now look at the single null in the `status` column

In [9]:
parseDF.filter($"status".isNull).show

+--------+---------+----+------+------------+
|    host|timestamp|path|status|content_size|
+--------+---------+----+------+------------+
|alyssa.p|         |    |  null|        null|
+--------+---------+----+------+------------+



#### This looks like bad data in the log file.
`status` column can also be dropped since we don't need it to get the final result.  
However, this would still leave the row of bad data.  
To filter bad data, we'll remove any rows where `timestamp` is empty.  
We will leave the rows where `path` is empty as that could still be valid data.

In [10]:
val filterDF = parseDF.drop("status","content_size")
                    .filter($"timestamp" =!= "")

filterDF = [host: string, timestamp: string ... 1 more field]


[host: string, timestamp: string ... 1 more field]

#### Let's do a sanity check to make sure we aren't throwing away too much of the original data

In [11]:
countNE(filterDF)
println(ingestDF.count)
println(filterDF.count)

         Column           Null          Empty
           host              0              0
      timestamp              0              0
           path              0           3093
1891715
1891714


#### Extract `date` column from `timestamp`

In [12]:
val curateDF = filterDF.withColumn("date",$"timestamp".substr(1,11))
                      .drop("timestamp")

curateDF = [host: string, path: string ... 1 more field]


[host: string, path: string ... 1 more field]

In [13]:
%%dataframe
curateDF

host,path,date
199.72.81.55,/history/apollo/,01/Jul/1995
unicomp6.unicomp.net,/shuttle/countdown/,01/Jul/1995
199.120.110.21,/shuttle/missions/sts-73/mission-sts-73.html,01/Jul/1995
burger.letters.com,/shuttle/countdown/liftoff.html,01/Jul/1995
199.120.110.21,/shuttle/missions/sts-73/sts-73-patch-small.gif,01/Jul/1995
burger.letters.com,/images/NASA-logosmall.gif,01/Jul/1995
burger.letters.com,/shuttle/countdown/video/livevideo.gif,01/Jul/1995
205.212.115.106,/shuttle/countdown/countdown.html,01/Jul/1995
d104.aa.net,/shuttle/countdown/,01/Jul/1995
129.94.144.152,/,01/Jul/1995


#### Finally, rank the top N hosts and paths for each day
We'll make this generic so it's easy to add any future columns to the ranking.

In [14]:
val colNames = Array("host","path")
val topN = 3

val finalDF = (for (colName <- colNames) yield {
                  curateDF.groupBy("date",colName).count
                    .withColumn(s"count_${colName}", $"count".cast("integer")).drop("count")
                    .withColumn("rank", row_number().over(Window.partitionBy("date").orderBy(col(s"count_${colName}").desc)))
                    .filter($"rank" <= topN)
                }).reduce(_.join(_, Seq("date","rank")))
                  .orderBy("date","rank")

colNames = Array(host, path)
topN = 3
finalDF = [date: string, rank: int ... 4 more fields]


[date: string, rank: int ... 4 more fields]

In [15]:
%%dataframe --limit 200
finalDF

date,rank,host,count_host,path,count_path
01/Jul/1995,1,piweba3y.prodigy.com,623,/images/NASA-logosmall.gif,3960
01/Jul/1995,2,piweba4y.prodigy.com,547,/images/KSC-logosmall.gif,3521
01/Jul/1995,3,alyssa.prodigy.com,536,/shuttle/countdown/count.gif,2684
02/Jul/1995,1,piweba3y.prodigy.com,960,/images/NASA-logosmall.gif,3406
02/Jul/1995,2,alyssa.prodigy.com,578,/images/KSC-logosmall.gif,3093
02/Jul/1995,3,piweba1y.prodigy.com,432,/shuttle/countdown/count.gif,2302
03/Jul/1995,1,piweba3y.prodigy.com,1067,/images/NASA-logosmall.gif,5563
03/Jul/1995,2,134.83.184.18,413,/images/KSC-logosmall.gif,4648
03/Jul/1995,3,alyssa.prodigy.com,368,/shuttle/countdown/count.gif,3660
04/Jul/1995,1,piweba3y.prodigy.com,1199,/images/NASA-logosmall.gif,3853
