---
# INFO-H600 - Computing Foundations of Data Sciences

## Team 14 : 

Roman Lešický, Theo Abraham, Kevin Straatman, Lara Hansen, Grégoire Van den Eynde and Nicolas Roux

Version of python : 3.11.14 | packaged by conda-forge 

---

# Library:

###### The download environment.txt is present within the Github repository of the project https://github.com/RomanLesicky/Data_Science_Project_INFO_H600

### How path's are handled in this project:

In [1]:
from pathlib import Path # We are using the pathlib library for our paths 

# The way the code works is that we first locate the project's root
project_root = Path.cwd().resolve()

# Then we make a variable which hall be used as our data directory path which is sued for everyone in this project 
# For steps 2 till 5 included.

data_dir = project_root / "data" 

# Simple print for as a sanity check
print("Project root:", project_root)
print("Data dir:", data_dir)

Project root: /mnt/c/Users/larah/Documents/GitHub/Data_Science_Project_INFO_H600
Data dir: /mnt/c/Users/larah/Documents/GitHub/Data_Science_Project_INFO_H600/data


### Rest of the library:

In [2]:
import pyspark
from pyspark.sql import SparkSession, functions 
import matplotlib.pyplot as plt

#! To be continued ofc 

---

# Overview of the project :

# `WIP`

---

# Step 1:

`TBD`

# DO NOT FORGET TO DO THE DOCUMENTATION in the `PDF`

### 1.0 Set-up of the SparkSession

In [3]:
# Int this cell we initialise a SparkSession, which can be reused.
# An important part of this code is that indicating to Spark to run all available CPU cores, for each task utilizing Spark.
# Therefore, the use of the code has been warned that when they are running cell which are Spark related this will utilize their whole CPU.
# The reason for doing this is that it gives us parallelism without needing a proper cluster
# Additionally, for larger file size we are allocating 16gb for the heap for the driver and executors (JVM), 200 of shuffles is reasonable and works for larger files 

spark = (SparkSession.builder.appName("MillionPlaylistProject").master("local[*]").config("spark.driver.memory", "16g")      
    .config("spark.executor.memory", "16g").config("spark.driver.maxResultSize", "4g").config("spark.sql.shuffle.partitions", "200").getOrCreate())

spark  # Just for postery we display the session 

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/04 14:49:46 WARN Utils: Your hostname, LaraYoga, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/12/04 14:49:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/04 14:49:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### 1.1 Reading JSON slices into raw DataFrames using Spark

In [4]:
"""
This is an important part of the project which needs to be addressed. 

The question "How many slice files / slices do we want to read?" needs to be asked since this determined the trade-off between scalability and practical runtime.
Essentially, the answer to that question is having a sufficient amount of data that our metrics will be good whilst not calculating until forever. 

As a group we have decide to hardcode the value 5 for demonstration purposes, this means that we shall ony use mpd.slice.0 to 4999 so only about 5k playlists. 
The sole reason for this specific value is that it small enough to run very fast and yet demonstrate that the pipeline works. 
Additionally the use can adapt this number via the global variable `NUMBER_OF_SLICES`, but they shall keep in mind the that they are using all the cores of their CPU for this. 

That being said, for practical reasons which concern task's 3 and 4 (5 too) we shall use a dataset that contains 50 slices meaning 50 thousand playlists. 
This value does provides enough data to obtain stable aggregate statistics and similarity scores while keeping computation times manageable on a single machine.

Here we do not use a randomized method to chose the slices, since the data at hand is not ordered nor are we worried with a certain bias since we shall be using the 50k 
version for the actual metric determination. 

For context:

Running the 5k version uses only a couple of gb of ram, whilst using the 50k slice size takes about 14 gb of ram.

"""

# So this global variable is to be changed if one desires for a higher number of slices 
# This is very IMPORTANT this number must not be above 1000, since the database provides only a 1000 slices and could cause an Indexing error.
# If the user choses to put 1000 this represents the entire 32.1 Gb of data which is the full database.
NUMBER_OF_SLICES = 5

# This is the file path to the original Million Playlist Dataset to be used only in this Task, for the rest of the project the data_dir variable is meant to be used.
# This dataset will never be published to github since it's under the .gitignore file. 
data_dir_MGD_Original_Data = project_root / "data_Million_Playlist_Dataset" 

# Function to allow for numeric sorting and not str-related sorting, essentially this function will be used as the key parameter when using the sorted function

def slice_start_key_numeric(path: Path) -> int:
    """
    So, this function allows to extract the numeric "starting" index from filenames. 
    This allows to sort slices in the correct numeric order.
    """
    name = path.name # As an example, for mpd.slice.1000-1999.json
    
    # Here we will split to get the middle and starting string 
    the_numeric_value = name.split(".")[2] # this will split in function of "." so we get a list of ["mpd", "slice", "1000-1999", "json"] here we want 1000-1999 so [2]
    starting_string = the_numeric_value.split("-")[0] # here we split again and we also get a list but we only want the starting string so [0]
    
    # And we return the string into an integer to ger a proper numeric sorting 
    return int(starting_string) 

# Here we load all the "mpd.slice.*.json" type files which we sort in function of their numeric values  
# This creates a list of ordered Path object from all the MDP slice's.
all_slices_of_the_playlists = sorted(data_dir_MGD_Original_Data.glob("mpd.slice.*.json"),key=slice_start_key_numeric) # Here we do use the helper function slice_start_key_numeric as detailed above 

# We build a list of inputs paths for Spark, which we also use for printing the MPD files as a precaution 
input_paths = [str(path) for path in all_slices_of_the_playlists[:NUMBER_OF_SLICES]]

# The printing 
print(f"\nThe number of slices which was inputted by the user = {NUMBER_OF_SLICES}:")
for paths in input_paths:
    print("  ", paths)

# So, here we do start the actual PySpark pipeline where we read the selected slice files as a single Spark DataFrame.
# In here each of the files has the structure: {"info": {...}, "playlists": [ {...}, {...}, ... ]}
playlists_unedited_DataFrame = (spark.read.option("multiLine", True).json(input_paths))

# This means that we can directly parse nested multi-line JSON into a structured DataFrame, so we don’t need to write manual parsing code which would need to be done with Dask.


The number of slices which was inputted by the user = 5:


AnalysisException: [UNABLE_TO_INFER_SCHEMA] Unable to infer schema for JSON. It must be specified manually. SQLSTATE: 42KD9

### 1.2 Flattening pipeline 

# Add more infos here cuz this is a bit meager 

> Note: Since our pipeline doesn't run on the entire 32.1 GB but rather on a sub-sample of a couple of slices (50 max) we rely on Spark's built-in schema inference. If we processed all 35 GB or ran on a cluster, we would define an explicit StructType schema to avoid an extra pass over the data.

In [None]:
"""
This second step of Task 1 consists in a flattening pipeline: we take the slice-level JSON files, extract all playlists, 
and then transform them into a final table with one row per (playlist, track).

From the 1.1 point we ended up with `playlists_unedited_DataFrame` which is a single dataframe "shape" according to how many `NUMBER_OF_SLICES`
the user has chosen. The data that is stored here corresponds to one row per JSON slice file.

Where each row has:
- an `info` struct, and

- a `playlists` array, where each element is a playlist with its own fields (pid, name, num_tracks, tracks, ...) 

For the remainder of the Tasks in this project it is much more convenient to work with "flat" tables rather 
than nested arrays and structs. Therefore we shall be making two table:

1) The first one named playlist-level table, which contains: 

   - one row per playlist
   - 9 columns: pid, name,collaborative,modified_atm,num_tracks, num_albums, num_followers, duration_ms and tracks 
   - This last `tracks` column still stores, for each playlist, the full array of its track entries. We will use it in the next step 
     to create one row per (playlist, track), and it also lets us easily reconstruct the original order of tracks if needed.


2) And a second one, named playlist-track table, which consists of:

   - one row per (playlist, track)
   - 15 columns: same information as the previous one, but we do not have tracks but tracks_pos, track_uri, track_name, artist_uri, artist_name,
                 album_uri, album_name, track_duration_ms

Essentially, the objective of these 2 tables is that we have a "normalized" outlook on the original data that we downloaded. Where:

- `playlists_flat_DataFrame` is a "playlist type" table which mainly contains information about the Playlist themselves 
                             from which specific Playlist metrics can be derived 
                             
- `playlist_track_DataFrame` in the same spirit but for tracks related information 

This helps us to:
- compute aggregate statistics in Task 2, 
- building track similarity from co-occurrences in playlists (Task 3),
- building playlist similarity (Task 4),
- and make a playlist continuation model (Task 5)

As briefly discussed in the markdown cell for section 1.2, we rely on Spark's built-in schema inference for the JSON input. The library inspects the JSON 
files and automatically infers field names and data types (string, long, array, struct, ...). We opted for this approach rather than defining an explicit 
JSON schema because we are not processing the full 32.1 GB of data, and therefore we do not strictly need very fine-grained control over every data type.

We are aware that if this this project needed all the data then a JSON scheme like `StructType` would have been necessary. Which would avoids an
extra pass of schema inference, making the job more robust against slight variations in the input.

In our particular setting, the inferred schema is stable across slices (even for a 50k-playlist subset), so we accept this trade-off between convenience and strictness.
"""

# Firstly we build the `playlists_flat_DataFrame` table: we explode the `playlists` array so that each playlist in each slice becomes its own row,
# and then we select only the playlist-level fields we need (pid, name, counts, duration, tracks).

playlists_DataFrame = playlists_unedited_DataFrame.select(
    functions.explode("playlists").alias("playlist"))

# From each `playlist` struct, we will only select the "playlist" related fields and `tracks` too.

playlists_flat_DataFrame = playlists_DataFrame.select(
    functions.col("playlist.pid").alias("pid"),                 # unique playlist id
    functions.col("playlist.name").alias("name"),               # playlist name
    functions.col("playlist.collaborative").alias("collaborative"),
    functions.col("playlist.modified_at").alias("modified_at"), # last modification timestamp
    functions.col("playlist.num_tracks").alias("num_tracks"),   # number of tracks in the playlist
    functions.col("playlist.num_albums").alias("num_albums"),   # number of distinct albums
    functions.col("playlist.num_followers").alias("num_followers"),
    functions.col("playlist.duration_ms").alias("duration_ms"), # total duration of the playlist
    functions.col("playlist.tracks").alias("tracks")            # still an array of track structs
)

# Secondly, we build the `playlist_track_DataFrame` table in a similar spirit: we explode the `tracks` array inside each playlist 
# and then flatten the nested `track` struct into explicit columns so that each row corresponds to a single (playlist, track) pair with all its metadata.

playlist_track_DataFrame = playlists_flat_DataFrame.select(
    functions.col("pid"),
    functions.col("name").alias("playlist_name"),
    functions.col("num_tracks"),
    functions.col("num_albums"),
    functions.col("num_followers"),
    functions.col("modified_at"),
    functions.col("duration_ms").alias("playlist_duration_ms"),
    functions.explode("tracks").alias("track")   # so here we explode the tracks array to make one row per track
)

# Here the `track` column is still a struct (pos, track_uri, track_name, artist, album, ...), so it needs to be flattened into individual columns, 
# so that each row is a fully explicit (playlist, track) record with all relevant information.

playlist_track_DataFrame = playlist_track_DataFrame.select(
    functions.col("pid"),
    functions.col("playlist_name"),
    functions.col("num_tracks"),
    functions.col("num_albums"),
    functions.col("num_followers"),
    functions.col("modified_at"),
    functions.col("playlist_duration_ms"),

    functions.col("track.pos").alias("track_pos"),             # position of the track in the playlist
    functions.col("track.track_uri").alias("track_uri"),       # Spotify track URI
    functions.col("track.track_name").alias("track_name"),     # track title
    functions.col("track.artist_uri").alias("artist_uri"),     # Spotify artist URI
    functions.col("track.artist_name").alias("artist_name"),   # artist name
    functions.col("track.album_uri").alias("album_uri"),       # Spotify album URI
    functions.col("track.album_name").alias("album_name"),     # album name
    functions.col("track.duration_ms").alias("track_duration_ms")  # duration of the specific track
)


#### 1.2.1 Scheme of  `playlist-level table` and `playlist–track table`:


In [6]:
print("Schema of playlist-level table:" + "\n")
playlists_flat_DataFrame.printSchema()

print("\nExample of 5 playlists:")
playlists_flat_DataFrame.show(5, truncate=False)

Schema of playlist-level table:

root
 |-- pid: long (nullable = true)
 |-- name: string (nullable = true)
 |-- collaborative: string (nullable = true)
 |-- modified_at: long (nullable = true)
 |-- num_tracks: long (nullable = true)
 |-- num_albums: long (nullable = true)
 |-- num_followers: long (nullable = true)
 |-- duration_ms: long (nullable = true)
 |-- tracks: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- album_name: string (nullable = true)
 |    |    |-- album_uri: string (nullable = true)
 |    |    |-- artist_name: string (nullable = true)
 |    |    |-- artist_uri: string (nullable = true)
 |    |    |-- duration_ms: long (nullable = true)
 |    |    |-- pos: long (nullable = true)
 |    |    |-- track_name: string (nullable = true)
 |    |    |-- track_uri: string (nullable = true)


Example of 5 playlists:
+----+--------+-------------+-----------+----------+----------+-------------+-----------+--------------------------------------

In [7]:
print("\nSchema of playlist-track table:" + "\n")
playlist_track_DataFrame.printSchema()

print("\nExample of 5 playlist-track rows:")
playlist_track_DataFrame.show(5, truncate=False)


Schema of playlist-track table:

root
 |-- pid: long (nullable = true)
 |-- playlist_name: string (nullable = true)
 |-- num_tracks: long (nullable = true)
 |-- num_albums: long (nullable = true)
 |-- num_followers: long (nullable = true)
 |-- modified_at: long (nullable = true)
 |-- playlist_duration_ms: long (nullable = true)
 |-- track_pos: long (nullable = true)
 |-- track_uri: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- artist_uri: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- album_uri: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- track_duration_ms: long (nullable = true)


Example of 5 playlist-track rows:
+----+-------------+----------+----------+-------------+-----------+--------------------+---------+------------------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+---------------------------

### 1.3 Saving flattened DataFrames locally 

`To not run if one doesn't want to locally save the dataframes`

Additionally, to make this cell of code work the user needs to have winutils.exe and hadoop.dll installed locally. This can be found on this github page: 

- https://github.com/cdarlint/winutils

The version which was used for this project was hadoop-3.3.6.

In [None]:
"""
This third sub-part of Task 1 is is just able saving the flattened DataFrames for later Tasks so that we do not have to 
re-do the whole Step 1 pipeline more than once. This, pipeline is very swift for the 5k example but it gets more and more 
heavier when increasing `NUMBER_OF_SLICES`.
"""

# NUMBER_OF_SLICES is defined in Section 1.1

# Here we have a very simple if/else code to decide on the folder's name based on NUMBER_OF_SLICES
if NUMBER_OF_SLICES >= 1000: # if the user inputted 1000 than that corresponds to the Full playlist 
    folder_name = "Full_Playlist" 
else: # if they put another number we will add that specific number as the name of the folder 
    folder_name = f"{NUMBER_OF_SLICES}k_Playlists"

# Here we define the correct path 
post_task1_dir = project_root / "data_post_Task_1" / folder_name 

# Here we use mkdir to make it 
post_task1_dir.mkdir(parents=True, exist_ok=True)

# Here is where we save the exact DataFrames which are produced in sections 1.1 and 1.2:

# So we we will have `playlists_flat_DataFrame`, which contains one row per playlist and still includes the `tracks` array
# We could technically drop the tracks array since we do not need them in the playlist centered dataframe but to have exactly the 
# same dataframe as if we were running the cells seems like good coding practice. 
# And in the same manner for the  `playlist_track_DataFrame`, where we have one row per (playlist, track).

playlists_flat_out = post_task1_dir / "playlists_flat"
playlist_track_out = post_task1_dir / "playlist_track"

playlists_flat_DataFrame.write.mode("overwrite").parquet(str(playlists_flat_out))

playlist_track_DataFrame.write.mode("overwrite").parquet(str(playlist_track_out))
# Here we are in overwrite so that each time that this code cell is ran it over-writes what is already present, so the user should be cautious. 


# Just to make things very clear 
print("\nSaved playlist tables to:")
print(playlists_flat_out)
print(playlist_track_out)


Saved playlist tables to:
C:\Users\roman\Desktop\Master - ULB\2nd year\Q1\Intro Data Sc\Data_Science_Project_INFO_H600\data_post_Task_1\5k_Playlists\playlists_flat
C:\Users\roman\Desktop\Master - ULB\2nd year\Q1\Intro Data Sc\Data_Science_Project_INFO_H600\data_post_Task_1\5k_Playlists\playlist_track


----

# Step 2:

# Do the whole pipeline for 5k and 50k to see if there are any differences !!!

### 2.1 Loading processed data from Task 1:

---

# Step 3: Track-Track Co-Occurence Analysis
### Identify which tracks tend to appear together in playlists.

In [None]:
import sys
sys.executable

# note to self before beginning, set kernel to INFO_H600 env, open with .code in linux, and then run library and set up of spark session (step 1.0)

'/home/larah/miniconda3/envs/INFO_H600_Project/bin/python'

### 3.1 Locate directories, load parquet parts.

In [5]:
from pathlib import Path
import pandas as pd

project_root = Path.cwd().resolve()
data_root = project_root / "data" / "5k_playlists"

playlists_flat_path = data_root / "playlists_flat"
playlist_track_path = data_root / "playlist_track"

# Only pick the actual parquet part files, ignore .crc etc.
playlists_flat_parts = sorted(playlists_flat_path.glob("*.parquet"))
print("Number of playlists_flat part files:", len(playlists_flat_parts))

playlist_track_parts = sorted(playlist_track_path.glob("*.parquet"))
print("Number of playlist_track part files:", len(playlist_track_parts))

# Load and concatenate into a single pandas DataFrame
playlists_flat_pd = pd.concat(
    (pd.read_parquet(p) for p in playlists_flat_parts),
    ignore_index=True
)

playlist_track_pd = pd.concat(
    (pd.read_parquet(p) for p in playlist_track_parts),
    ignore_index=True
)

playlists_flat_pd.head()
playlist_track_pd.head()

# convert this into a spark dataframe
playlists_flat_DataFrame = spark.createDataFrame(playlists_flat_pd)
playlists_flat_DataFrame.printSchema()
playlists_flat_DataFrame.show(5, truncate=False)

playlist_track_DataFrame = spark.createDataFrame(playlist_track_pd)
playlist_track_DataFrame.printSchema()
playlist_track_DataFrame.show(5, truncate=False)



Number of playlists_flat part files: 5
Number of playlist_track part files: 5
root
 |-- pid: long (nullable = true)
 |-- name: string (nullable = true)
 |-- collaborative: string (nullable = true)
 |-- modified_at: long (nullable = true)
 |-- num_tracks: long (nullable = true)
 |-- num_albums: long (nullable = true)
 |-- num_followers: long (nullable = true)
 |-- duration_ms: long (nullable = true)



                                                                                

+----+--------+-------------+-----------+----------+----------+-------------+-----------+
|pid |name    |collaborative|modified_at|num_tracks|num_albums|num_followers|duration_ms|
+----+--------+-------------+-----------+----------+----------+-------------+-----------+
|4000|skate   |true         |1432252800 |70        |62        |1            |16539160   |
|4001|Work out|false        |1493510400 |12        |11        |1            |3053039    |
|4002|Study   |false        |1462665600 |19        |14        |2            |4419474    |
|4003|HOLA    |false        |1493424000 |104       |89        |2            |23594821   |
|4004|House   |false        |1508889600 |79        |70        |2            |30240074   |
+----+--------+-------------+-----------+----------+----------+-------------+-----------+
only showing top 5 rows
root
 |-- pid: long (nullable = true)
 |-- playlist_name: string (nullable = true)
 |-- num_tracks: long (nullable = true)
 |-- num_albums: long (nullable = true)
 |-

25/12/04 14:50:47 WARN TaskSetManager: Stage 1 contains a task of very large size (2372 KiB). The maximum recommended task size is 1000 KiB.


+----+-------------+----------+----------+-------------+-----------+--------------------+---------+------------------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+-----------------------------------------------------------------------------+-----------------+
|pid |playlist_name|num_tracks|num_albums|num_followers|modified_at|playlist_duration_ms|track_pos|track_uri                           |track_name                  |artist_uri                           |artist_name                 |album_uri                           |album_name                                                                   |track_duration_ms|
+----+-------------+----------+----------+-------------+-----------+--------------------+---------+------------------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------

### 3.2 Building the (Playlist, Track) relation and filtering tracks.
#### This block is preparing the core dataset needed for the co-occurence analysis by constructing a clean (pid, track_uri) relationand reducing it to a manageable subset of tracks.

In [None]:
from pyspark.sql import functions as F

# build (playlist, track) relation. Only the playlist ID and track URI are kept. 

track_in_playlists = playlist_track_DataFrame.select("pid", "track_uri").distinct()
track_in_playlists.show(5, truncate=False)

# per-track playlist counts

track_counts = (
    track_in_playlists
    .groupBy("track_uri")
    .agg(F.countDistinct("pid").alias("playlist_count"))
)

track_counts.show(5, truncate=False)

# Keep tracks appearing in >= 2 playlists, popular tracks) (so that it is more digestible for my laptop to run, and spark does not crash)

popular_tracks = (
    track_counts
    .filter(F.col("playlist_count") >= 2)
    .select("track_uri")
)

# Reconstruct the filtered incidence relation.
track_in_playlists_pop = (
    track_in_playlists
    .join(popular_tracks, on="track_uri")
    .cache()
)

track_in_playlists_pop.count()



25/12/04 14:52:27 WARN TaskSetManager: Stage 2 contains a task of very large size (2372 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+----+------------------------------------+
|pid |track_uri                           |
+----+------------------------------------+
|4000|spotify:track:6NGxGmKo0w0zQE9HuaKzd9|
|4003|spotify:track:7FrPSk0rZdCPR50pfzdzMK|
|4003|spotify:track:5HRcFL5LZb6PoE17CDNTU0|
|4008|spotify:track:2zcps4ZXVu0tN1uAKcQRqg|
|4008|spotify:track:0qHVFGgYU93vZOj17MHIhd|
+----+------------------------------------+
only showing top 5 rows


25/12/04 14:52:31 WARN TaskSetManager: Stage 5 contains a task of very large size (2372 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+------------------------------------+--------------+
|track_uri                           |playlist_count|
+------------------------------------+--------------+
|spotify:track:2x7cxHkkKF4afADOkrxe6J|4             |
|spotify:track:1SKPmfSYaPsETbRHaiA18G|27            |
|spotify:track:5dI1yHSqgmilFEqpGbqxHh|16            |
|spotify:track:2gZpW5pTZkimGG98loFSl2|1             |
|spotify:track:6ltPEsP4edATzvinHOzvk2|59            |
+------------------------------------+--------------+
only showing top 5 rows


25/12/04 14:52:34 WARN TaskSetManager: Stage 11 contains a task of very large size (2372 KiB). The maximum recommended task size is 1000 KiB.
25/12/04 14:52:35 WARN TaskSetManager: Stage 12 contains a task of very large size (2372 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

257548

### 3.2.1 Optional sampling step (to limit analysis to top-N most frequent tracks)

In [None]:
# Take only first N tracks for a test run (e.g. 500 tracks)
# this is because my laptop cannot run the pop subset 

subset_tracks = (
    track_counts
    .orderBy(F.desc("playlist_count"))
    .limit(500)              # adjust as needed (smaller = safer)
    .select("track_uri")
)

# Restrict the (playlist, track) relation to only those top-N tracks

track_in_playlists_sub = (
    track_in_playlists
    .join(subset_tracks, on="track_uri")
    .cache()
)
track_in_playlists_sub.count()


25/12/04 14:53:18 WARN TaskSetManager: Stage 30 contains a task of very large size (2372 KiB). The maximum recommended task size is 1000 KiB.
25/12/04 14:53:18 WARN TaskSetManager: Stage 31 contains a task of very large size (2372 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

41783

### 3.3 Co-Occurence on a filtered (Playlist, Track) Table
#### Here I use `track_in_playlists_sub` as an example, but any similar (playlist, track) table can be substituted.

In [None]:
tip = track_in_playlists_sub  # or track_in_playlists_pop

#   1. Self-join `tip` on pid:
#        - `a` and `b` are two aliases of the same table.
#        - After the join, each row represents a pair of tracks that
#          appear together in the same playlist.

#  2. Filter with (a.track_uri < b.track_uri):
#        - Ensures an ordering on the pair (track_a, track_b).
#        - Eliminates:
#             * self-pairs (T, T)
#             * duplicate pairs (T1, T2) and (T2, T1)

#  3. Group by (track_a, track_b) and count:
#        - `cooc_count` = number of playlists in which this pair
#          co-occurs.
#        - This forms the core co-occurrence statistic used later
#          to define similarity between tracks.

pairs = (
    tip.alias("a")
    .join(tip.alias("b"), on="pid")
    .where(F.col("a.track_uri") < F.col("b.track_uri"))  # avoid duplicates & self-pairs
    .groupBy(
        F.col("a.track_uri").alias("track_a"),
        F.col("b.track_uri").alias("track_b")
    )
    .agg(F.count("*").alias("cooc_count"))
)

pairs.show(5, truncate=False)



+------------------------------------+------------------------------------+----------+
|track_a                             |track_b                             |cooc_count|
+------------------------------------+------------------------------------+----------+
|spotify:track:0v9Wz8o0BT8DU38R4ddjeH|spotify:track:1yxgsra98r3qAtxqiGZPiX|13        |
|spotify:track:04KTF78FFg8sOHC1BADqbY|spotify:track:0LWQWOFoz5GJLqcHk1fRO2|10        |
|spotify:track:4qikXelSRKvoCqFcHLB2H2|spotify:track:7jslhIiELQkgW9IHeYNOWE|6         |
|spotify:track:1jNOi6m3Hn8nLEeHCp5Msr|spotify:track:2aibwv5hGXSgw7Yru8IYTO|6         |
|spotify:track:0KKkJNfGyhkQ5aFogxQAPU|spotify:track:6JV2JOEocMgcZxYSZelKcc|15        |
+------------------------------------+------------------------------------+----------+
only showing top 5 rows


                                                                                

### 3.4 Computing Cosine Similarity between track pairs.
#### This metric rescales the raw co-occurence counts so that similarity is not dominated by globally popular tracks. It measures how unexpectedly often two tracks appear together relative to how often they appear overall. This will produce a normalised track-track similarity.

In [None]:
# Prepare marginal frequency tables for tracks A and B

track_counts_a = track_counts.withColumnRenamed("track_uri", "track_a") \
                             .withColumnRenamed("playlist_count", "count_a")
track_counts_b = track_counts.withColumnRenamed("track_uri", "track_b") \
                             .withColumnRenamed("playlist_count", "count_b")

# Join counts and compute cosine similarity:

# For each track pair (A, B), we have:
#          cooc_count = |playlists containing both A and B|
#          count_a     = |playlists containing A|
#          count_b     = |playlists containing B|

# Cosine similarity normalizes the co-occurrence by the  standalone frequencies of each track:
#         cosine(A, B) = cooc_count / sqrt(count_a * count_b)

cooc_sim_df = (
    pairs
    .join(track_counts_a, on="track_a")
    .join(track_counts_b, on="track_b")
    .withColumn(
        "cosine_cooc",
        F.col("cooc_count") / F.sqrt(F.col("count_a") * F.col("count_b"))
    )
)

#  Examine the top-scoring most similar track pairs

cooc_sim_df.orderBy(F.desc("cosine_cooc")).show(20, truncate=False)

25/12/04 14:57:08 WARN TaskSetManager: Stage 52 contains a task of very large size (2372 KiB). The maximum recommended task size is 1000 KiB.

+------------------------------------+------------------------------------+----------+-------+-------+------------------+
|track_b                             |track_a                             |cooc_count|count_a|count_b|cosine_cooc       |
+------------------------------------+------------------------------------+----------+-------+-------+------------------+
|spotify:track:2ANLarE8yHVsLWW21nj79M|spotify:track:0B8QzDH7YWih85V5SEMnyJ|57        |87     |97     |0.620482249978366 |
|spotify:track:5HGibWoxnkYSkl6mHmAlOE|spotify:track:03fT3OHB9KyMtGMt2zwqCT|56        |105    |78     |0.6187939745230095|
|spotify:track:5kjyiH6but1t2UDXq15aeS|spotify:track:1yEwEiTpsaPhQi9lb5EVV4|45        |78     |70     |0.6089984572058625|
|spotify:track:4dGJf1SER1T6ooX46vwzRB|spotify:track:1yEwEiTpsaPhQi9lb5EVV4|57        |78     |113    |0.6071392818424457|
|spotify:track:6fwdbPMwP1zVStm8FybmkO|spotify:track:4gmmRb6bZJffOOiww1JGTO|52        |72     |107    |0.5924411367547081|
|spotify:track:6s9ICeczY

                                                                                

### 3.4.1 Inspection of the top-similar track pairs

In [None]:
# Which pairs have the highest similarity?

cooc_sim_df.orderBy(F.desc("cosine_cooc")).show(20, truncate=False)

25/12/04 14:58:08 WARN TaskSetManager: Stage 75 contains a task of very large size (2372 KiB). The maximum recommended task size is 1000 KiB.
[Stage 81:=====>                                                   (1 + 9) / 10]

+------------------------------------+------------------------------------+----------+-------+-------+------------------+
|track_b                             |track_a                             |cooc_count|count_a|count_b|cosine_cooc       |
+------------------------------------+------------------------------------+----------+-------+-------+------------------+
|spotify:track:2ANLarE8yHVsLWW21nj79M|spotify:track:0B8QzDH7YWih85V5SEMnyJ|57        |87     |97     |0.620482249978366 |
|spotify:track:5HGibWoxnkYSkl6mHmAlOE|spotify:track:03fT3OHB9KyMtGMt2zwqCT|56        |105    |78     |0.6187939745230095|
|spotify:track:5kjyiH6but1t2UDXq15aeS|spotify:track:1yEwEiTpsaPhQi9lb5EVV4|45        |78     |70     |0.6089984572058625|
|spotify:track:4dGJf1SER1T6ooX46vwzRB|spotify:track:1yEwEiTpsaPhQi9lb5EVV4|57        |78     |113    |0.6071392818424457|
|spotify:track:6fwdbPMwP1zVStm8FybmkO|spotify:track:4gmmRb6bZJffOOiww1JGTO|52        |72     |107    |0.5924411367547081|
|spotify:track:6s9ICeczY

                                                                                

### 3.4.2 Adding track metadata
#### This just helps with interpretation and makes the outputs easier to inspect.

In [None]:
# Join back names for a nicer interpretation

# Build a small “dictionary” track_uri → (name, artist) metadata table

track_meta = (
    playlist_track_DataFrame
    .select("track_uri", "track_name", "artist_name")
    .dropna(subset=["track_uri"])
    .dropDuplicates(["track_uri"])
)

#   - dropna ensures no null URIs are included.
#   - dropDuplicates ensures exactly one row per track_uri.

# Join metadata to the cosine similarity matrix

cooc_with_meta = (
    cooc_sim_df
    .join(track_meta.withColumnRenamed("track_uri", "track_a"), on="track_a")
    .withColumnRenamed("track_name", "track_a_name")
    .withColumnRenamed("artist_name", "artist_a_name")
    .join(track_meta.withColumnRenamed("track_uri", "track_b"), on="track_b")
    .withColumnRenamed("track_name", "track_b_name")
    .withColumnRenamed("artist_name", "artist_b_name")
)

# Display highest-scoring track pairs with names for interpretation

cooc_with_meta.orderBy(F.desc("cosine_cooc")).show(20, truncate=False)

25/12/04 14:59:11 WARN TaskSetManager: Stage 98 contains a task of very large size (2372 KiB). The maximum recommended task size is 1000 KiB.
25/12/04 14:59:12 WARN TaskSetManager: Stage 99 contains a task of very large size (2372 KiB). The maximum recommended task size is 1000 KiB.

+------------------------------------+------------------------------------+----------+-------+-------+------------------+------------------------------------------------------+----------------------------+-----------------------+----------------------------+
|track_b                             |track_a                             |cooc_count|count_a|count_b|cosine_cooc       |track_a_name                                          |artist_a_name               |track_b_name           |artist_b_name               |
+------------------------------------+------------------------------------+----------+-------+-------+------------------+------------------------------------------------------+----------------------------+-----------------------+----------------------------+
|spotify:track:2ANLarE8yHVsLWW21nj79M|spotify:track:0B8QzDH7YWih85V5SEMnyJ|57        |87     |97     |0.620482249978366 |Money Longer                                          |Lil Uzi Vert                |You Was Right     

                                                                                

### 3.5 Query most similar tracks to a given target
####  Given a target track (identified by its URI), retrieve the top-k tracks that are most similar to it according to the cosine-based co-occurrence similarity.

In [None]:
# To get the most similar tracks to a given track;

from pyspark.sql import functions as F

# Method:
#   1. Filter cooc_with_meta to rows where the target track appears on either side of the pair: 
#          track_a == target_uri OR track_b == target_uri.
#
#   2. For each such row, derive the "other" track in the pair:
#         - other_track_uri  = the non-target track (track_a or track_b).
#         - other_track_name = corresponding track name.
#         - other_artist_name = corresponding artist.
#
#   3. Select:
#         other_track_uri, other_track_name, other_artist_name,
#         cosine_cooc (similarity score),
#         cooc_count  (raw co-occurrence frequency).
#
#   4. Order by cosine_cooc in descending order and take the top-k.

def most_similar_tracks(target_uri, k=10):
    """
    Print top-k tracks most similar to `target_uri` based on cosine_cooc.
    Uses the cooc_with_meta DataFrame.
    """

    sims = (
        cooc_with_meta
        .filter(
            (F.col("track_a") == target_uri) |
            (F.col("track_b") == target_uri)
        )
        .withColumn(
            "other_track_uri",
            F.when(F.col("track_a") == target_uri, F.col("track_b"))
             .otherwise(F.col("track_a"))
        )
        .withColumn(
            "other_track_name",
            F.when(F.col("track_a") == target_uri, F.col("track_b_name"))
             .otherwise(F.col("track_a_name"))
        )
        .withColumn(
            "other_artist_name",
            F.when(F.col("track_a") == target_uri, F.col("artist_b_name"))
             .otherwise(F.col("artist_a_name"))
        )
        .select(
            "other_track_uri",
            "other_track_name",
            "other_artist_name",
            "cosine_cooc",
            "cooc_count",
        )
        .orderBy(F.desc("cosine_cooc"))
        .limit(k)
    )

    sims.show(truncate=False)

In [14]:
# for example, pick one of the subset tracks  used earlier
example_uri = subset_tracks.first()["track_uri"]
print(example_uri)

most_similar_tracks(example_uri, k=10)

25/12/04 15:06:45 WARN TaskSetManager: Stage 141 contains a task of very large size (2372 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

spotify:track:7BKLCZ1jbUBVqRi2FVlTVw


25/12/04 15:06:48 WARN TaskSetManager: Stage 147 contains a task of very large size (2372 KiB). The maximum recommended task size is 1000 KiB.
25/12/04 15:06:49 WARN TaskSetManager: Stage 148 contains a task of very large size (2372 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+------------------------------------+-------------------------------------+-----------------+-------------------+----------+
|other_track_uri                     |other_track_name                     |other_artist_name|cosine_cooc        |cooc_count|
+------------------------------------+-------------------------------------+-----------------+-------------------+----------+
|spotify:track:6DNtNfH8hXkqOX1sjqmI7p|Cold Water (feat. Justin Bieber & MØ)|Major Lazer      |0.42948826719817906|70        |
|spotify:track:4pdPtRcBmOSQDlJ3Fk945m|Let Me Love You                      |DJ Snake         |0.4097075322821556 |62        |
|spotify:track:6875MeXyCW0wLyT72Eetmo|Starving                             |Hailee Steinfeld |0.4087084100217135 |59        |
|spotify:track:0QsvXIfqM0zZoerQfsI9lm|Don't Let Me Down                    |The Chainsmokers |0.38829712917457143|71        |
|spotify:track:2rizacJSyD9S1IQUxUxnsK|All We Know                          |The Chainsmokers |0.32214906772185625|39  

---