In [1]:
from pyspark.sql.functions import *
from pyspark.sql.window import *
from numpy import inf

In [2]:
path_to_csv = '/data/lsml/sga/clickstream.csv'

In [3]:
clickstream = spark.read.options(header=True, delimiter='\t', inferSchema=True).csv(path_to_csv)

In [4]:
# create column stop_time where time of error is recorded
clickstream = clickstream.withColumn("stop_time", when(clickstream.event_type.rlike("error"), 
                                                       clickstream.timestamp).otherwise(inf))

In [5]:
# find time of error for every user session
stop_time = clickstream.groupBy('session_id', 'user_id').min('stop_time')

In [6]:
# fill out column stop_time where time of error is recorded
clickstream = clickstream.join(stop_time, ['session_id', 'user_id'], how='left')

In [7]:
# add 1 to error time in timestamp 
clickstream = clickstream.withColumn("timestamp", when(clickstream.event_type.rlike("error"), 
                                                       clickstream.timestamp + 1).otherwise(clickstream.timestamp))

In [8]:
# remove all records after error occured
clickstream = clickstream.filter(clickstream['timestamp'] <= clickstream['min(stop_time)'])

In [9]:
# create column LAG where column event_page is offset by one
partition = Window.partitionBy("session_id", 'user_id').orderBy('timestamp')
clickstream = clickstream.withColumn("LAG",lag("event_page",1).over(partition))

In [10]:
# remove diplicate records in event_page
clickstream = clickstream.filter(expr("event_page is distinct from LAG"))

In [11]:
# create route column
clickstream = clickstream.groupby("session_id", 
                                  'user_id').agg(concat_ws("-", collect_list(clickstream.event_page)).alias('route'))

In [12]:
output = clickstream.select('route').groupBy('route').count().orderBy('count', ascending=False).limit(30)

In [13]:
output = output.toPandas()

In [14]:
output.to_csv('/home/aabilov/output.csv', header=False, index=False, sep='\t')

In [15]:
!cat /home/aabilov/output.csv

main	39752
main-tariffs	6606
main-news	6349
main-archive	5924
main-family	4910
main-digital	4277
main-bonus	3535
main-tariffs-news	1201
main-news-tariffs	1146
main-tariffs-archive	1050
main-news-archive	1026
main-archive-tariffs	1009
main-archive-news	1008
main-news-family	932
main-family-tariffs	927
main-tariffs-family	923
main-family-news	890
main-archive-family	826
main-news-digital	803
main-tariffs-main	791
main-family-archive	777
main-tariffs-digital	761
main-digital-news	755
main-digital-tariffs	731
main-archive-digital	724
main-spravka	712
main-news-main	701
main-digital-archive	688
main-tariffs-bonus	671
main-archive-main	631


In [16]:
! curl -d "$(cat /home/aabilov/output.csv)" hadoop2-00.yandex.ru:8008/sga/task_hive

Great job! Secret keyword is 'HiveMind'
