In [1]:
import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext(appName='sgatg')

from pyspark.sql import SparkSession, Row
spark = SparkSession(sc)

LIMIT = 30

24/01/21 11:07:16 WARN Utils: Your hostname, Tatianas-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.15 instead (on interface en0)
24/01/21 11:07:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/21 11:07:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
! head -n 6 clickstream.csv

user_id	session_id	event_type	event_page	timestamp
0	874	page	main	1696371064
0	874	event	main	1696372696
0	874	event	main	1696373564
0	874	page	rabota	1696374894
0	874	event	rabota	1696377393


# SQL Solution

In [3]:
data = spark.read.option("header", True).csv("clickstream.csv", sep = '\t')
data.createOrReplaceTempView("clickstream")

sql_result = spark.sql("""
SELECT 
    concat_ws("-", route_list) as route, count(*) as count
FROM
(
    SELECT collect_list(event_page) as route_list
    FROM
    (
        SELECT
            user_id, session_id, event_page,
            lag(event_page) OVER(PARTITION BY user_id, session_id ORDER BY timestamp) as lag_page,
            any(contains(event_type,'error')) OVER(PARTITION BY user_id, session_id ORDER BY timestamp) as prev
        FROM clickstream
        ORDER BY user_id, session_id, timestamp
    )
    WHERE !prev and ((event_page != lag_page) OR isnull(lag_page))
    GROUP BY user_id, session_id
)
GROUP BY route_list
ORDER by count DESC
LIMIT """ + str(LIMIT))
sql_result.write.option("sep","\t").mode('overwrite').csv('sql_results')

sql_result.show(n=LIMIT, truncate=False)

+---------------------+-----+
|route                |count|
+---------------------+-----+
|main                 |8184 |
|main-archive         |1113 |
|main-rabota          |1047 |
|main-internet        |897  |
|main-bonus           |870  |
|main-news            |769  |
|main-tariffs         |677  |
|main-online          |587  |
|main-vklad           |518  |
|main-rabota-archive  |170  |
|main-archive-rabota  |167  |
|main-bonus-archive   |143  |
|main-rabota-bonus    |139  |
|main-news-rabota     |135  |
|main-bonus-rabota    |135  |
|main-archive-internet|132  |
|main-rabota-news     |130  |
|main-internet-rabota |129  |
|main-archive-news    |126  |
|main-rabota-internet |124  |
|main-internet-archive|123  |
|main-archive-bonus   |117  |
|main-internet-bonus  |115  |
|main-tariffs-internet|114  |
|main-news-archive    |113  |
|main-news-internet   |109  |
|main-archive-tariffs |104  |
|main-internet-news   |103  |
|main-tariffs-archive |103  |
|main-rabota-main     |94   |
+---------

# RDD Solution

In [4]:
rdd = spark.sparkContext.textFile("clickstream.csv")  # we do not drop header line, but it will meet just 1 time
                                                      # and won't be in most popular routes
rdd = rdd.map(lambda line: line.split("\t"))
rdd = rdd.map(lambda x: (x[0] + '-' + x[1], 'error' if x[2].find('error') != -1 else x[3])) # make 
                                        # user_id-session_id string as a key and page or 'error' as value
                                        # 'error' we write only for pages on which event contained word 'error'
rdd = rdd.groupByKey()  # create list of pages or errors if event contained word 'error' for every user-session id

def drop_starting_from_error(line):
    user_session, pages = line
    correct_pages = []
    prev_page = None
    for page in pages:
        if page == 'error':
            break
        if page != prev_page:
            correct_pages.append(page)
            prev_page = page
#     if '-'.join(correct_pages) == 'main-internet-tariffs-main':
#         print(user_session)
    return ('-'.join(correct_pages), user_session)

rdd = rdd.map(drop_starting_from_error)  # drop all pages starting from the one on which error occured
                                         # return route and user-session ids
rdd = rdd.groupByKey()  # create list of user_session ids for every route
rdd = rdd.map(lambda x: (x[0], len(set(x[1]))))
rdd = rdd.sortBy(lambda x: -x[1])
rdd = rdd.map(lambda x: x[0] + '\t' + str(x[1]))
with open('rdd_results.csv', 'w') as file:
    file.write('\n'.join(rdd.take(LIMIT)))
for line in rdd.take(LIMIT):
    print(line)

                                                                                

main	8185
main-archive	1113
main-rabota	1047
main-internet	897
main-bonus	870
main-news	769
main-tariffs	677
main-online	587
main-vklad	518
main-rabota-archive	170
main-archive-rabota	167
main-bonus-archive	143
main-rabota-bonus	139
main-bonus-rabota	135
main-news-rabota	135
main-archive-internet	132
main-rabota-news	130
main-internet-rabota	129
main-archive-news	126
main-rabota-internet	124
main-internet-archive	123
main-archive-bonus	117
main-internet-bonus	115
main-tariffs-internet	113
main-news-archive	113
main-news-internet	109
main-archive-tariffs	104
main-internet-news	103
main-tariffs-archive	103
main-rabota-main	94


# DF Solution

In [5]:
from pyspark.sql.functions import lower, when, udf, collect_list
from pyspark.sql.types import StringType

df = spark.read.option("delimiter", "\t").option("header", True).csv("clickstream.csv")
# df.event_page = df
df.show(6)

+-------+----------+----------+----------+----------+
|user_id|session_id|event_type|event_page| timestamp|
+-------+----------+----------+----------+----------+
|      0|       874|      page|      main|1696371064|
|      0|       874|     event|      main|1696372696|
|      0|       874|     event|      main|1696373564|
|      0|       874|      page|    rabota|1696374894|
|      0|       874|     event|    rabota|1696377393|
|      0|       874|      page|    online|1696378229|
+-------+----------+----------+----------+----------+
only showing top 6 rows



In [6]:
df_small = df.withColumn("event_page", when(df['event_type'].contains('error'), 'error')\
              .otherwise(df["event_page"]))[['user_id', 'session_id', 'event_page', 'timestamp']]
df_small.show(6)

+-------+----------+----------+----------+
|user_id|session_id|event_page| timestamp|
+-------+----------+----------+----------+
|      0|       874|      main|1696371064|
|      0|       874|      main|1696372696|
|      0|       874|      main|1696373564|
|      0|       874|    rabota|1696374894|
|      0|       874|    rabota|1696377393|
|      0|       874|    online|1696378229|
+-------+----------+----------+----------+
only showing top 6 rows



In [7]:
df_pages_w_errors = df_small.groupby(['user_id', 'session_id'])\
           .agg(collect_list(df_small.event_page)\
           .alias('route'))[['route']]
df_pages_w_errors.show(6)

+--------------------+
|               route|
+--------------------+
|              [main]|
|              [main]|
|[main, internet, ...|
|[main, archive, e...|
|[main, main, main...|
|[main, news, news...|
+--------------------+
only showing top 6 rows



In [8]:
def route_from_pages(pages):
    route = []
    prev_page = None
    for page in pages:
        if page == 'error':
            break
        if page != prev_page:
            route.append(page)
            prev_page = page
    return '-'.join(route)

udf_route_from_pages = udf(route_from_pages, StringType())

df_routes = df_pages_w_errors.withColumn('route', udf_route_from_pages('route'))

df_routes.show(6)

+--------------------+
|               route|
+--------------------+
|                main|
|                main|
|main-internet-arc...|
|        main-archive|
|main-bonus-intern...|
|main-news-interne...|
+--------------------+
only showing top 6 rows



In [9]:
df_routes_w_counts = df_routes.groupBy('route').count().sort('count', ascending=[False]).limit(LIMIT)

df_routes_w_counts.write.option("sep","\t").mode('overwrite').csv('df_results')

df_routes_w_counts.show(LIMIT)

+--------------------+-----+
|               route|count|
+--------------------+-----+
|                main| 8185|
|        main-archive| 1113|
|         main-rabota| 1047|
|       main-internet|  896|
|          main-bonus|  870|
|           main-news|  769|
|        main-tariffs|  677|
|         main-online|  587|
|          main-vklad|  518|
| main-rabota-archive|  170|
| main-archive-rabota|  167|
|  main-bonus-archive|  143|
|   main-rabota-bonus|  139|
|   main-bonus-rabota|  135|
|    main-news-rabota|  135|
|main-archive-inte...|  132|
|    main-rabota-news|  130|
|main-internet-rabota|  129|
|   main-archive-news|  126|
|main-rabota-internet|  124|
|main-internet-arc...|  123|
|  main-archive-bonus|  117|
| main-internet-bonus|  115|
|main-tariffs-inte...|  113|
|   main-news-archive|  113|
|  main-news-internet|  109|
|main-archive-tariffs|  104|
|  main-internet-news|  103|
|main-tariffs-archive|  103|
|    main-rabota-main|   94|
+--------------------+-----+



24/01/21 11:07:30 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
