# Question 3

> Assumption: The required output format is `tsv`, matching the input format.


Project Setup

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("question_3").config("spark.driver.memory", "4g").getOrCreate()

PART_A_OUTPUT = "./output/part_a"
PART_B_OUTPUT = "./output/part_b"
PART_C_OUTPUT = "./output/part_c"

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/07 17:24:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Initial analysis of tsv file

In [2]:
track_path = "../../data/userid-timestamp-artid-artname-traid-traname.tsv"
track_df = spark.read.csv(track_path, sep=r"\t", header=False, inferSchema=True)
track_df = track_df.withColumnsRenamed(
    {
        "_c0": "user_id",
        "_c1": "timestamp",
        "_c2": "artist_id",
        "_c3": "artist_name",
        "_c4": "track_id",
        "_c5": "track_name",
    }
)
track_df.show(5, truncate=False)
track_df.printSchema()
track_df.describe().show(truncate=False)

                                                                                

+-----------+-------------------+------------------------------------+-----------+--------+------------------------------------------+
|user_id    |timestamp          |artist_id                           |artist_name|track_id|track_name                                |
+-----------+-------------------+------------------------------------+-----------+--------+------------------------------------------+
|user_000001|2009-05-05 00:08:57|f1b1cf71-bd35-4e99-8624-24a6e15f133a|Deep Dish  |null    |Fuck Me Im Famous (Pacha Ibiza)-09-28-2007|
|user_000001|2009-05-04 14:54:10|a7f7df4a-77d8-4f12-8acd-5c60c93f4de8|坂本龍一   |null    |Composition 0919 (Live_2009_4_15)         |
|user_000001|2009-05-04 14:52:04|a7f7df4a-77d8-4f12-8acd-5c60c93f4de8|坂本龍一   |null    |Mc2 (Live_2009_4_15)                      |
|user_000001|2009-05-04 14:42:52|a7f7df4a-77d8-4f12-8acd-5c60c93f4de8|坂本龍一   |null    |Hibari (Live_2009_4_15)                   |
|user_000001|2009-05-04 14:42:11|a7f7df4a-77d8-4f12-8acd-5c60c93f4d

23/07/07 17:24:39 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+-----------+------------------------------------+-----------------------------+------------------------------------+-----------------------------+
|summary|user_id    |artist_id                           |artist_name                  |track_id                            |track_name                   |
+-------+-----------+------------------------------------+-----------------------------+------------------------------------+-----------------------------+
|count  |19150868   |18548702                            |19150868                     |16982280                            |19150867                     |
|mean   |null       |null                                |Infinity                     |null                                |NaN                          |
|stddev |null       |null                                |NaN                          |null                                |NaN                          |
|min    |user_000001|00010eb3-ebfe-4965-81ef-0ac64cd49fde|! Euro

                                                                                

## Dataframe cleanup
The previous describe shows different count values across the columns indicating null values.
Lets remove rows with any null values, providing a complete dataset for analysis.

In [3]:
track_df = track_df.na.drop()
track_df.describe().show(truncate=False)



+-------+-----------+------------------------------------+-------------+------------------------------------+-----------------+
|summary|user_id    |artist_id                           |artist_name  |track_id                            |track_name       |
+-------+-----------+------------------------------------+-------------+------------------------------------+-----------------+
|count  |16982280   |16982280                            |16982280     |16982280                            |16982280         |
|mean   |null       |null                                |Infinity     |null                                |NaN              |
|stddev |null       |null                                |NaN          |null                                |NaN              |
|min    |user_000001|00011101-560b-4c98-8cec-60b545a160b5|!Action Pact!|00000891-ca9c-490c-9fae-fff04957c9ef|!                |
|max    |user_001000|fffed9ff-98c6-458a-8379-47e7fb4ba6ec|푸른새벽     |ffffff64-4a90-4350-9281-c9dc10aa9d30

                                                                                

## Part A
"Create a list of user IDs, along with the number of distinct songs each user has played."

An aggregation on `user_id` and count distinct `track_id`.

In [4]:
distinct_songs_per_user_df = track_df.groupBy("user_id")\
    .agg(f.countDistinct("track_id").alias("distinct_song_count"))\
    .orderBy(f.col("distinct_song_count").desc())

distinct_songs_per_user_df.show(5)

# Export the dataframe to a single tsv file
distinct_songs_per_user_df.coalesce(1)\
    .write\
    .option("sep", "\t")\
    .option("encoding", "UTF-8")\
    .csv(PART_A_OUTPUT)

                                                                                

+-----------+-------------------+
|    user_id|distinct_song_count|
+-----------+-------------------+
|user_000691|              59850|
|user_000861|              43860|
|user_000681|              36746|
|user_000800|              31872|
|user_000774|              29997|
+-----------+-------------------+
only showing top 5 rows



                                                                                

## Part B
"Create a list of the 100 most popular songs (artist and title) in the dataset, with the number
of times each was played."

A `group_by` on `artist_name` and `track_name` with an aggregate count of all rows, ordered in descending order and limited to the top 100.

In [5]:

popular_songs_df = track_df.select("artist_name", "track_name")\
    .groupBy("artist_name", "track_name")\
    .agg(f.count("*").alias("count"))\
    .orderBy(f.col("count").desc())\
    .limit(100)

popular_songs_df.show(5)

# Export the dataframe to a single tsv file
popular_songs_df.coalesce(1)\
    .write\
    .option("sep", "\t")\
    .option("encoding", "UTF-8")\
    .csv(PART_B_OUTPUT)


                                                                                

+-------------------+--------------------+-----+
|        artist_name|          track_name|count|
+-------------------+--------------------+-----+
| The Postal Service|  Such Great Heights| 3992|
|       Boy Division|Love Will Tear Us...| 3663|
|          Radiohead|        Karma Police| 3534|
|               Muse|Supermassive Blac...| 3483|
|Death Cab For Cutie|     Soul Meets Body| 3479|
+-------------------+--------------------+-----+
only showing top 5 rows



                                                                                

## Part C
"Say we define a user’s “session” of Last.fm usage to be comprised of one or more songs
played by that user, where each song is started within 20 minutes of the previous song’s
start time.
Create a list of the top 10 longest sessions (by elapsed time), with the following information
about each session: userid, timestamp of first and last songs in the session, and the list of
songs played in the session (in order of play)."

This query required the use of a window function to group user sessions while retaining the other required fields.
I've provided comments below describing individual commands, I'll also summarise here.

1. Determine the time difference between x played track and x-1 played track utilising a window function to encapsulate individual users.
2. Label the start track of a new session using a session flag where the previous track was played more than 20 minutes ago.
3. Add a new column tracking the session_id (per user) that increments for every session flag using `sum`.
4. Aggregate dataframe to return user sessions with start and end times and songs played.
5. Calculate the elapsed time for a session and order by this value in descending order, limiting to the top 10 longest sessions.


In [6]:
SESSION_DURATION_S = 1200

# Limit the dataframe to only the required columns aiming to increase performance.
session_df = track_df.select("user_id", "timestamp", "track_name")

# Specify the window for grouping users
window_spec = Window.partitionBy("user_id").orderBy("timestamp")

# Calculate the time difference between current and previous row timestamp in seconds
session_df = session_df.withColumn("time_diff_seconds", f.col("timestamp").cast("long") - f.lag(f.col("timestamp").cast("long"), 1).over(window_spec))

# Calculate the session flag (1 if new session, 0 otherwise)
session_flag_calculator = f.when((f.col("time_diff_seconds").isNull()) | (f.col("time_diff_seconds") > SESSION_DURATION_S), 1).otherwise(0)
session_df = session_df.withColumn("session_flag", session_flag_calculator)

# Calculate the session ID by summing the session flags for user window
session_df = session_df.withColumn("session_id", f.sum("session_flag").over(window_spec))

# Remove unnecessary columns
session_df = session_df.drop("session_flag")

# Return a new dataframe of user sessions with start and end times and songs played grouped by user and session
longest_sessions_df = session_df.groupBy("user_id", "session_id").agg(
    f.min("timestamp").alias("session_start_time"),
    f.max("timestamp").alias("session_end_time"),
    f.collect_list(f.col("track_name")).alias("songs_played")
)

# Calculate the elapsed time for each user session
longest_sessions_df = longest_sessions_df.withColumn("elapsed_time", (f.col("session_end_time").cast("long") - f.col("session_start_time").cast("long")))

# Sort the sessions by elapsed time in descending order and select the top 10 longest sessions
longest_sessions_df = longest_sessions_df.orderBy(f.col("elapsed_time").desc()).limit(10)

# Due to the songs_played column being an array, we need to convert it to string to be able to write it to a tsv file.
def array_to_string(my_list):
    return '[' + ','.join([str(elem) for elem in my_list]) + ']'

array_to_string_udf = f.udf(array_to_string, StringType())
longest_sessions_df = longest_sessions_df.withColumn('songs_played', array_to_string_udf(longest_sessions_df["songs_played"]))

# Remove unnecessary columns
longest_sessions_df = longest_sessions_df.drop("elapsed_time", "session_id")

longest_sessions_df.show(5)

# Export the dataframe to a single tsv file
longest_sessions_df.coalesce(1)\
    .write\
    .option("sep", "\t")\
    .option("encoding", "UTF-8")\
    .csv(PART_C_OUTPUT)


                                                                                

+-----------+-------------------+-------------------+--------------------+
|    user_id| session_start_time|   session_end_time|        songs_played|
+-----------+-------------------+-------------------+--------------------+
|user_000949|2006-02-12 17:49:31|2006-02-27 11:29:37|[Chained To You,T...|
|user_000949|2007-05-01 03:41:15|2007-05-14 01:05:52|[White Daisy Pass...|
|user_000949|2005-12-09 08:26:38|2005-12-18 04:40:04|[Neighborhood #2 ...|
|user_000949|2005-11-11 03:30:37|2005-11-18 22:47:26|[Excuse Me Miss A...|
|user_000949|2006-03-18 23:04:14|2006-03-26 19:13:45|[Disco Science,He...|
+-----------+-------------------+-------------------+--------------------+
only showing top 5 rows



                                                                                