# User routes on the site
## Description
**Clickstream** is a sequence of user actions on a website. It allows you to understand how users interact with the site. In this task, you need to find the most frequent custom routes.

## Input data
Input data is а table with clickstream data in file `hdfs:/data/clickstream.csv`.

### Table structure
* `user_id (int)` - Unique user identifier.
* `session_id (int)` - Unique identifier for the user session. The user's session lasts until the identifier changes.
* `event_type (string)` - Event type from the list:
    * **page** - visit to the page
    * **event** - any action on the page
    * <b>&lt;custom&gt;</b> - string with any other type
* `event_type (string)` - Page on the site.
* `timestamp (int)` - Unix-timestamp of action.

### Browser errors
Errors can sometimes occur in the user's browser - after such an error appears, we can no longer trust the data of this session and all the following lines after the error or at the same time with it are considered corrupted and **should not be counted** in statistics.

When an error occurs on the page, a random string containing the word **error** will be written to the `event_type` field.

### Sample of user session
<pre>
+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1620494781|
|    562|       507|       event|      main|1620494788|
|    562|       507|       event|      main|1620494798|
|    562|       507|        page|    family|1620494820|
|    562|       507|       event|    family|1620494828|
|    562|       507|        page|      main|1620494848|
|    562|       507|wNaxLlerrorU|      main|1620494865|
|    562|       507|       event|      main|1620494873|
|    562|       507|        page|      news|1620494875|
|    562|       507|        page|   tariffs|1620494876|
|    562|       507|       event|   tariffs|1620494884|
|    562|       514|        page|      main|1620728918|
|    562|       514|       event|      main|1620729174|
|    562|       514|        page|   archive|1620729674|
|    562|       514|        page|     bonus|1620729797|
|    562|       514|        page|   tariffs|1620731090|
|    562|       514|       event|   tariffs|1620731187|
+-------+----------+------------+----------+----------+
</pre>

#### Correct user routes for a given user:
* **Session 507**: main-family-main
* **Session 514**: main-archive-bonus-tariffs

Route elements are ordered by the time they appear in the clickstream, from earliest to latest.

The route must be accounted for completely before the end of the session or an error in the session.

## Task
You need to use the Spark SQL, Spark RDD and Spark DF interfaces to create a solution file, the lines of which contain **the 30 most frequent user routes** on the site.

Each line of the file should contain the `route` and `count` values **separated by tabs**, where:
* `route` - route on the site, consisting of pages separated by "-".
* `count` - the number of user sessions in which this route was.

The lines must be **ordered in descending order** of the `count` field.

## Criteria
You can get maximum of 3.5 points (final grade) for this assignment, depedning on the number of interface you manage to leverage. The criteria are as follows:

* 0.5 points – Spark SQL solution with 1 query
* 0.5 points – Spark SQL solution with <=2 queries
* 0.5 points – Spark RDD solution
* 0.5 points – Spark DF solution
* 0.5 points – your solution algorithm is relatively optimized, i.e.: no O^2 or O^3 complexities; appropriate object usage; no data leaks etc. This is evaluated by staff.
* 1 point – 1 on 1 screening session. During this session staff member can ask you questions regarding your solution logic, framework usage, questionable parts of your code etc. If your code is clean enough, the staff member can just ask you to solve a theoretical problem connected to Spark.


In [1]:
!pip install pyspark==3.2.4

Collecting pyspark==3.2.4
  Downloading pyspark-3.2.4.tar.gz (281.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.5/281.5 MB[0m [31m531.0 kB/s[0m eta [36m0:00:00[0m00:01[0m00:07[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m164.8 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.2.4-py2.py3-none-any.whl size=282040922 sha256=9dd24e8b850cf1696c0099f7d9a6488cfc85768c92cca089002fff7a5ef587e5
  Stored in directory: /home/jovyan/.cache/pip/wheels/e7/e3/c8/c358dac750f2b6a4b03328d10e05a5c69501664bd6504b6c3e
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3

# DF-based approach

In [14]:
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.sql import SparkSession

def initialize_spark(app_name="ClickstreamAnalysisJSON"):
    """Initializes and returns a SparkSession."""
    spark_context = SparkContext.getOrCreate()
    spark_context.setLogLevel("WARN")  # Set log level to WARN to reduce verbosity
    spark = SparkSession.builder \
        .appName(app_name) \
        .getOrCreate()
    return spark

spark = initialize_spark()

data_path = "hdfs:/data/clickstream.csv"

# Load the clickstream data with tab delimiter and header
clickstream_data = spark.read.csv(data_path, sep="\t", header=True)

# schema to verify correct loading
clickstream_data.printSchema()

# sample of the data
clickstream_data.show(5, truncate=False)



                                                                                

root
 |-- user_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_page: string (nullable = true)
 |-- timestamp: string (nullable = true)

+-------+----------+------------+----------+----------+
|user_id|session_id|event_type  |event_page|timestamp |
+-------+----------+------------+----------+----------+
|562    |507       |page        |main      |1695584127|
|562    |507       |event       |main      |1695584134|
|562    |507       |event       |main      |1695584144|
|562    |507       |event       |main      |1695584147|
|562    |507       |wNaxLlerrorU|main      |1695584154|
+-------+----------+------------+----------+----------+
only showing top 5 rows



In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def construct_route(event_sequence):
    """
    Constructs a user route from a sequence of events.
    
    Args:
        event_sequence (list of tuples): Each tuple contains (event_type, event_page).
        
    Returns:
        str: The constructed route as a hyphen-separated string.
    """
    navigation = []
    for event in event_sequence:
        event_type, event_page = event
        if "error" in event_type.lower():
            break  # terminate route construction on encountering an error event
        if event_type.lower() == "page":
            if not navigation or navigation[-1] != event_page:
                navigation.append(event_page)
    return "-".join(navigation)

# register the function as a UDF with StringType return
route_construction_udf = udf(construct_route, StringType())


In [16]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import col, collect_list, struct

# a window specification partitioned by user and session, ordered by timestamp
window_spec = Window.partitionBy("user_id", "session_id").orderBy("timestamp")

# aggregate event_type and event_page into a list per user-session
aggregated_df = clickstream_data \
    .withColumn("event_struct", struct("event_type", "event_page")) \
    .withColumn("events", collect_list("event_struct").over(window_spec)) \
    .groupBy("user_id", "session_id") \
    .agg(F.max("events").alias("events"))

# apply the UDF to construct routes
routes_df = aggregated_df.withColumn("route", route_construction_udf(col("events")))

# the resulting DataFrame
routes_df.select("user_id", "session_id", "route").show(5, truncate=False)


[Stage 58:>                                                         (0 + 1) / 1]

+-------+----------+--------------------------------------------------------------------------------+
|user_id|session_id|route                                                                           |
+-------+----------+--------------------------------------------------------------------------------+
|1      |1026      |main                                                                            |
|10     |762       |main-internet-archive-bonus-internet-main-rabota-main-archive-bonus-news-archive|
|1002   |285       |main-news-internet-rabota-bonus                                                 |
|1007   |82        |main-archive-digital                                                            |
|1009   |639       |main-online-internet-digital-archive                                            |
+-------+----------+--------------------------------------------------------------------------------+
only showing top 5 rows



                                                                                

In [17]:
import json

# the frequency of each unique route
route_frequencies = routes_df.groupBy("route").count().orderBy(F.desc("count"))

# top 10 routes
route_frequencies.show(10, truncate=False)
top_n = 10
top_routes = route_frequencies.limit(top_n).collect()

routes_dict = {row['route']: row['count'] for row in top_routes}

# output JSON file
output_json_path = "top10_routes.json"
with open(output_json_path, 'w') as json_file:
    json.dump(routes_dict, json_file, indent=4)

with open(output_json_path, 'r') as json_file:
    print(json_file.read())


                                                                                

+-------------------+-----+
|route              |count|
+-------------------+-----+
|main               |8185 |
|main-archive       |1113 |
|main-rabota        |1047 |
|main-internet      |897  |
|main-bonus         |870  |
|main-news          |769  |
|main-tariffs       |677  |
|main-online        |587  |
|main-vklad         |518  |
|main-rabota-archive|170  |
+-------------------+-----+
only showing top 10 rows





{
    "main": 8185,
    "main-archive": 1113,
    "main-rabota": 1047,
    "main-internet": 897,
    "main-bonus": 870,
    "main-news": 769,
    "main-tariffs": 677,
    "main-online": 587,
    "main-vklad": 518,
    "main-rabota-archive": 170
}


                                                                                

### Check

In [18]:
!curl -F file=@top10_routes.json 51.250.123.136:80/MDS-LSML1/ntrusova/w6/2

0.9999999999999999
Correct main answer!
Correct main-archive answer!
Correct main-rabota answer!
Correct main-internet answer!
Correct main-bonus answer!
Correct main-news answer!
Correct main-tariffs answer!
Correct main-online answer!
Correct main-vklad answer!
Correct main-rabota-archive answer!


 # SQL-based approach
 ## 1. Define and Register a User-Defined Function (UDF) for Route Construction

In [19]:
from pyspark.sql.types import StringType
from pyspark.sql import functions as F

# a function to build user routes from event sequences
def build_user_route(event_records):
    """
    Constructs a navigation route for a user session based on event records.
    
    Args:
        event_records (list of tuples): Each tuple contains (event_type, event_page, sequence_number).
        
    Returns:
        str: A hyphen-separated string representing the user's navigation route.
    """
    # sort events by their sequence number to ensure chronological order
    sorted_events = sorted(event_records, key=lambda record: record[2])
    
    navigation_steps = []
    for event_type, event_page, _ in sorted_events:
        if "error" in event_type.lower():
            break  # stop processing if an error
        
        if event_type.lower() == "page" and (not navigation_steps or navigation_steps[-1] != event_page):
            navigation_steps.append(event_page)
    
    # a single route string
    route = "-".join(navigation_steps)
    return route

# the Python function as a Spark UDF
route_builder_udf = F.udf(build_user_route, StringType())
spark.udf.register("route_builder_udf", build_user_route, StringType())


2024-10-17 13:26:25,940 WARN analysis.SimpleFunctionRegistry: The function route_builder_udf replaced a previously registered function.


<function __main__.build_user_route(event_records)>

## 2. Create Temporary View and Execute SQL Query to Generate Routes

In [20]:
# a temporary view for the clickstream data
clickstream_data.createOrReplaceTempView("clickstream_data")

# SQL query to construct user routes using the registered UDF
route_generation_query = """
    SELECT
        route_builder_udf(collect_list(struct(event_type, event_page, seq_num))) AS user_route
    FROM (
        SELECT
            user_id,
            session_id,
            event_type,
            event_page,
            ROW_NUMBER() OVER (PARTITION BY user_id, session_id ORDER BY timestamp) AS seq_num
        FROM
            clickstream_data
    ) ordered_events
    GROUP BY
        user_id, session_id
"""

generated_routes_df = spark.sql(route_generation_query)

generated_routes_df.show(5, truncate=False)


[Stage 73:>                                                         (0 + 1) / 1]

+--------------------------------------------------------------------------------+
|user_route                                                                      |
+--------------------------------------------------------------------------------+
|main                                                                            |
|main-internet-archive-bonus-internet-main-rabota-main-archive-bonus-news-archive|
|main-news-internet-rabota-bonus                                                 |
|main-archive-digital                                                            |
|main-online-internet-digital-archive                                            |
+--------------------------------------------------------------------------------+
only showing top 5 rows



                                                                                

## 3. Aggregate and Count Unique Routes

In [21]:
# temporary view for the generated routes
generated_routes_df.createOrReplaceTempView("routes_view")

# count the frequency of each unique route
route_frequency_query = """
    SELECT
        user_route AS route,
        COUNT(*) AS occurrence_count
    FROM
        routes_view
    GROUP BY
        user_route
    ORDER BY
        occurrence_count DESC
"""

route_frequencies_df = spark.sql(route_frequency_query)

# top 10 most frequent routes
route_frequencies_df.show(10, truncate=False)




+-------------------+----------------+
|route              |occurrence_count|
+-------------------+----------------+
|main               |8185            |
|main-archive       |1113            |
|main-rabota        |1047            |
|main-internet      |897             |
|main-bonus         |870             |
|main-news          |769             |
|main-tariffs       |677             |
|main-online        |587             |
|main-vklad         |518             |
|main-rabota-archive|170             |
+-------------------+----------------+
only showing top 10 rows



                                                                                

## 4. Extract Top Routes and Export to JSON

In [22]:
import json

top_n_routes = 10

# collect the top N routes into a list of Row objects
top_routes = route_frequencies_df.limit(top_n_routes).collect()

top_routes_dict = {row['route']: row['occurrence_count'] for row in top_routes}

output_json_file = "top10_routes_sql.json"

with open(output_json_file, 'w') as json_file:
    json.dump(top_routes_dict, json_file, indent=4)

with open(output_json_file, 'r') as json_file:
    print(json_file.read())




{
    "main": 8185,
    "main-archive": 1113,
    "main-rabota": 1047,
    "main-internet": 897,
    "main-bonus": 870,
    "main-news": 769,
    "main-tariffs": 677,
    "main-online": 587,
    "main-vklad": 518,
    "main-rabota-archive": 170
}


                                                                                

# RDD approach

In [23]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()


clickstream_rdd = clickstream_data.rdd

# extract relevant event information
def extract_event(row):
    """
    Extracts user ID, session ID, event type, event page, and timestamp from a row.
    
    Args:
        row (Row): A Spark Row object containing clickstream data.
        
    Returns:
        tuple: A tuple containing ((user_id, session_id), (event_type, event_page, timestamp))
    """
    return ((row['user_id'], row['session_id']),
            (row['event_type'], row['event_page'], row['timestamp']))

# group events by user_id and session_id
grouped_events_rdd = clickstream_rdd.map(extract_event) \
                                   .groupByKey()

# construct routes from grouped events
def construct_route(events):
    """
    Constructs a navigation route from a sequence of events.
    
    Args:
        events (Iterable[tuple]): An iterable of tuples containing (event_type, event_page, timestamp).
        
    Returns:
        str: A hyphen-separated string representing the user's navigation route.
    """
    # sort based on timestamp to ensure chronological order
    sorted_events = sorted(events, key=lambda event: event[2])
    
    navigation_path = []
    for event_type, event_page, _ in sorted_events:
        if "error" in event_type.lower():
            break  # stop processing if an error event is encountered
        if event_type.lower() == "page":
            if not navigation_path or navigation_path[-1] != event_page:
                navigation_path.append(event_page)
    
    return "-".join(navigation_path)

# apply the route construction function to each group
routes_rdd = grouped_events_rdd.mapValues(construct_route)

# count the occurrences of each unique route
route_counts_rdd = routes_rdd.map(lambda route: (route[1], 1)) \
                             .reduceByKey(lambda count1, count2: count1 + count2)

# sort the routes by their count in descending order
sorted_route_counts_rdd = route_counts_rdd.sortBy(lambda x: x[1], ascending=False)

# collect the sorted route counts to the driver for further processing
collected_route_counts = sorted_route_counts_rdd.collect()


                                                                                

In [24]:
top_n = 10

top_routes_rdd = collected_route_counts[:top_n]
top_routes_dict = {route: count for route, count in top_routes_rdd}

print("Top Routes Dictionary from RDD Approach:")
print(top_routes_dict)

top_routes_dict


Top Routes Dictionary from RDD Approach:
{'main': 8185, 'main-archive': 1113, 'main-rabota': 1047, 'main-internet': 897, 'main-bonus': 870, 'main-news': 769, 'main-tariffs': 677, 'main-online': 587, 'main-vklad': 518, 'main-rabota-archive': 170}


{'main': 8185,
 'main-archive': 1113,
 'main-rabota': 1047,
 'main-internet': 897,
 'main-bonus': 870,
 'main-news': 769,
 'main-tariffs': 677,
 'main-online': 587,
 'main-vklad': 518,
 'main-rabota-archive': 170}