##### Abdul Ghaffar -- Week 2 Apache Pyspark
### Assignment 1: 
#### Write a PySpark job that 
##### 1. Takes Apache log file as input (get any Apache log file available online)
##### 2. Parses the rows using map functions and get the result as RDD
##### 3. Filters out the rows that do no have a URL available
##### 4. Groups the data on IP addresses
##### 5. Calculates the average session length against every IP address
##### 6. Saves the output on a disk.

In [1]:
import sys
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext, SparkConf
import os
from pyspark.sql.functions import col, max as max_, min as min_
from pyspark.sql import SparkSession

In [2]:
conf = SparkConf().setAppName("Read Log files").setMaster("local[*]")
sc = SparkContext(conf = conf)
spark = SparkSession(sc)

In [3]:
# A regular expression pattern to extract fields from the log line
#APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (.*?) (\S+)" (\d{3}) (\S+) "(.*?)" "(.*?)"$'

In [4]:
import re
import datetime

from pyspark.sql import Row

month_map = {'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
    'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12}

def parse_time(s):
    """ Used to convert Apache time format into a Python datetime object
    Args:
        s (str): date and time in Apache time format
    Returns:
        datetime: datetime object (ignore timezone for now)
    """
    return datetime.datetime(int(s[7:11]),
                             month_map[s[3:6]],
                             int(s[0:2]),
                             int(s[12:14]),
                             int(s[15:17]),
                             int(s[18:20]))


def parseApacheLogLine(log_file_line):
    """ Parse a line in the Apache Common Log format
    Args:
        log_file_line (str): a line of text in the Apache Common Log format
    Returns:
        tuple: either a dictionary containing the parts of the Apache Access Log and 1,
               or the original invalid log line and 0
    """
    match_result = re.search(APACHE_ACCESS_LOG_PATTERN, log_file_line)
    if match_result is None:
        return (log_file_line, 0)
    size_field = match_result.group(9)
    if size_field == '-':
        size = 0
    else:
        size = match_result.group(9)
    return (Row(
        host_ip          = match_result.group(1),
        client_identd = match_result.group(2),
        user_id       = match_result.group(3),
        date_time     = parse_time(match_result.group(4)),
        method        = match_result.group(5),
        endpoint      = match_result.group(6),
        protocol      = match_result.group(7),
        response_code = int(match_result.group(8)),
        content_size  = size,
        referrer     = match_result.group(10),
        userAgent    = match_result.group(11)
    ), 1)



In [5]:
logFile = "apache_logs.txt"

def parseLogs():
    """ Read and parse log file """
    parsed_logs = (sc.textFile(logFile).map(parseApacheLogLine).cache())

    access_logs = (parsed_logs
                   .filter(lambda s: s[1] == 1)
                   .map(lambda s: s[0])
                   .cache())

    failed_logs = (parsed_logs
                   .filter(lambda s: s[1] == 0)
                   .map(lambda s: s[0]))
    failed_logs_count = failed_logs.count()
    if failed_logs_count > 0:
        print ('Number of invalid logline: %d' % failed_logs.count())
        for line in failed_logs.take(20):
            print ('Invalid logline: %s' % line)

    print ('Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count()))
    return parsed_logs, access_logs, failed_logs


parsed_logs, access_logs, failed_logs = parseLogs()




Number of invalid logline: 2
Invalid logline: 83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
Invalid logline: 46.118.127.106 - - [20/May/2015:12:05:17 +0000] "GET /scripts/grok-py-test/configlib.py HTTP/1.1" 200 235 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html
Read 10000 lines, successfully parsed 9998 lines, failed to parse 2 lines


In [6]:
print(access_logs.take(2))

[Row(host_ip='83.149.9.216', client_identd='-', user_id='-', date_time=datetime.datetime(2015, 5, 17, 10, 5, 43), method='GET', endpoint='/presentations/logstash-monitorama-2013/images/kibana-dashboard3.png', protocol='HTTP/1.1', response_code=200, content_size='171717', referrer='http://semicomplete.com/presentations/logstash-monitorama-2013/', userAgent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36'), Row(host_ip='83.149.9.216', client_identd='-', user_id='-', date_time=datetime.datetime(2015, 5, 17, 10, 5, 47), method='GET', endpoint='/presentations/logstash-monitorama-2013/plugin/highlight/highlight.js', protocol='HTTP/1.1', response_code=200, content_size='26185', referrer='http://semicomplete.com/presentations/logstash-monitorama-2013/', userAgent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36')]


In [7]:
#Convert access_logs rdd to dataframe
access_logs_df = access_logs.toDF()

In [8]:
## filter empty urls with null value
access_logs_df = access_logs_df.filter(access_logs_df.host_ip != "")
access_logs_df.count()

9998

In [9]:
#groupBy based on ip requests made
access_logs_df_ip_grouped = access_logs_df.groupBy('host_ip').count()
access_logs_df_ip_grouped.show()

+---------------+-----+
|        host_ip|count|
+---------------+-----+
|  87.169.99.232|    1|
|  99.188.185.40|    1|
|   31.45.226.43|    1|
|    5.39.15.151|    2|
|  131.114.11.55|    2|
|188.238.146.131|    1|
| 208.43.243.244|    8|
|    180.76.5.74|    1|
| 203.84.135.120|    6|
| 76.164.234.106|    2|
| 128.179.155.97|    6|
| 173.192.238.44|    1|
| 86.185.215.203|    1|
| 107.170.40.205|    4|
|  213.180.27.58|    1|
| 197.187.26.144|    1|
|  173.213.76.77|    2|
|  109.74.154.79|    7|
|   200.10.161.5|    4|
|    82.60.18.23|    2|
+---------------+-----+
only showing top 20 rows



In [10]:
import pyspark.sql.functions as func

#average session length against every IP address
access_logs_df_session_time = access_logs_df.groupBy('host_ip').agg(func.avg('date_time').alias('average_session_time'))
access_logs_df_session_time.show()

+---------------+--------------------+
|        host_ip|average_session_time|
+---------------+--------------------+
|  87.169.99.232|       1.431839159E9|
|  99.188.185.40|        1.43186071E9|
|   31.45.226.43|       1.431889548E9|
|    5.39.15.151|      1.4319471255E9|
|  131.114.11.55|      1.4320317275E9|
|188.238.146.131|       1.432119939E9|
| 208.43.243.244|      1.4321271305E9|
|    180.76.5.74|       1.431903959E9|
| 203.84.135.120|1.4319471486666667E9|
| 76.164.234.106|        1.43195431E9|
| 128.179.155.97|1.4321043326666667E9|
| 173.192.238.44|       1.432109148E9|
| 86.185.215.203|       1.432116354E9|
| 107.170.40.205|     1.43197232825E9|
|  213.180.27.58|         1.4319831E9|
| 197.187.26.144|       1.431997523E9|
|  173.213.76.77|      1.4320119225E9|
|  109.74.154.79|1.4320587325714285E9|
|   200.10.161.5|       1.432096537E9|
|    82.60.18.23|      1.4321163315E9|
+---------------+--------------------+
only showing top 20 rows



In [11]:
access_logs_df_session_time.coalesce(1).write.format('csv').option("header", "true").save('access_logs_df_session_time.csv')

In [12]:
type(access_logs_df)

pyspark.sql.dataframe.DataFrame

In [13]:
#find session active for urls which has been requested
access_logs_df_gr = access_logs_df.withColumn("date_time", col("date_time").cast("timestamp")) \
    .groupBy("host_ip") \
    .agg((max_("date_time") - min_("date_time")).alias("session_length"))

In [14]:
access_logs_df_gr.show()

+---------------+--------------------+
|        host_ip|      session_length|
+---------------+--------------------+
|  87.169.99.232|           0 seconds|
|  99.188.185.40|           0 seconds|
|   31.45.226.43|           0 seconds|
|    5.39.15.151|           7 seconds|
|  131.114.11.55| 31 hours 17 seconds|
|188.238.146.131|           0 seconds|
| 208.43.243.244|          53 seconds|
|    180.76.5.74|           0 seconds|
| 203.84.135.120|          26 seconds|
| 76.164.234.106|          10 seconds|
| 128.179.155.97|  2 hours 30 seconds|
| 173.192.238.44|           0 seconds|
| 86.185.215.203|           0 seconds|
| 107.170.40.205|69 hours 59 minut...|
|  213.180.27.58|           0 seconds|
| 197.187.26.144|           0 seconds|
|  173.213.76.77|          43 seconds|
|  109.74.154.79|          43 seconds|
|   200.10.161.5|  1 hours 33 seconds|
|    82.60.18.23|           1 seconds|
+---------------+--------------------+
only showing top 20 rows



In [15]:
#saving apache log file parsed data
access_logs_df.coalesce(1).write.format('csv').option("header", "true").save('log_file_parsed_data.csv')