In [1]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName="jupyter")

from pyspark.sql import SparkSession
se = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-10-10 19:59:33,561 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [2]:
from pyspark.sql import functions as F

In [3]:
! hadoop fs -copyFromLocal clickstream.csv

copyFromLocal: `clickstream.csv': File exists


In [4]:
# read csv

csv_df = se.read.options(header=True, delimiter="\t", inferSchema=True).csv("clickstream.csv")
csv_df.show(5)

                                                                                

+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1695584127|
|    562|       507|       event|      main|1695584134|
|    562|       507|       event|      main|1695584144|
|    562|       507|       event|      main|1695584147|
|    562|       507|wNaxLlerrorU|      main|1695584154|
+-------+----------+------------+----------+----------+
only showing top 5 rows



In [5]:
# remove events

clean_df = csv_df.filter(csv_df["event_type"] == "page")
clean_df.show(5)

+-------+----------+----------+----------+----------+
|user_id|session_id|event_type|event_page| timestamp|
+-------+----------+----------+----------+----------+
|    562|       507|      page|      main|1695584127|
|    562|       507|      page|    rabota|1695584166|
|    562|       507|      page|      main|1695584194|
|    562|       507|      page|     bonus|1695584221|
|    562|       507|      page|    online|1695584222|
+-------+----------+----------+----------+----------+
only showing top 5 rows



In [6]:
# time of first error for each session

error_df = (csv_df.filter(csv_df["event_type"].contains("error"))
    .groupBy(["user_id", "session_id"])
    .agg(F.min("timestamp").alias("firsterrortimestamp")))
error_df.show(30)



+-------+----------+-------------------+
|user_id|session_id|firsterrortimestamp|
+-------+----------+-------------------+
|   3513|        68|         1695623875|
|   4332|       766|         1695633583|
|   4757|       611|         1695653221|
|   2009|       827|         1695747863|
|   1731|       193|         1695798006|
|   3663|       287|         1695846314|
|   3836|       329|         1696020745|
|   4568|       350|         1696050732|
|   4816|       126|         1696119282|
|    957|       179|         1696199592|
|   4327|       423|         1696258181|
|   1093|       721|         1695711604|
|   3425|        60|         1695910385|
|   1657|       675|         1695913699|
|   1985|        91|         1695980984|
|   4007|       423|         1695986102|
|   3817|       652|         1695999710|
|   2821|       888|         1696151396|
|    114|       354|         1695680693|
|   2633|       287|         1695754663|
|   2979|       852|         1695768939|
|   3156|       

                                                                                

In [7]:
# page clipping after first error

joined_df = clean_df.join(error_df, on=["user_id", "session_id"], how="left")
faultless_df = joined_df.filter(joined_df["firsterrortimestamp"].isNull() | (joined_df["firsterrortimestamp"] > joined_df["timestamp"]))
faultless_df.show(30)

                                                                                

+-------+----------+----------+----------+----------+-------------------+
|user_id|session_id|event_type|event_page| timestamp|firsterrortimestamp|
+-------+----------+----------+----------+----------+-------------------+
|   1889|       140|      page|      main|1695614937|               null|
|   1889|       140|      page|  internet|1695614956|               null|
|   1889|       140|      page|   archive|1695614980|               null|
|   1889|       140|      page|      main|1695615032|               null|
|   3513|        68|      page|      main|1695619122|         1695623875|
|   3513|        68|      page|   tariffs|1695619197|         1695623875|
|   3513|        68|      page|     bonus|1695619627|         1695623875|
|   3513|        68|      page|      main|1695622918|         1695623875|
|   3513|        68|      page|   tariffs|1695623309|         1695623875|
|   3513|        68|      page|   archive|1695623808|         1695623875|
|   4332|       766|      page|      m

In [8]:
# create routes

route_df = (faultless_df
    .groupBy(["user_id", "session_id"])
    .agg(F.expr("concat_ws('-', sort_array(collect_list(struct(timestamp, event_page))).event_page) as route")))
route_df.show(5)

                                                                                

+-------+----------+--------------------+
|user_id|session_id|               route|
+-------+----------+--------------------+
|      0|       874|  main-rabota-online|
|      0|       898|main-news-tariffs...|
|      0|       901|main-internet-bon...|
|      1|       954|          main-bonus|
|      1|       979|main-rabota-archi...|
+-------+----------+--------------------+
only showing top 5 rows



In [9]:
def remove_duplicates(s: str) -> str:
    '''Removes consecutive repetitions of the same page from the path, leaving only one
    
    Example:
    
    Transforms: 'main-main-rabota-online'
    Returns: 'main-rabota-online'
    '''
    pages = str(s).split('-')
    filtered_pages = []
    for page in pages:
        if not filtered_pages or page != filtered_pages[-1]:
            filtered_pages.append(page)
    return '-'.join(filtered_pages)

remover = F.udf(remove_duplicates)

In [10]:
good_routes_df = route_df.select('user_id', 'session_id', remover('route').alias("route"))
good_routes_df.show(5)

[Stage 24:>                                                         (0 + 1) / 1]

+-------+----------+--------------------+
|user_id|session_id|               route|
+-------+----------+--------------------+
|      0|       874|  main-rabota-online|
|      0|       898|main-news-tariffs...|
|      0|       901|main-internet-bon...|
|      1|       954|          main-bonus|
|      1|       979|main-rabota-archi...|
+-------+----------+--------------------+
only showing top 5 rows



                                                                                

In [11]:
# count routes

routes_count_df = good_routes_df.groupby('route').agg(F.count('route').alias("cnt"))
routes_count_df.show(5)



+--------------------+---+
|               route|cnt|
+--------------------+---+
|main-archive-inte...| 21|
|main-archive-vkla...|  1|
|main-internet-bon...|  8|
|main-online-tarif...|  1|
|main-archive-bonu...| 12|
+--------------------+---+
only showing top 5 rows



                                                                                

In [12]:
# top 30

res = routes_count_df.sort('cnt', ascending=False).limit(30)
res.show(30)

                                                                                

+--------------------+----+
|               route| cnt|
+--------------------+----+
|                main|8184|
|        main-archive|1112|
|         main-rabota|1047|
|       main-internet| 896|
|          main-bonus| 870|
|           main-news| 769|
|        main-tariffs| 677|
|         main-online| 587|
|          main-vklad| 518|
| main-rabota-archive| 170|
| main-archive-rabota| 167|
|  main-bonus-archive| 143|
|   main-rabota-bonus| 138|
|   main-bonus-rabota| 136|
|    main-news-rabota| 135|
|main-archive-inte...| 132|
|    main-rabota-news| 130|
|main-internet-rabota| 129|
|   main-archive-news| 126|
|main-rabota-internet| 124|
|main-internet-arc...| 123|
|  main-archive-bonus| 117|
| main-internet-bonus| 115|
|main-tariffs-inte...| 114|
|   main-news-archive| 113|
|  main-news-internet| 109|
|main-archive-tariffs| 104|
|  main-internet-news| 103|
|main-tariffs-archive| 103|
|    main-rabota-main|  94|
+--------------------+----+



                                                                                

In [13]:
# write result

res.toPandas().to_csv('df_output.csv', header=False, index=False, sep='\t')

                                                                                

In [14]:
sc.stop()