In [1]:
### Read Log Files to an RDD
#dataPath = "/FileStore/tables/sample_log.txt"
dataPath = "/FileStore/tables/2015_07_22_mktplace_shop_web_log_sample_log-214a9.gz"
raw_data = sc.textFile(dataPath).map(lambda row: row.split(" "))
raw_data.take(1)

In [2]:
#Functions to clean timestamp, IP and request type
from dateutil.parser import parse
def parseDateTime(data):
  new_data = parse(data.replace("T"," ").split(".")[0])
  #new_data = data.replace("T"," ").split(".")[0]
  return new_data

def parseIP(ip):
  new_ip = ip.split(":")[0]
  return new_ip

def parseReq(request):
  return request.replace('"','')

def parseURL(raw_url):
  return raw_url.split("?")[0]

In [3]:
# filter RDD to use only IP_ADDRESS, Timestamp, URL and request type columns 
time_ip_RDD = raw_data.map(lambda r: (parseIP(r[2]),parseDateTime(r[0]),parseReq(r[11]),parseURL(r[12])))
time_ip_RDD.take(10)

In [4]:
# Defining schema of the dataframe and converting RDD to dataframe using the schema
from pyspark.sql.types import *
fields = [StructField("IP_ADDRESS", StringType(), True),StructField("TIME_STAMP",TimestampType(), True),StructField("REQUEST_TYPE", StringType(),True),StructField("URL", StringType(), True)]
schema = StructType(fields)

df = spark.createDataFrame(time_ip_RDD, schema)
df.head(5)
df.repartition(1).write.format("com.databricks.spark.csv").option("header", True).save("/FileStore/weblog/raw_logs")

In [5]:
##### Main Processing steps 
#### 1) Find difference between subsequent timestamps after ordering for an given IP
#### 2) Apply threshold filter and checkpoint start and end of an session and group the records to a session
#### 3) Flag records to GET type request or POST type request 
from pyspark.sql import functions as F
from pyspark.sql.window import Window
threshold = 900   ### Inactivity threshold interval 5 min
ordered_df = df.withColumn("PREV_TIME_STAMP",F.lag("TIME_STAMP",1).over(Window().partitionBy("IP_ADDRESS").orderBy("TIME_STAMP")))
ordered_df = ordered_df.withColumn("TIME_DIFF", F.unix_timestamp("TIME_STAMP") - F.unix_timestamp("PREV_TIME_STAMP"))
ordered_df = ordered_df.withColumn("Session_Ind",F.when((ordered_df.TIME_DIFF > threshold) | (F.isnull(ordered_df.TIME_DIFF) ) , 1).otherwise(0))
ordered_df = ordered_df.withColumn("Session_Group",F.sum(ordered_df.Session_Ind).over(Window().partitionBy("IP_ADDRESS").orderBy("TIME_STAMP")))
#ordered_df = ordered_df.withColumn("Start_Time",F.min(ordered_df.TIME_STAMP).over(Window().partitionBy("IP_ADDRESS","Session_Group")))
#ordered_df = ordered_df.withColumn("End_Time",F.max(ordered_df.TIME_STAMP).over(Window().partitionBy("IP_ADDRESS","Session_Group")))
ordered_df = ordered_df.withColumn("Get_Ind",F.when(ordered_df.REQUEST_TYPE=="GET",1).otherwise(0))
ordered_df = ordered_df.withColumn("Post_Ind",F.when(ordered_df.REQUEST_TYPE=="POST",1).otherwise(0))
ordered_df.show(200)

In [6]:
ordered_df.groupBy(ordered_df.REQUEST_TYPE).count().show()

In [7]:
### Create Master DataSet on User level and session level 
### Get start time, end time, url count, distinct url count, session time and starting point url
### I think the above attributes are the most important in profiling data on user x session level 
ordered_df.createOrReplaceTempView("logs")
master_df = spark.sql("SELECT tbl.*,(unix_timestamp(tbl.End_Time)-unix_timestamp(tbl.Start_Time)) AS Session_Time, tbl2.URL as Start_URL from (SELECT IP_ADDRESS, Session_Group, min(TIME_STAMP) as Start_Time, max(Time_Stamp) as End_Time,sum(Get_Ind) as Get_Count, sum(Post_Ind) as Post_Count, count(URL) as URL_Count ,count(distinct URL) as Unique_URL_Count from logs group by 1,2)tbl LEFT JOIN (select IP_ADDRESS,URL,Session_Group from logs where TIME_DIFF is null or TIME_DIFF > 900 )tbl2 USING (IP_ADDRESS,Session_Group)")
#master_df = spark.sql("SELECT IP_ADDRESS, Session_Group, min(TIME_STAMP) as Start_Time, max(Time_Stamp) as End_Time, sum(Get_Ind) as Get_Count, sum(Post_Ind) as Post_Count,count(URL) as URL_Count, count(distinct URL) as Unique_URL_Count from logs group by 1,2")
master_df.orderBy("IP_ADDRESS","Session_Group").show(200)
master_df.repartition(1).write.format("com.databricks.spark.csv").option("header",True).save("/FileStore/weblog/master_logs")

In [8]:
### Average Session Time 
master_df.createOrReplaceTempView("master_logs")
results = spark.sql("SELECT avg(Session_Time) FROM master_logs")
results.show()

In [9]:
#### Unique URL Hits per session
uniq_hits = spark.sql("select IP_ADDRESS,Session_Group,Unique_URL_Count from master_logs order by IP_ADDRESS")
uniq_hits.show(200)

In [10]:
uniq_hits.repartition(1).write.format("com.databricks.spark.csv").option("header", True).save("/FileStore/weblog/unique_hits")

In [11]:
### Most Engaged Users Session Wise
engaged_users = spark.sql("select IP_ADDRESS,Session_Group,Session_Time from master_logs ORDER BY Session_Time DESC")
engaged_users.show(50)
                          

In [12]:
engaged_users.repartition(1).write.format("com.databricks.spark.csv").option("header", True).save("/FileStore/weblog/engaged_users")

In [13]:
### Most Engaged Users based on sum of total session times 
total_engaged_users = spark.sql("SELECT IP_ADDRESS, SUM(Session_Time) AS Total_Session_Time FROM master_logs GROUP BY IP_ADDRESS ORDER BY Total_Session_Time DESC")
total_engaged_users.show(50)

In [14]:
#df.createOrReplaceTempView("raw_logs")
#spark.sql("SELECT max(TIME_STAMP),min(TIME_STAMP) from raw_logs").show()

In [15]:
## Distribution of session times at different times of the day to check if there is an instrisic pattern of session times at different hours of the day
spark.sql("SELECT Start_Time, Session_Time from master_logs").toPandas().set_index('Start_Time').resample('30min').mean().plot(kind='bar',figsize=(30,20))
display()
### There is no intrinsic distrubution on average session times at different hours of the day. I think data for more number of days would help out in figuring out a pattern

In [16]:

#### Predicting the load on the server: Ideally if there is a pattern in the time series data, then we apply ARIMA model to fit the data. But in the one day's data, there is no prper pattern


In [17]:
## Creating time series data on min level
import pandas as pd 

ts_df = df.select("TIME_STAMP","URL").toPandas()
df.select("TIME_STAMP","URL").repartition(1).write.format("com.databricks.spark.csv").option("header",True).save("/FileStore/weblog/timeseries_df")
ts_df = ts_df.set_index('TIME_STAMP').resample('1min').count()
ts_df = ts_df["URL"]/60   ### requests per second 

In [18]:
ts_df.sort_values(ascending=False).head(100)

In [19]:
spark.sql("select count(*),count(distinct Start_URL) from master_logs").show()
### Lot of distinct starting urls compared to total number of rows in master dataset

In [20]:
#dbutils.fs.rm("/FileStore/weblog/",True)
