In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, FloatType, StructType, StructField,ArrayType, DateType,TimestampType
spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
#from pyspark.sql.functions import udf, col,size,lit,rank, split, countDistinct, lag
from pyspark.sql import functions as F
from pyspark.sql.window import Window
sc = spark.sparkContext
import pandas as pd
from datetime import datetime

## Read Data

In [41]:
schema = StructType([
    StructField("timestamp_str", StringType(), True),
    StructField("elb", StringType(), True),
    StructField("client_port", StringType(), True),
    StructField("backend_port", StringType(), True),
    StructField("request_processing_time", FloatType(), True),
    StructField("backend_processing_time", FloatType(), True),
    StructField("response_processing_time", FloatType(), True),
    StructField("elb_status_code", StringType(), True),
    StructField("backend_status_code", StringType(), True),
    StructField("received_bytes", FloatType(), True),
    StructField("sent_bytes", FloatType(), True),
    StructField("request", StringType(), True),
    StructField("user_agent", StringType(), True),
    StructField("ssl_cipher", StringType(), True),
    StructField("ssl_protocol", StringType(), True),])
df=spark.read.option("delimiter", " ").csv("data/2015_07_22_mktplace_shop_web_log_sample.log.gz", schema=schema, header = False)
def format_time(ts):
    return datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S.%fZ")
func_format_time=F.udf(format_time,TimestampType())
#print format_time("2015-07-22T09:00:28.019143Z")
df = df.withColumn("timestamp",func_format_time(df.timestamp_str))
split_col1 = F.split(df['client_port'], ':')
split_col2 = F.split(df['request'], ' ')
df = df.withColumn('client_ip', split_col1.getItem(0))
df = df.withColumn('port', split_col1.getItem(1))
df = df.withColumn('url', split_col2.getItem(1))
df = df.orderBy('client_ip', 'timestamp','url')
df.printSchema()
df.persist()
#print df.count()
#1158500

root
 |-- timestamp_str: string (nullable = true)
 |-- elb: string (nullable = true)
 |-- client_port: string (nullable = true)
 |-- backend_port: string (nullable = true)
 |-- request_processing_time: float (nullable = true)
 |-- backend_processing_time: float (nullable = true)
 |-- response_processing_time: float (nullable = true)
 |-- elb_status_code: string (nullable = true)
 |-- backend_status_code: string (nullable = true)
 |-- received_bytes: float (nullable = true)
 |-- sent_bytes: float (nullable = true)
 |-- request: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- ssl_cipher: string (nullable = true)
 |-- ssl_protocol: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- client_ip: string (nullable = true)
 |-- port: string (nullable = true)
 |-- url: string (nullable = true)



In [42]:
pdf_distinct_vals=df.agg(*(F.countDistinct(F.col(c)).alias(c) for c in df.columns)).toPandas()
pdf_distinct_vals.reset_index().T

Unnamed: 0,0
index,0
timestamp_str,1158272
elb,1
client_port,404391
backend_port,25
request_processing_time,68
backend_processing_time,116722
response_processing_time,244
elb_status_code,27
backend_status_code,28


In [43]:
df.agg(F.min("timestamp").alias("earliest_time"), F.max("timestamp").alias("latest_time")).toPandas().T

Unnamed: 0,0
earliest_time,2015-07-22 02:40:06.499174
latest_time,2015-07-22 21:10:27.993803


In [45]:
from datetime import datetime
def get_time_diff_old(ts,ts_prev):
    if ts_prev is None:
        return -1.0
    else:
        fmt = '%Y-%m-%d %H:%M:%S'
        #ts = datetime.strptime(ts, fmt)
        #ts_prev = datetime.strptime(ts_prev, fmt)
        td = ts - ts_prev
        #td_mins = round(td.total_seconds() / 60.0,2)
        td_secs = td.total_seconds()
        return td_secs
def get_time_diff(ts,ts_prev):
    if ts_prev is None:
        return -1.0
    else:
        td = ts - ts_prev
        td_secs = td.total_seconds()
        return td_secs
func_get_time_diff=F.udf(get_time_diff,FloatType())

def session_change_flag(request_secs,time_window):
    request_mins=request_secs/60.0
    if request_secs==-1.0 or request_mins>=time_window:
        return 1
    else:
        return 0
func_session_change_flag=F.udf(session_change_flag,IntegerType())

In [46]:
my_window = Window.partitionBy('client_ip').orderBy('timestamp','url')
df = df.withColumn("prev_timestamp",F.lag(df.timestamp).over(my_window))
df = df.withColumn("diff_timestamp_secs",func_get_time_diff(df.timestamp,df.prev_timestamp))
df = df.withColumn("session_flag",func_session_change_flag(df.diff_timestamp_secs,F.lit(10)))
df = df.withColumn('session_id_temp', F.sum('session_flag').over(my_window))
df = df.withColumn('session_id',F.concat(F.col('client_ip'),F.lit('_'), F.col('session_id_temp')))
df = df.orderBy('client_ip', 'timestamp','url')
df.printSchema()

root
 |-- timestamp_str: string (nullable = true)
 |-- elb: string (nullable = true)
 |-- client_port: string (nullable = true)
 |-- backend_port: string (nullable = true)
 |-- request_processing_time: float (nullable = true)
 |-- backend_processing_time: float (nullable = true)
 |-- response_processing_time: float (nullable = true)
 |-- elb_status_code: string (nullable = true)
 |-- backend_status_code: string (nullable = true)
 |-- received_bytes: float (nullable = true)
 |-- sent_bytes: float (nullable = true)
 |-- request: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- ssl_cipher: string (nullable = true)
 |-- ssl_protocol: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- client_ip: string (nullable = true)
 |-- port: string (nullable = true)
 |-- url: string (nullable = true)
 |-- prev_timestamp: timestamp (nullable = true)
 |-- diff_timestamp_secs: float (nullable = true)
 |-- session_flag: integer (nullable = true)
 |-- session_i

In [77]:
#df_sessions=df.groupBy("session_id").agg(F.min("timestamp").alias("session_start_time"), F.max("timestamp").alias("session_end_time"))
df_sessions=df.groupBy("session_id").agg(F.min("timestamp").alias("session_start_time"), F.max("timestamp").alias("session_end_time"), F.countDistinct("url").alias("unique_urls"),F.count("url").alias("total_urls"))
df_sessions = df_sessions.withColumn("session_duration_secs",func_get_time_diff(df_sessions.session_end_time,df_sessions.session_start_time))
split_col3 = F.split(df_sessions['session_id'], '_')
df_sessions = df_sessions.withColumn('client_ip', split_col3.getItem(0))
df_sessions = df_sessions.withColumn('session_number', split_col3.getItem(1))
df_sessions = df_sessions.orderBy('session_id')
df_sessions.persist()

DataFrame[session_id: string, session_start_time: timestamp, session_end_time: timestamp, unique_urls: bigint, total_urls: bigint, session_duration_secs: float, client_ip: string, session_number: string]

### Ignore session with just one url

In [66]:
count_total_sessions=df_sessions.count()
count_single_urls_sessions=df_sessions.where(F.col('total_urls')==1).count()
df_sessions=df_sessions.where(F.col('total_urls')!=1)
count_filtered_sessions=df_sessions.count()
print '# total entries: ', df.count()
print '# total sessions: ', count_total_sessions
print '# single url sessions: ', count_single_urls_sessions
print '# filtered url sessions: ', count_filtered_sessions


# total entries:  1158500
# total sessions:  114244
# single url sessions:  22721
# filtered url sessions:  91523


## 1. Sessionize the web log by IP. Sessionize = aggregrate all page hits by visitor/IP during a session. 

In [47]:
cols=['client_ip','timestamp','prev_timestamp','diff_timestamp_secs','session_flag','session_id','url']
df[cols].toPandas().head(50)

Unnamed: 0,client_ip,timestamp,prev_timestamp,diff_timestamp_secs,session_flag,session_id,url
0,1.186.101.79,2015-07-22 10:45:55.881199,NaT,-1.0,1,1.186.101.79_1,https://paytm.com:443/shop/wallet/balance?chan...
1,1.186.101.79,2015-07-22 10:45:55.885488,2015-07-22 10:45:55.881199,0.004289,0,1.186.101.79_1,https://paytm.com:443/shop/wallet/txnhistory?p...
2,1.186.101.79,2015-07-22 10:46:27.839734,2015-07-22 10:45:55.885488,31.954247,0,1.186.101.79_1,https://paytm.com:443/shop/v1/frequentorders?c...
3,1.186.101.79,2015-07-22 10:46:56.591943,2015-07-22 10:46:27.839734,28.752209,0,1.186.101.79_1,https://paytm.com:443/papi/v1/expresscart/verify
4,1.186.101.79,2015-07-22 10:47:01.782695,2015-07-22 10:46:56.591943,5.190752,0,1.186.101.79_1,https://paytm.com:443/api/v1/expresscart/check...
5,1.186.101.79,2015-07-22 10:47:06.893987,2015-07-22 10:47:01.782695,5.111292,0,1.186.101.79_1,https://paytm.com:443/shop/summary/1116587591
6,1.186.101.79,2015-07-22 10:47:07.616869,2015-07-22 10:47:06.893987,0.722882,0,1.186.101.79_1,https://paytm.com:443/shop/cart?channel=web&ve...
7,1.186.101.79,2015-07-22 10:47:07.844446,2015-07-22 10:47:07.616869,0.227577,0,1.186.101.79_1,https://paytm.com:443/shop/orderdetail/1116587...
8,1.186.101.79,2015-07-22 10:47:18.072370,2015-07-22 10:47:07.844446,10.227924,0,1.186.101.79_1,https://paytm.com:443/shop/orderdetail/1116587...
9,1.186.101.79,2015-07-22 10:47:28.084661,2015-07-22 10:47:18.072370,10.012291,0,1.186.101.79_1,https://paytm.com:443/shop/orderdetail/1116587...


* In the above results we can see that for '1.186.101.79' there are two sessions

### 2. Determine the average session time

In [67]:
avg_session_time=round(df_sessions.select(F.avg("session_duration_secs").alias('avg_session_duration')).collect()[0]['avg_session_duration'],3)
print '# Average Session Time in Seconds: ',avg_session_time
# Average Session Time in Seconds:  94.537

# Average Session Time in Seconds:  94.537


### 3. Determine unique URL visits per session. To clarify, count a hit to a unique URL only once per session.

In [69]:
df_sessions.limit(100).toPandas()[['session_id','total_urls','unique_urls']].head(20)

Unnamed: 0,session_id,total_urls,unique_urls
0,1.186.101.79_1,19,9
1,1.186.101.79_2,14,8
2,1.186.103.240_1,7,5
3,1.186.103.78_1,4,4
4,1.186.108.213_1,2,2
5,1.186.108.230_1,5,5
6,1.186.108.242_1,3,3
7,1.186.108.28_1,80,80
8,1.186.108.29_1,3,3
9,1.186.108.79_1,3,3


### 4. Find the most engaged users, ie the IPs with the longest session times

### 4.1 Approch 1: considering different sessions

In [78]:
df_sessions_order_session_time=df_sessions.orderBy('session_duration_secs',ascending=False)
pdf_sessions_order_session_time=df_sessions_order_session_time.toPandas()
pdf_sessions_order_session_time.head(20)

Unnamed: 0,session_id,session_start_time,session_end_time,unique_urls,total_urls,session_duration_secs,client_ip,session_number
0,125.19.44.66_5,2015-07-22 10:30:28.187923,2015-07-22 10:49:52.669247,363,653,1164.481323,125.19.44.66,5
1,52.74.219.71_5,2015-07-22 10:30:28.220275,2015-07-22 10:49:52.557803,7599,8858,1164.337524,52.74.219.71,5
2,119.81.61.166_5,2015-07-22 10:30:28.314707,2015-07-22 10:49:52.643168,1307,1381,1164.328491,119.81.61.166,5
3,106.186.23.95_5,2015-07-22 10:30:28.615543,2015-07-22 10:49:52.305445,2112,2208,1163.689941,106.186.23.95,5
4,182.66.36.72_1,2015-07-22 10:30:28.168381,2015-07-22 10:49:50.986961,10,28,1162.818604,182.66.36.72,1
5,59.144.58.37_5,2015-07-22 10:30:28.992848,2015-07-22 10:49:51.631006,233,438,1162.638184,59.144.58.37,5
6,54.251.151.39_5,2015-07-22 10:30:30.025377,2015-07-22 10:49:52.635245,5,1181,1162.609863,54.251.151.39,5
7,196.15.16.102_4,2015-07-22 10:30:29.295367,2015-07-22 10:49:51.732297,57,84,1162.43689,196.15.16.102,4
8,103.29.159.186_2,2015-07-22 10:30:29.284428,2015-07-22 10:49:51.711237,2,246,1162.426758,103.29.159.186,2
9,180.211.69.209_4,2015-07-22 10:30:28.361971,2015-07-22 10:49:50.188754,53,81,1161.826782,180.211.69.209,4


### 4.2 Approch 2: considering average session time

In [80]:
df_sessions_avg=df_sessions.groupBy("client_ip").agg(F.count("session_number").alias("number_of_session"), F.avg("session_duration_secs").alias("avg_session_time")).orderBy('avg_session_time',ascending=False)
df_sessions_avg.limit(100).toPandas().head(20)


Unnamed: 0,client_ip,number_of_session,avg_session_time
0,182.66.36.72,1,1162.818604
1,49.202.59.215,1,1157.597168
2,59.91.249.99,1,1141.681519
3,180.215.72.113,1,1139.791504
4,61.8.146.66,1,1139.737549
5,1.23.30.226,1,1134.829224
6,60.254.126.213,1,1134.290649
7,117.203.81.246,1,1130.084229
8,112.79.36.86,1,1129.544189
9,125.22.34.83,1,1129.238037
