# Large Scale ML. Final Project

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql.window import Window
import os
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
import json

In [3]:
os.environ["PYTHONHASHSEED"]=str(232)

In [4]:
sc = SparkContext(appName="final_project")
se = SparkSession(sc)

In [5]:
sc

In [6]:
filename = "clickstream.csv"
sessions_data = se.read.csv(filename, header=True, sep='\t', inferSchema=True)
sessions_data.show(10)

+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1695584127|
|    562|       507|       event|      main|1695584134|
|    562|       507|       event|      main|1695584144|
|    562|       507|       event|      main|1695584147|
|    562|       507|wNaxLlerrorU|      main|1695584154|
|    562|       507|       event|      main|1695584154|
|    562|       507|       event|      main|1695584154|
|    562|       507|       event|      main|1695584160|
|    562|       507|        page|    rabota|1695584166|
|    562|       507|       event|    rabota|1695584174|
+-------+----------+------------+----------+----------+
only showing top 10 rows



## SQL

In [7]:
sessions = sessions_data.cache()
sessions.createOrReplaceTempView('clickstream')
w = Window.partitionBy("user_id", "session_id").orderBy("timestamp")

In [8]:
query = """WITH base AS (SELECT * FROM clickstream ORDER BY user_id, session_id, timestamp), filtered_sessions AS 
(SELECT * ,SUM(CASE WHEN event_type LIKE '%error%' THEN 1 ELSE 0 END) OVER (PARTITION BY user_id, session_id ORDER BY timestamp) as error_count
FROM base WHERE 1=1) ,valid_sessions AS (SELECT user_id, session_id, event_page, timestamp FROM filtered_sessions
WHERE 1=1 AND error_count = 0 AND event_type = 'page')
,user_routes AS (SELECT user_id, session_id, CONCAT_WS('-', COLLECT_LIST(event_page) OVER 
(PARTITION BY user_id, session_id ORDER BY timestamp, event_page ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) 
as route FROM valid_sessions) SELECT route, COUNT(*) as count FROM (SELECT DISTINCT * FROM user_routes) GROUP BY route
ORDER BY count DESC LIMIT 30"""

In [9]:
result = se.sql(query)
result.show(30, truncate=False)

+---------------------+-----+
|route                |count|
+---------------------+-----+
|main                 |8090 |
|main-archive         |1095 |
|main-rabota          |1039 |
|main-internet        |879  |
|main-bonus           |865  |
|main-news            |760  |
|main-tariffs         |669  |
|main-online          |584  |
|main-vklad           |514  |
|main-archive-rabota  |167  |
|main-rabota-archive  |167  |
|main-bonus-archive   |139  |
|main-rabota-bonus    |136  |
|main-bonus-rabota    |134  |
|main-news-rabota     |134  |
|main-archive-internet|131  |
|main-rabota-news     |129  |
|main-internet-rabota |128  |
|main-archive-news    |125  |
|main-rabota-internet |123  |
|main-internet-archive|123  |
|main-archive-bonus   |117  |
|main-tariffs-internet|114  |
|main-internet-bonus  |114  |
|main-news-archive    |112  |
|main-news-internet   |108  |
|main-archive-tariffs |103  |
|main-internet-news   |102  |
|main-tariffs-archive |102  |
|main-main            |94   |
+---------

## RDD

In [10]:
sessions = sessions_data.rdd.persist()

In [11]:
errors = sessions.filter(lambda row: "error" in row.event_type.lower())
errors = errors.map(lambda row: ((row.user_id, row.session_id), row.timestamp))
errors = errors.reduceByKey(lambda a,b: min(a, b))
errors.take(10)

[((844, 258), 1695584652),
 ((116, 994), 1695586753),
 ((4152, 646), 1695592170),
 ((165, 481), 1695596660),
 ((3472, 86), 1695598280),
 ((3956, 130), 1695600853),
 ((1817, 253), 1695601060),
 ((3870, 372), 1695611823),
 ((3304, 54), 1695617988),
 ((2564, 10), 1695620464)]

In [13]:
correct = sessions.filter(lambda r: r.event_type.lower() == "page")
correct = correct.map(lambda r: ((r.user_id, r.session_id), (r.event_page, r.timestamp)))
correct = correct.leftOuterJoin(errors).filter(lambda a: a[1][1] is None or (a[1][0][1] < a[1][1]))                        
correct = correct.map(lambda a: (a[0], a[1][0])).filter(lambda a: a[1][1] is not None)          
correct.take(10)

[((2209, 541), ('main', 1695585442)),
 ((2209, 541), ('bonus', 1695585553)),
 ((2209, 541), ('online', 1695585593)),
 ((2209, 541), ('internet', 1695585792)),
 ((2209, 541), ('news', 1695585878)),
 ((2209, 541), ('main', 1695585994)),
 ((116, 994), ('main', 1695586629)),
 ((116, 994), ('tariffs', 1695586637)),
 ((116, 994), ('archive', 1695586673)),
 ((116, 994), ('vklad', 1695586703))]

In [14]:
routes = correct.groupByKey()
routes = routes.mapValues(lambda a: '-'.join([page for page, timestamp in sorted(a, key=lambda b: (b[1], b[0]))]))
routes = routes.map(lambda a: (a[1], 1))
routes = routes.reduceByKey(lambda a, b: a + b)
routes = routes.takeOrdered(30, key=lambda x: -x[1])

In [15]:
for route, count in routes:
    print(route, "-", count)

main - 8090
main-archive - 1095
main-rabota - 1039
main-internet - 879
main-bonus - 865
main-news - 760
main-tariffs - 669
main-online - 584
main-vklad - 514
main-rabota-archive - 167
main-archive-rabota - 167
main-bonus-archive - 139
main-rabota-bonus - 136
main-news-rabota - 134
main-bonus-rabota - 134
main-archive-internet - 131
main-rabota-news - 129
main-internet-rabota - 128
main-archive-news - 125
main-rabota-internet - 123
main-internet-archive - 123
main-archive-bonus - 117
main-tariffs-internet - 114
main-internet-bonus - 114
main-news-archive - 112
main-news-internet - 108
main-archive-tariffs - 103
main-internet-news - 102
main-tariffs-archive - 102
main-main - 94


## DF 

In [16]:
sessions = sessions_data.orderBy("user_id", "session_id", "event_page", "timestamp")

In [17]:
w = Window.partitionBy("user_id", "session_id")
w = w.orderBy("timestamp")

In [18]:
sessions = sessions.withColumn("error_count", F.sum(F.when(F.col("event_type").like("%error%"), 1).otherwise(0)).over(w))
sessions.show(10)

+-------+----------+----------+----------+----------+-----------+
|user_id|session_id|event_type|event_page| timestamp|error_count|
+-------+----------+----------+----------+----------+-----------+
|      0|       874|      page|      main|1696371064|          0|
|      0|       874|     event|      main|1696372696|          0|
|      0|       874|     event|      main|1696373564|          0|
|      0|       874|      page|    rabota|1696374894|          0|
|      0|       874|     event|    rabota|1696377393|          0|
|      0|       874|      page|    online|1696378229|          0|
|      0|       874|     event|    online|1696378928|          0|
|      0|       901|      page|      main|1698989569|          0|
|      0|       901|     event|      main|1698989581|          0|
|      0|       901|      page|  internet|1698989737|          0|
+-------+----------+----------+----------+----------+-----------+
only showing top 10 rows



In [19]:
correct = sessions.filter((F.col("error_count") == 0) & (F.col("event_type") == "page"))
correct.show(10)

+-------+----------+----------+----------+----------+-----------+
|user_id|session_id|event_type|event_page| timestamp|error_count|
+-------+----------+----------+----------+----------+-----------+
|      0|       874|      page|      main|1696371064|          0|
|      0|       874|      page|    rabota|1696374894|          0|
|      0|       874|      page|    online|1696378229|          0|
|      0|       901|      page|      main|1698989569|          0|
|      0|       901|      page|  internet|1698989737|          0|
|      0|       901|      page|     bonus|1698989797|          0|
|      0|       901|      page|      main|1698989993|          0|
|      0|       901|      page|  internet|1698990779|          0|
|      0|       901|      page|     vklad|1698991260|          0|
|      0|       901|      page|      main|1698991594|          0|
+-------+----------+----------+----------+----------+-----------+
only showing top 10 rows



In [20]:
routes = correct.withColumn("route", F.concat_ws('-', F.collect_list("event_page").over(w.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))).select("user_id", "session_id", "route")
routes = routes.distinct()
routes = routes.groupBy("route")
routes = routes.count()
routes = routes.orderBy(F.col("count").desc())
routes = routes.limit(30)

In [21]:
routes.show(30, truncate=False)

+---------------------+-----+
|route                |count|
+---------------------+-----+
|main                 |8090 |
|main-archive         |1095 |
|main-rabota          |1039 |
|main-internet        |879  |
|main-bonus           |865  |
|main-news            |760  |
|main-tariffs         |669  |
|main-online          |584  |
|main-vklad           |514  |
|main-archive-rabota  |167  |
|main-rabota-archive  |167  |
|main-bonus-archive   |139  |
|main-rabota-bonus    |136  |
|main-bonus-rabota    |134  |
|main-news-rabota     |134  |
|main-archive-internet|131  |
|main-rabota-news     |129  |
|main-internet-rabota |128  |
|main-archive-news    |125  |
|main-rabota-internet |123  |
|main-internet-archive|123  |
|main-archive-bonus   |117  |
|main-internet-bonus  |114  |
|main-tariffs-internet|114  |
|main-news-archive    |112  |
|main-news-internet   |108  |
|main-archive-tariffs |103  |
|main-internet-news   |102  |
|main-tariffs-archive |102  |
|main-main            |94   |
+---------

In [22]:
result = routes.toJSON()
output = [eval(i) for i in result.collect()]

with open("result.json", 'w') as f:
    json.dump(output, f)