In [87]:
# Pre requisite for workinmg with google colab 
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

!ls

import findspark
findspark.init()

# Import the libraries
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.39)] [Connected to cloud.r-proj                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Waiting for headers] [Connecting to ppa.launchpad.net (185.125.190.52)] [Co0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Connecting to ppa.launchpa                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Connecting to ppa.launchpa                                                                               Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [1 InRelease gpgv 88.7 kB] [4 InRelease 15.6 kB/88.7 kB 18%] [Connecting to                                                          

In [88]:
# Spark Session
spark = SparkSession.builder.master("local").appName("Data Engineering Challenge").getOrCreate()
sc = spark.sparkContext

In [89]:
# the log file has been renamed to weblog.txt and uploaded in the google colab session
# Load the data by creating rdd
rdd = sc.textFile('weblog.txt')
# split the data from log text file on whitespaces into different columns
rdd = rdd.map(lambda line: line.split(" "))

In [90]:
# Taking the values from 1st, 3rd and 12th columns and RDD is converted to Dataframe
df_final = rdd.map(lambda line: Row(timestamp=line[0], ipaddress=line[2].split(':')[0],url=line[12])).toDF()
df_final.show(5)

+---------------+--------------------+--------------------+
|      ipaddress|           timestamp|                 url|
+---------------+--------------------+--------------------+
|123.242.248.130|2015-07-22T09:00:...|https://paytm.com...|
|  203.91.211.44|2015-07-22T09:00:...|https://paytm.com...|
|    1.39.32.179|2015-07-22T09:00:...|https://paytm.com...|
| 180.179.213.94|2015-07-22T09:00:...|https://paytm.com...|
| 120.59.192.208|2015-07-22T09:00:...|https://paytm.com...|
+---------------+--------------------+--------------------+
only showing top 5 rows



In [91]:
# timestamp string field is converted to TimestampType for basic arithmetic
df_final = df_final.withColumn('timestamp', df_final['timestamp'].cast(TimestampType()))
df_final.show(5)

+---------------+--------------------+--------------------+
|      ipaddress|           timestamp|                 url|
+---------------+--------------------+--------------------+
|123.242.248.130|2015-07-22 09:00:...|https://paytm.com...|
|  203.91.211.44|2015-07-22 09:00:...|https://paytm.com...|
|    1.39.32.179|2015-07-22 09:00:...|https://paytm.com...|
| 180.179.213.94|2015-07-22 09:00:...|https://paytm.com...|
| 120.59.192.208|2015-07-22 09:00:...|https://paytm.com...|
+---------------+--------------------+--------------------+
only showing top 5 rows



In [92]:
# Q1: SESSIONIZE THE WEB LOG BY IP.
# sessionize data based on 15 min fixed window time 
#assign an sequentially increasing Id as SessionId to each session
SessionDF = df_final.select(window("timestamp", "15 minutes").alias('FixedTimeWindow'),'timestamp',"ipaddress").groupBy('FixedTimeWindow','ipaddress').count().withColumnRenamed('count', 'NumberHitsInSessionForIp')
#SessionDF.show(2)
SessionDF = SessionDF.withColumn("SessionId", monotonically_increasing_id())
SessionDF.show(5,False)

+------------------------------------------+--------------+------------------------+---------+
|FixedTimeWindow                           |ipaddress     |NumberHitsInSessionForIp|SessionId|
+------------------------------------------+--------------+------------------------+---------+
|[2015-07-22 09:00:00, 2015-07-22 09:15:00]|1.38.17.231   |14                      |0        |
|[2015-07-22 09:00:00, 2015-07-22 09:15:00]|161.51.16.10  |1                       |1        |
|[2015-07-22 09:00:00, 2015-07-22 09:15:00]|117.213.93.103|3                       |2        |
|[2015-07-22 09:00:00, 2015-07-22 09:15:00]|165.225.104.65|35                      |3        |
|[2015-07-22 09:00:00, 2015-07-22 09:15:00]|1.39.46.218   |7                       |4        |
+------------------------------------------+--------------+------------------------+---------+
only showing top 5 rows



In [93]:
# join the time stamps and url to the Sessionized DF
dfWithTimeStamps = df_final.select(window("timestamp", "15 minutes").alias('FixedTimeWindow'),'timestamp',"ipaddress","url")
SessionDF = dfWithTimeStamps.join(SessionDF,['FixedTimeWindow','ipaddress'])
SessionDF.show(2)

+--------------------+---------------+--------------------+--------------------+------------------------+---------+
|     FixedTimeWindow|      ipaddress|           timestamp|                 url|NumberHitsInSessionForIp|SessionId|
+--------------------+---------------+--------------------+--------------------+------------------------+---------+
|[2015-07-22 02:30...|  106.51.141.73|2015-07-22 02:44:...|https://paytm.com...|                       1|      217|
|[2015-07-22 02:30...|107.167.109.115|2015-07-22 02:43:...|http://www.paytm....|                       1|      211|
+--------------------+---------------+--------------------+--------------------+------------------------+---------+
only showing top 2 rows



In [94]:
# Finding the first hit time of each ip for each session and join in to our session df
FirstHitTimeStamps = SessionDF.groupBy("SessionId").agg(min("timestamp").alias('FirstHitTime'))
SessionDF = FirstHitTimeStamps.join(SessionDF,['SessionId'])
SessionDF.select(col("SessionId"),col("ipaddress"),col("FirstHitTime")).show(20)

+---------+--------------+--------------------+
|SessionId|     ipaddress|        FirstHitTime|
+---------+--------------+--------------------+
|       26|  218.248.82.9|2015-07-22 09:02:...|
|       26|  218.248.82.9|2015-07-22 09:02:...|
|       26|  218.248.82.9|2015-07-22 09:02:...|
|       26|  218.248.82.9|2015-07-22 09:02:...|
|       29|  27.62.30.188|2015-07-22 09:02:...|
|       29|  27.62.30.188|2015-07-22 09:02:...|
|       29|  27.62.30.188|2015-07-22 09:02:...|
|       29|  27.62.30.188|2015-07-22 09:02:...|
|      474|101.221.128.95|2015-07-22 10:35:...|
|      474|101.221.128.95|2015-07-22 10:35:...|
|      474|101.221.128.95|2015-07-22 10:35:...|
|      474|101.221.128.95|2015-07-22 10:35:...|
|      474|101.221.128.95|2015-07-22 10:35:...|
|      474|101.221.128.95|2015-07-22 10:35:...|
|      474|101.221.128.95|2015-07-22 10:35:...|
|      474|101.221.128.95|2015-07-22 10:35:...|
|      474|101.221.128.95|2015-07-22 10:35:...|
|      474|101.221.128.95|2015-07-22 10:

In [97]:
# TIME DURATION OF SESSION
# Duration of a session = time difference of first and last hit in a session 
# if there is only one hit in a session the duration is zero
timeDiff = (unix_timestamp(SessionDF.timestamp)-unix_timestamp(SessionDF.FirstHitTime))
SessionDF = SessionDF.withColumn("timeDiffwithFirstHit", timeDiff)
tmpdf = SessionDF.groupBy("SessionId").agg(max("timeDiffwithFirstHit").alias("SessionDuration"))
SessionDF = SessionDF.join(tmpdf,['SessionId'])
SessionDF.select(col("SessionId"),col("ipaddress"),col("SessionDuration")).show(20)

+---------+--------------+---------------+
|SessionId|     ipaddress|SessionDuration|
+---------+--------------+---------------+
|       26|  218.248.82.9|             13|
|       26|  218.248.82.9|             13|
|       26|  218.248.82.9|             13|
|       26|  218.248.82.9|             13|
|       29|  27.62.30.188|             33|
|       29|  27.62.30.188|             33|
|       29|  27.62.30.188|             33|
|       29|  27.62.30.188|             33|
|      474|101.221.128.95|            226|
|      474|101.221.128.95|            226|
|      474|101.221.128.95|            226|
|      474|101.221.128.95|            226|
|      474|101.221.128.95|            226|
|      474|101.221.128.95|            226|
|      474|101.221.128.95|            226|
|      474|101.221.128.95|            226|
|      474|101.221.128.95|            226|
|      474|101.221.128.95|            226|
|      474|101.221.128.95|            226|
|      474|101.221.128.95|            226|
+---------+

In [99]:
#Q2: AVEARAGE SESSION TIME
meandf = SessionDF.groupBy().avg('SessionDuration')
meandf.show(3)

+--------------------+
|avg(SessionDuration)|
+--------------------+
|  141.58578161415625|
+--------------------+



In [100]:
#Q3: UNIQUE URL VISITS PER SESSION
# Determine unique URL visits per session. To clarify, count a hit to a unique URL only once per session
dfURL = SessionDF.groupBy("SessionId","URL").count().distinct().withColumnRenamed('count', 'hitURLcount')
dfURL.show(20)

+----------+--------------------+-----------+
| SessionId|                 URL|hitURLcount|
+----------+--------------------+-----------+
|        26|https://paytm.com...|          2|
|        26|http://www.paytm....|          2|
|        29|https://paytm.com...|          1|
|        29|https://paytm.com...|          1|
|        29|https://paytm.com...|          1|
|        29|https://paytm.com...|          1|
|       474|https://paytm.com...|          2|
|       474|https://paytm.com...|          2|
|       474|https://paytm.com...|          2|
|       474|https://paytm.com...|          5|
|       474|https://paytm.com...|          3|
|       474|https://paytm.com...|          2|
|       474|https://paytm.com...|          1|
|       474|https://paytm.com...|          1|
|       474|https://paytm.com...|          1|
|       474|https://paytm.com...|          1|
|8589934658|https://paytm.com...|          1|
|8589934965|https://paytm.com...|          1|
|8589934965|https://paytm.com...| 

In [101]:
#Q4: MOST ENGAGED USER
#find the IPs with the longest session times
EngagedUsers = SessionDF.select("ipaddress","SessionID","SessionDuration").sort(col("SessionDuration").desc()).distinct()
EngagedUsers.show(3)

+---------------+------------+---------------+
|      ipaddress|   SessionID|SessionDuration|
+---------------+------------+---------------+
| 164.100.96.254|249108103236|            847|
| 111.119.199.22|283467841590|            839|
|117.220.186.227|755914244158|            804|
+---------------+------------+---------------+
only showing top 3 rows

