# User routes on the site
## Description
**Clickstream** is a sequence of user actions on a website. It allows you to understand how users interact with the site. In this task, you need to find the most frequent custom routes.

## Input data
Input data is а table with clickstream data in file `hdfs:/data/clickstream.csv`.

### Table structure
* `user_id (int)` - Unique user identifier.
* `session_id (int)` - Unique identifier for the user session. The user's session lasts until the identifier changes.
* `event_type (string)` - Event type from the list:
    * **page** - visit to the page
    * **event** - any action on the page
    * <b>&lt;custom&gt;</b> - string with any other type
* `event_type (string)` - Page on the site.
* `timestamp (int)` - Unix-timestamp of action.

### Browser errors
Errors can sometimes occur in the user's browser - after such an error appears, we can no longer trust the data of this session and all the following lines after the error or at the same time with it are considered corrupted and **should not be counted** in statistics.

When an error occurs on the page, a random string containing the word **error** will be written to the `event_type` field.

### Sample of user session
<pre>
+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1620494781|
|    562|       507|       event|      main|1620494788|
|    562|       507|       event|      main|1620494798|
|    562|       507|        page|    family|1620494820|
|    562|       507|       event|    family|1620494828|
|    562|       507|        page|      main|1620494848|
|    562|       507|wNaxLlerrorU|      main|1620494865|
|    562|       507|       event|      main|1620494873|
|    562|       507|        page|      news|1620494875|
|    562|       507|        page|   tariffs|1620494876|
|    562|       507|       event|   tariffs|1620494884|
|    562|       514|        page|      main|1620728918|
|    562|       514|       event|      main|1620729174|
|    562|       514|        page|   archive|1620729674|
|    562|       514|        page|     bonus|1620729797|
|    562|       514|        page|   tariffs|1620731090|
|    562|       514|       event|   tariffs|1620731187|
+-------+----------+------------+----------+----------+
</pre>

#### Correct user routes for a given user:
* **Session 507**: main-family-main
* **Session 514**: main-archive-bonus-tariffs

Route elements are ordered by the time they appear in the clickstream, from earliest to latest.

The route must be accounted for completely before the end of the session or an error in the session.

## Task
You need to use the Spark SQL, Spark RDD and Spark DF interfaces to create a solution file, the lines of which contain **the 30 most frequent user routes** on the site.

Each line of the file should contain the `route` and `count` values **separated by tabs**, where:
* `route` - route on the site, consisting of pages separated by "-".
* `count` - the number of user sessions in which this route was.

The lines must be **ordered in descending order** of the `count` field.

## Criteria
You can get maximum of 3.5 points (final grade) for this assignment, depedning on the number of interface you manage to leverage. The criteria are as follows:

* 0.5 points – Spark SQL solution with 1 query
* 0.5 points – Spark SQL solution with <=2 queries
* 0.5 points – Spark RDD solution
* 0.5 points – Spark DF solution
* 0.5 points – your solution algorithm is relatively optimized, i.e.: no O^2 or O^3 complexities; appropriate object usage; no data leaks etc. This is evaluated by staff.
* 1 point – 1 on 1 screening session. During this session staff member can ask you questions regarding your solution logic, framework usage, questionable parts of your code etc. If your code is clean enough, the staff member can just ask you to solve a theoretical problem connected to Spark.


In [1]:
import pandas as pd
import numpy as np

In [2]:
df = pd.read_csv('clickstream.csv', sep='\t')

In [3]:
# Making sure clickstream is indeed sorted by timestamp
(np.abs(df['timestamp'] - df['timestamp'].sort_values())).sum()

0

In [4]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')

from pyspark.sql import SparkSession, functions as F, Window
se = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024-03-04 18:40:18,096 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [5]:
clicks = se.read.csv('hdfs:/data/clickstream.csv', sep='\t', header=True)
clicks.createOrReplaceTempView('clicks')
clicks.limit(5).toPandas()

                                                                                

Unnamed: 0,user_id,session_id,event_type,event_page,timestamp
0,562,507,page,main,1695584127
1,562,507,event,main,1695584134
2,562,507,event,main,1695584144
3,562,507,event,main,1695584147
4,562,507,wNaxLlerrorU,main,1695584154


In [6]:
def save_csv(data, path, limit=30, sep='\t',
             show=True, display=30, **kwargs):
    display = min(limit, display)

    data = [(x[0], x[1]) for x in data.take(limit)]

    df = pd.DataFrame(data, columns=['route', 'count'])
    df.to_csv(path, sep=sep, index=False, **kwargs)
    if show:
        return df.head(display)

def compare_csv(file1, file2, sep='\t', **kwargs):
    df1 = pd.read_csv(file1, sep=sep, **kwargs).reset_index(drop=True)
    df2 = pd.read_csv(file2, sep=sep, **kwargs).reset_index(drop=True)
    
    if df1.shape[0] != df2.shape[0]:
        print('Number of rows are different!')
        return False
    
    diff = df1.compare(df2, result_names=("left", "right"))
    if diff.empty:
        print('Dataframes match!')
        return True, None
    return False, diff

## SQL

In [7]:
def sql_join(table, order_by='timestamp'):
    return '''WITH min_errors AS (
                SELECT user_id AS uid, session_id AS sid, MIN(INT(timestamp)) AS min_timestamp
                FROM {table}
                WHERE INSTR(event_type, 'error') > 0
                GROUP BY user_id, session_id
            ), no_errors AS (
                SELECT user_id, session_id, event_page, {order_by},
                       lag(event_page, 1) OVER (PARTITION BY user_id, session_id ORDER BY {order_by}) AS prev_page
                FROM {table}
                LEFT JOIN min_errors ON user_id = uid AND session_id = sid
                WHERE min_timestamp IS NULL OR INT(timestamp) < INT(min_timestamp)
            ), clean AS (
                SELECT user_id, session_id, event_page, {order_by}
                FROM no_errors
                WHERE prev_page IS NULL OR event_page != prev_page
            ), routes AS (
                SELECT user_id, session_id, MAX(route) AS route
                FROM (SELECT user_id, session_id,
                             COLLECT_LIST(event_page) OVER (PARTITION BY user_id, session_id ORDER BY {order_by}) AS route
                      FROM clean) AS temp
                GROUP BY user_id, session_id
            )

            SELECT CONCAT_WS('-', route) as route, COUNT(route) as count
            FROM routes
            GROUP BY route
            ORDER BY count DESC, route DESC'''.format(table=table, order_by=order_by)

sql_routes = se.sql(sql_join('clicks')) # LIMIT 30 is omitted intentionally

In [8]:
save_csv(sql_routes, 'sql.csv')

                                                                                

Unnamed: 0,route,count
0,main,8184
1,main-archive,1113
2,main-rabota,1047
3,main-internet,897
4,main-bonus,870
5,main-news,769
6,main-tariffs,677
7,main-online,587
8,main-vklad,518
9,main-rabota-archive,170


## RDD

In [9]:
rdd_min_errors = (clicks
                  .rdd
                  .filter(lambda x: 'error' in x.event_type)
                  .map(lambda x: ((x.user_id, x.session_id), int(x.timestamp)))
                  .reduceByKey(lambda v1, v2: min(v1, v2)))

rdd_no_errors = (clicks
                 .rdd
                 .map(lambda x: ((x.user_id, x.session_id), x))
                 .leftOuterJoin(rdd_min_errors)
                 .filter(lambda x: x[1][1] is None or int(x[1][0].timestamp) < x[1][1])
                 .map(lambda x: (x[0], x[1][0])))

def clean(vals):
    res = []
    for val in sorted(vals, key=lambda x: int(x.timestamp)):
        if len(res) == 0 or res[-1] != val.event_page:
            res.append(val.event_page)
    return '-'.join(res)

rdd_clean = (rdd_no_errors
             .groupByKey()
             .mapValues(lambda vals: clean(vals)))

rdd_routes = (rdd_clean
              .map(lambda x: (x[1], 1))
              .reduceByKey(lambda x, y: x + y)
              .sortBy(lambda x: (x[1], x[0]), ascending=False))

                                                                                

In [10]:
save_csv(rdd_routes, 'rdd.csv')

                                                                                

Unnamed: 0,route,count
0,main,8184
1,main-archive,1113
2,main-rabota,1047
3,main-internet,897
4,main-bonus,870
5,main-news,769
6,main-tariffs,677
7,main-online,587
8,main-vklad,518
9,main-rabota-archive,170


In [11]:
assert compare_csv('rdd.csv', 'sql.csv')[0]

Dataframes match!


## DF

In [12]:
# I intentionally didn't cast values before to make use of casts embedded into sql and rdd solutions
clicks = clicks.withColumn('user_id', F.col('user_id').cast('int'))
clicks = clicks.withColumn('session_id', F.col('session_id').cast('int'))
clicks = clicks.withColumn('timestamp', F.col('timestamp').cast('int'))
clicks

DataFrame[user_id: int, session_id: int, event_type: string, event_page: string, timestamp: int]

In [13]:
df_min_errors = (clicks
                 .where(F.col('event_type').contains('error'))
                 .groupBy(['user_id', 'session_id'])
                 .agg(F.min('timestamp').alias('min_timestamp'))
                 .select(['user_id', 'session_id', 'min_timestamp']))

df_no_errors = (clicks
                .join(df_min_errors, ['user_id', 'session_id'], 'left')
                .where(F.col('min_timestamp').isNull() | (F.col('timestamp') < F.col('min_timestamp')))
                .withColumn('prev_page', F.lag('event_page', 1).over(Window.partitionBy(['user_id', 'session_id']).orderBy('timestamp')))
                .select(['user_id', 'session_id', 'event_page', 'timestamp', 'prev_page']))

df_clean = (df_no_errors
            .where(F.col('prev_page').isNull() | (F.col('event_page') != F.col('prev_page')))
            .withColumn('route', F.collect_list('event_page').over(Window.partitionBy(['user_id', 'session_id']).orderBy('timestamp')))
            .select(['user_id', 'session_id', 'route']))

df_routes = (df_clean
             .groupBy(['user_id', 'session_id'])
             .agg(F.max('route').alias('route'))
             .groupBy('route')
             .agg(F.count('route').alias('count'))
             .orderBy(['count', 'route'], ascending=False)
             .withColumn('route', F.concat_ws('-', 'route')))

In [14]:
save_csv(df_routes, 'df.csv')

                                                                                

Unnamed: 0,route,count
0,main,8184
1,main-archive,1113
2,main-rabota,1047
3,main-internet,897
4,main-bonus,870
5,main-news,769
6,main-tariffs,677
7,main-online,587
8,main-vklad,518
9,main-rabota-archive,170


In [15]:
assert compare_csv('df.csv', 'sql.csv')[0]

Dataframes match!


## Alternative solution

Let's assume that clicks are ordered by fuller timestamps with ms, ns etc.

Since order by is not stable and timestamp sorting messes up the order, we need to add an id index column.

In [16]:
df = pd.read_csv('clickstream.csv', sep='\t')
print((df.sort_values(['timestamp']).index != df.index).sum()) ## unstable sort

df['id'] = df.index
df.to_csv('clickstream1.csv', sep='\t', index=False)

24583


In [17]:
! hadoop fs -rm /data/clickstream1.csv
! hadoop fs -copyFromLocal clickstream1.csv /data
! hadoop fs -ls /data

Deleted /data/clickstream1.csv
Found 5 items
-rw-r--r--   1 root   supergroup   32241574 2023-09-24 20:38 /data/clickstream.csv
-rw-r--r--   1 jovyan supergroup   39130467 2024-03-04 18:41 /data/clickstream1.csv
-rw-r--r--   1 jovyan supergroup     321538 2024-03-04 18:37 /data/clickstream10000.csv
-rw-r--r--   1 jovyan supergroup        499 2024-03-05 18:23 /data/clickstream_test.csv
drwxr-xr-x   - root   supergroup          0 2023-09-24 20:38 /data/transactions


In [18]:
clicks1 = se.read.csv('hdfs:/data/clickstream1.csv', sep='\t', header=True)
clicks1.createOrReplaceTempView('clicks1')
clicks1.limit(5).toPandas()

Unnamed: 0,user_id,session_id,event_type,event_page,timestamp,id
0,562,507,page,main,1695584127,0
1,562,507,event,main,1695584134,1
2,562,507,event,main,1695584144,2
3,562,507,event,main,1695584147,3
4,562,507,wNaxLlerrorU,main,1695584154,4


In [19]:
sql_routes1 = se.sql(sql_join('clicks1', 'id')) # LIMIT 30 is omitted intentionally

In [20]:
save_csv(sql_routes1, 'sql1.csv')

                                                                                

Unnamed: 0,route,count
0,main,8184
1,main-archive,1111
2,main-rabota,1047
3,main-internet,896
4,main-bonus,872
5,main-news,769
6,main-tariffs,675
7,main-online,587
8,main-vklad,516
9,main-rabota-archive,170


In [21]:
compare_csv('sql.csv', 'sql1.csv')[1]

Unnamed: 0_level_0,count,count
Unnamed: 0_level_1,left,right
1,1113.0,1111.0
3,897.0,896.0
4,870.0,872.0
6,677.0,675.0
8,518.0,516.0
15,132.0,133.0
16,130.0,129.0
22,115.0,116.0
23,114.0,115.0
24,113.0,112.0


In [22]:
# stop Spark (and YARN application)
sc.stop()

## Given map-reduce sequence of tasks, what would be the algorithm to convert it into Spark, can one improve it in speed?

Below are mapper1.py (removes the header and reorders the columns) and reducer_clean.py (cleans the data, analogical to `clean` in spark)

In [23]:
%%file mapper1.py
import sys

_ = sys.stdin.readline().strip() # remove the header

for line in sys.stdin:
    user_id, session_id, event_type, event_page, timestamp = line.strip().split()
    print('\t'.join([user_id, session_id, timestamp, event_type, event_page]))

Overwriting mapper1.py


In [24]:
%%file reducer_clean.py
import sys


prev_session = (None, None)
prev_page = None
err = False

for line in sys.stdin:
    user_id, session_id, timestamp, event_type, event_page = line.strip().split()
    
    if prev_session != (user_id, session_id):
        prev_page = None
        err = False
    prev_session = (user_id, session_id)
    
    if not err and 'error' in event_type:
        err = True
    
    if not err:
        if prev_page is None or prev_page != event_page:
            print('\t'.join([user_id, session_id, timestamp, event_page]))
        prev_page = event_page

Overwriting reducer_clean.py


Testing map-reduce on small data slice

In [25]:
df = pd.read_csv('clickstream.csv', sep='\t')
df.iloc[pd.np.r_[33:47]].to_csv('clickstream_test.csv', sep='\t', index=False)

In [26]:
df.iloc[pd.np.r_[33:47]]

Unnamed: 0,user_id,session_id,event_type,event_page,timestamp
33,3539,849,page,bonus,1695584324
34,3539,849,event,bonus,1695584324
35,3539,849,event,bonus,1695584333
36,461,174,page,main,1695584343
37,4567,514,page,main,1695584345
38,3539,849,event,bonus,1695584348
39,4567,514,mAXExoCXerror,main,1695584351
40,4567,514,event,main,1695584357
41,3539,849,event,bonus,1695584365
42,4567,514,page,archive,1695584373


In [27]:
! python mapper1.py < clickstream_test.csv | sort -k1,1n -k2,2n -k5,5n | grep "\S" > test.csv
! cat test.csv

461	174	1695584343	page	main
461	174	1695584376	event	main
461	174	1695584376	event	main
3539	849	1695584324	event	bonus
3539	849	1695584324	page	bonus
3539	849	1695584333	event	bonus
3539	849	1695584348	event	bonus
3539	849	1695584365	event	bonus
3539	849	1695584389	page	tariffs
4567	514	1695584345	page	main
4567	514	1695584351	mAXExoCXerror	main
4567	514	1695584357	event	main
4567	514	1695584373	page	archive
4567	514	1695584381	page	internet


In [28]:
! python reducer_clean.py < test.csv

461	174	1695584343	main
3539	849	1695584324	bonus
3539	849	1695584389	tariffs
4567	514	1695584345	main


In [29]:
%%file reducer1.py
import sys


prev_session = (None, None)
prev_page = None
err = False
route = []

for line in sys.stdin:
    user_id, session_id, timestamp, event_type, event_page = line.strip().split()
    
    if prev_session != (user_id, session_id):
        prev_page = None
        err = False
        if len(route) > 0:
            print('-'.join(route))
        route = []
    prev_session = (user_id, session_id)
    
    if not err and 'error' in event_type:
        err = True
    
    if not err:
        if prev_page is None or prev_page != event_page:
            route.append(event_page)
        prev_page = event_page

if len(route) > 0:
    print('-'.join(route))

Overwriting reducer1.py


In [30]:
! python reducer1.py < test.csv > clean.csv
! cat clean.csv

main
bonus-tariffs
main


Works as expected!

We need to do another map-reduce to get the desired result.

In [31]:
%%file mapper2.py
import sys


for line in sys.stdin:
    print(line)

Overwriting mapper2.py


In [32]:
%%file reducer2.py
import sys


prev_route = None
count = 0

for line in sys.stdin:
    route = line.strip()
    
    if prev_route is not None and prev_route != route:
        print('\t'.join([prev_route, str(count)]))
        count = 0
    
    count += 1
    prev_route = route

print('\t'.join([route, str(count)]))

Overwriting reducer2.py


In [33]:
! cat clean.csv | sort -k1,1n | python reducer2.py

bonus-tariffs	1
main	2


Running in hadoop

In [34]:
! hadoop fs -rm -r /routes

! mapred streaming \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1n -k2,2n -k3,3n" \
  -D stream.num.map.output.key.fields=3 \
  -D mapreduce.partition.keypartitioner.options=-k1,3 \
  -input /data/clickstream.csv \
  -output /routes \
  -mapper "/opt/conda/bin/python3.10 mapper1.py" \
  -reducer "/opt/conda/bin/python3.10 reducer1.py" \
  -file mapper1.py \
  -file reducer1.py \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

Deleted /routes
2024-03-04 18:41:34,141 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper1.py, reducer1.py] [/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.4.jar] /tmp/streamjob1288169692454003578.jar tmpDir=null
2024-03-04 18:41:34,837 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2024-03-04 18:41:34,987 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2024-03-04 18:41:35,204 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/jovyan/.staging/job_1709671001671_0018
2024-03-04 18:41:35,497 INFO mapred.FileInputFormat: Total input files to process : 1
2024-03-04 18:41:35,511 INFO net.NetworkTopology: Adding a new node: /default-rack/127.0.0.1:9866
2024-03-04 18:41:35,559 INFO mapreduce.JobSubmitter: number of splits:2
2024-03-04 18:41:35,685 INFO mapreduce.JobSubmitte

In [35]:
%%bash
hadoop fs -cat "/routes/*" | sort -k 1,1 -t $'\t' -n | head -n 10

archive-main-archive	
archive-main-archive-news-vklad-rabota	
bonus-main-archive-internet-news-online-internet-online-tariffs-online-archive-rabota	
bonus-main-bonus-archive-tariffs-archive	
digital-main-bonus-news	
internet-main-internet	
main	
main	
main	
main	


In [36]:
! hadoop fs -rm -r /counts

! mapred streaming \
  -input /routes \
  -output /counts \
  -mapper "/opt/conda/bin/python3.10 mapper2.py" \
  -reducer "/opt/conda/bin/python3.10 reducer2.py" \
  -file mapper2.py \
  -file reducer2.py

Deleted /counts
2024-03-04 18:41:59,791 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper2.py, reducer2.py] [/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.4.jar] /tmp/streamjob9087059577938724142.jar tmpDir=null
2024-03-04 18:42:00,496 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2024-03-04 18:42:00,641 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2024-03-04 18:42:00,834 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/jovyan/.staging/job_1709671001671_0019
2024-03-04 18:42:01,113 INFO mapred.FileInputFormat: Total input files to process : 1
2024-03-04 18:42:01,160 INFO mapreduce.JobSubmitter: number of splits:2
2024-03-04 18:42:01,261 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1709671001671_0019
2024-03-04 18:42:01,261 INFO mapreduce.JobSub

In [37]:
%%bash
hadoop fs -cat "/counts/*" | sort -k 2,2 -t $'\t' -n -r | head -n 10

	48523
main	8185
main-archive	1111
main-rabota	1046
main-internet	896
main-bonus	870
main-news	768
main-tariffs	677
main-online	587
main-vklad	516


Count are a little different (I couldn't figure out why), but route order is correct

### So, to convert map-reduce to Spark it is necessary to follow the rules:
1) Mapper translates to select and where (filtering) clauses;
2) Reducer implements sorting (order by), grouping (group by) and any operations which work with multiple rows (aggregate functions).
3) For any aggregate function that requires resorting the data, separate reducer is necessary (it's possible to do multiple map-reduce operations in one call using [hadoop-multiple-streaming](https://github.com/hyonaldo/hadoop-multiple-streaming)).

In terms of speed, Spark will be generally faster since it uses internal optimization techniques (especially critical for aggregate functions) and usually makes better use of caching and parallelization.

## Can the task be solved without joins? In which case would it be faster?

Below is solution without joins.

In [38]:
findspark.init()
sc = pyspark.SparkContext(appName='jupyter')
se = SparkSession(sc)

2024-03-04 18:42:18,076 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [39]:
clicks = se.read.csv('hdfs:/data/clickstream.csv', sep='\t', header=True)
clicks.createOrReplaceTempView('clicks')
clicks.limit(5).toPandas()

                                                                                

Unnamed: 0,user_id,session_id,event_type,event_page,timestamp
0,562,507,page,main,1695584127
1,562,507,event,main,1695584134
2,562,507,event,main,1695584144
3,562,507,event,main,1695584147
4,562,507,wNaxLlerrorU,main,1695584154


In [40]:
def sql_count(table, order_by='timestamp'):
    return '''
            WITH min_errors AS (
                SELECT user_id, session_id, event_page, {order_by},
                    max(CASE WHEN INSTR(event_type, 'error') > 0 THEN 1 ELSE 0 END)
                        OVER (PARTITION BY user_id, session_id ORDER BY {order_by}) AS err
                FROM {table}
            ), no_errors AS (
                SELECT user_id, session_id, event_page, timestamp,
                       lag(event_page, 1) OVER (PARTITION BY user_id, session_id ORDER BY timestamp) AS prev_page
                FROM min_errors
                WHERE err = 0
            ), clean AS (
                SELECT user_id, session_id, event_page, timestamp
                FROM no_errors
                WHERE prev_page IS NULL OR event_page != prev_page
            )

            SELECT DISTINCT user_id, session_id
            FROM clean'''.format(table=table, order_by=order_by)

def sql_max(table, order_by='timestamp'):
    return '''
            WITH min_errors AS (
                SELECT user_id, session_id, event_page, {order_by},
                    max(CASE WHEN INSTR(event_type, 'error') > 0 THEN 1 ELSE 0 END)
                        OVER (PARTITION BY user_id, session_id ORDER BY {order_by}) AS err
                FROM {table}
            ), no_errors AS (
                SELECT user_id, session_id, event_page, {order_by},
                       lag(event_page, 1) OVER (PARTITION BY user_id, session_id ORDER BY {order_by}) AS prev_page
                FROM min_errors
                WHERE err = 0
            ), clean AS (
                SELECT user_id, session_id, event_page, {order_by}
                FROM no_errors
                WHERE prev_page IS NULL OR event_page != prev_page
            ), routes AS (
                SELECT user_id, session_id, MAX(route) AS route
                FROM (SELECT user_id, session_id,
                             COLLECT_LIST(event_page) OVER (PARTITION BY user_id, session_id ORDER BY {order_by}) AS route
                      FROM clean) AS temp
                GROUP BY user_id, session_id
            )

            SELECT CONCAT_WS('-', route) as route, COUNT(route) as count
            FROM routes
            GROUP BY route
            ORDER BY count DESC, route DESC'''.format(table=table, order_by=order_by)

sql_routes2 = se.sql(sql_max('clicks')) # LIMIT 30 is omitted intentionally

In [41]:
save_csv(sql_routes2, 'sql2.csv')

                                                                                

Unnamed: 0,route,count
0,main,8184
1,main-archive,1113
2,main-rabota,1047
3,main-internet,897
4,main-bonus,870
5,main-news,769
6,main-tariffs,677
7,main-online,587
8,main-vklad,518
9,main-rabota-archive,170


In [42]:
compare_csv('sql.csv', 'sql2.csv')

Dataframes match!


(True, None)

This solution will be faster the smaller is the number of user session (SELECT DISTINCT user_id, session_id).

In our case, there are 48522 distinct user sessions and aggregate (max) solution is actually faster than join solution.

In [43]:
se.sql(sql_count('clicks')).count()

                                                                                

48522

In [44]:
%%time
_ = se.sql(sql_join('clicks')).take(30)



CPU times: user 13.3 ms, sys: 0 ns, total: 13.3 ms
Wall time: 4.04 s


                                                                                

In [45]:
%%time
_ = sql_routes2.take(30)



CPU times: user 1 ms, sys: 9.25 ms, total: 10.3 ms
Wall time: 2.89 s


                                                                                

If we take only 10000 first rows from the clickstream, there are 521 distinct user session.

Join solution becomes almost twice as fast.

In [46]:
df = pd.read_csv('clickstream.csv', sep='\t')
df[:10000].to_csv('clickstream10000.csv', sep='\t', index=False)

In [47]:
! hadoop fs -rm /data/clickstream10000.csv
! hadoop fs -copyFromLocal clickstream10000.csv /data
! hadoop fs -ls /data

Deleted /data/clickstream10000.csv
Found 5 items
-rw-r--r--   1 root   supergroup   32241574 2023-09-24 20:38 /data/clickstream.csv
-rw-r--r--   1 jovyan supergroup   39130467 2024-03-04 18:41 /data/clickstream1.csv
-rw-r--r--   1 jovyan supergroup     321538 2024-03-04 18:42 /data/clickstream10000.csv
-rw-r--r--   1 jovyan supergroup        499 2024-03-05 18:23 /data/clickstream_test.csv
drwxr-xr-x   - root   supergroup          0 2023-09-24 20:38 /data/transactions


In [48]:
clicks10000 = se.read.csv('hdfs:/data/clickstream10000.csv', sep='\t', header=True)
clicks10000.createOrReplaceTempView('clicks10000')
clicks10000.limit(5).toPandas()

Unnamed: 0,user_id,session_id,event_type,event_page,timestamp
0,562,507,page,main,1695584127
1,562,507,event,main,1695584134
2,562,507,event,main,1695584144
3,562,507,event,main,1695584147
4,562,507,wNaxLlerrorU,main,1695584154


In [49]:
clicks10000.count()

10000

In [50]:
se.sql(sql_count('clicks10000')).count()

521

In [51]:
%%time
_ = se.sql(sql_join('clicks10000')).take(30) # LIMIT 30 is omitted intentionally

CPU times: user 0 ns, sys: 4.98 ms, total: 4.98 ms
Wall time: 684 ms


In [52]:
%%time
_ = se.sql(sql_max('clicks10000')).take(30) # LIMIT 30 is omitted intentionally

CPU times: user 0 ns, sys: 4.91 ms, total: 4.91 ms
Wall time: 339 ms


In [53]:
# stop Spark (and YARN application)
sc.stop()