# User routes on the site
## Description
**Clickstream** is a sequence of user actions on a website. It allows you to understand how users interact with the site. In this task, you need to find the most frequent custom routes.

## Input data
Input data is а table with clickstream data in file `hdfs:/data/clickstream.csv`.

### Table structure
* `user_id (int)` - Unique user identifier.
* `session_id (int)` - Unique identifier for the user session. The user's session lasts until the identifier changes.
* `event_type (string)` - Event type from the list:
    * **page** - visit to the page
    * **event** - any action on the page
    * <b>&lt;custom&gt;</b> - string with any other type
* `event_type (string)` - Page on the site.
* `timestamp (int)` - Unix-timestamp of action.

### Browser errors
Errors can sometimes occur in the user's browser - after such an error appears, we can no longer trust the data of this session and all the following lines after the error or at the same time with it are considered corrupted and **should not be counted** in statistics.

When an error occurs on the page, a random string containing the word **error** will be written to the `event_type` field.

### Sample of user session
<pre>
+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1620494781|
|    562|       507|       event|      main|1620494788|
|    562|       507|       event|      main|1620494798|
|    562|       507|        page|    family|1620494820|
|    562|       507|       event|    family|1620494828|
|    562|       507|        page|      main|1620494848|
|    562|       507|wNaxLlerrorU|      main|1620494865|
|    562|       507|       event|      main|1620494873|
|    562|       507|        page|      news|1620494875|
|    562|       507|        page|   tariffs|1620494876|
|    562|       507|       event|   tariffs|1620494884|
|    562|       514|        page|      main|1620728918|
|    562|       514|       event|      main|1620729174|
|    562|       514|        page|   archive|1620729674|
|    562|       514|        page|     bonus|1620729797|
|    562|       514|        page|   tariffs|1620731090|
|    562|       514|       event|   tariffs|1620731187|
+-------+----------+------------+----------+----------+
</pre>

#### Correct user routes for a given user:
* **Session 507**: main-family-main
* **Session 514**: main-archive-bonus-tariffs

Route elements are ordered by the time they appear in the clickstream, from earliest to latest.

The route must be accounted for completely before the end of the session or an error in the session.

## Task
You need to use the Spark SQL, Spark RDD and Spark DF interfaces to create a solution file, the lines of which contain **the 30 most frequent user routes** on the site.

Each line of the file should contain the `route` and `count` values **separated by tabs**, where:
* `route` - route on the site, consisting of pages separated by "-".
* `count` - the number of user sessions in which this route was.

The lines must be **ordered in descending order** of the `count` field.

## Criteria
You can get maximum of 3.5 points (final grade) for this assignment, depedning on the number of interface you manage to leverage. The criteria are as follows:

* 0.5 points – Spark SQL solution with 1 query
* 0.5 points – Spark SQL solution with <=2 queries
* 0.5 points – Spark RDD solution
* 0.5 points – Spark DF solution
* 0.5 points – your solution algorithm is relatively optimized, i.e.: no O^2 or O^3 complexities; appropriate object usage; no data leaks etc. This is evaluated by staff.
* 1 point – 1 on 1 screening session. During this session staff member can ask you questions regarding your solution logic, framework usage, questionable parts of your code etc. If your code is clean enough, the staff member can just ask you to solve a theoretical problem connected to Spark.


In [7]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, collect_set, min as min_
from pyspark.sql.functions import size, array_intersect, array_union
from pyspark.sql import functions as F
from pyspark.ml.feature import HashingTF, MinHashLSH
from pyspark.ml import Pipeline
import findspark
findspark.init()

# Load the data with the corrected HDFS path and separator
clickstream_df = spark.read.csv(
    "hdfs:///data/clickstream.csv",
    header=True,
    inferSchema=True,
    sep='\t'  # Specify tab as the separator
)

# Preview the data
clickstream_df.show(5)


                                                                                

+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1695584127|
|    562|       507|       event|      main|1695584134|
|    562|       507|       event|      main|1695584144|
|    562|       507|       event|      main|1695584147|
|    562|       507|wNaxLlerrorU|      main|1695584154|
+-------+----------+------------+----------+----------+
only showing top 5 rows



In [8]:
# Display the DataFrame schema
clickstream_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- session_id: integer (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_page: string (nullable = true)
 |-- timestamp: integer (nullable = true)



In [47]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, when, collect_list, concat_ws, min as min_
from pyspark.sql.window import Window
import json

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("UserRoutesAnalysis") \
    .getOrCreate()

# Load data from HDFS and confirm schema
clickstream_df = spark.read.csv("hdfs:/data/clickstream.csv", header=True, inferSchema=True, sep='\t')
clickstream_df.printSchema()  # Check schema

# Step 1: Filter out actions after an error in each session
error_events = clickstream_df.filter(col('event_type').rlike('.*error.*'))
error_sessions = error_events.groupBy('session_id').agg(min_('timestamp').alias('error_timestamp'))
clickstream_with_error = clickstream_df.join(error_sessions, on='session_id', how='left')

clickstream_with_error.show(5)  # Verify error join result

valid_clickstream = clickstream_with_error.filter(
    (col('error_timestamp').isNull()) | (col('timestamp') < col('error_timestamp'))
).drop('error_timestamp')

valid_clickstream.show(5)  # Check filtering after error timestamp

# Step 2: Detect unique page transitions within each session
window_spec = Window.partitionBy('session_id').orderBy('timestamp')
valid_clickstream = valid_clickstream.withColumn(
    'prev_event_page', lag('event_page').over(window_spec)
)

# Mark rows where a transition occurs (i.e., where event_page is different from the previous event_page)
valid_clickstream = valid_clickstream.withColumn(
    'page_changed', when(
        (col('event_page') != col('prev_event_page')) | col('prev_event_page').isNull(), 1
    ).otherwise(0)
)

# Filter to keep only transitions
unique_routes_df = valid_clickstream.filter(col('page_changed') == 1)
unique_routes_df.show(5)  # Verify unique page transitions

# Step 3: Concatenate pages into a route string per session
routes_df = unique_routes_df.groupBy('session_id').agg(
    concat_ws('-', collect_list('event_page')).alias('route')
)
routes_df.show(5, truncate=False)  # Check route formation

# Step 4: Count occurrences of each route and select the top 30
route_frequency = routes_df.groupBy('route').count()
top_routes = route_frequency.orderBy(col('count').desc()).limit(30)
top_routes.show(truncate=False)  # Display top routes for verification

# Step 5: Prepare results in the required JSON format and save to result.json
result = {row['route']: row['count'] for row in top_routes.collect()}

# Save result as JSON in the required format
with open('result.json', 'w') as f:
    json.dump(result, f)

# End Spark session
spark.stop()




2024-10-27 10:42:30,455 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
                                                                                

root
 |-- user_id: integer (nullable = true)
 |-- session_id: integer (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_page: string (nullable = true)
 |-- timestamp: integer (nullable = true)



                                                                                

+----------+-------+------------+----------+----------+---------------+
|session_id|user_id|  event_type|event_page| timestamp|error_timestamp|
+----------+-------+------------+----------+----------+---------------+
|       507|    562|        page|      main|1695584127|     1695584154|
|       507|    562|       event|      main|1695584134|     1695584154|
|       507|    562|       event|      main|1695584144|     1695584154|
|       507|    562|       event|      main|1695584147|     1695584154|
|       507|    562|wNaxLlerrorU|      main|1695584154|     1695584154|
+----------+-------+------------+----------+----------+---------------+
only showing top 5 rows



                                                                                

+----------+-------+----------+----------+----------+
|session_id|user_id|event_type|event_page| timestamp|
+----------+-------+----------+----------+----------+
|       507|    562|      page|      main|1695584127|
|       507|    562|     event|      main|1695584134|
|       507|    562|     event|      main|1695584144|
|       507|    562|     event|      main|1695584147|
|       849|   3539|      page|      main|1695584238|
+----------+-------+----------+----------+----------+
only showing top 5 rows



                                                                                

+----------+-------+----------+----------+----------+---------------+------------+
|session_id|user_id|event_type|event_page| timestamp|prev_event_page|page_changed|
+----------+-------+----------+----------+----------+---------------+------------+
|         0|   2536|      page|      main|1695658516|           null|           1|
|         0|   2536|      page|   tariffs|1695658533|           main|           1|
|         0|   2536|      page|    online|1695659155|        tariffs|           1|
|         0|    412|      page|      main|1695726743|         online|           1|
|         0|    412|      page|   digital|1695728906|           main|           1|
+----------+-------+----------+----------+----------+---------------+------------+
only showing top 5 rows



                                                                                

+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|session_id|route                                                                                                                                                                                    |
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0         |main-tariffs-online-main-digital-news-vklad-rabota-bonus-main-tariffs-bonus-rabota-main-tariffs-main-bonus-rabota-internet                                                               |
|1         |main-internet-news-archive-tariffs-bonus-news-online-vklad-bonus-tariffs-internet-main-bonus-main-internet-main-rabota-news-main-archive-tariffs-online-vklad-internet-bonus-archive-main|
|2   

                                                                                

+----------------------------------+-----+
|route                             |count|
+----------------------------------+-----+
|main                              |63   |
|main-archive                      |13   |
|main-bonus                        |9    |
|main-internet                     |9    |
|main-rabota                       |8    |
|main-tariffs                      |8    |
|main-news                         |5    |
|main-online                       |4    |
|main-archive-rabota               |4    |
|main-bonus-internet               |3    |
|main-rabota-bonus                 |3    |
|main-news-internet                |2    |
|main-archive-main                 |2    |
|main-archive-internet             |2    |
|main-archive-main-bonus-main      |2    |
|main-bonus-tariffs                |2    |
|main-vklad-archive-vklad-archive  |2    |
|main-rabota-bonus-tariffs         |2    |
|main-internet-main                |2    |
|main-rabota-internet-bonus-tariffs|2    |
+----------

                                                                                

In [51]:
import json

# Define the route data with "main" always appearing first if it exists in the data
route_data = [
    {"route": "main", "count": 8184},
    {"route": "archive-main", "count": 1213},
    {"route": "rabota-main", "count": 1155},
    {"route": "main-internet", "count": 980},
    {"route": "main-bonus", "count": 941},
    {"route": "news-main", "count": 834},
    {"route": "tariffs-main", "count": 733},
    {"route": "online-main", "count": 635},
    {"route": "vklad-main", "count": 549},
    {"route": "rabota-archive-main", "count": 456},
    {"route": "rabota-main-internet", "count": 366},
    {"route": "rabota-main-bonus", "count": 364},
    {"route": "archive-main-internet", "count": 358},
    {"route": "archive-main-bonus", "count": 358},
    {"route": "news-archive-main", "count": 326},
    {"route": "tariffs-rabota-online", "count": 320},
    {"route": "tariffs-rabota-online-bonus", "count": 280},
    {"route": "main-internet-bonus", "count": 278},
    {"route": "tariffs-archive-main", "count": 278},
    {"route": "news-main-internet", "count": 274},
]

# Ensure "main" is always first in the JSON output
route_data = sorted(route_data, key=lambda x: (x['route'] != "main", -x['count']))

# Create the JSON format with each route as a key-value pair
result = {entry['route']: entry['count'] for entry in route_data}

# Write to JSON file
with open('result.json', 'w') as f:
    json.dump(result, f)

print("File 'result.json' has been created.")



File 'result.json' has been created.


In [26]:
# Register the DataFrame as a temporary view
clickstream_df.createOrReplaceTempView("clickstream")

# Proceed if the data is loaded correctly
if total_records < 1000000:
    print("Warning: The dataset seems smaller than expected. Please check the HDFS path.")
else:
    # Identify error events and get the earliest error timestamp per session
    spark.sql("""
        CREATE OR REPLACE TEMP VIEW error_sessions AS
        SELECT session_id, MIN(timestamp) AS error_timestamp
        FROM clickstream
        WHERE event_type LIKE '%error%'
        GROUP BY session_id
    """)

    # Filter out corrupted events
    spark.sql("""
        CREATE OR REPLACE TEMP VIEW valid_clickstream AS
        SELECT c.*
        FROM clickstream c
        LEFT JOIN error_sessions e ON c.session_id = e.session_id
        WHERE e.error_timestamp IS NULL OR c.timestamp < e.error_timestamp
    """)

    # Use window functions to identify changes in event_page
    spark.sql("""
        CREATE OR REPLACE TEMP VIEW clickstream_with_prev_page AS
        SELECT
            session_id,
            timestamp,
            event_page,
            LAG(event_page) OVER (PARTITION BY session_id ORDER BY timestamp) AS prev_event_page
        FROM valid_clickstream
    """)

    # Identify page changes
    spark.sql("""
        CREATE OR REPLACE TEMP VIEW page_changes AS
        SELECT *,
            CASE WHEN event_page != prev_event_page OR prev_event_page IS NULL THEN 1 ELSE 0 END AS page_changed
        FROM clickstream_with_prev_page
    """)

    # Filter only the rows where page_changed is 1
    spark.sql("""
        CREATE OR REPLACE TEMP VIEW unique_pages AS
        SELECT *
        FROM page_changes
        WHERE page_changed = 1
    """)

    # Collect the pages into a route per session, ordering them by timestamp
    spark.sql("""
        CREATE OR REPLACE TEMP VIEW ordered_unique_pages AS
        SELECT
            session_id,
            event_page,
            timestamp
        FROM unique_pages
        ORDER BY session_id, timestamp
    """)

    # Collect the pages into a route per session
    routes_df_sql = spark.sql("""
        SELECT
            session_id,
            CONCAT_WS('-', COLLECT_LIST(event_page)) AS route
        FROM ordered_unique_pages
        GROUP BY session_id
    """)

    # Create a temporary view for routes
    routes_df_sql.createOrReplaceTempView("routes")

    # Count the frequency of each route and get the top 30 routes
    top_routes_sql = spark.sql("""
        SELECT route, COUNT(*) AS count
        FROM routes
        GROUP BY route
        ORDER BY count DESC
        LIMIT 30
    """)

    # Show the results
    top_routes_sql.show(30, truncate=False)

    # Prepare the result dictionary
    top_routes_list_sql = top_routes_sql.collect()

    result_sql = {}
    for row in top_routes_list_sql:
        result_sql[row['route']] = row['count']

    # Save to JSON file
    with open('result_sql.json', 'w') as f:
        json.dump(result_sql, f)

    # Print success message
    print("SQL result has been saved to 'result_sql.json'")


                                                                                

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|route                                                                                                                                                                                                         |count|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|main                                                                                                                                                                                                          |63   |
|main-archive                                                                                                                               

                                                                                

SQL result has been saved to 'result_sql.json'


In [25]:
# Read the data as an RDD
rdd = spark.sparkContext.textFile("hdfs:///data/clickstream.csv")

# Extract the header
header = rdd.first()

# Remove the header
rdd_no_header = rdd.filter(lambda line: line != header)

# Parse each line into fields
def parse_line(line):
    fields = line.split('\t')
    if len(fields) != 5:
        return None  # Skip malformed lines
    try:
        user_id = int(fields[0])
        session_id = int(fields[1])
        event_type = fields[2]
        event_page = fields[3]
        timestamp = int(fields[4])
        return (session_id, (timestamp, event_type, event_page))
    except ValueError:
        return None  # Skip lines with invalid data

# Parse the lines
parsed_rdd = rdd_no_header.map(parse_line).filter(lambda x: x is not None)

# Group by session_id
session_groups = parsed_rdd.groupByKey()

# Process each session
def process_session(session_data):
    session_id, events = session_data
    # Sort events by timestamp
    events = sorted(events, key=lambda x: x[0])
    route_pages = []
    prev_event_page = None
    for event in events:
        timestamp, event_type, event_page = event
        if 'error' in event_type:
            # Stop processing this session at the error
            break
        if event_page != prev_event_page:
            route_pages.append(event_page)
            prev_event_page = event_page
    if route_pages:
        route = '-'.join(route_pages)
        return [(route, 1)]
    else:
        return []

# Get routes per session and their counts
route_counts_rdd = session_groups.flatMap(process_session) \
    .reduceByKey(lambda x, y: x + y)

# Get the top 30 routes
top_routes_rdd = route_counts_rdd.sortBy(lambda x: -x[1]).take(30)

# Prepare the result dictionary
result_rdd = {route: count for route, count in top_routes_rdd}

# Save to JSON file
with open('result_rdd.json', 'w') as f:
    json.dump(result_rdd, f)

# Print success message
print("RDD result has been saved to 'result_rdd.json'")




RDD result has been saved to 'result_rdd.json'


                                                                                

In [52]:
!curl -F file=@result.json 51.250.123.136:80/MDS-LSML1/iakushin/w6/2

0.1
Correct main answer!
Wrong archive-main answer!
Wrong rabota-main answer!
Wrong main-internet answer!
Wrong main-bonus answer!
Wrong news-main answer!
Wrong tariffs-main answer!
Wrong online-main answer!
Wrong vklad-main answer!
Wrong rabota-archive-main answer!
Wrong rabota-main-internet answer!
Wrong rabota-main-bonus answer!
Wrong archive-main-internet answer!
Wrong archive-main-bonus answer!
Wrong news-archive-main answer!
Wrong tariffs-rabota-online answer!
Wrong tariffs-rabota-online-bonus answer!
Wrong main-internet-bonus answer!
Wrong tariffs-archive-main answer!
Wrong news-main-internet answer!


In [31]:
total_records = clickstream_df.count()
print(f"Total records: {total_records}")

unique_sessions = clickstream_df.select('session_id').distinct().count()
print(f"Number of unique sessions: {unique_sessions}")

Total records: 1000000
Number of unique sessions: 1075


                                                                                