# User routes on the site
### Part 1. Spark DF solution

In [None]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window
import numpy as np
import json
from pyspark.sql.types import ArrayType, StringType
import sys
import os
np.random.seed(0)

spark = SparkSession.builder \
    .appName("Clickstream_DF") \
    .getOrCreate()

spark.sparkContext.setLogLevel("OFF")

# Loading data from HDFS
clickstream_data = spark.read.csv("hdfs:///data/clickstream.csv", header=True, inferSchema=True, sep='\t')

# Displaying the schema of the data
clickstream_data.printSchema()

# Displaying original data
print("Original data:")
clickstream_data.show(5)
print(clickstream_data.count())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024-10-24 10:00:10,160 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
                                                                                

root
 |-- user_id: integer (nullable = true)
 |-- session_id: integer (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_page: string (nullable = true)
 |-- timestamp: integer (nullable = true)

Original data:


                                                                                

+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1695584127|
|    562|       507|       event|      main|1695584134|
|    562|       507|       event|      main|1695584144|
|    562|       507|       event|      main|1695584147|
|    562|       507|wNaxLlerrorU|      main|1695584154|
+-------+----------+------------+----------+----------+
only showing top 5 rows



                                                                                

1000000


In [None]:
# Finding rows with errors
print("Searching for rows with errors")
error_dataframe = clickstream_data.filter(F.col('event_type').rlike(".*error.*"))
error_dataframe.show(5)

# For each combination of user_id and session_id, finding the minimum timestamp of the error
print("Finding the minimum timestamp of errors for each session (user_id + session_id)...")
error_min_timestamp = error_dataframe.groupBy('user_id', 'session_id').agg(F.min('timestamp').alias('min_error_timestamp'))
error_min_timestamp.show(5)

# Joining data with clickstream and filter out rows after the first error
print("Joining with main data and filtering events after the first error")
joined_dataframe = clickstream_data.join(error_min_timestamp, ['user_id', 'session_id'], 'left')
joined_dataframe.show(5)
print(joined_dataframe.count())

# Removing rows that occurred after the first error
filtered_dataframe = joined_dataframe.filter(
    (F.col('min_error_timestamp').isNull()) | (F.col('timestamp') < F.col('min_error_timestamp'))
)
filtered_dataframe = filtered_dataframe.drop('min_error_timestamp')

print("Data after filtering out rows after the first error:")
filtered_dataframe.show(5)
print(filtered_dataframe.count())

# Keeping only events of type 'page' as they are important for route construction
print("Filtering only events of type 'page'")
page_events_dataframe = filtered_dataframe.filter(F.col('event_type') == 'page')
page_events_dataframe.show(5)

# Function to remove consecutive duplicates
def remove_consecutive_duplicates(pages):
    if not pages:
        return pages
    result = [pages[0]]
    for page in pages[1:]:
        if page != result[-1]:
            result.append(page)
    return result

# Registering UDF for using in Spark
remove_consecutive_duplicates_udf = F.udf(remove_consecutive_duplicates, ArrayType(StringType()))

# Groupping events by user_id and session_id and ordering by timestamp
print("Grouping events by user_id and session_id, collecting pages in the correct order...")
window_specification = Window.partitionBy('user_id', 'session_id').orderBy('timestamp')

# Aggregating all pages for each session in the correct order
page_sequences_dataframe = page_events_dataframe \
    .withColumn('ordered_pages', F.collect_list('event_page').over(window_specification)) \
    .groupBy('user_id', 'session_id') \
    .agg(F.max('ordered_pages').alias('page_sequence'))

# Removing consecutive duplicates
page_sequences_dataframe = page_sequences_dataframe.withColumn('page_sequence', remove_consecutive_duplicates_udf(F.col('page_sequence')))
page_sequences_dataframe.show(5, truncate=False)

# Concatenating the list of pages into a string
print("Creating string routes through '-' for each session (user_id + session_id)")
page_sequences_dataframe = page_sequences_dataframe.withColumn('route', F.concat_ws("-", F.col('page_sequence')))
page_sequences_dataframe.show(5, truncate=False)

# Counting the number of unique routes
print("Counting sessions for each unique route")
route_counts_dataframe = page_sequences_dataframe.groupBy('route').count()
route_counts_dataframe.show(5, truncate=False)

# Sorting routes by descending frequency and select top-30
print("Sorting by count and selecting top-30 routes")
top_routes_dataframe = route_counts_dataframe.orderBy(F.desc('count')).limit(30)
top_routes_dataframe.show(30, truncate=False)


Searching for rows with errors


                                                                                

+-------+----------+-----------------+----------+----------+
|user_id|session_id|       event_type|event_page| timestamp|
+-------+----------+-----------------+----------+----------+
|    562|       507|     wNaxLlerrorU|      main|1695584154|
|   4567|       514|    mAXExoCXerror|      main|1695584351|
|    461|       174|uvjferrorYYwYlubX|  internet|1695584529|
|    844|       258|kfIpzqTUaerrorSQD|      main|1695584652|
|    461|       174|       iVerrornrA|      news|1695584698|
+-------+----------+-----------------+----------+----------+
only showing top 5 rows

Finding the minimum timestamp of errors for each session (user_id + session_id)...


                                                                                

+-------+----------+-------------------+
|user_id|session_id|min_error_timestamp|
+-------+----------+-------------------+
|   3513|        68|         1695623875|
|   4332|       766|         1695633583|
|   4757|       611|         1695653221|
|   2009|       827|         1695747863|
|   1731|       193|         1695798006|
+-------+----------+-------------------+
only showing top 5 rows

Joining with main data and filtering events after the first error


                                                                                

+-------+----------+------------+----------+----------+-------------------+
|user_id|session_id|  event_type|event_page| timestamp|min_error_timestamp|
+-------+----------+------------+----------+----------+-------------------+
|    562|       507|        page|      main|1695584127|         1695584154|
|    562|       507|       event|      main|1695584134|         1695584154|
|    562|       507|       event|      main|1695584144|         1695584154|
|    562|       507|       event|      main|1695584147|         1695584154|
|    562|       507|wNaxLlerrorU|      main|1695584154|         1695584154|
+-------+----------+------------+----------+----------+-------------------+
only showing top 5 rows



                                                                                

1000000
Data after filtering out rows after the first error:


                                                                                

+-------+----------+----------+----------+----------+
|user_id|session_id|event_type|event_page| timestamp|
+-------+----------+----------+----------+----------+
|   1889|       140|      page|      main|1695614937|
|   1889|       140|      page|  internet|1695614956|
|   1889|       140|     event|  internet|1695614971|
|   1889|       140|      page|   archive|1695614980|
|   1889|       140|     event|   archive|1695615006|
+-------+----------+----------+----------+----------+
only showing top 5 rows





698547
Filtering only events of type 'page'


                                                                                

+-------+----------+----------+----------+----------+
|user_id|session_id|event_type|event_page| timestamp|
+-------+----------+----------+----------+----------+
|   1889|       140|      page|      main|1695614937|
|   1889|       140|      page|  internet|1695614956|
|   1889|       140|      page|   archive|1695614980|
|   1889|       140|      page|      main|1695615032|
|   3513|        68|      page|      main|1695619122|
+-------+----------+----------+----------+----------+
only showing top 5 rows

Grouping events by user_id and session_id, collecting pages in the correct order...


                                                                                

+-------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|session_id|page_sequence                                                                                                                                           |
+-------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
|0      |874       |[main, rabota, online]                                                                                                                                  |
|0      |898       |[main, news, tariffs, rabota, bonus, tariffs, bonus, internet, news, tariffs, online, archive]                                                          |
|0      |901       |[main, internet, bonus, main, internet, vklad, main, rabota, online, main, internet, rabota, tariffs, bonus, o

                                                                                

+-------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------+
|user_id|session_id|page_sequence                                                                                                                                           |route                                                                                                                              |
+-------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------+
|0      |874       |[main, rabota, online]                                        

                                                                                

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|route                                                                                                                                                            |count|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|main-archive-internet-archive                                                                                                                                    |21   |
|main-archive-vklad-internet-main-archive-tariffs-rabota-bonus-archive-digital-rabota-news-internet-archive-main-news-main-news-bonus-vklad-internet-tariffs-bonus|1    |
|main-internet-bonus-main                                                                                                                             

[Stage 72:>                                                         (0 + 1) / 1]

+---------------------+-----+
|route                |count|
+---------------------+-----+
|main                 |8184 |
|main-archive         |1113 |
|main-rabota          |1047 |
|main-internet        |897  |
|main-bonus           |870  |
|main-news            |769  |
|main-tariffs         |677  |
|main-online          |587  |
|main-vklad           |518  |
|main-rabota-archive  |170  |
|main-archive-rabota  |167  |
|main-bonus-archive   |143  |
|main-rabota-bonus    |139  |
|main-bonus-rabota    |135  |
|main-news-rabota     |135  |
|main-archive-internet|132  |
|main-rabota-news     |130  |
|main-internet-rabota |129  |
|main-archive-news    |126  |
|main-rabota-internet |124  |
|main-internet-archive|123  |
|main-archive-bonus   |117  |
|main-internet-bonus  |115  |
|main-tariffs-internet|114  |
|main-news-archive    |113  |
|main-news-internet   |109  |
|main-archive-tariffs |104  |
|main-internet-news   |103  |
|main-tariffs-archive |103  |
|main-rabota-main     |94   |
+---------

                                                                                

In [None]:
# Writing to the file on HDFS
output_path = "hdfs:///output/top_routes_df.csv"
print(f"Writing to the CSV file: {output_path}")

top_routes_dataframe.write.mode('overwrite').csv(output_path, sep='\t', header=True)


Writing to the CSV file: hdfs:///output/top_routes_df.csv


                                                                                

In [None]:
import pandas as pd

top_routes_pd = top_routes_dataframe.toPandas()
output_csv_path = "top_routes_df.csv"

# Saving to file with tab delimiter
top_routes_pd.to_csv(output_csv_path, sep='\t', index=False, header=True)
spark.stop()


                                                                                

### Part 2. Spark SQL solution

In [None]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
import numpy as np
np.random.seed(0)

spark = SparkSession.builder \
    .appName("Clickstream_SQL") \
    .getOrCreate()

#spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.setLogLevel("OFF")

# Loading data from HDFS
clickstream_data = spark.read.csv("hdfs:///data/clickstream.csv", header=True, inferSchema=True, sep='\t')

print("Original data:")
clickstream_data.show(5)

clickstream_data.createOrReplaceTempView("clickstream")

                                                                                

Original data:


                                                                                

+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1695584127|
|    562|       507|       event|      main|1695584134|
|    562|       507|       event|      main|1695584144|
|    562|       507|       event|      main|1695584147|
|    562|       507|wNaxLlerrorU|      main|1695584154|
+-------+----------+------------+----------+----------+
only showing top 5 rows



In [None]:
#  Finding rows with errors, minimum error timestamps for each session, and filtering them
query1 = """
WITH error_min_timestamp AS (
    SELECT user_id, session_id, MIN(timestamp) AS min_error_timestamp
    FROM clickstream
    WHERE event_type LIKE '%error%'
    GROUP BY user_id, session_id
),
filtered_data AS (
    SELECT c.*
    FROM clickstream c
    LEFT JOIN error_min_timestamp e
    ON c.user_id = e.user_id AND c.session_id = e.session_id
    WHERE e.min_error_timestamp IS NULL OR c.timestamp < e.min_error_timestamp
),
page_events AS (
    SELECT
        user_id,
        session_id,
        event_page,
        LAG(event_page, 1) OVER (PARTITION BY user_id, session_id ORDER BY timestamp) AS previous_page
    FROM filtered_data
    WHERE event_type = 'page'
),
unique_page_events AS (
    SELECT user_id, session_id, event_page
    FROM page_events
    WHERE event_page != previous_page OR previous_page IS NULL
)
SELECT user_id, session_id, event_page
FROM unique_page_events
"""

# Executing the query
page_events_dataframe = spark.sql(query1)

print("Data after query 1:")
page_events_dataframe.show(5)

page_events_dataframe.createOrReplaceTempView("page_events")

# Collecting the list of pages for each session and counting unique routes
query2 = """
WITH page_sequences AS (
    SELECT user_id, session_id, COLLECT_LIST(event_page) AS page_sequence
    FROM page_events
    GROUP BY user_id, session_id
),
route_sequences AS (
    SELECT user_id, session_id, CONCAT_WS('-', page_sequence) AS route
    FROM page_sequences
)
SELECT route, COUNT(*) AS count
FROM route_sequences
GROUP BY route
ORDER BY count DESC
LIMIT 30
"""

# Executing the query
top_routes_dataframe = spark.sql(query2)

print("Top-30 unique routes:")
top_routes_dataframe.show(30, truncate=False)


Data after query 1:


                                                                                

+-------+----------+----------+
|user_id|session_id|event_page|
+-------+----------+----------+
|      0|       874|      main|
|      0|       874|    rabota|
|      0|       874|    online|
|      0|       898|      main|
|      0|       898|      news|
+-------+----------+----------+
only showing top 5 rows

Top-30 unique routes:


                                                                                

+---------------------+-----+
|route                |count|
+---------------------+-----+
|main                 |8184 |
|main-archive         |1113 |
|main-rabota          |1047 |
|main-internet        |897  |
|main-bonus           |870  |
|main-news            |769  |
|main-tariffs         |677  |
|main-online          |587  |
|main-vklad           |518  |
|main-rabota-archive  |170  |
|main-archive-rabota  |167  |
|main-bonus-archive   |143  |
|main-rabota-bonus    |139  |
|main-bonus-rabota    |135  |
|main-news-rabota     |135  |
|main-archive-internet|132  |
|main-rabota-news     |130  |
|main-internet-rabota |129  |
|main-archive-news    |126  |
|main-rabota-internet |124  |
|main-internet-archive|122  |
|main-archive-bonus   |117  |
|main-internet-bonus  |115  |
|main-tariffs-internet|114  |
|main-news-archive    |113  |
|main-news-internet   |109  |
|main-archive-tariffs |104  |
|main-internet-news   |103  |
|main-tariffs-archive |103  |
|main-rabota-main     |94   |
+---------

In [None]:
# Writing to the file on HDFS
output_path = "hdfs:///output/top_routes_sql.csv"
print(f"Writing to the file: {output_path}")
top_routes_dataframe.write.mode('overwrite').csv(output_path, sep='\t', header=True)


Writing to the file: hdfs:///output/top_routes_sql.csv


                                                                                

In [None]:
import pandas as pd

top_routes_pd = top_routes_dataframe.toPandas()
output_csv_path = "top_routes_sql.csv"

top_routes_pd.to_csv(output_csv_path, sep='\t', index=False, header=True)
spark.stop()

                                                                                

### Part 3. Spark RDD solution

In [None]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
import numpy as np
np.random.seed(0)

spark_context = SparkContext(appName="Clickstream_RDD")
spark = SparkSession.builder.appName("Clickstream_RDD").getOrCreate()

spark_context.setLogLevel("OFF")

# Loading data from HDFS
print("Loading data from HDFS")
clickstream_rdd = spark_context.textFile("hdfs:///data/clickstream.csv").map(lambda line: line.split('\t'))
header = clickstream_rdd.first()
clickstream_rdd = clickstream_rdd.filter(lambda row: row != header)

print(f"Total number of data rows: {clickstream_rdd.count()}")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024-10-24 11:03:10,516 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


Loading data from HDFS...


                                                                                

Total number of data rows: 1000000


In [None]:
# Filtering rows with errors
print("Filtering rows with errors")
error_rdd = clickstream_rdd.filter(lambda row: 'error' in row[2])
print(f"Errors found: {error_rdd.count()}")

# Finding the minimum error timestamp
print("Finding the minimum error timestamp for each session")
error_min_timestamp = error_rdd.map(lambda row: ((row[0], row[1]), int(row[4]))) \
                                .reduceByKey(min)
print(f"Number of unique user_id and session_id combinations with errors: {error_min_timestamp.count()}")

# Joining data and filtering events after the first error
print("Joining data and filtering events after the first error")
clickstream_with_errors = clickstream_rdd.map(lambda row: ((row[0], row[1]), (row[2], row[3], int(row[4]))))  # (user_id, session_id) -> (event_type, event_page, timestamp)
joined_rdd = clickstream_with_errors.leftOuterJoin(error_min_timestamp)

# Removing rows that occurred after the first error
filtered_rdd = joined_rdd.filter(lambda x: x[1][1] is None or x[1][0][2] < x[1][1])  # If min_error_timestamp is None or timestamp < min_error_timestamp
print(f"Number of rows after filtering: {filtered_rdd.count()}")

#  Keeping only 'page' events
print("Filtering only 'page' events")
page_events_rdd = filtered_rdd.filter(lambda x: x[1][0][0] == 'page')
print(f"Number of 'page' events: {page_events_rdd.count()}")

# Grouping events by user_id and session_id
print("Grouping events by user_id and session_id")
page_sequences = page_events_rdd.map(lambda x: ((x[0][0], x[0][1]), x[1][0][1]))  # ((user_id, session_id), event_page)
grouped_rdd = page_sequences.groupByKey()

# Removing consecutive duplicates
print("Removing consecutive duplicate pages")
def remove_consecutive_duplicates(pages):
    if not pages:
        return pages
    result = [pages[0]]
    for page in pages[1:]:
        if page != result[-1]:
            result.append(page)
    return result

# Applying the function to remove duplicates and create routes
routes_rdd = grouped_rdd.mapValues(lambda pages: '-'.join(remove_consecutive_duplicates(list(pages))))
print(f"Number of unique routes after removing duplicates: {routes_rdd.count()}")

# Counting the number of unique routes
print("Counting the number of sessions for each unique route")
route_counts = routes_rdd.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b)

# Sorting routes by descending frequency and selecting the top 30
print("Sorting routes by frequency and selecting the top 30")
top_routes = route_counts.takeOrdered(30, key=lambda x: -x[1])

print("Top 30 routes:")
for route in top_routes:
    print(route)

Filtering rows with errors


                                                                                

Errors found: 20899
Finding the minimum error timestamp for each session


                                                                                

Number of unique user_id and session_id combinations with errors: 14569
Joining data and filtering events after the first error


                                                                                

Number of rows after filtering: 698547
Filtering only 'page' events


                                                                                

Number of 'page' events: 294335
Grouping events by user_id and session_id
Removing consecutive duplicate pages


                                                                                

Number of unique routes after removing duplicates: 48522
Counting the number of sessions for each unique route
Sorting routes by frequency and selecting the top 30


                                                                                

Top 30 routes:
('main', 8184)
('main-archive', 1113)
('main-rabota', 1047)
('main-internet', 897)
('main-bonus', 870)
('main-news', 769)
('main-tariffs', 677)
('main-online', 587)
('main-vklad', 518)
('main-rabota-archive', 170)
('main-archive-rabota', 167)
('main-bonus-archive', 143)
('main-rabota-bonus', 139)
('main-news-rabota', 135)
('main-bonus-rabota', 135)
('main-archive-internet', 132)
('main-rabota-news', 130)
('main-internet-rabota', 129)
('main-archive-news', 126)
('main-rabota-internet', 124)
('main-internet-archive', 123)
('main-archive-bonus', 117)
('main-internet-bonus', 115)
('main-tariffs-internet', 114)
('main-news-archive', 113)
('main-news-internet', 109)
('main-archive-tariffs', 104)
('main-internet-news', 103)
('main-tariffs-archive', 103)
('main-rabota-main', 94)


In [None]:
# Writing to the file
from pyspark.sql import Row

top_routes_rows = [Row(route=route[0], count=route[1]) for route in top_routes]
top_routes_dataframe = spark.createDataFrame(top_routes_rows)

output_path = "hdfs:///output/top_routes_RDD.csv"
print(f"Writing to the file: {output_path}")
top_routes_dataframe.write.mode("overwrite").csv(output_path, sep='\t', header=True)


Writing to the file: hdfs:///output/top_routes_RDD.csv


                                                                                

In [None]:
import pandas as pd

top_routes_pd = top_routes_dataframe.toPandas()
output_csv_path = "top_routes_RDD.csv"

top_routes_pd.to_csv(output_csv_path, sep='\t', index=False, header=True)

spark.stop()

                                                                                