In [41]:
import pyspark
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as f

In [98]:
sc = SparkSession.builder.master("local") \
                 .getOrCreate()

In [99]:
clickstream = sc.read.csv("clickstream.csv", sep="\t", header=True)

In [100]:
clickstream.show()

+-------+----------+----------+-------------+----------+
|user_id|session_id|event_type|   event_page| timestamp|
+-------+----------+----------+-------------+----------+
|     10|         1|      page|         main|1619563674|
|     10|         1|     event|         main|1619563683|
|      1|       308|      page|         main|1619563751|
|      1|       308|     event|         main|1619563759|
|      1|       308|     event|         main|1619563760|
|      1|       317|      page|         main|1619563805|
|      1|       317|     event|         main|1619563811|
|      0|       862|      page|         main|1619563877|
|      0|       862|     event|         main|1619563878|
|      0|       862|     event|         main|1619563886|
|      0|       862|     event|         main|1619563894|
|      0|       862|      page|      archive|1619563897|
|      0|       862|     event|      archive|1619563906|
|      0|       862|      page|         main|1619563913|
|      0|       862|      page|

In [101]:
# список пользователей с ошибками
users_with_error = clickstream.filter(f.col("event_type").rlike(".*error.*")) \
                              .groupBy("user_id", "session_id") \
                              .agg(f.min("timestamp").alias("min_error_timestamp"))
users_with_error.show()

+-------+----------+-------------------+
|user_id|session_id|min_error_timestamp|
+-------+----------+-------------------+
|      3|       807|         1619577711|
|      5|       139|         1619574897|
|      9|      1027|         1619583200|
|      9|       969|         1619571039|
|      6|       985|         1619595904|
|      8|       491|         1619567943|
|      9|       943|         1619564185|
|     10|        63|         1619581483|
|     10|       132|         1619606542|
|     10|       176|         1619617542|
|      5|       177|         1619584670|
|      6|       924|         1619579940|
|      8|       591|         1619596008|
|      0|      1098|         1619620160|
|      3|       778|         1619570321|
|      3|       978|         1619620541|
|      4|       420|         1619607416|
|      9|       945|         1619566944|
|      0|      1070|         1619614804|
|      2|       377|         1619580535|
+-------+----------+-------------------+
only showing top

In [102]:
# удаление событий после ошибок
clickstream_correct = clickstream.join(users_with_error, 
                                       on=["user_id", "session_id"],
                                       how="left_outer") \
                                 .filter((f.col("timestamp") < f.col("min_error_timestamp")) | 
                                          f.col("min_error_timestamp").isNull()) \
                                 .drop("min_error_timestamp")

clickstream_correct.show()

+-------+----------+----------+-------------+----------+
|user_id|session_id|event_type|   event_page| timestamp|
+-------+----------+----------+-------------+----------+
|     10|         1|      page|         main|1619563674|
|     10|         1|     event|         main|1619563683|
|      1|       308|      page|         main|1619563751|
|      1|       308|     event|         main|1619563759|
|      1|       308|     event|         main|1619563760|
|      1|       317|      page|         main|1619563805|
|      1|       317|     event|         main|1619563811|
|      0|       862|      page|         main|1619563877|
|      0|       862|     event|         main|1619563878|
|      0|       862|     event|         main|1619563886|
|      0|       862|     event|         main|1619563894|
|      0|       862|      page|      archive|1619563897|
|      0|       862|     event|      archive|1619563906|
|      0|       862|      page|         main|1619563913|
|      0|       862|      page|

In [103]:
# удаление случайных страниц
w = Window().partitionBy("user_id", "session_id") \
            .orderBy("timestamp") 

clickstream_nonrandom = clickstream_correct.withColumn("next_event_type", 
                                                      f.lead("event_type", default="page").over(w)) \
                                           .filter(~((f.col("event_type") == "page") & 
                                                    (f.col("next_event_type") == "page"))) \
                                           .drop("next_event_type")
clickstream_nonrandom.show()

+-------+----------+----------+-----------+----------+
|user_id|session_id|event_type| event_page| timestamp|
+-------+----------+----------+-----------+----------+
|      6|      1069|      page|    archive|1619615653|
|      6|      1069|     event|    archive|1619615673|
|      6|      1069|     event|    archive|1619615682|
|      6|      1069|     event|    archive|1619615692|
|      6|      1069|     event|    archive|1619615706|
|      6|      1069|      page|       main|1619615717|
|      6|      1069|     event|       main|1619615719|
|      6|      1069|     event|       main|1619615721|
|      6|      1069|     event|       main|1619615789|
|      6|      1069|     event|       main|1619615792|
|      6|      1069|     event|       main|1619615840|
|      6|      1069|      page|autopayment|1619615886|
|      6|      1069|     event|autopayment|1619615896|
|      6|      1069|      page|       main|1619615996|
|      6|      1069|     event|       main|1619616002|
|      6| 

In [104]:
print(clickstream.count())
print(clickstream_correct.count())
print(clickstream_nonrandom.count())

10000
7375
6150


In [105]:
# группировка сессий и составление пользовательского пути
user_journey_map = clickstream_nonrandom.filter("event_type = 'page'") \
                                        .groupBy("user_id", "session_id") \
                                        .agg(f.concat_ws("-", f.collect_list("event_page")).alias("journey"))
user_journey_map.show()

+-------+----------+--------------------+
|user_id|session_id|             journey|
+-------+----------+--------------------+
|      6|      1069|archive-main-auto...|
|      1|       502|main-main-archive...|
|      3|       807|autopayment-bezna...|
|      4|       319|       beznal_cc_rus|
|      5|       139|main-autopayment-...|
|      7|       501|main-beznal_cc_ru...|
|      0|       969|main-beznal_cc_ru...|
|      8|       632|main-autopayment-...|
|      3|       756|main-beznal_cc_ru...|
|      9|      1027|           main-main|
|      0|      1053|                main|
|     10|        76|autopayment-main-...|
|      0|       923|  main-beznal_cc_rus|
|     10|        34|beznal_cc_rus-aut...|
|      4|       432|                main|
|      6|       909|             archive|
|      9|      1092|archive-main-main...|
|      9|       993|main-beznal_cc_ru...|
|      4|       446|main-autopayment-...|
|      1|       516|                main|
+-------+----------+--------------

In [106]:
# статистика по самым частым пользовательским маршрутам
journey_map_stats = user_journey_map.groupBy("journey").count() \
                                    .orderBy(f.col("count").desc()) \
                                    .limit(100)
journey_map_stats.show()

+--------------------+-----+
|             journey|count|
+--------------------+-----+
|                main|   70|
|        main-archive|   20|
|  main-beznal_cc_rus|   17|
|    main-autopayment|   17|
|       beznal_cc_rus|   10|
|             archive|    9|
|           main-main|    9|
|   main-archive-main|    7|
|main-autopayment-...|    7|
|main-archive-auto...|    6|
|  beznal_cc_rus-main|    6|
|main-autopayment-...|    6|
|         autopayment|    5|
|main-beznal_cc_ru...|    5|
|        archive-main|    4|
|autopayment-autop...|    4|
| autopayment-archive|    4|
|main-beznal_cc_ru...|    3|
|main-beznal_cc_ru...|    3|
|main-beznal_cc_ru...|    2|
+--------------------+-----+
only showing top 20 rows

