# User routes on the site
## Description
**Clickstream** is a sequence of user actions on a website. It allows you to understand how users interact with the site. In this task, you need to find the most frequent custom routes.

## Input data
Input data is а table with clickstream data in file `hdfs:/data/clickstream.csv`.

### Table structure
* `user_id (int)` - Unique user identifier.
* `session_id (int)` - Unique identifier for the user session. The user's session lasts until the identifier changes.
* `event_type (string)` - Event type from the list:
    * **page** - visit to the page
    * **event** - any action on the page
    * <b>&lt;custom&gt;</b> - string with any other type
* `event_type (string)` - Page on the site.
* `timestamp (int)` - Unix-timestamp of action.

### Browser errors
Errors can sometimes occur in the user's browser - after such an error appears, we can no longer trust the data of this session and all the following lines after the error or at the same time with it are considered corrupted and **should not be counted** in statistics.

When an error occurs on the page, a random string containing the word **error** will be written to the `event_type` field.

### Sample of user session
<pre>
+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1620494781|
|    562|       507|       event|      main|1620494788|
|    562|       507|       event|      main|1620494798|
|    562|       507|        page|    family|1620494820|
|    562|       507|       event|    family|1620494828|
|    562|       507|        page|      main|1620494848|
|    562|       507|wNaxLlerrorU|      main|1620494865|
|    562|       507|       event|      main|1620494873|
|    562|       507|        page|      news|1620494875|
|    562|       507|        page|   tariffs|1620494876|
|    562|       507|       event|   tariffs|1620494884|
|    562|       514|        page|      main|1620728918|
|    562|       514|       event|      main|1620729174|
|    562|       514|        page|   archive|1620729674|
|    562|       514|        page|     bonus|1620729797|
|    562|       514|        page|   tariffs|1620731090|
|    562|       514|       event|   tariffs|1620731187|
+-------+----------+------------+----------+----------+
</pre>

#### Correct user routes for a given user:
* **Session 507**: main-family-main
* **Session 514**: main-archive-bonus-tariffs

Route elements are ordered by the time they appear in the clickstream, from earliest to latest.

The route must be accounted for completely before the end of the session or an error in the session.

## Task
You need to use the Spark SQL, Spark RDD and Spark DF interfaces to create a solution file, the lines of which contain **the 30 most frequent user routes** on the site.

Each line of the file should contain the `route` and `count` values **separated by tabs**, where:
* `route` - route on the site, consisting of pages separated by "-".
* `count` - the number of user sessions in which this route was.

The lines must be **ordered in descending order** of the `count` field.

## Criteria
You can get maximum of 3.5 points (final grade) for this assignment, depedning on the number of interface you manage to leverage. The criteria are as follows:

* 0.5 points – Spark SQL solution with 1 query
* 0.5 points – Spark SQL solution with <=2 queries
* 0.5 points – Spark RDD solution
* 0.5 points – Spark DF solution
* 0.5 points – your solution algorithm is relatively optimized, i.e.: no O^2 or O^3 complexities; appropriate object usage; no data leaks etc. This is evaluated by staff.
* 1 point – 1 on 1 screening session. During this session staff member can ask you questions regarding your solution logic, framework usage, questionable parts of your code etc. If your code is clean enough, the staff member can just ask you to solve a theoretical problem connected to Spark.


In [1]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='SGA_FABRICE_jupyter')

from pyspark.sql import SparkSession, Row
se = SparkSession(sc)

from pyspark.sql.context import SQLContext
sqlContext = SQLContext(sc)



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [3]:
file_path = 'hdfs:/data/clickstream.csv'

In [4]:
sql_tab = sqlContext.read.option("delimiter", "\t").option("header", True).csv('clickstream.csv')

# Spark SQL

In [5]:
# Convert the timestamp column to integer type  
converted_data = sql_tab.withColumn("timestamp", sql_tab.timestamp.cast("int"))  

# Find the minimum timestamp for events containing 'error' for each user and session  
min_error_ts_df = (  
    converted_data  
    .filter(converted_data.event_type.contains("error"))  
    .groupBy('user_id', 'session_id')  
    .agg(F.min('timestamp').alias('min_error_ts'))  
)  

# Define a window specification to order events by timestamp within each user and session  
event_window_spec = Window.partitionBy('user_id', 'session_id').orderBy('timestamp')  

# Join the original DataFrame with the DataFrame containing minimum error timestamps  
result_df = (  
    converted_data  
    .join(min_error_ts_df, ['user_id', 'session_id'], 'left')  
    .filter(  
        (min_error_ts_df.min_error_ts.isNull()) |  
        (converted_data.timestamp < min_error_ts_df.min_error_ts)  
    )  
    .filter(converted_data.event_type == 'page')  
    .select('user_id', 'session_id', 'event_page', 'timestamp')  
    .withColumn('previous_event_page', F.lag('event_page', 1).over(event_window_spec))  
    .filter(  
        (F.col('previous_event_page').isNull()) |  
        (F.col('event_page') != F.col('previous_event_page'))  
    )  
)

# Collect pages into a list and create the navigation route
navigation_route_df = ( 
    result_df 
    .groupBy('user_id', 'session_id') 
    .agg(F.collect_list('event_page').alias('page_list')) 
)

# Create navigation routes by joining pages with '-'
navigation_counts_df = ( 
    navigation_route_df 
    .select('user_id', 'session_id', F.expr("array_join(page_list, '-') as navigation_route"))  # Use expr for array_join
    .groupBy('navigation_route') 
    .count() 
    .orderBy(F.desc('count')) 
) 

# Show the result 
navigation_counts_df.show(30)

+--------------------+-----+
|    navigation_route|count|
+--------------------+-----+
|                main| 8184|
|        main-archive| 1113|
|         main-rabota| 1047|
|       main-internet|  897|
|          main-bonus|  870|
|           main-news|  769|
|        main-tariffs|  676|
|         main-online|  587|
|          main-vklad|  518|
| main-rabota-archive|  170|
| main-archive-rabota|  167|
|  main-bonus-archive|  143|
|   main-rabota-bonus|  139|
|   main-bonus-rabota|  135|
|    main-news-rabota|  135|
|main-archive-inte...|  132|
|    main-rabota-news|  130|
|main-internet-rabota|  129|
|   main-archive-news|  126|
|main-rabota-internet|  124|
|main-internet-arc...|  123|
|  main-archive-bonus|  117|
| main-internet-bonus|  115|
|main-tariffs-inte...|  114|
|   main-news-archive|  113|
|  main-news-internet|  109|
|main-archive-tariffs|  104|
|  main-internet-news|  103|
|main-tariffs-archive|  103|
|    main-rabota-main|   94|
+--------------------+-----+
only showing t

# Spark Data

In [6]:
# Create a DataFrame from the existing RDD and schema
data_frame = se.createDataFrame(sql_tab.rdd, sql_tab.schema) 
data_frame.show(20)


+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1695584127|
|    562|       507|       event|      main|1695584134|
|    562|       507|       event|      main|1695584144|
|    562|       507|       event|      main|1695584147|
|    562|       507|wNaxLlerrorU|      main|1695584154|
|    562|       507|       event|      main|1695584154|
|    562|       507|       event|      main|1695584154|
|    562|       507|       event|      main|1695584160|
|    562|       507|        page|    rabota|1695584166|
|    562|       507|       event|    rabota|1695584174|
|    562|       507|       event|    rabota|1695584181|
|    562|       507|       event|    rabota|1695584189|
|    562|       507|        page|      main|1695584194|
|    562|       507|       event|      main|1695584204|
|    562|       507|       event|      main|1695

In [7]:
# Convert the 'timestamp' column to integer type
data_frame_converted = data_frame.withColumn("timestamp", data_frame['timestamp'].cast("int")) 

# Find the minimum timestamp for events containing 'error' for each user and session
min_timestamp_df = (
    data_frame_converted
    .select("user_id", "session_id", "timestamp")
    .filter(data_frame_converted.event_type.contains("error"))
    .groupBy('user_id', 'session_id')
    .agg(F.min('timestamp').alias('min_timestamp')) 
)

# Define a window specification to order events by timestamp within each user and session
window_spec = Window.partitionBy('user_id', 'session_id').orderBy('timestamp') 

# Process the converted DataFrame to find navigation routes
navigation_routes_df = (
    data_frame_converted
    .join(min_timestamp_df, ['user_id', 'session_id'], 'left') 
    .filter((min_timestamp_df.min_timestamp.isNull()) | (data_frame_converted.timestamp < min_timestamp_df.min_timestamp)) 
    .filter(data_frame_converted.event_type == 'page') 
    .select('user_id', 'session_id', 'event_page', 'timestamp') 
    .withColumn('previous_event_page', F.lag('event_page', 1).over(window_spec)) 
    .filter((F.col('previous_event_page').isNull()) | (F.col('event_page') != F.col('previous_event_page'))) 
    .groupBy('user_id', 'session_id') 
    .agg(F.collect_list(data_frame_converted.event_page).alias('pages_list')) 
    .groupBy('user_id', 'session_id', 'pages_list') 
    .agg(F.array_join('pages_list', '-').alias('navigation_route')) 
    .select('navigation_route') 
    .groupBy('navigation_route') 
    .agg(F.count('*').alias('route_count')) 
    .orderBy(F.desc('route_count'))
)

# Display the top 30 navigation routes
navigation_routes_df.show(30)


+--------------------+-----------+
|    navigation_route|route_count|
+--------------------+-----------+
|                main|       8184|
|        main-archive|       1113|
|         main-rabota|       1047|
|       main-internet|        897|
|          main-bonus|        870|
|           main-news|        769|
|        main-tariffs|        676|
|         main-online|        587|
|          main-vklad|        518|
| main-rabota-archive|        170|
| main-archive-rabota|        167|
|  main-bonus-archive|        143|
|   main-rabota-bonus|        139|
|   main-bonus-rabota|        135|
|    main-news-rabota|        135|
|main-archive-inte...|        132|
|    main-rabota-news|        130|
|main-internet-rabota|        129|
|   main-archive-news|        126|
|main-rabota-internet|        124|
|main-internet-arc...|        123|
|  main-archive-bonus|        117|
| main-internet-bonus|        115|
|main-tariffs-inte...|        114|
|   main-news-archive|        113|
|  main-news-interne

# RDD

In [8]:
# Convert the DataFrame to an RDD
rdd_events = sql_tab.rdd

In [9]:
# Filter for error events, extract user-session pairs with their timestamps,
# and find the minimum timestamp for each user-session pair
error_events_map = (
    rdd_events
    .filter(lambda event: 'error' in event['event_type'])  # Keep only error events
    .map(lambda event: (f"{event['user_id']}_{event['session_id']}", int(event['timestamp'])))  # Create (user_session, timestamp) pairs
    .groupByKey()  # Group by user-session key
    .map(lambda user_session: (user_session[0], min(user_session[1])))  # Find minimum timestamp for each user-session
    .collectAsMap()  # Collect results as a dictionary
)

In [10]:
# Display the first 10 items of the error events dictionary
list(error_events_map.items())[:10]

# Broadcast the error events dictionary to all nodes in the cluster
broadcast_error_events = sc.broadcast(error_events_map)  # Send to cluster

In [11]:
import math
from itertools import groupby

# Function to remove consecutive duplicate pages from a session
def remove_consecutive_duplicates(user_session):
    # Sort the pages based on their timestamps
    sorted_pages = [page[0] for page in sorted(user_session[1], key=lambda entry: entry[1])]
    # Use groupby to extract unique consecutive pages
    unique_pages = [key for key, _ in groupby(sorted_pages)]
    return (user_session[0], unique_pages)

# Process the main data to filter and create the desired output
filtered_sessions = (
    rdd_events
    .map(lambda entry: (f"{entry['user_id']}_{entry['session_id']}", [entry['event_type'], entry['event_page'], int(entry['timestamp'])]))  # Create user-session pairs with event details
    .filter(lambda entry: entry[0] not in broadcast_error_events.value or entry[1][2] < broadcast_error_events.value[entry[0]])  # Exclude error events
    .filter(lambda entry: "page" == entry[1][0])  # Keep only page events
    .map(lambda entry: (entry[0], (entry[1][1], entry[1][2])))  # Map to (user_session, (page, timestamp))
    .groupByKey()  # Group by user-session
    .map(remove_consecutive_duplicates)  # Remove consecutive duplicates
    .map(lambda session: (session[0], "-".join(list(session[1]))))  # Join pages into a single string
    .map(lambda session: (session[1], 1))  # Prepare for counting occurrences
    .reduceByKey(lambda count1, count2: count1 + count2)  # Count occurrences of each unique page sequence
    .sortBy(lambda session: session[1], ascending=False)  # Sort by counts in descending order
    .take(30)  # Take the top 30 results
)


In [12]:
filtered_sessions  # Result

[('main', 8184),
 ('main-archive', 1113),
 ('main-rabota', 1047),
 ('main-internet', 897),
 ('main-bonus', 870),
 ('main-news', 769),
 ('main-tariffs', 677),
 ('main-online', 587),
 ('main-vklad', 518),
 ('main-rabota-archive', 170),
 ('main-archive-rabota', 167),
 ('main-bonus-archive', 143),
 ('main-rabota-bonus', 139),
 ('main-news-rabota', 135),
 ('main-bonus-rabota', 135),
 ('main-archive-internet', 132),
 ('main-rabota-news', 130),
 ('main-internet-rabota', 129),
 ('main-archive-news', 126),
 ('main-rabota-internet', 124),
 ('main-internet-archive', 123),
 ('main-archive-bonus', 117),
 ('main-internet-bonus', 115),
 ('main-tariffs-internet', 114),
 ('main-news-archive', 113),
 ('main-news-internet', 109),
 ('main-archive-tariffs', 104),
 ('main-internet-news', 103),
 ('main-tariffs-archive', 103),
 ('main-rabota-main', 94)]

# Json file

In [14]:
# Create a dictionary to store the top 10 results
res = {}
for k, v in filtered_sessions[:10]:
    res[k] = v

In [15]:
import json

# Convert the dictionary to a JSON string
result_SGA_Fabrice = json.dumps(res)


In [16]:
f = open("result_SGA_Fabrice.json", "w")
f.write(result_SGA_Fabrice)
f.close()