# User routes on the site
## Description
**Clickstream** is a sequence of user actions on a website. It allows you to understand how users interact with the site. In this task, you need to find the most frequent custom routes.

## Input data
Input data is а table with clickstream data in file `hdfs:/data/clickstream.csv`.

### Table structure
* `user_id (int)` - Unique user identifier.
* `session_id (int)` - Unique identifier for the user session. The user's session lasts until the identifier changes.
* `event_type (string)` - Event type from the list:
    * **page** - visit to the page
    * **event** - any action on the page
    * <b>&lt;custom&gt;</b> - string with any other type
* `event_type (string)` - Page on the site.
* `timestamp (int)` - Unix-timestamp of action.

### Browser errors
Errors can sometimes occur in the user's browser - after such an error appears, we can no longer trust the data of this session and all the following lines after the error or at the same time with it are considered corrupted and **should not be counted** in statistics.

When an error occurs on the page, a random string containing the word **error** will be written to the `event_type` field.

### Sample of user session
<pre>
+-------+----------+------------+----------+----------+
|user_id|session_id|  event_type|event_page| timestamp|
+-------+----------+------------+----------+----------+
|    562|       507|        page|      main|1620494781|
|    562|       507|       event|      main|1620494788|
|    562|       507|       event|      main|1620494798|
|    562|       507|        page|    family|1620494820|
|    562|       507|       event|    family|1620494828|
|    562|       507|        page|      main|1620494848|
|    562|       507|wNaxLlerrorU|      main|1620494865|
|    562|       507|       event|      main|1620494873|
|    562|       507|        page|      news|1620494875|
|    562|       507|        page|   tariffs|1620494876|
|    562|       507|       event|   tariffs|1620494884|
|    562|       514|        page|      main|1620728918|
|    562|       514|       event|      main|1620729174|
|    562|       514|        page|   archive|1620729674|
|    562|       514|        page|     bonus|1620729797|
|    562|       514|        page|   tariffs|1620731090|
|    562|       514|       event|   tariffs|1620731187|
+-------+----------+------------+----------+----------+
</pre>

#### Correct user routes for a given user:
* **Session 507**: main-family-main
* **Session 514**: main-archive-bonus-tariffs

Route elements are ordered by the time they appear in the clickstream, from earliest to latest.

The route must be accounted for completely before the end of the session or an error in the session.

## Task
You need to use the Spark SQL, Spark RDD and Spark DF interfaces to create a solution file, the lines of which contain **the 30 most frequent user routes** on the site.

Each line of the file should contain the `route` and `count` values **separated by tabs**, where:
* `route` - route on the site, consisting of pages separated by "-".
* `count` - the number of user sessions in which this route was.

The lines must be **ordered in descending order** of the `count` field.

## Criteria
You can get maximum of 3.5 points (final grade) for this assignment, depedning on the number of interface you manage to leverage. The criteria are as follows:

* 0.5 points – Spark SQL solution with 1 query
* 0.5 points – Spark SQL solution with <=2 queries
* 0.5 points – Spark RDD solution
* 0.5 points – Spark DF solution
* 0.5 points – your solution algorithm is relatively optimized, i.e.: no O^2 or O^3 complexities; appropriate object usage; no data leaks etc. This is evaluated by staff.
* 1 point – 1 on 1 screening session. During this session staff member can ask you questions regarding your solution logic, framework usage, questionable parts of your code etc. If your code is clean enough, the staff member can just ask you to solve a theoretical problem connected to Spark.


In [1]:
import findspark
findspark.init()

In [1]:
import pandas as pd

import sys
import itertools

import pyspark.sql.functions as F
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

In [2]:
ram_size = 6
num_cores = 6
spark = (
    SparkSession.builder.master(f"local[{num_cores}]")
    .config("spark.local.dir", "spark_tmp")
    .config("spark.driver.memory", f"{ram_size}g")
    .config("spark.driver.maxResultSize", f"{ram_size}g")
    .config("spark.storage.memoryFraction", "1")
    # Required for large column names.
    .config("spark.sql.debug.maxToStringFields", "2000")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.ui.port", "8082")
    # To opimize toPandas speed
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/21 20:15:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/21 20:15:31 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


Spark SQL solution with 1 query

In [162]:
df = pd.read_csv("clickstream.csv", sep='\t')

In [163]:
df = spark.createDataFrame(df)
df.registerTempTable('clickstream')



In [112]:
df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- session_id: long (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_page: string (nullable = true)
 |-- timestamp: long (nullable = true)



Spark SQL solution with <= 2 queries

In [117]:
df_single_query = """
WITH error_data AS (
    SELECT
        user_id,
        session_id,
        MIN(timestamp) AS error_time
    FROM
        clickstream
    WHERE
        event_type LIKE '%error%'
    GROUP BY
        user_id,
        session_id
),
unique_sessions AS (
    SELECT
        *,
        CONCAT(user_id, '_', session_id) AS unique_id
    FROM
        clickstream
),
ordered_sessions AS (
    SELECT
        *,
        ROW_NUMBER() OVER (PARTITION BY unique_id ORDER BY timestamp) AS row_num
    FROM
        unique_sessions
),
filtered_sessions AS (
    SELECT
        u.*,
        e.error_time
    FROM
        ordered_sessions u
    LEFT JOIN
        error_data e ON u.user_id = e.user_id AND u.session_id = e.session_id
    WHERE
        e.session_id IS NULL OR u.timestamp < e.error_time
),
distinct_pages AS (
    SELECT
        unique_id,
        event_page,
        LAG(event_page, 1) OVER (PARTITION BY unique_id ORDER BY timestamp) AS prev_page
    FROM
        filtered_sessions
),
no_duplicates AS (
    SELECT
        unique_id,
        event_page
    FROM
        distinct_pages
    WHERE
        (prev_page IS NULL OR event_page <> prev_page)
),
route_list AS (
    SELECT
        unique_id,
        COLLECT_LIST(event_page) AS pages
    FROM
        no_duplicates
    GROUP BY
        unique_id
),
route_counts AS (
    SELECT
        CONCAT_WS('-', pages) AS route,
        COUNT(*) AS count
    FROM
        route_list
    GROUP BY
        route
)
SELECT
    route,
    count
FROM
    route_counts
ORDER BY
    count DESC,
    route ASC
LIMIT 30;
"""

# Execute the query
result_df = spark.sql(df_single_query)

In [118]:
result_df.show()

24/01/21 23:17:31 WARN TaskSetManager: Stage 128 contains a task of very large size (13195 KiB). The maximum recommended task size is 1000 KiB.
[Stage 130:>                                                        (0 + 6) / 6]

+--------------------+-----+
|               route|count|
+--------------------+-----+
|                main| 8184|
|        main-archive| 1113|
|         main-rabota| 1047|
|       main-internet|  896|
|          main-bonus|  870|
|           main-news|  769|
|        main-tariffs|  677|
|         main-online|  587|
|          main-vklad|  518|
| main-rabota-archive|  170|
| main-archive-rabota|  167|
|  main-bonus-archive|  143|
|   main-rabota-bonus|  139|
|   main-bonus-rabota|  135|
|    main-news-rabota|  135|
|main-archive-inte...|  132|
|    main-rabota-news|  130|
|main-internet-rabota|  129|
|   main-archive-news|  126|
|main-rabota-internet|  124|
+--------------------+-----+
only showing top 20 rows



                                                                                

Spark RDD solution

In [None]:
def extract_routes(line):
    route = []
    if len(line) == 1:
        return line[0]
    route.append(line[0])
    for word in range(1, len(line)):
        if line[word] != 'error':
            if line[word] == route[-1]:
                continue
            route.append(line[word])
        else:
            break
    return "-".join(route)

In [170]:
mapped_rdd = df.rdd.map(lambda x: [x[0], x[1], x[2], 'error', x[4]] 
                                  if 'error' in str(x[2]).lower() 
                                  else [x[0], x[1], x[2], x[3], x[4]])

filtered_rdd = mapped_rdd.filter(lambda x: x[2] == 'page' or 'error' in x[2].lower())
paired_rdd = filtered_rdd.map(lambda x: [(x[0], x[1]), [x[3]]])
reduced_rdd = paired_rdd.reduceByKey(lambda x, y: x + y)

routes_rdd = reduced_rdd.map(lambda x: (extract_routes(x[1]), 1))

top_routes = routes_rdd.reduceByKey(lambda x, y: x + y)\
                      .sortBy(lambda x: x[1], ascending=False)\
                      .take(30)

24/01/21 23:56:51 WARN TaskSetManager: Stage 337 contains a task of very large size (13226 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Spark DF solution

In [134]:
df.createOrReplaceTempView("clickstream")

error_data = (
    df
    .filter(F.col("event_type").like("%error%"))
    .groupBy("user_id", "session_id")
    .agg(F.min("timestamp").alias("error_time"))
)

unique_sessions = (
    df
    .withColumn("unique_id", F.concat("user_id", F.lit("_"), "session_id"))
)

ordered_sessions = (
    unique_sessions
    .withColumn("row_num", F.row_number().over(Window.partitionBy("unique_id").orderBy("timestamp")))
)

filtered_sessions = (
    ordered_sessions
    .join(error_data, (ordered_sessions.user_id == error_data.user_id) & (ordered_sessions.session_id == error_data.session_id), "left_outer")
    .filter((F.col("error_time").isNull()) | (ordered_sessions.timestamp < F.col("error_time")))
)

distinct_pages = (
    filtered_sessions
    .select("unique_id", "event_page", F.lag("event_page").over(Window.partitionBy("unique_id").orderBy("timestamp")).alias("prev_page"))
)

no_duplicates = (
    distinct_pages
    .filter((F.col("prev_page").isNull()) | (F.col("event_page") != F.col("prev_page")))
    .select("unique_id", "event_page")
)

route_list = (
    no_duplicates
    .groupBy("unique_id")
    .agg(F.collect_list("event_page").alias("pages"))
)

route_counts = (
    route_list
    .groupBy(F.concat_ws("-", "pages").alias("route"))
    .agg(F.count("*").alias("count"))
)

result_df = (
    route_counts
    .select("route", "count")
    .orderBy(F.desc("count"), F.asc("route"))
    .limit(30)
)

# Show the result DataFrame
result_df.show()

24/01/21 23:28:14 WARN TaskSetManager: Stage 170 contains a task of very large size (13195 KiB). The maximum recommended task size is 1000 KiB.
[Stage 170:>                                                        (0 + 6) / 6]

+--------------------+-----+
|               route|count|
+--------------------+-----+
|                main| 8184|
|        main-archive| 1113|
|         main-rabota| 1047|
|       main-internet|  896|
|          main-bonus|  870|
|           main-news|  769|
|        main-tariffs|  677|
|         main-online|  587|
|          main-vklad|  518|
| main-rabota-archive|  170|
| main-archive-rabota|  167|
|  main-bonus-archive|  143|
|   main-rabota-bonus|  139|
|   main-bonus-rabota|  135|
|    main-news-rabota|  135|
|main-archive-inte...|  132|
|    main-rabota-news|  130|
|main-internet-rabota|  129|
|   main-archive-news|  126|
|main-rabota-internet|  124|
+--------------------+-----+
only showing top 20 rows



                                                                                