In [1]:
!pip uninstall -y pyspark

Found existing installation: pyspark 3.5.3
Uninstalling pyspark-3.5.3:
  Successfully uninstalled pyspark-3.5.3


In [2]:
!pip install pyspark==3.2.4

Collecting pyspark==3.2.4
  Downloading pyspark-3.2.4.tar.gz (281.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.5/281.5 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:02[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m842.3 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.2.4-py2.py3-none-any.whl size=282040922 sha256=6af5e52f8069ffec131b5a05cf529be1cda1a750924383f4afc97270498cf7d1
  Stored in directory: /home/jovyan/.cache/pip/wheels/e7/e3/c8/c358dac750f2b6a4b03328d10e05a5c69501664bd6504b6c3e
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existin

In [3]:
import pyspark
print(pyspark.__version__)

3.2.4


 # Spark SQL Solution

In [9]:
spark = SparkSession.builder.appName("UserRoutes").getOrCreate()

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import json
from collections import OrderedDict

df = spark.read.option("header", "true") \
    .option("sep", "\t") \
    .csv("hdfs:/data/clickstream.csv", inferSchema=True)
df.createOrReplaceTempView("df")

sql_query = """
WITH error_times AS (
    SELECT user_id, session_id, MIN(timestamp) AS error_timestamp
    FROM df
    WHERE event_type LIKE '%error%'
    GROUP BY user_id, session_id
),
filtered_events AS (
    SELECT df.user_id, df.session_id, df.event_type, df.event_page, df.timestamp
    FROM df
    LEFT JOIN error_times ON df.user_id = error_times.user_id AND df.session_id = error_times.session_id
    WHERE error_times.error_timestamp IS NULL OR df.timestamp < error_times.error_timestamp
),
page_events AS (
    SELECT
        user_id,
        session_id,
        event_page,
        timestamp,
        LAG(event_page) OVER (PARTITION BY user_id, session_id ORDER BY timestamp) AS prev_event_page
    FROM filtered_events
    WHERE event_type = 'page'
),
non_duplicate_pages AS (
    SELECT
        user_id,
        session_id,
        event_page,
        timestamp
    FROM page_events
    WHERE event_page != prev_event_page OR prev_event_page IS NULL
),
routes_per_session AS (
    SELECT
        user_id,
        session_id,
        CONCAT_WS('-', TRANSFORM(
            SORT_ARRAY(
                COLLECT_LIST(NAMED_STRUCT('timestamp', timestamp, 'event_page', event_page))
            ),
            x -> x.event_page
        )) AS route
    FROM non_duplicate_pages
    GROUP BY user_id, session_id
),
route_counts AS (
    SELECT route, COUNT(*) AS count
    FROM routes_per_session
    GROUP BY route
)
SELECT route, count
FROM route_counts
ORDER BY count DESC
LIMIT 10
"""

result_df = spark.sql(sql_query)
top_routes = result_df.collect()

# create a json, print and save to file
result_dict = OrderedDict()
for idx, row in enumerate(top_routes, start=1):    
    result_dict[row['route']] = row['count']
    
print(json.dumps(result_dict, indent=4))

with open('result.json', 'w') as f:
    json.dump(result_dict, f, indent=4)




{
    "main": 8184,
    "main-archive": 1112,
    "main-rabota": 1047,
    "main-internet": 896,
    "main-bonus": 870,
    "main-news": 769,
    "main-tariffs": 677,
    "main-online": 587,
    "main-vklad": 518,
    "main-rabota-archive": 170
}


                                                                                

# Spark DataFrame Solution

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import json
from collections import OrderedDict

spark = SparkSession.builder.appName("UserRoutes_DataFrame").getOrCreate()

df = spark.read.option("header", "true") \
    .option("sep", "\t") \
    .csv("hdfs:/data/clickstream.csv", inferSchema=True)

df = df.withColumn('is_error', df.event_type.contains('error'))

error_times = df.filter(col('is_error')) \
    .groupBy('user_id', 'session_id') \
    .agg(min('timestamp').alias('error_timestamp'))

df = df.join(error_times, on=['user_id', 'session_id'], how='left')

df_filtered = df.filter(
    col('error_timestamp').isNull() | (col('timestamp') < col('error_timestamp'))
)

page_events = df_filtered.filter(col('event_type') == 'page')

window_spec = Window.partitionBy('user_id', 'session_id').orderBy('timestamp')

page_events = page_events.withColumn(
    'prev_event_page',
    lag('event_page').over(window_spec)
)

non_duplicate_pages = page_events.filter(
    (col('event_page') != col('prev_event_page')) | col('prev_event_page').isNull()
)

routes_df = non_duplicate_pages.groupBy('user_id', 'session_id') \
    .agg(
        collect_list(
            struct(col('timestamp'), col('event_page'))
        ).alias('pages')
    )

routes_df = routes_df.withColumn(
    'sorted_pages',
    sort_array(col('pages'))
).withColumn(
    'route',
    concat_ws(
        '-',
        expr("transform(sorted_pages, x -> x.event_page)")
    )
)

route_counts = routes_df.groupBy('route').agg(count('*').alias('count'))

top_routes_df = route_counts.orderBy(desc('count')).limit(10)

top_routes = top_routes_df.collect()

# create a json, print and save to file
result_dict = OrderedDict()
for idx, row in enumerate(top_routes, start=1):    
    result_dict[row['route']] = row['count']
    
print(json.dumps(result_dict, indent=4))

with open('result.json', 'w') as f:
    json.dump(result_dict, f, indent=4)




{
    "main": 8184,
    "main-archive": 1112,
    "main-rabota": 1047,
    "main-internet": 896,
    "main-bonus": 870,
    "main-news": 769,
    "main-tariffs": 677,
    "main-online": 587,
    "main-vklad": 518,
    "main-rabota-archive": 170
}


                                                                                

# Spark RDD Solution

In [18]:
from pyspark.sql import SparkSession
from collections import OrderedDict
import json
import builtins  # Import builtins to access the built-in min function

spark = SparkSession.builder.appName("UserRoutes_RDD").getOrCreate()
sc = spark.sparkContext

df = spark.read.option("header", "true") \
    .option("sep", "\t") \
    .csv("hdfs:/data/clickstream.csv", inferSchema=True)

rdd = df.rdd

def map_row(row):
    user_id = row['user_id']
    session_id = row['session_id']
    event_type = row['event_type']
    event_page = row['event_page']
    timestamp = row['timestamp']
    is_error = 'error' in event_type.lower()
    return (user_id, session_id, event_type, event_page, timestamp, is_error)

parsed_rdd = rdd.map(map_row)

# Use builtins.min to ensure we're using the built-in min function
error_timestamps = parsed_rdd.filter(lambda x: x[5]) \
    .map(lambda x: ((x[0], x[1]), x[4])) \
    .reduceByKey(builtins.min)

session_events = parsed_rdd.map(lambda x: ((x[0], x[1]), x))

joined_rdd = session_events.leftOuterJoin(error_timestamps)

def filter_before_error(x):
    ((user_id, session_id), (event, error_timestamp)) = x
    timestamp = event[4]
    if error_timestamp is None:
        return True
    return timestamp < error_timestamp

filtered_rdd = joined_rdd.filter(filter_before_error)

page_events = filtered_rdd.filter(lambda x: x[1][0][2] == 'page')

def extract_pages(x):
    ((user_id, session_id), (event, _)) = x
    return ((user_id, session_id), [(event[4], event[3])])

session_pages = page_events.map(extract_pages).reduceByKey(lambda a, b: a + b)

def build_route(x):
    pages = sorted(x[1], key=lambda y: y[0])
    deduped_pages = []
    last_page = None
    for timestamp, page in pages:
        if page != last_page:
            deduped_pages.append(page)
            last_page = page
    route = '-'.join(deduped_pages)
    return (route, 1)

routes = session_pages.map(build_route)

route_counts = routes.reduceByKey(lambda a, b: a + b)

top_routes = route_counts.takeOrdered(10, key=lambda x: -x[1])

# create a json, print and save to file
result_dict = OrderedDict()
for route_str, frequency in top_routes:
    result_dict[route_str] = frequency
    
print(json.dumps(result_dict, indent=4))

with open('result.json', 'w') as f:
    json.dump(result_dict, f, indent=4)


                                                                                

{
    "main": 8184,
    "main-archive": 1113,
    "main-rabota": 1047,
    "main-internet": 897,
    "main-bonus": 870,
    "main-news": 769,
    "main-tariffs": 677,
    "main-online": 587,
    "main-vklad": 518,
    "main-rabota-archive": 170
}


# Submit

In [19]:
#!curl -F file=@result.json 51.250.123.136:80/MDS-LSML1/<>/w6/2

0.9999999999999999
Correct main answer!
Correct main-archive answer!
Correct main-rabota answer!
Correct main-internet answer!
Correct main-bonus answer!
Correct main-news answer!
Correct main-tariffs answer!
Correct main-online answer!
Correct main-vklad answer!
Correct main-rabota-archive answer!
