# User routes on the site
## Description
**Clickstream** is a sequence of user actions on a website. It allows you to understand how users interact with the site. In this task, you need to find the most frequent custom routes.

## Input data
Input data is а table with clickstream data in file `hdfs:/data/clickstream.csv`.

### Table structure
* `user_id (int)` - Unique user identifier.
* `session_id (int)` - Unique identifier for the user session. The user's session lasts until the identifier changes.
* `event_type (string)` - Event type from the list:
    * **page** - visit to the page
    * **event** - any action on the page
    * <b>&lt;custom&gt;</b> - string with any other type
* `event_type (string)` - Page on the site.
* `timestamp (int)` - Unix-timestamp of action.

### Browser errors
Errors can sometimes occur in the user's browser - after such an error appears, we can no longer trust the data of this session and all the following lines after the error or at the same time with it are considered corrupted and **should not be counted** in statistics.

When an error occurs on the page, a random string containing the word **error** will be written to the `event_type` field.

### Sample of user session
<pre>
+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1620494781|
|    562|       507|       event|      main|1620494788|
|    562|       507|       event|      main|1620494798|
|    562|       507|        page|    family|1620494820|
|    562|       507|       event|    family|1620494828|
|    562|       507|        page|      main|1620494848|
|    562|       507|wNaxLlerrorU|      main|1620494865|
|    562|       507|       event|      main|1620494873|
|    562|       507|        page|      news|1620494875|
|    562|       507|        page|   tariffs|1620494876|
|    562|       507|       event|   tariffs|1620494884|
|    562|       514|        page|      main|1620728918|
|    562|       514|       event|      main|1620729174|
|    562|       514|        page|   archive|1620729674|
|    562|       514|        page|     bonus|1620729797|
|    562|       514|        page|   tariffs|1620731090|
|    562|       514|       event|   tariffs|1620731187|
+-------+----------+------------+----------+----------+
</pre>

#### Correct user routes for a given user:
* **Session 507**: main-family-main
* **Session 514**: main-archive-bonus-tariffs

Route elements are ordered by the time they appear in the clickstream, from earliest to latest.

The route must be accounted for completely before the end of the session or an error in the session.

## Task
You need to use the Spark SQL, Spark RDD and Spark DF interfaces to create a solution file, the lines of which contain **the 30 most frequent user routes** on the site.

Each line of the file should contain the `route` and `count` values **separated by tabs**, where:
* `route` - route on the site, consisting of pages separated by "-".
* `count` - the number of user sessions in which this route was.

The lines must be **ordered in descending order** of the `count` field.

## Criteria
You can get maximum of 3.5 points (final grade) for this assignment, depedning on the number of interface you manage to leverage. The criteria are as follows:

* 0.5 points – Spark SQL solution with 1 query
* 0.5 points – Spark SQL solution with <=2 queries
* 0.5 points – Spark RDD solution
* 0.5 points – Spark DF solution
* 0.5 points – your solution algorithm is relatively optimized, i.e.: no O^2 or O^3 complexities; appropriate object usage; no data leaks etc. This is evaluated by staff.
* 1 point – 1 on 1 screening session. During this session staff member can ask you questions regarding your solution logic, framework usage, questionable parts of your code etc. If your code is clean enough, the staff member can just ask you to solve a theoretical problem connected to Spark.


# Solution

## Data 

In [1]:
!hdfs dfs -ls /data

Found 2 items
-rw-r--r--   1 root supergroup   32241574 2023-09-24 20:38 /data/clickstream.csv
drwxr-xr-x   - root supergroup          0 2023-09-24 20:38 /data/transactions


In [2]:
!hdfs dfs -head /data/clickstream.csv

user_id	session_id	event_type	event_page	timestamp
562	507	page	main	1695584127
562	507	event	main	1695584134
562	507	event	main	1695584144
562	507	event	main	1695584147
562	507	wNaxLlerrorU	main	1695584154
562	507	event	main	1695584154
562	507	event	main	1695584154
562	507	event	main	1695584160
562	507	page	rabota	1695584166
562	507	event	rabota	1695584174
562	507	event	rabota	1695584181
562	507	event	rabota	1695584189
562	507	page	main	1695584194
562	507	event	main	1695584204
562	507	event	main	1695584211
562	507	event	main	1695584211
562	507	event	main	1695584219
562	507	page	bonus	1695584221
562	507	page	online	1695584222
562	507	event	online	1695584230
3539	849	page	main	1695584238
3539	849	event	main	1695584252
3539	849	page	online	1695584261
3539	849	page	bonus	1695584269
3539	849	event	bonus	1695584278
3539	849	page	news	1695584285
3539	849	page	main	1695584291
3539	849	event	main	1695584301
3539	849	page	news	1695584306
3539	849	event	news	1695584307
3539	849	page	vklad	169558

In [3]:
# Function to print results in the file and on the screen

def print_result(name, result):
    file = open(f'{name}_approach.txt', 'w')
    for path, count in result[:-1]:
        file.write(f"{path}\t{str(count)}\n")
        print(f"{path}\t{str(count)}")
    path, count = result[-1]
    file.write(f"{path}\t{str(count)}")
    file.close()
    print(f"{path}\t{str(count)}")
    return 'Done!'

In feature `event_type` there are 3 main types: event, page and errors of different kinds. `event_type` == 'event' is redundant for our task, thus, for analysis we will take into account only rows where `event_type` is 'page' or contains word 'error'.

---
# 1. `PANDAS` approach: *This is for reference only!*

In [4]:
import pandas as pd
from collections import Counter

In [5]:
df = pd.read_csv('clickstream.csv', sep='\t')
display(df.head())
display(df.shape)

Unnamed: 0,user_id,session_id,event_type,event_page,timestamp
0,562,507,page,main,1695584127
1,562,507,event,main,1695584134
2,562,507,event,main,1695584144
3,562,507,event,main,1695584147
4,562,507,wNaxLlerrorU,main,1695584154


(1000000, 5)

Let us drop out rows where `event_type` == 'event'

In [6]:
df = df[df['event_type'] != 'event']
display(df.head(3))
display(df.shape)

Unnamed: 0,user_id,session_id,event_type,event_page,timestamp
0,562,507,page,main,1695584127
4,562,507,wNaxLlerrorU,main,1695584154
8,562,507,page,rabota,1695584166


(421610, 5)

In [7]:
df['error'] = 0
df['error'].where(df['event_type'] == 'page', 1, inplace=True)
display(df.head(3))
display(df.shape)

Unnamed: 0,user_id,session_id,event_type,event_page,timestamp,error
0,562,507,page,main,1695584127,0
4,562,507,wNaxLlerrorU,main,1695584154,1
8,562,507,page,rabota,1695584166,0


(421610, 6)

In [8]:
error_table = df[['user_id', 'session_id', 'timestamp']][df['error'] == 1]
error_table.rename(columns={'timestamp': 'error_timestamp'}, inplace=True)
display(error_table.head(3))
display(error_table.shape)

Unnamed: 0,user_id,session_id,error_timestamp
4,562,507,1695584154
39,4567,514,1695584351
70,461,174,1695584529


(20899, 3)

In [9]:
print('With duplicates')
error_table[(error_table['user_id'] == 4567) & (error_table['session_id'] == 514)]

With duplicates


Unnamed: 0,user_id,session_id,error_timestamp
39,4567,514,1695584351
379,4567,514,1695586124
774,4567,514,1695588205


By dropping duplicates we will get the first error in a session

In [10]:
error_table1 = error_table.drop_duplicates(subset=['user_id', 'session_id'], keep='first')
display(error_table1.head(3))
print('Without duplicates')
error_table1[(error_table1['user_id'] == 4567) & (error_table1['session_id'] == 514)]

Unnamed: 0,user_id,session_id,error_timestamp
4,562,507,1695584154
39,4567,514,1695584351
70,461,174,1695584529


Without duplicates


Unnamed: 0,user_id,session_id,error_timestamp
39,4567,514,1695584351


In [11]:
result = pd.merge(df, error_table1, how='left', on=['user_id','session_id'])
display(result.head(3))
display(result.shape)

Unnamed: 0,user_id,session_id,event_type,event_page,timestamp,error,error_timestamp
0,562,507,page,main,1695584127,0,1695584000.0
1,562,507,wNaxLlerrorU,main,1695584154,1,1695584000.0
2,562,507,page,rabota,1695584166,0,1695584000.0


(421610, 7)

In [12]:
result = result.drop(result[(result['error_timestamp'] - result['timestamp'] <= 0)].index)
result.head(3)

Unnamed: 0,user_id,session_id,event_type,event_page,timestamp,error,error_timestamp
0,562,507,page,main,1695584127,0,1695584000.0
6,3539,849,page,main,1695584238,0,
7,3539,849,page,online,1695584261,0,


In [13]:
result.shape

(294335, 7)

In [14]:
routes = result.groupby(['user_id', 'session_id'])[['event_page']].agg(lambda x: x)['event_page'].values.tolist()

In [15]:
from collections import defaultdict
d = defaultdict(int)
for route in routes:
    if not isinstance(route, str):
        route = '-'.join(route)
    d[route] += 1


In [16]:
res = sorted(d.items(), key=lambda x: x[1], reverse=True)[:30]

In [17]:
print_result('REF1_pure_pandas', res)

main	8090
main-archive	1096
main-rabota	1039
main-internet	880
main-bonus	865
main-news	760
main-tariffs	669
main-online	584
main-vklad	514
main-rabota-archive	167
main-archive-rabota	167
main-bonus-archive	139
main-rabota-bonus	137
main-news-rabota	134
main-bonus-rabota	133
main-archive-internet	131
main-rabota-news	129
main-internet-rabota	128
main-archive-news	125
main-internet-archive	123
main-rabota-internet	123
main-archive-bonus	117
main-tariffs-internet	114
main-internet-bonus	114
main-news-archive	112
main-news-internet	108
main-archive-tariffs	103
main-tariffs-archive	102
main-internet-news	102
main-main	94


'Done!'

### Brute force `pandas`

In [18]:
def pandas_approach(row):
    
    user = row['user_id']
    session = row['session_id']
    event_type = row['event_type']
    event_page = row['event_page']
    time = row['timestamp']
    
    if user not in user_session:
        user_session[user] = {'stop_list': []}
    if session not in user_session[user]:
        user_session[user][session] = {'path': [],
                                       'time': []}                                       
    
    if session not in user_session[user]['stop_list']:
        if event_type in ['page', 'event']:
            if event_type == 'page':                
                user_session[user][session]['path'].append(event_page)
                user_session[user][session]['time'].append(time)
        else:
            user_session[user]['stop_list'].append(session)
            if time == user_session[user][session]['time'][-1]:
                user_session[user][session]['path'] = user_session[user][session]['path'][:-1] 
                
    return 'Done!'

In [19]:
user_session = {}
df.apply(pandas_approach, axis=1);

In [20]:
paths = ["-".join(session['path']) for user in user_session.values() for session in user.values() if not isinstance(session, list)]
result = sorted(Counter(paths).items(), key=lambda x: (x[1], list(x[0])), reverse=True)[:30]

In [21]:
print_result('REF2_dict', result)

main	8090
main-archive	1096
main-rabota	1039
main-internet	880
main-bonus	865
main-news	760
main-tariffs	669
main-online	584
main-vklad	514
main-rabota-archive	167
main-archive-rabota	167
main-bonus-archive	139
main-rabota-bonus	137
main-news-rabota	134
main-bonus-rabota	133
main-archive-internet	131
main-rabota-news	129
main-internet-rabota	128
main-archive-news	125
main-rabota-internet	123
main-internet-archive	123
main-archive-bonus	117
main-tariffs-internet	114
main-internet-bonus	114
main-news-archive	112
main-news-internet	108
main-archive-tariffs	103
main-tariffs-archive	102
main-internet-news	102
main-main	94


'Done!'

---
# 2. `Spark SQL` approach

## Spark SQL in ONE query

In [22]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')

from pyspark.sql import SparkSession, Row
se = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-10-10 16:42:02,712 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [23]:
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import col

In [24]:
path = '/data/clickstream.csv'

df = se.read.options(delimiter="\t", header=True).csv(path)
df.registerTempTable("df")
df.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1695584127|
|    562|       507|       event|      main|1695584134|
|    562|       507|       event|      main|1695584144|
|    562|       507|       event|      main|1695584147|
|    562|       507|wNaxLlerrorU|      main|1695584154|
+-------+----------+------------+----------+----------+
only showing top 5 rows



                                                                                

In [25]:
result = se.sql( \
'''
 SELECT route, COUNT(route) as count
    FROM (
    SELECT concat_ws('-', (collect_list(struct(joined_sql.timestamp, joined_sql.event_page))).event_page) as route, joined_sql.user_id, joined_sql.session_id
        FROM (
            SELECT *
                FROM (
                    SELECT INT(user_id), INT(session_id), event_type, event_page, INT(timestamp)
                        FROM df
                        WHERE df.event_type != 'event'
                        order by user_id, session_id, timestamp) as spark_sql
                LEFT JOIN (
                            SELECT err_table.err_user_id, err_table.err_session_id, min(err_table.error_timestamp) as error_timestamp
                                FROM (
                                SELECT user_id as err_user_id, session_id as err_session_id, timestamp as error_timestamp
                                    FROM (
                                    SELECT INT(user_id), INT(session_id), event_type, event_page, INT(timestamp), INT(event_type like '%error%') as error
                                FROM df
                                WHERE df.event_type != 'event'
                                order by user_id, session_id, timestamp
                                    ) as spark_sql
                                    WHERE error == 1
                                    order by user_id, session_id, error_timestamp) as err_table
                                GROUP BY err_table.err_user_id, err_table.err_session_id
                           ) as error_table
                 ON spark_sql.user_id == error_table.err_user_id and 
                   spark_sql.session_id == error_table.err_session_id
                WHERE spark_sql.timestamp < error_table.error_timestamp or isnull(error_timestamp)
                ORDER BY user_id, session_id, timestamp) as joined_sql
        GROUP BY joined_sql.user_id, joined_sql.session_id
        ORDER BY joined_sql.user_id, joined_sql.session_id) as final
    GROUP BY route
    ORDER BY count desc        
''')

In [26]:
print_result('spark_sql', result.collect()[:30])

                                                                                

main	8090
main-archive	1096
main-rabota	1039
main-internet	880
main-bonus	865
main-news	760
main-tariffs	669
main-online	584
main-vklad	514
main-archive-rabota	167
main-rabota-archive	167
main-bonus-archive	139
main-rabota-bonus	137
main-news-rabota	134
main-bonus-rabota	133
main-archive-internet	131
main-rabota-news	129
main-internet-rabota	128
main-archive-news	125
main-rabota-internet	123
main-internet-archive	123
main-archive-bonus	117
main-tariffs-internet	114
main-internet-bonus	114
main-news-archive	112
main-news-internet	108
main-archive-tariffs	103
main-internet-news	102
main-tariffs-archive	102
main-main	94


'Done!'

In [27]:
sc.stop()

## DRAFTS for SPARK SQL

Let us drop out rows where `event_type` == 'event'

In [28]:
spark_sql = se.sql( \
'''
SELECT INT(user_id), INT(session_id), event_type, event_page, INT(timestamp), INT(event_type like '%error%') as error
    FROM df
    WHERE df.event_type != 'event'
    order by user_id, session_id, timestamp
    
''')
spark_sql.registerTempTable('spark_sql')
spark_sql.show(25)
spark_sql.count()

Py4JJavaError: An error occurred while calling o41.showString.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.lang.Thread.run(Thread.java:750)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:118)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1522)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:102)
	at org.apache.spark.sql.execution.datasources.FileFormat.buildReaderWithPartitionValues(FileFormat.scala:131)
	at org.apache.spark.sql.execution.datasources.FileFormat.buildReaderWithPartitionValues$(FileFormat.scala:122)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:177)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:426)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:417)
	at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:504)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:526)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:454)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:497)
	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:237)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:50)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:750)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:204)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [None]:
error_table = se.sql( \
'''
SELECT err_table.err_user_id, err_table.err_session_id, 
    min(err_table.error_timestamp) as error_timestamp
    FROM (
    SELECT user_id as err_user_id, session_id as err_session_id, timestamp as error_timestamp
        FROM (
        SELECT INT(user_id), INT(session_id), event_type, event_page, INT(timestamp), INT(event_type like '%error%') as error
    FROM df
    WHERE df.event_type != 'event'
    order by user_id, session_id, timestamp
        ) as spark_sql
        WHERE error == 1
        order by user_id, session_id, error_timestamp) as err_table
    GROUP BY err_table.err_user_id, err_table.err_session_id
    
       
''')
error_table.registerTempTable('error_table')
error_table.show(15)
error_table.count()

In [None]:
joined_sql = se.sql( \
'''
                SELECT *
                    FROM spark_sql
                    LEFT JOIN error_table on spark_sql.user_id == error_table.err_user_id and 
                                        spark_sql.session_id == error_table.err_session_id
                    where timestamp < error_timestamp or isnull(error_timestamp)
                    order by user_id, session_id, timestamp
                     
    
''')
joined_sql.registerTempTable('joined_sql')
joined_sql.show(15)
joined_sql.count()

Let's check that there is no errors

In [None]:
joined_sql.select(['error']).distinct().rdd.map(lambda x: x.error).collect()

In [None]:
result1 = se.sql( \
'''
    SELECT concat_ws('-', (collect_list(struct(Timestamp, event_page))).event_page) as route, user_id, session_id
    FROM joined_sql
    GROUP BY user_id, session_id
    order by user_id, session_id
''')
result1.registerTempTable('result1')
result1.show(50)
result1.count()

In [None]:
final_result = se.sql( \
    '''
    SELECT route, COUNT(route) as count
        FROM result1
        GROUP BY route
        order by count desc
'''
      )

In [None]:
final_result.show(30)
final_result.count()

In [None]:
sc.stop()

---
# 3. `Spark RDD` approach

In [29]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')

from pyspark.sql import SparkSession, Row
se = SparkSession(sc)

2023-10-10 16:43:10,803 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [30]:
path = '/data/clickstream.csv'

df = se.read.options(delimiter="\t", header=True).csv(path)
df.registerTempTable("df")
df.show(5)

                                                                                

+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1695584127|
|    562|       507|       event|      main|1695584134|
|    562|       507|       event|      main|1695584144|
|    562|       507|       event|      main|1695584147|
|    562|       507|wNaxLlerrorU|      main|1695584154|
+-------+----------+------------+----------+----------+
only showing top 5 rows



In [31]:
rdd_df = df.rdd
rdd_df.take(3)

                                                                                

[Row(user_id='562', session_id='507', event_type='page', event_page='main', timestamp='1695584127'),
 Row(user_id='562', session_id='507', event_type='event', event_page='main', timestamp='1695584134'),
 Row(user_id='562', session_id='507', event_type='event', event_page='main', timestamp='1695584144')]

In [32]:
rdd_df_filtered = rdd_df.filter(lambda x: 'event' not in x[2])
rdd_df_filtered.take(3)

[Row(user_id='562', session_id='507', event_type='page', event_page='main', timestamp='1695584127'),
 Row(user_id='562', session_id='507', event_type='wNaxLlerrorU', event_page='main', timestamp='1695584154'),
 Row(user_id='562', session_id='507', event_type='page', event_page='rabota', timestamp='1695584166')]

In [33]:
def tick_error(row):
    if 'error' in row['event_type']:
        return (row['user_id'], row['session_id']), (row['event_page'], row['timestamp']), 1 
    return (row['user_id'], row['session_id']), (row['event_page'], row['timestamp']), 0         

In [34]:
rdd_df_err = rdd_df_filtered.map(tick_error)

In [35]:
rdd_df_err.take(5)

[(('562', '507'), ('main', '1695584127'), 0),
 (('562', '507'), ('main', '1695584154'), 1),
 (('562', '507'), ('rabota', '1695584166'), 0),
 (('562', '507'), ('main', '1695584194'), 0),
 (('562', '507'), ('bonus', '1695584221'), 0)]

In [36]:
def get_error_table(row):
    if row[2] == 1:
        return row[0], row[1][1]        

In [37]:
def first_error(row):
    return min(row[1])

In [38]:
err_table = rdd_df_err.map(get_error_table).filter(lambda x: x is not None).groupBy(lambda x: x[0]).map(first_error)

In [39]:
err_table.collect()[:3]

                                                                                

[(('3940', '740'), '1696271012'),
 (('3434', '641'), '1696271432'),
 (('2092', '521'), '1696274764')]

In [40]:
rdd_df_joined = rdd_df_err.leftOuterJoin(err_table)

In [41]:
rdd_df_joined.collect()[:5]

                                                                                

[(('2569', '54'), (('online', '1696270680'), '1696285133')),
 (('2569', '54'), (('bonus', '1696272716'), '1696285133')),
 (('2569', '54'), (('online', '1696274201'), '1696285133')),
 (('2569', '54'), (('bonus', '1696274460'), '1696285133')),
 (('2569', '54'), (('main', '1696275080'), '1696285133'))]

In [42]:
# test filtration
x = (('2569', '54'), (('online', '1696270680'), '1696285133'))
print(int(x[1][0][1]), int(x[1][1]), int(x[1][0][1]) < int(x[1][1]))

1696270680 1696285133 True


In [43]:
routes = rdd_df_joined.filter(lambda x: (x[1][1] is None or int(x[1][0][1]) < int(x[1][1]))) 

In [44]:
routes.collect()[:3]

                                                                                

[(('2569', '54'), (('online', '1696270680'), '1696285133')),
 (('2569', '54'), (('bonus', '1696272716'), '1696285133')),
 (('2569', '54'), (('online', '1696274201'), '1696285133'))]

In [45]:
def route(row):   
    return [e for e in row[1]]

In [46]:
result = routes.groupBy(lambda x: x[0]).map(route)

In [47]:
routes = []
for line in result.collect():
    paths = []
    for path in line:
        paths.append(path[1][0][0])
    routes.append("-".join(paths))   

                                                                                

In [48]:
from collections import Counter
answer = sorted(Counter(routes).items(), key=lambda x: x[1], reverse=True)[:30]

In [49]:
print_result('spark_rdd', answer)

main	8090
main-archive	1089
main-rabota	1038
main-internet	879
main-bonus	865
main-news	759
main-tariffs	668
main-online	584
main-vklad	513
main-rabota-archive	167
main-archive-rabota	167
main-bonus-archive	139
main-rabota-bonus	137
main-news-rabota	134
main-bonus-rabota	133
main-archive-internet	131
main-rabota-news	129
main-internet-rabota	128
main-archive-news	125
main-rabota-internet	123
main-internet-archive	123
main-archive-bonus	117
main-tariffs-internet	114
main-internet-bonus	114
main-news-archive	112
main-news-internet	108
main-archive-tariffs	103
main-internet-news	102
main-tariffs-archive	102
main-main	94


'Done!'

In [50]:
sc.stop()

---
# 4. `Spark DataFrame` approach 

In [51]:
# connect, context, session

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')

from pyspark.sql import SparkSession, Row
se = SparkSession(sc)

2023-10-10 16:43:47,752 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [52]:
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import col

In [53]:
path = '/data/clickstream.csv'

In [54]:
spark_df = se.read.options(delimiter="\t", header=True).csv(path)
spark_df.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1695584127|
|    562|       507|       event|      main|1695584134|
|    562|       507|       event|      main|1695584144|
|    562|       507|       event|      main|1695584147|
|    562|       507|wNaxLlerrorU|      main|1695584154|
+-------+----------+------------+----------+----------+
only showing top 5 rows



                                                                                

In [55]:
spark_df =  spark_df[spark_df['event_type'] != 'event']

In [56]:
spark_df.count()

                                                                                

421610

In [57]:
spark_df = spark_df.withColumn('error', spark_df.event_type.like("%error%").cast('Integer'))
spark_df = spark_df.drop('event_type')
spark_df.show()

+-------+----------+----------+----------+-----+
|user_id|session_id|event_page| timestamp|error|
+-------+----------+----------+----------+-----+
|    562|       507|      main|1695584127|    0|
|    562|       507|      main|1695584154|    1|
|    562|       507|    rabota|1695584166|    0|
|    562|       507|      main|1695584194|    0|
|    562|       507|     bonus|1695584221|    0|
|    562|       507|    online|1695584222|    0|
|   3539|       849|      main|1695584238|    0|
|   3539|       849|    online|1695584261|    0|
|   3539|       849|     bonus|1695584269|    0|
|   3539|       849|      news|1695584285|    0|
|   3539|       849|      main|1695584291|    0|
|   3539|       849|      news|1695584306|    0|
|   3539|       849|     vklad|1695584317|    0|
|   3539|       849|    rabota|1695584319|    0|
|   3539|       849|     bonus|1695584324|    0|
|    461|       174|      main|1695584343|    0|
|   4567|       514|      main|1695584345|    0|
|   4567|       514|

In [58]:
error_table = spark_df[['user_id', 'session_id', 'timestamp']][spark_df['error'] == 1]
error_table = error_table.withColumnRenamed('timestamp', 'error_timestamp')
error_table.show(5)
error_table.count()

+-------+----------+---------------+
|user_id|session_id|error_timestamp|
+-------+----------+---------------+
|    562|       507|     1695584154|
|   4567|       514|     1695584351|
|    461|       174|     1695584529|
|    844|       258|     1695584652|
|    461|       174|     1695584698|
+-------+----------+---------------+
only showing top 5 rows



                                                                                

20899

Let us leave only the first error entry in each `user-session` pair.

For example, 3 errors occured during the session_id = 514 for user_id = 4567. After dropping the duplicates we will have only the first error occurence left.

In [59]:
# Before
error_table[(error_table['user_id'] == 4567) & (error_table['session_id'] == 514)].show()
error_table.count()

+-------+----------+---------------+
|user_id|session_id|error_timestamp|
+-------+----------+---------------+
|   4567|       514|     1695584351|
|   4567|       514|     1695586124|
|   4567|       514|     1695588205|
+-------+----------+---------------+



20899

In [60]:
# After
error_table = error_table.dropDuplicates(['user_id', 'session_id'])
error_table[(error_table['user_id'] == 4567) & (error_table['session_id'] == 514)].show()
error_table.count()

                                                                                

+-------+----------+---------------+
|user_id|session_id|error_timestamp|
+-------+----------+---------------+
|   4567|       514|     1695584351|
+-------+----------+---------------+



                                                                                

14569

In [63]:
spark_df = spark_df.join(error_table, ['user_id', 'session_id'], 'left')


In [66]:
spark_df.show(15)
spark_df.count()

                                                                                

+-------+----------+----------+----------+-----+---------------+
|user_id|session_id|event_page| timestamp|error|error_timestamp|
+-------+----------+----------+----------+-----+---------------+
|    562|       507|      main|1695584127|    0|     1695584154|
|    562|       507|      main|1695584154|    1|     1695584154|
|    562|       507|    rabota|1695584166|    0|     1695584154|
|    562|       507|      main|1695584194|    0|     1695584154|
|    562|       507|     bonus|1695584221|    0|     1695584154|
|    562|       507|    online|1695584222|    0|     1695584154|
|    461|       174|      main|1695584343|    0|     1695584529|
|   3539|       849|      main|1695584238|    0|           null|
|   3539|       849|    online|1695584261|    0|           null|
|   3539|       849|     bonus|1695584269|    0|           null|
|   3539|       849|      news|1695584285|    0|           null|
|   3539|       849|      main|1695584291|    0|           null|
|   3539|       849|     

                                                                                

421610

In [67]:
from pyspark.sql.functions import col

In [68]:
result = spark_df[(col("error_timestamp").isNull()) | (col("timestamp").cast('Integer')  < col("error_timestamp").cast('Integer'))]

In [69]:
result = result.groupBy("user_id", "session_id").agg(collect_list("event_page").alias('path')).groupBy("path").count().orderBy(['count','path'], ascending=False).collect()

                                                                                

In [70]:
answer = [['-'.join(path[0]), path[1]] for path in result]

In [72]:
sc.stop()

In [73]:
print_result('spark_df', answer[:30])

main	8076
main-archive	1088
main-rabota	1034
main-internet	878
main-bonus	863
main-news	759
main-tariffs	668
main-online	580
main-vklad	513
main-rabota-archive	166
main-archive-rabota	166
main-bonus-archive	139
main-rabota-bonus	136
main-news-rabota	133
main-bonus-rabota	133
main-archive-internet	131
main-rabota-news	129
main-internet-rabota	128
main-archive-news	125
main-internet-archive	123
main-rabota-internet	122
main-archive-bonus	117
main-tariffs-internet	114
main-news-archive	112
main-internet-bonus	112
main-news-internet	108
main-internet-news	102
main-archive-tariffs	102
main-tariffs-archive	101
main-main	94


'Done!'