# Parsing Access Log of NASA Kennedy Space Center WWW server in Florida using pyspark

### Data Source

"home/gldata/ NASA_access_log_Aug95.gz"

### Data Dictionary

Above dataset is access log of NASA Kennedy Space Center WWW server in Florida. The logs are an ASCII file with one line per request, with the following columns:

**host** - making the request. A hostname when possible, otherwise the Internet address if the name could not be looked up.

**timestamp** - in the format "DAY MON DD HH:MM:SS YYYY", where DAY is the day of the week, MON is the name of the month, DD is the day of the month, HH:MM:SS is the time of day using a 24-hour clock, and YYYY is the year. The timezone is -0400.

**request url** - Request URL.

**HTTP reply code**

**bytes returned by the server**

Note that from 01/Aug/1995:14:52:01 until 03/Aug/1995:04:36:13 there are no accesses recorded, as the Web server was shut down, due to Hurricane Erin.

In [35]:
from pyspark import SparkContext,SparkConf,SQLContext
import re
from pyspark.sql.types import Row
from __future__ import print_function

In [2]:
sparkConf= SparkConf().setAppName("NasaLogAssignment").setMaster("local[1]")

In [3]:
sc= SparkContext(conf=sparkConf)
sqlContext = SQLContext(sc)

In [4]:
PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)(.*)" (\d{3}) (\S+)'
def parseLogLine(log):
    m = re.match(PATTERN, log)
    if m:
        return [Row(host=m.group(1), timeStamp=m.group(4),url=m.group(6), httpCode=int(m.group(8)))]
    else:
        return []


In [5]:
logContents = sc.textFile("/gldata/NASA_access_log_Aug95.gz").flatMap(lambda x: parseLogLine(x))

In [6]:
logContents.first()

Row(host=u'in24.inetnebr.com', httpCode=200, timeStamp=u'01/Aug/1995:00:00:01 -0400', url=u'/shuttle/missions/sts-68/news/sts-68-mcc-05.txt')

In [7]:
columns = ["host", "httpCode", "timestamp1", "url"]
contentSchema = StructType([
    StructField("host",StringType(),True),
    StructField("httpCode",IntegerType(),True),
    StructField("timestamp1",StringType(),True),
    StructField("url",StringType(),True)
   ])
dfraw = sqlContext.createDataFrame(logContents.map(lambda row: Row(**{x[0]: x[1] for x in zip(columns, row)})), contentSchema)

In [8]:
dfraw.columns

['host', 'httpCode', 'timestamp1', 'url']

In [9]:
from pyspark.sql.functions import split
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import concat
from pyspark.sql.functions import to_date
from pyspark.sql.functions import udf
from pyspark.sql.functions import countDistinct
import pandas as pd

### Split timestamp and discard milliseconds component

In [10]:
split_col = split(dfraw['timestamp1'], ' ')
dfraw = dfraw.withColumn('datetime', split_col.getItem(0))

### Get day, hour, min and seconds

In [11]:
split_col1 = split(dfraw['datetime'],':' )
dfraw = dfraw.withColumn('date',split_col1.getItem(0))

In [12]:
dfraw = dfraw.withColumn('hour',split_col1.getItem(1))
dfraw = dfraw.withColumn('min',split_col1.getItem(2))
dfraw = dfraw.withColumn('second',split_col1.getItem(3))
dfraw = dfraw.withColumn('actualTimestamp',concat(col('datetime'),lit(' '),col('hour'),lit(':'),col('min'),lit(':'),col('second')))

In [13]:
dfraw.select('datetime','date','actualTimestamp','hour','min','second').show()

+--------------------+-----------+--------------------+----+---+------+
|            datetime|       date|     actualTimestamp|hour|min|second|
+--------------------+-----------+--------------------+----+---+------+
|01/Aug/1995:00:00:01|01/Aug/1995|01/Aug/1995:00:00...|  00| 00|    01|
|01/Aug/1995:00:00:07|01/Aug/1995|01/Aug/1995:00:00...|  00| 00|    07|
|01/Aug/1995:00:00:08|01/Aug/1995|01/Aug/1995:00:00...|  00| 00|    08|
|01/Aug/1995:00:00:08|01/Aug/1995|01/Aug/1995:00:00...|  00| 00|    08|
|01/Aug/1995:00:00:08|01/Aug/1995|01/Aug/1995:00:00...|  00| 00|    08|
|01/Aug/1995:00:00:09|01/Aug/1995|01/Aug/1995:00:00...|  00| 00|    09|
|01/Aug/1995:00:00:10|01/Aug/1995|01/Aug/1995:00:00...|  00| 00|    10|
|01/Aug/1995:00:00:10|01/Aug/1995|01/Aug/1995:00:00...|  00| 00|    10|
|01/Aug/1995:00:00:10|01/Aug/1995|01/Aug/1995:00:00...|  00| 00|    10|
|01/Aug/1995:00:00:11|01/Aug/1995|01/Aug/1995:00:00...|  00| 00|    11|
|01/Aug/1995:00:00:12|01/Aug/1995|01/Aug/1995:00:00...|  00| 00|

### Convert date into a date type and then cast the same to a string

In [14]:
dfraw = dfraw.withColumn('actualdate', to_date('date', 'dd/MMM/yyyy'))

In [15]:
dfraw = dfraw.withColumn('actualdateAsString',dfraw.actualdate.cast(StringType()))

In [16]:
dfraw.select('datetime','date','actualdate','actualdateAsString','actualTimestamp','hour','min','second').show()

+--------------------+-----------+----------+------------------+--------------------+----+---+------+
|            datetime|       date|actualdate|actualdateAsString|     actualTimestamp|hour|min|second|
+--------------------+-----------+----------+------------------+--------------------+----+---+------+
|01/Aug/1995:00:00:01|01/Aug/1995|1995-08-01|        1995-08-01|01/Aug/1995:00:00...|  00| 00|    01|
|01/Aug/1995:00:00:07|01/Aug/1995|1995-08-01|        1995-08-01|01/Aug/1995:00:00...|  00| 00|    07|
|01/Aug/1995:00:00:08|01/Aug/1995|1995-08-01|        1995-08-01|01/Aug/1995:00:00...|  00| 00|    08|
|01/Aug/1995:00:00:08|01/Aug/1995|1995-08-01|        1995-08-01|01/Aug/1995:00:00...|  00| 00|    08|
|01/Aug/1995:00:00:08|01/Aug/1995|1995-08-01|        1995-08-01|01/Aug/1995:00:00...|  00| 00|    08|
|01/Aug/1995:00:00:09|01/Aug/1995|1995-08-01|        1995-08-01|01/Aug/1995:00:00...|  00| 00|    09|
|01/Aug/1995:00:00:10|01/Aug/1995|1995-08-01|        1995-08-01|01/Aug/1995:00:00.

### To calculate the day of the week given a date string

In [17]:
def get_weekday(date):
    import datetime
    import calendar
    year, month, day = (int(x) for x in date.split('-'))    
    weekday = datetime.date(year, month, day)
    return calendar.day_name[weekday.weekday()]

In [18]:
getWeekDay=udf(get_weekday)

In [19]:
get_weekday('1995-08-01')

'Tuesday'

In [20]:
dfraw.columns

['host',
 'httpCode',
 'timestamp1',
 'url',
 'datetime',
 'date',
 'hour',
 'min',
 'second',
 'actualTimestamp',
 'actualdate',
 'actualdateAsString']

In [21]:
dfraw = dfraw.withColumn('dayOfWeek',getWeekDay('actualdateAsString'))

In [22]:
dfraw.columns

['host',
 'httpCode',
 'timestamp1',
 'url',
 'datetime',
 'date',
 'hour',
 'min',
 'second',
 'actualTimestamp',
 'actualdate',
 'actualdateAsString',
 'dayOfWeek']

In [23]:
dfraw.select('datetime','date','actualdate','actualdateAsString','dayOfWeek','actualTimestamp','hour','min','second').show()

+--------------------+-----------+----------+------------------+---------+--------------------+----+---+------+
|            datetime|       date|actualdate|actualdateAsString|dayOfWeek|     actualTimestamp|hour|min|second|
+--------------------+-----------+----------+------------------+---------+--------------------+----+---+------+
|01/Aug/1995:00:00:01|01/Aug/1995|1995-08-01|        1995-08-01|  Tuesday|01/Aug/1995:00:00...|  00| 00|    01|
|01/Aug/1995:00:00:07|01/Aug/1995|1995-08-01|        1995-08-01|  Tuesday|01/Aug/1995:00:00...|  00| 00|    07|
|01/Aug/1995:00:00:08|01/Aug/1995|1995-08-01|        1995-08-01|  Tuesday|01/Aug/1995:00:00...|  00| 00|    08|
|01/Aug/1995:00:00:08|01/Aug/1995|1995-08-01|        1995-08-01|  Tuesday|01/Aug/1995:00:00...|  00| 00|    08|
|01/Aug/1995:00:00:08|01/Aug/1995|1995-08-01|        1995-08-01|  Tuesday|01/Aug/1995:00:00...|  00| 00|    08|
|01/Aug/1995:00:00:09|01/Aug/1995|1995-08-01|        1995-08-01|  Tuesday|01/Aug/1995:00:00...|  00| 00|

### Q1: Write spark code( using RDD) to find out top 10 requested URLs along with count of number of times they have been requested (This information will help company to find out most popular pages and how frequently they are accessed)

In [25]:
dfraw.select("url").groupBy("url").count().orderBy('count', ascending=False).show(10)

+--------------------+-----+
|                 url|count|
+--------------------+-----+
|/images/NASA-logo...|97410|
|/images/KSC-logos...|75337|
|/images/MOSAIC-lo...|67448|
|/images/USA-logos...|67068|
|/images/WORLD-log...|66444|
|/images/ksclogo-m...|62778|
|           /ksc.html|43687|
|/history/apollo/i...|37826|
|/images/launch-lo...|35138|
|                   /|30347|
+--------------------+-----+
only showing top 10 rows



### Q2: Write spark code to find out top 5 hosts / IP making the request along with count (This information will help company to find out locations where website is popular or to figure out potential DDoS attacks)

In [26]:
dfraw.select("host").groupBy("host").count().orderBy('count', ascending=False).show(5)

+--------------------+-----+
|                host|count|
+--------------------+-----+
|  edams.ksc.nasa.gov| 6530|
|piweba4y.prodigy.com| 4846|
|        163.206.89.4| 4791|
|piweba5y.prodigy.com| 4607|
|piweba3y.prodigy.com| 4416|
+--------------------+-----+
only showing top 5 rows



### Q3: Write spark code to find out top 5 time frame for high traffic (which day of the week or hour of the day receives peak traffic, this information will help company to manage resources for handling peak traffic load)

In [32]:
dfraw.select('dayOfWeek').groupBy('dayOfWeek').count().orderBy('count',ascending=False).show(5)

+---------+------+
|dayOfWeek| count|
+---------+------+
| Thursday|304297|
|  Tuesday|278749|
|Wednesday|255796|
|   Friday|234370|
|   Monday|228271|
+---------+------+
only showing top 5 rows



### Q4: Write spark code to find out 5 time frames of least traffic (which day of the week or hour of the day receives least traffic, this information will help company to do production deployment in that time frame so that less number of users will be affected if some thing goes wrong during deployment)

In [29]:
dfraw.select('dayOfWeek', 'hour').groupBy('dayOfWeek','hour').count().orderBy('count').show(5)

+---------+----+-----+
|dayOfWeek|hour|count|
+---------+----+-----+
|   Sunday|  06| 2437|
| Saturday|  05| 2579|
|   Sunday|  05| 2734|
| Saturday|  06| 2748|
|   Sunday|  04| 2807|
+---------+----+-----+
only showing top 5 rows



### Q5: Write spark code to find out unique HTTP codes returned by the server along with count (this information is helpful for devops team to find out how many requests are failing so that appropriate action can be taken to fix the issue)

In [33]:
dfraw.select('httpCode').groupBy('httpCode').count().orderBy('count',ascending=False).show()

+--------+-------+
|httpCode|  count|
+--------+-------+
|     200|1398988|
|     304| 134146|
|     302|  26444|
|     404|  10056|
|     403|    171|
|     501|     27|
|     500|      3|
+--------+-------+

