In [1]:
import re
import datetime

from pyspark.sql import Row

month_map = {'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
    'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12}

def parse_apache_time(s):
    """ Convert Apache time format into a Python datetime object
    Args:
        s (str): date and time in Apache time format
    Returns:
        datetime: datetime object (ignore timezone for now)
    """
    return datetime.datetime(int(s[7:11]),
                             month_map[s[3:6]],
                             int(s[0:2]),
                             int(s[12:14]),
                             int(s[15:17]),
                             int(s[18:20]))


def parseApacheLogLine(logline):
    """ Parse a line in the Apache Common Log format
    Args:
        logline (str): a line of text in the Apache Common Log format
    Returns:
        tuple: either a dictionary containing the parts of the Apache Access Log and 1,
               or the original invalid log line and 0
    """
    match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
    if match is None:
        return (logline, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = long(0)
    else:
        size = long(match.group(9))
    return (Row(
        host          = match.group(1),
        client_identd = match.group(2),
        user_id       = match.group(3),
        date_time     = parse_apache_time(match.group(4)),
        method        = match.group(5),
        endpoint      = match.group(6),
        protocol      = match.group(7),
        response_code = int(match.group(8)),
        content_size  = size
    ), 1)

In [2]:
# A regular expression pattern to extract fields from the log line
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (.+?)\s?(\S+)?" (\d{3}) (\S+)'

In [3]:
def parseLogs():
    """ Read and parse log file """
    parsed_logs = (sc
                   .textFile("/FileStore/tables/access_log_Jul95")
                   .map(parseApacheLogLine)
                   .cache())

    access_logs = (parsed_logs
                   .filter(lambda s: s[1] == 1)
                   .map(lambda s: s[0])
                   .cache())

    failed_logs = (parsed_logs
                   .filter(lambda s: s[1] == 0)
                   .map(lambda s: s[0]))
    failed_logs_count = failed_logs.count()
    if failed_logs_count > 0:
        print 'Number of invalid logline: %d' % failed_logs.count()
        for line in failed_logs.take(20):
            print 'Invalid logline: %s' % line

    print 'Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count())
    return parsed_logs, access_logs, failed_logs


parsed_logs, access_logs, failed_logs = parseLogs()

In [4]:
# Calculate statistics based on the content size.
content_sizes = access_logs.map(lambda log: log.content_size).cache()
avg = content_sizes.reduce(lambda a, b : a + b) / content_sizes.count()
min = content_sizes.min()
max = content_sizes.max()
print 'Content Size, Avg: ',avg,' Min: ',min,' Max: ',max

In [5]:
# Response Code to Count
responseCodeToCount = (access_logs
                       .map(lambda log: (log.response_code, 1))
                       .reduceByKey(lambda a, b : a + b)
                       .cache())
responseCodeToCountList = responseCodeToCount.take(100)
print 'Found %d response codes' % len(responseCodeToCountList)
print 'Response Code Counts: %s' % responseCodeToCountList

In [6]:
# Any hosts that has accessed the server more than 10 times.
hostCountPairTuple = access_logs.map(lambda log: (log.host, 1))

hostSum = hostCountPairTuple.reduceByKey(lambda a, b : a + b)

hostMoreThan10 = hostSum.filter(lambda s: s[1] > 10)

hostsPick20 = (hostMoreThan10
               .map(lambda s: s[0])
               .take(20))

print 'Any 20 hosts that have accessed more then 10 times: %s' % hostsPick20

In [7]:
# Top Endpoints
endpointCounts = (access_logs
                  .map(lambda log: (log.endpoint, 1))
                  .reduceByKey(lambda a, b : a + b))

topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])

print 'Top Ten Endpoints: %s' % topEndpoints

In [8]:
# What are the top ten endpoints which did not have return code 200?
not200 = access_logs.filter(lambda log: log.response_code!=200)

endpointCountPairTuple = not200.map(lambda log: (log.endpoint,1))

endpointSum = endpointCountPairTuple.reduceByKey(lambda a,b: a+b)

topTenErrURLs = endpointSum.takeOrdered(10, lambda s: -1 * s[1])
print 'Top Ten failed URLs: %s' % topTenErrURLs

In [9]:
#How many unique hosts are there in the entire log?
hosts = access_logs.map(lambda log: log.host)

uniqueHosts = hosts.distinct()
uniqueHostCount = uniqueHosts.count()
print 'Unique hosts: %d' % uniqueHostCount

In [10]:
#the number of unique hosts in the entire log on a day-by-day basis.
dayToHostPairTuple = access_logs.map(lambda log:(log.date_time.day, log.host))

dayGroupedHosts = dayToHostPairTuple.groupByKey()
dayHostCount = dayGroupedHosts.map(lambda (day, hosts): (day, len(set(hosts)) ) )

dailyHosts = dayHostCount.sortByKey().cache()
dailyHostsList = dailyHosts.take(30)
print 'Unique hosts per day: %s' % dailyHostsList

In [11]:
# the average number of requests on a day-by-day basis
dayAndHostTuple = access_logs.map(lambda log:(log.date_time.day,1))

groupedByDay =  dayAndHostTuple.reduceByKey(lambda a, b: a + b)

sortedByDay = groupedByDay.sortByKey()

avgDailyReqPerHost = (sortedByDay
                      .join(dailyHosts)
                      .map(lambda (x,y): (x , y[0] / y[1]))
                      .sortByKey()
                      .cache()
                     )
                      
avgDailyReqPerHostList = avgDailyReqPerHost.take(30)
print 'Average number of daily requests per Hosts is %s' % avgDailyReqPerHostList

In [12]:
#Counting 404 Response Codes
badRecords = (access_logs
              .filter(lambda log: log.response_code==404)
              .cache())
print 'Found %d 404 URLs' % badRecords.count()

In [13]:
#Listing 404 Response Code Records
badEndpoints = badRecords.map(lambda log: log.endpoint)

badUniqueEndpoints = badEndpoints.distinct()

badUniqueEndpointsPick40 = badUniqueEndpoints.take(40)
print '404 URLS: %s' % badUniqueEndpointsPick40


In [14]:
#Listing the Top Twenty 404 Response Code Endpoints
badEndpointsCountPairTuple = badRecords.map(lambda log: (log.endpoint,1))

badEndpointsSum = badEndpointsCountPairTuple.reduceByKey(lambda a,b:a+b)

badEndpointsTop20 = badEndpointsSum.takeOrdered(20,lambda s: -s[1])
print 'Top Twenty 404 URLs: %s' % badEndpointsTop20

In [15]:
#Listing the Top Twenty-five 404 Response Code Hosts
errHostsCountPairTuple = badRecords.map(lambda log: (log.host,1))

errHostsSum = errHostsCountPairTuple.reduceByKey(lambda a,b:a+b)

errHostsTop25 = errHostsSum.takeOrdered(25,lambda s: -s[1])
print 'Top 25 hosts that generated errors: %s' % errHostsTop25

In [16]:
#Listing 404 Response Codes per Day
errDateCountPairTuple = badRecords.map(lambda log:(log.date_time.day,1))

errDateSum = errDateCountPairTuple.reduceByKey(lambda a,b: a+b)

errDateSorted = (errDateSum
                 .sortByKey()
                 .cache()
                )

errByDate = errDateSorted.take(30)
print '404 Errors by day: %s' % errByDate


In [17]:
#Top Five Days for 404 Response Codes
topErrDate = errDateSorted.takeOrdered(5,lambda s:-s[1])
print 'Top Five dates for 404 requests: %s' % topErrDate

In [18]:
#Hourly 404 Response Codes
hourCountPairTuple = badRecords.map(lambda log:(log.date_time.hour,1))

hourRecordsSum = hourCountPairTuple.reduceByKey(lambda a,b:a+b)

hourRecordsSorted = (hourRecordsSum
                     .sortByKey()
                     .cache()
                    )

errHourList = hourRecordsSorted.take(24)
print 'Top hours for 404 requests: %s' % errHourList