In [2]:
from pyspark import SparkContext
from pyspark import SparkConf
sc = SparkContext("local",'ApacheApp')
sc

<pyspark.context.SparkContext at 0x7fde5d19e290>

In [3]:
sc.version

u'2.0.2'

In [4]:
import re
import datetime

from pyspark.sql import Row

month_map = {'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
    'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12}

def parse_apache_time(s):
    """ Convert Apache time format into a Python datetime object
    Args:
        s (str): date and time in Apache time format
    Returns:
        datetime: datetime object (ignore timezone for now)
    """
    return datetime.datetime(int(s[7:11]),
                             month_map[s[3:6]],
                             int(s[0:2]),
                             int(s[12:14]),
                             int(s[15:17]),
                             int(s[18:20]))


def parseApacheLogLine(logline):
    """ Parse a line in the Apache Common Log format
    Args:
        logline (str): a line of text in the Apache Common Log format
    Returns:
        tuple: either a dictionary containing the parts of the Apache Access Log and 1,
               or the original invalid log line and 0
    """
    match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
    if match is None:
        return (logline, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = long(0)
    else:
        size = long(match.group(9))
    return (Row(
        host          = match.group(1),
        client_identd = match.group(2),
        user_id       = match.group(3),
        date_time     = parse_apache_time(match.group(4)),
        method        = match.group(5),
        endpoint      = match.group(6),
        protocol      = match.group(7),
        response_code = int(match.group(8)),
        content_size  = size
    ), 1)

In [5]:
# A regular expression pattern to extract fields from the log line
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'

In [6]:
import sys
import os

logFile = '/data/spark/project/NASA_access_log_Aug95.gz'

def parseLogs():
    """ Read and parse log file """
    parsed_logs = (sc
                   .textFile(logFile)
                   .map(parseApacheLogLine)
                   .cache())

    access_logs = (parsed_logs
                   .filter(lambda s: s[1] == 1)
                   .map(lambda s: s[0])
                   .cache())

    failed_logs = (parsed_logs
                   .filter(lambda s: s[1] == 0)
                   .map(lambda s: s[0]))
    failed_logs_count = failed_logs.count()
    if failed_logs_count > 0:
        print 'Number of invalid logline: %d' % failed_logs.count()
        for line in failed_logs.take(20):
            print 'Invalid logline: %s' % line

    print 'Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count())
    return parsed_logs, access_logs, failed_logs


parsed_logs, access_logs, failed_logs = parseLogs()

Number of invalid logline: 895
Invalid logline: 198.213.130.253 - - [03/Aug/1995:11:29:02 -0400] "GET /shuttle/missions/sts-34/mission-sts-34.html"><IMG images/ssbuv1.gif SRC="images/small34p.gif/ HTTP/1.0" 404 -
Invalid logline: ztm-13.dial.xs4all.nl - - [04/Aug/1995:09:34:52 -0400] "GET / /   HTTP/1.0" 200 7034
Invalid logline: pc32.cis.uoguelph.ca - - [04/Aug/1995:10:57:21 -0400] "GET / /   HTTP/1.0" 200 7034
Invalid logline: sgate08.st-and.ac.uk - - [04/Aug/1995:17:52:59 -0400] "GET /htbin/wais.pl?Wake Shield HTTP/1.0" 200 6858
Invalid logline: userp2.snowhill.com - - [05/Aug/1995:14:57:06 -0400] "GET / " HTTP/1.0" 200 7034
Invalid logline: ppp-nyc-2-64.ios.com - - [05/Aug/1995:20:45:33 -0400] "GET /shuttle/missions/sts-69/images/images.html 40,207 89,234 HTTP/1.0" 200 2443
Invalid logline: ppp-nyc-2-64.ios.com - - [05/Aug/1995:20:47:52 -0400] "GET /shuttle/countdown/tour.html 40,243 89,262 HTTP/1.0" 200 4347
Invalid logline: client-71-162.online.apple.com - - [05/Aug/1995:22:53:19

In [7]:
# Based on the above data, please answer following questions:
# Q1: Write spark code( using RDD) to find out top 10 requested URLs along with count of number of times they have been requested (This information will help company to find out most popular pages and how frequently they are accessed)
# Sample output:
# URL                                                              Count
# shuttle/missions/sts-71/mission-sts-71.html     549
# shuttle/resources/orbiters/enterprise.html        145

In [8]:
# Top Endpoints
endpointCounts = (access_logs
                  .map(lambda log: (log.endpoint, 1))
                  .reduceByKey(lambda a, b : a + b))

topEndpoints = endpointCounts.takeOrdered(15, lambda s: -1 * s[1])
print 'Top Ten URLs/Endpoints requested: %s' % topEndpoints

Top Ten URLs/Endpoints requested: [(u'/images/NASA-logosmall.gif', 97384), (u'/images/KSC-logosmall.gif', 75332), (u'/images/MOSAIC-logosmall.gif', 67441), (u'/images/USA-logosmall.gif', 67061), (u'/images/WORLD-logosmall.gif', 66437), (u'/images/ksclogo-medium.gif', 62771), (u'/ksc.html', 43683), (u'/history/apollo/images/apollo-logo1.gif', 37824), (u'/images/launch-logo.gif', 35135), (u'/', 30327), (u'/images/ksclogosmall.gif', 27808), (u'/shuttle/missions/sts-69/mission-sts-69.html', 24606), (u'/shuttle/countdown/', 24458), (u'/shuttle/missions/sts-69/count69.gif', 24381), (u'/shuttle/missions/sts-69/sts-69-patch-small.gif', 23404)]


In [9]:
# Q2: Write spark code to find out top 5 hosts / IP making the request along with count (This information will help company to find out locations where website is popular or to figure out potential DDoS attacks)
# Sample output: 
# URL                    Count
# 192.168.78.24     219

In [10]:
hostCountPairTuple = access_logs.map(lambda log: (log.host, 1))
hostSum = hostCountPairTuple.reduceByKey(lambda a, b : a + b)
hostSum.takeOrdered(20, lambda s: -1 * s[1])

[(u'edams.ksc.nasa.gov', 6530),
 (u'piweba4y.prodigy.com', 4846),
 (u'163.206.89.4', 4791),
 (u'piweba5y.prodigy.com', 4607),
 (u'piweba3y.prodigy.com', 4416),
 (u'www-d1.proxy.aol.com', 3889),
 (u'www-b2.proxy.aol.com', 3534),
 (u'www-b3.proxy.aol.com', 3463),
 (u'www-c5.proxy.aol.com', 3423),
 (u'www-b5.proxy.aol.com', 3411),
 (u'www-c2.proxy.aol.com', 3407),
 (u'www-d2.proxy.aol.com', 3404),
 (u'www-a2.proxy.aol.com', 3337),
 (u'news.ti.com', 3298),
 (u'www-d3.proxy.aol.com', 3296),
 (u'www-b4.proxy.aol.com', 3293),
 (u'www-c3.proxy.aol.com', 3272),
 (u'www-d4.proxy.aol.com', 3234),
 (u'www-c1.proxy.aol.com', 3177),
 (u'www-c4.proxy.aol.com', 3134)]

In [11]:
# Above list shows the host/ip address from where highest number of URL requests originated.

In [12]:
# Q3: Write spark code to find out top 5 time frame for high traffic (which day of the week or hour of the day receives peak traffic, this information will help company to manage resources for handling peak traffic load)
# Sample Output:
# |        host     |req_cnt|
# |  edams.gov |   6530|

# Q4: Write spark code to find out 5 time frames of least traffic (which day of the week or hour of the day receives least traffic, this information will help company to do production deployment in that time frame so that less number of users will be affected if some thing goes wrong during deployment)
# Sample Output:
# |     timeFrame    |req_cnt|
# |31/Nov/1995:11|   5000|

In [13]:
import calendar
access_logs.map(lambda log: (calendar.day_name[log.date_time.weekday()], 1)).reduceByKey(lambda a, b : a + b).takeOrdered(7, lambda s: -1 * s[1])

[('Thursday', 303979),
 ('Tuesday', 278666),
 ('Wednesday', 255452),
 ('Friday', 234356),
 ('Monday', 228242),
 ('Sunday', 134663),
 ('Saturday', 133645)]

In [14]:
# Above Code run indicates, Thursday is the week of the day in general with high traffic.
# Above Code run indicates, Saturday is the week of the day in general with least traffic. Sunday can also be counted with least traffic. 

In [15]:
access_logs.map(lambda log: (log.date_time.hour, 1)).reduceByKey(lambda a, b : a + b).takeOrdered(24, lambda s: -1 * s[1])

[(15, 109419),
 (12, 105132),
 (13, 104502),
 (14, 101320),
 (16, 99484),
 (11, 95312),
 (10, 88222),
 (17, 80800),
 (9, 78654),
 (18, 66773),
 (8, 65411),
 (22, 60622),
 (20, 59895),
 (19, 59285),
 (21, 57951),
 (23, 54539),
 (0, 47805),
 (7, 47349),
 (1, 38527),
 (2, 32506),
 (6, 31257),
 (3, 29962),
 (5, 27555),
 (4, 26721)]

In [16]:
# As per Python date time notation, hour '15' represents the hour beginning at 3pm.
# So, we can interpret the time duration 4am-6am is the time frame with least traffic.
# Also, note that the hour beginning 3pm is the time frame with high traffic.

In [17]:
# There is catch though. Note that the time frame analysis based on weekday and hour above done seperately and now lets try to combine both weekday and hour factors to assess the traffic.

In [18]:
TrafficRDD = access_logs.map(lambda log: ((calendar.day_name[log.date_time.weekday()], log.date_time.hour), 1)).reduceByKey(lambda a, b : a + b)

In [19]:
TrafficRDD.takeOrdered(10, lambda s: -1 * s[1])

[(('Thursday', 15), 23379),
 (('Thursday', 12), 23032),
 (('Tuesday', 13), 21115),
 (('Tuesday', 12), 20908),
 (('Thursday', 13), 20422),
 (('Thursday', 14), 20411),
 (('Thursday', 11), 19693),
 (('Tuesday', 15), 19269),
 (('Thursday', 16), 19245),
 (('Tuesday', 11), 19212)]

In [20]:
# From the above analysis, top 5 time widows with highest traffic can be observed at the following hours:
# Thursday beginning 3pm
# Thursday beginning 12pm
# Tuesday beginning 1pm
# Tuesday beginning 12pm
# Thursday beginning 1pm
# Thursday beginning 2pm

In [21]:
TrafficRDD.takeOrdered(10, lambda s: s[1])

[(('Sunday', 6), 2437),
 (('Saturday', 5), 2579),
 (('Sunday', 5), 2734),
 (('Saturday', 6), 2748),
 (('Sunday', 4), 2807),
 (('Sunday', 3), 2828),
 (('Saturday', 4), 2838),
 (('Sunday', 8), 2957),
 (('Saturday', 7), 3122),
 (('Sunday', 2), 3282)]

In [22]:
# From the above analysis, top 5 time widows with least traffic can be observed at the following hours:
# Sunday beginning 6am
# Saturday beginning 5am
# Sunday beginning 5am
# Saturday beginning 6am
# Sunday beginning 4am

In [23]:
# Q5: Write spark code to find out unique HTTP codes returned by the server along with count (this information is helpful for devops team to find out how many requests are failing so that appropriate action can be taken to fix the issue)
# Sample output:
# HTTP code            Count
# 200 - 15400 
# 404    324

In [24]:
responseCodeToCount = (access_logs
                       .map(lambda log: (log.response_code, 1))
                       .reduceByKey(lambda a, b : a + b)
                       .cache())
responseCodeToCountList = responseCodeToCount.takeOrdered(100)
print 'Found %d response codes' % len(responseCodeToCountList)
print 'Response Code Counts: %s' % responseCodeToCountList

Found 7 response codes
Response Code Counts: [(200, 1398207), (302, 26437), (304, 134138), (403, 171), (404, 10020), (500, 3), (501, 27)]
