In [324]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import pyspark.sql.functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
from pyspark.sql.functions import udf

In [325]:
spark = SparkSession \
    .builder \
    .appName("sm_sga") \
    .master("yarn") \
    .getOrCreate()

In [326]:
df = spark.read.option("delimiter", "\t")\
    .option("header", True)\
    .option("inferSchema", True)\
    .csv("/data/lsml/sga/clickstream.csv")

In [327]:
#df.show(10)

In [328]:
df_errors = df.filter(f.lower(df.event_type).contains("error")) # filter error records

In [329]:
#df_errors.show()

In [330]:
df_errors = df_errors.groupBy("user_id", "session_id")\
    .agg(f.min(df_errors.timestamp)) # find first error record 

In [331]:
#df_errors.show()

In [332]:
df_errors = df_errors.withColumnRenamed("user_id","uid") \
    .withColumnRenamed("session_id","sid")\
    .withColumnRenamed("min(timestamp)", "mtime")  # change headers for join

In [333]:
#df_errors.show()

In [334]:
df_pages = df.filter(f.lower(df.event_type) == "page") # fileter page records

In [335]:
#df_pages.show()

In [336]:
df_join = df_pages.join(df_errors, [df_pages.user_id == df_errors.uid, df_pages.session_id == df_errors.sid], 'left')

In [337]:
#df_join.show()

In [338]:
df_join = df_join.drop("uid", "sid")

In [339]:
#df_join.show()

In [340]:
df_join.groupby().max('timestamp').collect()

[Row(max(timestamp)=1646883660)]

In [341]:
df_join = df_join.na.fill(1646883661)

In [342]:
# df_join.count()

In [343]:
df_join = df_join.filter(df_join.timestamp < df_join.mtime)

In [344]:
#df_join_sorted.show()

In [345]:
window = Window.partitionBy("user_id", "session_id").orderBy("timestamp")

In [346]:
df_concat = df_join.withColumn("concatStr" , f.collect_list("event_page").over(window)) 

In [347]:
#df_concat.show()

In [348]:
window = Window.partitionBy("user_id", "session_id").orderBy(f.desc("timestamp"))

In [349]:
df_result = df_concat.withColumn("row", f.row_number().over(window))

In [350]:
# df_result.show()

In [351]:
df_result = df_result.filter(col("row") == 1).drop("row")

In [352]:
# df_result.show()

In [353]:
df_result = df_result.groupBy(df_result.concatStr)\
    .agg(f.count(df_result.concatStr)\
    .alias("r_count")).sort(f.desc("r_count"))

In [354]:
# df_result.show()

In [355]:
def fn2(x):
    return '-'.join(x)

In [356]:
concat_udf = udf(lambda x: fn2(x))
df_result.withColumn('concatStr', concat_udf(df_result.concatStr)).show(30)

+--------------------+-------+
|           concatStr|r_count|
+--------------------+-------+
|                main|  39256|
|        main-tariffs|   6536|
|           main-news|   6274|
|        main-archive|   5849|
|         main-family|   4863|
|        main-digital|   4224|
|          main-bonus|   3495|
|   main-tariffs-news|   1189|
|   main-news-tariffs|   1131|
|main-tariffs-archive|   1037|
|   main-news-archive|   1002|
|   main-archive-news|    998|
|main-archive-tariffs|    997|
| main-family-tariffs|    922|
|    main-news-family|    919|
| main-tariffs-family|    918|
|    main-family-news|    880|
| main-archive-family|    818|
|   main-news-digital|    797|
| main-family-archive|    775|
|   main-tariffs-main|    761|
|main-tariffs-digital|    751|
|   main-digital-news|    748|
|main-digital-tariffs|    722|
|main-archive-digital|    720|
|        main-spravka|    708|
|      main-news-main|    686|
|main-digital-archive|    683|
|  main-tariffs-bonus|    667|
|     ma