install Java, Spark, and Findspark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

Set environment variables

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-1.8.0-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [None]:

import findspark
findspark.init()

Create spaarksession and sparkcontext

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
                    .appName('coding Assignment')\
                    .getOrCreate()

sc=spark.sparkContext

In [None]:
input_data="access.log"

In [None]:
input_df = spark.read.text(input_data)
input_df.printSchema()

root
 |-- value: string (nullable = true)



In [None]:
input_df.show(10, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                              |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                                                                     

In [None]:
# Taking a sample data to perform Regex
sample_df = [item['value'] for item in input_df.take(15)]
sample_df

['',
 '13.66.139.0 - - [19/Dec/2020:13:57:26 +0100] "GET /index.php?option=com_phocagallery&view=category&id=1:almhuette-raith&Itemid=53 HTTP/1.1" 200 32653 "-" "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)" "-"',
 '157.48.153.185 - - [19/Dec/2020:14:08:06 +0100] "GET /apache-log/access.log HTTP/1.1" 200 233 "-" "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" "-"',
 '157.48.153.185 - - [19/Dec/2020:14:08:08 +0100] "GET /favicon.ico HTTP/1.1" 404 217 "http://www.almhuette-raith.at/apache-log/access.log" "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" "-"',
 '216.244.66.230 - - [19/Dec/2020:14:14:26 +0100] "GET /robots.txt HTTP/1.1" 200 304 "-" "Mozilla/5.0 (compatible; DotBot/1.1; http://www.opensiteexplorer.org/dotbot, help@moz.com)" "-"',
 '54.36.148.92 - - [19/Dec/2020:14:16:44 +0100] "GET /index.php?option=com_phocagallery

In [None]:
import re
from pyspark.sql.context import SQLContext
from pyspark.context import SparkContext
from pyspark.sql.types import TimestampType

In [None]:
#Regular expressions to extract the timestamp fields
ts_pattern = r'\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2}'
timestamps = [re.search(ts_pattern, item).group(0) 
              if re.search(ts_pattern, item)
              else 'No Match'
              for item in sample_df]
timestamps

['No Match',
 '19/Dec/2020:13:57:26',
 '19/Dec/2020:14:08:06',
 '19/Dec/2020:14:08:08',
 '19/Dec/2020:14:14:26',
 '19/Dec/2020:14:16:44',
 '19/Dec/2020:14:29:21',
 '19/Dec/2020:14:58:59',
 '19/Dec/2020:14:58:59',
 '19/Dec/2020:15:09:30',
 '19/Dec/2020:15:09:31',
 '19/Dec/2020:15:16:50',
 '19/Dec/2020:15:22:40',
 '19/Dec/2020:15:23:10',
 '19/Dec/2020:15:23:11']

In [None]:
#Regular expressions to extract the HTTP request methods, URIs, and Protocol patterns fields
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
method_uri_protocol = [re.search(method_uri_protocol_pattern, item).groups()
               if re.search(method_uri_protocol_pattern, item)
               else 'no match'
               for item in sample_df]
method_uri_protocol

['no match',
 ('GET',
  '/index.php?option=com_phocagallery&view=category&id=1:almhuette-raith&Itemid=53',
  'HTTP/1.1'),
 ('GET', '/apache-log/access.log', 'HTTP/1.1'),
 ('GET', '/favicon.ico', 'HTTP/1.1'),
 ('GET', '/robots.txt', 'HTTP/1.1'),
 ('GET',
  '/index.php?option=com_phocagallery&view=category&id=2%3Awinterfotos&Itemid=53',
  'HTTP/1.1'),
 ('GET', '/administrator/index.php', 'HTTP/1.1'),
 ('GET', '/apache-log/access.log', 'HTTP/1.1'),
 ('GET', '/favicon.ico', 'HTTP/1.1'),
 ('GET', '/robots.txt', 'HTTP/1.1'),
 ('GET',
  '/index.php?option=com_phocagallery&view=category&id=2%3Awinterfotos&Itemid=53',
  'HTTP/1.1'),
 ('GET', '/apache-log/access.log', 'HTTP/1.1'),
 ('GET', '/administrator/%22', 'HTTP/1.1'),
 ('GET', '/', 'HTTP/1.1'),
 ('GET', '/modules/mod_bowslideshow/tmpl/css/bowslideshow.css', 'HTTP/1.1')]

In [None]:
#creating final dataframe required for test assigment
from pyspark.sql.functions import regexp_extract

final_df = input_df.select(regexp_extract('value', ts_pattern, 0).alias('timestamp'),
                         regexp_extract('value', method_uri_protocol_pattern, 1).alias('API Call'))
final_df.show()
print((final_df.count(), len(final_df.columns)))

+--------------------+--------+
|           timestamp|API Call|
+--------------------+--------+
|                    |        |
|19/Dec/2020:13:57:26|     GET|
|19/Dec/2020:14:08:06|     GET|
|19/Dec/2020:14:08:08|     GET|
|19/Dec/2020:14:14:26|     GET|
|19/Dec/2020:14:16:44|     GET|
|19/Dec/2020:14:29:21|     GET|
|19/Dec/2020:14:58:59|     GET|
|19/Dec/2020:14:58:59|     GET|
|19/Dec/2020:15:09:30|     GET|
|19/Dec/2020:15:09:31|     GET|
|19/Dec/2020:15:16:50|     GET|
|19/Dec/2020:15:22:40|     GET|
|19/Dec/2020:15:23:10|     GET|
|19/Dec/2020:15:23:11|     GET|
|19/Dec/2020:15:23:11|     GET|
|19/Dec/2020:15:23:11|     GET|
|19/Dec/2020:15:23:11|     GET|
|19/Dec/2020:15:23:11|     GET|
|19/Dec/2020:15:23:11|     GET|
+--------------------+--------+
only showing top 20 rows

(8410, 2)


In [None]:
final_df.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- API Call: string (nullable = true)



In [None]:
#check for null value in final Data frame
bad_rows_df = final_df.filter(final_df['timestamp'].isNull() |
                             final_df['API Call'].isNull() )
                             
bad_rows_df.count()

0

In [None]:
from pyspark.sql.functions import to_timestamp

In [None]:
time_df=final_df.select('*', to_timestamp(final_df['timestamp'], "dd/MMM/yyyy:HH:mm:ss").alias('time')).drop('timestamp')

In [None]:
time_df.printSchema()
time_df.count()
time_df.show()

root
 |-- API Call: string (nullable = true)
 |-- time: timestamp (nullable = true)

+--------+-------------------+
|API Call|               time|
+--------+-------------------+
|        |               null|
|     GET|2020-12-19 13:57:26|
|     GET|2020-12-19 14:08:06|
|     GET|2020-12-19 14:08:08|
|     GET|2020-12-19 14:14:26|
|     GET|2020-12-19 14:16:44|
|     GET|2020-12-19 14:29:21|
|     GET|2020-12-19 14:58:59|
|     GET|2020-12-19 14:58:59|
|     GET|2020-12-19 15:09:30|
|     GET|2020-12-19 15:09:31|
|     GET|2020-12-19 15:16:50|
|     GET|2020-12-19 15:22:40|
|     GET|2020-12-19 15:23:10|
|     GET|2020-12-19 15:23:11|
|     GET|2020-12-19 15:23:11|
|     GET|2020-12-19 15:23:11|
|     GET|2020-12-19 15:23:11|
|     GET|2020-12-19 15:23:11|
|     GET|2020-12-19 15:23:11|
+--------+-------------------+
only showing top 20 rows



In [None]:
# Cleaned data frame in correct formats
from pyspark.sql.functions import date_trunc,col
cleaned_df = time_df.withColumn("hour", date_trunc("hour", col("time").cast("timestamp"))).drop('time')
cleaned_df.show()

+--------+-------------------+
|API Call|               hour|
+--------+-------------------+
|        |               null|
|     GET|2020-12-19 13:00:00|
|     GET|2020-12-19 14:00:00|
|     GET|2020-12-19 14:00:00|
|     GET|2020-12-19 14:00:00|
|     GET|2020-12-19 14:00:00|
|     GET|2020-12-19 14:00:00|
|     GET|2020-12-19 14:00:00|
|     GET|2020-12-19 14:00:00|
|     GET|2020-12-19 15:00:00|
|     GET|2020-12-19 15:00:00|
|     GET|2020-12-19 15:00:00|
|     GET|2020-12-19 15:00:00|
|     GET|2020-12-19 15:00:00|
|     GET|2020-12-19 15:00:00|
|     GET|2020-12-19 15:00:00|
|     GET|2020-12-19 15:00:00|
|     GET|2020-12-19 15:00:00|
|     GET|2020-12-19 15:00:00|
|     GET|2020-12-19 15:00:00|
+--------+-------------------+
only showing top 20 rows



In [None]:
#This code also generates the result but not in correct format
cleaned_df.groupBy('hour', 'API Call').agg({'API Call':'count'}).sort('hour', ascending=True).show()

+-------------------+--------+---------------+
|               hour|API Call|count(API Call)|
+-------------------+--------+---------------+
|               null|        |              1|
|2020-12-19 13:00:00|     GET|              1|
|2020-12-19 14:00:00|     GET|              7|
|2020-12-19 15:00:00|     GET|             36|
|2020-12-19 16:00:00|     GET|             12|
|2020-12-19 17:00:00|     GET|             45|
|2020-12-19 17:00:00|    POST|             43|
|2020-12-19 18:00:00|     GET|             78|
|2020-12-19 18:00:00|    POST|             73|
|2020-12-19 19:00:00|     GET|            146|
|2020-12-19 19:00:00|    POST|             66|
|2020-12-19 20:00:00|     GET|            123|
|2020-12-19 20:00:00|    POST|             75|
|2020-12-19 21:00:00|    POST|             61|
|2020-12-19 21:00:00|     GET|             88|
|2020-12-19 22:00:00|    POST|             52|
|2020-12-19 22:00:00|     GET|             56|
|2020-12-19 23:00:00|     GET|             90|
|2020-12-19 2

In [None]:
from pyspark.sql.functions import *
get_df = cleaned_df.groupBy("hour").agg(count(when(col("API Call") == "GET", 1)).alias("num(GET)")).sort('hour', ascending=True)
get_df.show()

+-------------------+--------+
|               hour|num(GET)|
+-------------------+--------+
|               null|       0|
|2020-12-19 13:00:00|       1|
|2020-12-19 14:00:00|       7|
|2020-12-19 15:00:00|      36|
|2020-12-19 16:00:00|      12|
|2020-12-19 17:00:00|      45|
|2020-12-19 18:00:00|      78|
|2020-12-19 19:00:00|     146|
|2020-12-19 20:00:00|     123|
|2020-12-19 21:00:00|      88|
|2020-12-19 22:00:00|      56|
|2020-12-19 23:00:00|      90|
|2020-12-20 00:00:00|      72|
|2020-12-20 01:00:00|      74|
|2020-12-20 02:00:00|      62|
|2020-12-20 03:00:00|      71|
|2020-12-20 04:00:00|      86|
|2020-12-20 05:00:00|      83|
|2020-12-20 06:00:00|      76|
|2020-12-20 07:00:00|      67|
+-------------------+--------+
only showing top 20 rows



In [None]:
post_df = cleaned_df.groupBy("hour").agg(count(when(col("API Call") == "POST", 1)).alias("num(POST)")).sort('hour', ascending=True)
post_df.show()

+-------------------+---------+
|               hour|num(POST)|
+-------------------+---------+
|               null|        0|
|2020-12-19 13:00:00|        0|
|2020-12-19 14:00:00|        0|
|2020-12-19 15:00:00|        0|
|2020-12-19 16:00:00|        0|
|2020-12-19 17:00:00|       43|
|2020-12-19 18:00:00|       73|
|2020-12-19 19:00:00|       66|
|2020-12-19 20:00:00|       75|
|2020-12-19 21:00:00|       61|
|2020-12-19 22:00:00|       52|
|2020-12-19 23:00:00|       83|
|2020-12-20 00:00:00|       69|
|2020-12-20 01:00:00|       63|
|2020-12-20 02:00:00|       55|
|2020-12-20 03:00:00|       67|
|2020-12-20 04:00:00|       77|
|2020-12-20 05:00:00|       77|
|2020-12-20 06:00:00|       69|
|2020-12-20 07:00:00|       65|
+-------------------+---------+
only showing top 20 rows



In [None]:
final_result = get_df.join(post_df, on=['hour'], how='inner')
final_result.show()

+-------------------+--------+---------+
|               hour|num(GET)|num(POST)|
+-------------------+--------+---------+
|2020-12-19 13:00:00|       1|        0|
|2020-12-19 14:00:00|       7|        0|
|2020-12-19 15:00:00|      36|        0|
|2020-12-19 16:00:00|      12|        0|
|2020-12-19 17:00:00|      45|       43|
|2020-12-19 18:00:00|      78|       73|
|2020-12-19 19:00:00|     146|       66|
|2020-12-19 20:00:00|     123|       75|
|2020-12-19 21:00:00|      88|       61|
|2020-12-19 22:00:00|      56|       52|
|2020-12-19 23:00:00|      90|       83|
|2020-12-20 00:00:00|      72|       69|
|2020-12-20 01:00:00|      74|       63|
|2020-12-20 02:00:00|      62|       55|
|2020-12-20 03:00:00|      71|       67|
|2020-12-20 04:00:00|      86|       77|
|2020-12-20 05:00:00|      83|       77|
|2020-12-20 06:00:00|      76|       69|
|2020-12-20 07:00:00|      67|       65|
|2020-12-20 08:00:00|      63|       62|
+-------------------+--------+---------+
only showing top