In [89]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [90]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, FloatType, LongType

schema = StructType([
    StructField("time", TimestampType(), True),
    StructField("elb", StringType(), True),
    StructField("client", StringType(), True),
    StructField("backend", StringType(), True),
    StructField("request_processing_time", FloatType(), True),
    StructField("backend_processing_time", FloatType(), True),
    StructField("response_processing_time", FloatType(), True),
    StructField("elb_status_code", StringType(), True),
    StructField("backend_status_code", StringType(), True),
    StructField("received_byes", IntegerType(), True),
    StructField("sent_bytes", IntegerType(), True),
    StructField("request", StringType(), True),
    StructField("user_agent", StringType(), True),
    StructField("ssl_cipher", StringType(), True),
    StructField("ssl_protocol", StringType(), True),
])

df = spark.read.csv("2015_07_22_mktplace_shop_web_log_sample.csv", header=False, inferSchema=True, sep=" ", schema=schema )

In [91]:
df.show(10)

+--------------------+----------------+--------------------+-------------+-----------------------+-----------------------+------------------------+---------------+-------------------+-------------+----------+--------------------+--------------------+--------------------+------------+
|                time|             elb|              client|      backend|request_processing_time|backend_processing_time|response_processing_time|elb_status_code|backend_status_code|received_byes|sent_bytes|             request|          user_agent|          ssl_cipher|ssl_protocol|
+--------------------+----------------+--------------------+-------------+-----------------------+-----------------------+------------------------+---------------+-------------------+-------------+----------+--------------------+--------------------+--------------------+------------+
|2015-07-22 05:00:...|marketpalce-shop|123.242.248.130:5...|10.0.6.158:80|                 2.2E-5|               0.026109|                  2.0E-

In [92]:
df.printSchema()

root
 |-- time: timestamp (nullable = true)
 |-- elb: string (nullable = true)
 |-- client: string (nullable = true)
 |-- backend: string (nullable = true)
 |-- request_processing_time: float (nullable = true)
 |-- backend_processing_time: float (nullable = true)
 |-- response_processing_time: float (nullable = true)
 |-- elb_status_code: string (nullable = true)
 |-- backend_status_code: string (nullable = true)
 |-- received_byes: integer (nullable = true)
 |-- sent_bytes: integer (nullable = true)
 |-- request: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- ssl_cipher: string (nullable = true)
 |-- ssl_protocol: string (nullable = true)



In [93]:
df.select('time')

DataFrame[time: timestamp]

In [94]:
from pyspark.sql.functions import lag
df = df.withColumn('prev_time',
                        lag(df['time'])
                        .over(Window.partitionBy("client")
                        .orderBy("time")))
df= df.withColumn("time_diff",(df.time.cast(LongType()) - df.prev_time.cast(LongType())))
# df = df.withColumn("time_diff", df["time_diff"].cast(IntegerType()))

In [95]:
df.show(10)

+--------------------+----------------+-------------------+-------------+-----------------------+-----------------------+------------------------+---------------+-------------------+-------------+----------+--------------------+--------------------+--------------------+------------+--------------------+---------+
|                time|             elb|             client|      backend|request_processing_time|backend_processing_time|response_processing_time|elb_status_code|backend_status_code|received_byes|sent_bytes|             request|          user_agent|          ssl_cipher|ssl_protocol|           prev_time|time_diff|
+--------------------+----------------+-------------------+-------------+-----------------------+-----------------------+------------------------+---------------+-------------------+-------------+----------+--------------------+--------------------+--------------------+------------+--------------------+---------+
|2015-07-22 05:04:...|marketpalce-shop|1.187.167.214:65

In [98]:
#new session if timelag > 15 mins or 900 seconds
from pyspark.sql.functions import *
df = df.withColumn('is_new_session', when( col('time_diff') > 900 , 1).otherwise(0))
#Assign a new session id (incremental and client specific) if there is a new session
df = df.withColumn("user_session_id", sum('is_new_session').over(Window.partitionBy('client').orderBy('time')))
df.show(10)

+--------------------+----------------+-------------------+-------------+-----------------------+-----------------------+------------------------+---------------+-------------------+-------------+----------+--------------------+--------------------+--------------------+------------+--------------------+---------+--------------+---------------+
|                time|             elb|             client|      backend|request_processing_time|backend_processing_time|response_processing_time|elb_status_code|backend_status_code|received_byes|sent_bytes|             request|          user_agent|          ssl_cipher|ssl_protocol|           prev_time|time_diff|is_new_session|user_session_id|
+--------------------+----------------+-------------------+-------------+-----------------------+-----------------------+------------------------+---------------+-------------------+-------------+----------+--------------------+--------------------+--------------------+------------+--------------------+----

## Avergae session time

In [105]:
from pyspark.sql.functions import sum,avg,max,min,mean,count, first, last, countDistinct
grpdf=df.groupby('client', 'user_session_id')\
    .agg(min("time").alias("session_start"), \
          max("time").alias("session_end"),
          count("time").alias("request_count"),
          countDistinct("request").alias("unique_request_count"))
grpdf= grpdf.withColumn("session_len",(grpdf.session_end.cast(LongType()) - grpdf.session_start.cast(LongType())))
grpdf.show()

+-------------------+---------------+--------------------+--------------------+-------------+--------------------+-----------+
|             client|user_session_id|       session_start|         session_end|request_count|unique_request_count|session_len|
+-------------------+---------------+--------------------+--------------------+-------------+--------------------+-----------+
|1.187.167.214:65257|              0|2015-07-22 05:04:...|2015-07-22 05:04:...|            1|                   1|          0|
| 1.187.170.77:64760|              0|2015-07-22 05:02:...|2015-07-22 05:02:...|            1|                   1|          0|
|1.187.179.217:34549|              0|2015-07-22 12:11:...|2015-07-22 12:12:...|            3|                   3|         44|
|1.187.185.201:46980|              0|2015-07-22 12:23:...|2015-07-22 12:23:...|            1|                   1|          0|
| 1.187.202.35:38668|              0|2015-07-22 07:01:...|2015-07-22 07:02:...|            5|                  

In [106]:
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = grpdf.select(
    _mean(col('session_len')).alias('mean')
).collect()

mean = df_stats[0]['mean']
print(mean)

14.487045344712822


## Unique requests

In [107]:
grpdf.select('client', 'user_session_id', 'unique_request_count').show(10)

+-------------------+---------------+--------------------+
|             client|user_session_id|unique_request_count|
+-------------------+---------------+--------------------+
|1.187.167.214:65257|              0|                   1|
| 1.187.170.77:64760|              0|                   1|
|1.187.179.217:34549|              0|                   3|
|1.187.185.201:46980|              0|                   1|
| 1.187.202.35:38668|              0|                   4|
|  1.187.214.14:6921|              0|                   4|
|1.187.224.105:12587|              0|                   1|
| 1.187.224.54:28093|              0|                   3|
|  1.187.51.226:7451|              0|                   2|
|   1.22.16.45:46216|              0|                   6|
+-------------------+---------------+--------------------+
only showing top 10 rows



In [109]:
df_stats = grpdf.select(
    _mean(col('unique_request_count')).alias('mean')
).collect()

mean = df_stats[0]['mean']
print('Average number of unique requests per session', mean)

Average number of unique requests per session 2.382494004796163


## Longest session times

In [111]:
grpdf.sort(grpdf.session_len.desc()).show(20)

+--------------------+---------------+--------------------+--------------------+-------------+--------------------+-----------+
|              client|user_session_id|       session_start|         session_end|request_count|unique_request_count|session_len|
+--------------------+---------------+--------------------+--------------------+-------------+--------------------+-----------+
|203.191.34.178:10400|              1|2015-07-22 06:30:...|2015-07-22 07:04:...|          126|                  10|       2066|
|103.29.159.138:57045|              0|2015-07-22 06:30:...|2015-07-22 07:04:...|           96|                   3|       2065|
|213.239.204.204:3...|              0|2015-07-22 06:30:...|2015-07-22 07:04:...|          234|                 234|       2065|
|   78.46.60.71:58504|              0|2015-07-22 06:30:...|2015-07-22 07:04:...|          237|                 237|       2064|
| 54.169.191.85:15462|              0|2015-07-22 06:30:...|2015-07-22 07:04:...|          314|          

In [97]:
# from pyspark.sql.types import ArrayType
# from pyspark.sql.types import *
# from pyspark.sql.functions import udf
# def assign_sessions(time_diffs):
#     i=0
#     sessions=[]
#     if time_diffs is None:
#         return None
#     if isinstance(time_diffs,int):
#         return [1]
#     for time_diff in time_diffs:
# #         sessions.append(i)
#         if (time_diff=='null'):
#             i=i+1
#             sessions.append(i)
#         elif (time_diff<900):
#             sessions.append(i)
#         else:
#             i=i+1
#             sessions.append(i)
#     return sessions

# assign_sessions_udf=udf(lambda y: assign_sessions(y), ArrayType(IntegerType()))
# # spark.udf.register("assign_sessions_udf", assign_sessions, ArrayType(IntegerType()))            
# df = df.withColumn('sessionid',
#                     assign_sessions_udf(df['time_diff'])
#                     .over(Window.partitionBy("client")
#                     .orderBy("time").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))
# # df = df.withColumn('sessionid',
# #                     assign_sessions_udf(df['time_diff'])
# #                     )