<a href="https://colab.research.google.com/github/DomizianoScarcelli/big-data-project/blob/dev/data_preparation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Configuration

In [1]:
#@title Download necessary libraries
!pip install pyspark -qq
!pip install -U -q PyDrive -qq
!apt install openjdk-8-jdk-headless -qq

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 3 newly installed, 0 to remove and 38 not upgraded.
Need to get 36.5 MB of archives.
After this operation, 144 MB of additional disk space will be used.
Selecting previously unselected package libxtst6:amd64.
(Reading database ... 122541 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3-1_amd64.deb ...
Unpacking libxtst6:amd64 (2:1.2.3-1) 

In [2]:
#@title Imports
import os
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
import plotly

import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, DataFrame, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, FloatType, LongType
from pyspark import SparkContext, SparkConf
from pyspark.ml.linalg import SparseVector, DenseVector, VectorUDT

from tqdm.notebook import tqdm
import time
import gc

from google.colab import drive

from typing import Tuple
from functools import reduce
import pickle

In [3]:
#@title Set up variables
JAVA_HOME = "/usr/lib/jvm/java-8-openjdk-amd64"
GDRIVE_DIR = "/content/drive"
GDRIVE_HOME_DIR = GDRIVE_DIR + "/MyDrive"
GDRIVE_DATA_DIR = GDRIVE_HOME_DIR + "/Big Data/datasets"
DATASET_FILE = os.path.join(GDRIVE_DATA_DIR, "pyspark_friendly_spotify_playlist_dataset")
AUDIO_FEATURES_FILE = os.path.join(GDRIVE_DATA_DIR, "pyspark_track_features")
LITTLE_SLICE_FILE = os.path.join(GDRIVE_DATA_DIR, "little_slice")
SMALL_SLICE_FLIE = os.path.join(GDRIVE_DATA_DIR, "small_slice")
LITTLE_SLICE_AUDIO_FEATURES = os.path.join(GDRIVE_DATA_DIR, "little_slice_audio_features")
MICRO_SLICE_AUDIO_FEATURES = os.path.join(GDRIVE_DATA_DIR, "micro_slice_audio_features")
SPLITTED_SLICE_AUDIO_FEATURES = os.path.join(GDRIVE_DATA_DIR, "splitted_pyspark_track_features")
SAVED_DFS_PATH = os.path.join(GDRIVE_DATA_DIR, "saved_dfs")
RANDOM_SEED = 42 # for reproducibility
os.environ["JAVA_HOME"] = JAVA_HOME
os.environ["PYSPARK_PYTHON"]="python"

In [4]:
#@title Create the session
conf = SparkConf().\
                set('spark.ui.port', "4050").\
                set('spark.executor.memory', '12G').\
                set('spark.driver.memory', '12G').\
                set('spark.driver.maxResultSize', '100G').\
                set("spark.executor.extraJavaOptions", "-XX:+UseG1GC").\
                setAppName("PySparkTutorial").\
                setMaster("local[*]")

# Create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [5]:
drive.mount(GDRIVE_DIR, force_remount=True)

Mounted at /content/drive


## Setup ngrok

In [6]:
!pip install pyngrok

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyngrok
  Downloading pyngrok-6.0.0.tar.gz (681 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m681.2/681.2 kB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyngrok
  Building wheel for pyngrok (setup.py) ... [?25l[?25hdone
  Created wheel for pyngrok: filename=pyngrok-6.0.0-py3-none-any.whl size=19867 sha256=e939c7f73451d3c62d603447e97213f733b65ac2e689f8f991e891c6c0e82e9f
  Stored in directory: /root/.cache/pip/wheels/5c/42/78/0c3d438d7f5730451a25f7ac6cbf4391759d22a67576ed7c2c
Successfully built pyngrok
Installing collected packages: pyngrok
Successfully installed pyngrok-6.0.0


In [7]:
!ngrok authtoken 2NVN8kdoOnMVtlDGGWtwsbT5M3Q_2EJv2HE77FEXkz978Qtnq

Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml


In [8]:
from pyngrok import ngrok

# Open a ngrok tunnel on the port 4050 where Spark is running
port = '4050'
public_url = ngrok.connect(port).public_url



In [9]:
print("To access the Spark Web UI console, please click on the following link to the ngrok tunnel \"{}\" -> \"http://127.0.0.1:{}\"".format(public_url, port))

To access the Spark Web UI console, please click on the following link to the ngrok tunnel "https://bf1c-35-231-212-214.ngrok-free.app" -> "http://127.0.0.1:4050"


In [10]:
#@title Check if everything is ok
spark, sc._conf.getAll()


(<pyspark.sql.session.SparkSession at 0x7f8bfc5422c0>,
 [('spark.executor.extraJavaOptions',
   '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -XX:+UseG1GC'),
  ('spark.app.name', 'PySparkTutorial'),
  ('spark.app.startTime', '1686220359212'

# Data acquisition

In [11]:
from pyspark.ml.linalg import VectorUDT
song_schema = StructType([
    StructField("pos", IntegerType(), True),
    StructField("artist_name", StringType(), True),
    StructField("track_uri", StringType(), True),
    StructField("artist_uri", StringType(), True),
    StructField("track_name", StringType(), True),
    StructField("album_uri", StringType(), True),
    StructField("duration_ms", LongType(), True),
    StructField("album_name", StringType(), True)
])

playlist_schema = StructType([
    StructField("name", StringType(), True),
    StructField("collaborative", StringType(), True),
    StructField("pid", IntegerType(), True),
    StructField("modified_at", IntegerType(), True),
    StructField("num_tracks", IntegerType(), True),
    StructField("num_albums", IntegerType(), True),
    StructField("num_followers", IntegerType(), True),
    StructField("tracks", ArrayType(song_schema), True),
    StructField("num_edits", IntegerType(), True),
    StructField("duration_ms", IntegerType(), True),
    StructField("num_artists", IntegerType(), True),
])

playlist_schema_mapped = StructType([
    StructField("name", StringType(), True),
    StructField("collaborative", StringType(), True),
    StructField("pid", IntegerType(), True),
    StructField("modified_at", IntegerType(), True),
    StructField("num_tracks", IntegerType(), True),
    StructField("num_albums", IntegerType(), True),
    StructField("num_followers", IntegerType(), True),
    StructField("tracks", VectorUDT(), True),
    StructField("num_edits", IntegerType(), True),
    StructField("duration_ms", IntegerType(), True),
    StructField("num_artists", IntegerType(), True),
])

audio_features_schema = StructType([
    StructField("danceability", FloatType(), True),
    StructField("energy", FloatType(), True),
    StructField("key", IntegerType(), True),
    StructField("loudness", FloatType(), True),
    StructField("mode", IntegerType(), True),
    StructField("speechiness", FloatType(), True),
    StructField("acousticness", FloatType(), True),
    StructField("instrumentalness", FloatType(), True),
    StructField("liveness", FloatType(), True),
    StructField("valence", FloatType(), True),
    StructField("tempo", FloatType(), True),
    StructField("type", StringType(), True),
    StructField("id", StringType(), True),
    StructField("uri", StringType(), True),
    StructField("track_href", StringType(), True),
    StructField("analysis_url", StringType(), True),
    StructField("duration_ms", LongType(), True),
    StructField("time_signature", IntegerType(), True)
])

In [12]:
playlist_df = spark.read.schema(playlist_schema).json(DATASET_FILE, multiLine=True)
slice_df = spark.read.schema(playlist_schema).json(SMALL_SLICE_FLIE, multiLine=True)
audio_df = spark.read.schema(audio_features_schema).json(SPLITTED_SLICE_AUDIO_FEATURES, multiLine=True) #has less songs than expected

In [13]:
# slice_df = slice_df.limit(10_000).cache()
NUM_PLAYLISTS = slice_df.count()

# Dataset Train-Test split

## Simple Train-Test split
For the user-based and item-based collaborative filtering, the train-test split is done by splitting the songs inside each playlist with a ration of 80% train and 20% test. This means that the playlists are the same for the train and test, but the songs inside are different. In this way we can use the train test to recommend the songs, and use the test set to evaluate the results.

In [14]:
from sklearn.model_selection import train_test_split as sklearn_split

def train_test_split(playlist: Row) -> Tuple[Row, Row]:
    train_rows, test_rows = sklearn_split(playlist.tracks, random_state=42)

    playlist_train =  Row(
            name=playlist.name,
            collaborative=playlist.collaborative,
            pid=playlist.pid,
            modified_at=playlist.modified_at,
            num_tracks=playlist.num_tracks,
            num_albums=playlist.num_albums,
            num_followers=playlist.num_followers,
            tracks=train_rows,
            num_edits=playlist.num_edits,
            duration_ms=playlist.duration_ms,
            num_artists=playlist.num_artists,
        )

    playlist_test = Row(
            name=playlist.name,
            collaborative=playlist.collaborative,
            pid=playlist.pid,
            modified_at=playlist.modified_at,
            num_tracks=playlist.num_tracks,
            num_albums=playlist.num_albums,
            num_followers=playlist.num_followers,
            tracks=test_rows,
            num_edits=playlist.num_edits,
            duration_ms=playlist.duration_ms,
            num_artists=playlist.num_artists,
        )
    
    return playlist_train, playlist_test

In [15]:
from pyspark.sql.functions import udf, struct
import shutil

def divide_whole_dataset(playlist_df: DataFrame) -> Tuple[DataFrame, DataFrame]:
  train_test_split_udf = udf(train_test_split, returnType=ArrayType(StructType(playlist_df.schema.fields)))
  divided_df = playlist_df.withColumn("divided", train_test_split_udf(struct(*playlist_df.columns)))
  train_test_df = divided_df.select(F.col('divided').getItem(0).alias('train'), F.col('divided').getItem(1).alias('test'))

  train_df = train_test_df.select("train.*")
  test_df = train_test_df.select("test.*")
  return train_df, test_df

In [16]:
TRAIN_DF_PATH = os.path.join(SAVED_DFS_PATH, f"train_df-{NUM_PLAYLISTS}.json")
TEST_DF_PATH = os.path.join(SAVED_DFS_PATH, f"test_df-{NUM_PLAYLISTS}.json")

if os.path.exists(TRAIN_DF_PATH) and os.path.exists(TEST_DF_PATH):
  train_df = spark.read.schema(playlist_schema).json(TRAIN_DF_PATH)
  test_df = spark.read.schema(playlist_schema).json(TEST_DF_PATH)
else:
  # In order to avoid [PATH_ALREADY_EXISTS] errors. 
  if os.path.exists(TRAIN_DF_PATH):
    shutil.rmtree(TRAIN_DF_PATH)
  if os.path.exists(TEST_DF_PATH):
    shutil.rmtree(TEST_DF_PATH)

  train_df, test_df = divide_whole_dataset(slice_df)
  train_df, test_df = train_df.cache(), test_df.cache()
  train_df.write.mode("overwrite").json(TRAIN_DF_PATH)
  test_df.write.mode("overwrite").json(TEST_DF_PATH)

## Neural Network Train-Test split
Regarding the Neural Network approach, we cannot use the same train-test split. This because we need a training test that contains some playlists in order to train the model, and then we need a test set with different playlists in order to make the performance evaluation. The test set will also be split with the approach above, meaning some songs will be removes in order to evaluate the recommendations. This approach is needed in order to not evaluate the model with playlists that were in the training set.

In [17]:
def nn_train_test_split(playlists: DataFrame) -> Tuple[DataFrame, DataFrame]:
    train_playlists, test_playlists = playlists.randomSplit([0.8, 0.2], 42)
    return train_playlists, test_playlists

In [34]:
NN_TRAIN_DF_PATH = os.path.join(SAVED_DFS_PATH, f"nn_train_df-{NUM_PLAYLISTS}.json")
NN_TEST_DF_PATH = os.path.join(SAVED_DFS_PATH, f"nn_test_df-{NUM_PLAYLISTS}.json")
NN_TEST_DF_TRAIN_PATH = os.path.join(SAVED_DFS_PATH, f"nn_test_df-train-{NUM_PLAYLISTS}.json")
NN_TEST_DF_TEST_PATH = os.path.join(SAVED_DFS_PATH, f"nn_test_df-test-{NUM_PLAYLISTS}.json")

if os.path.exists(NN_TRAIN_DF_PATH) and os.path.exists(NN_TEST_DF_TRAIN_PATH) and os.path.exists(NN_TEST_DF_TEST_PATH) and os.path.exists(NN_TEST_DF_PATH):
  nn_train_df = spark.read.schema(playlist_schema).json(NN_TRAIN_DF_PATH)
  nn_test_df = spark.read.schema(playlist_schema).json(NN_TEST_DF_PATH)
  nn_test_train_df = spark.read.schema(playlist_schema).json(NN_TEST_DF_TRAIN_PATH)
  nn_test_test_df = spark.read.schema(playlist_schema).json(NN_TEST_DF_TEST_PATH)
else:
  # In order to avoid [PATH_ALREADY_EXISTS] errors. 
  if os.path.exists(NN_TRAIN_DF_PATH):
    shutil.rmtree(NN_TRAIN_DF_PATH)
  if os.path.exists(NN_TEST_DF_PATH):
    shutil.rmtree(NN_TEST_DF_PATH)
  if os.path.exists(NN_TEST_DF_TRAIN_PATH):
    shutil.rmtree(NN_TEST_DF_TRAIN_PATH)
  if os.path.exists(NN_TEST_DF_TEST_PATH):
    shutil.rmtree(NN_TEST_DF_TEST_PATH)

  nn_train_df, nn_test_df = nn_train_test_split(slice_df)
  nn_test_train_df, nn_test_test_df = divide_whole_dataset(test_df)

  nn_train_df, nn_test_df = nn_train_df.cache(), nn_test_df.cache()
  test_train_df, test_test_df = nn_test_train_df.cache(), nn_test_test_df.cache()
  nn_train_df.write.mode("overwrite").json(NN_TRAIN_DF_PATH)
  nn_test_df.write.mode("overwrite").json(NN_TEST_DF_PATH)
  nn_test_train_df.write.mode("overwrite").json(NN_TEST_DF_TRAIN_PATH)
  nn_test_test_df.write.mode("overwrite").json(NN_TEST_DF_TEST_PATH)


# Data Preparation

## Get the artists vector

In [21]:
def get_all_artists(playlist_df: DataFrame, filter_rare: bool = False) -> Tuple[DataFrame, int]:
  """
  Given a playlist dataframe, returns the dataframe containing the unique list of artist_uri that are present in all the playlists.
  """
  if filter_rare:
    all_songs = playlist_df.select(F.explode("tracks.artist_uri").alias("artist_uri")) \
            .groupBy("artist_uri") \
            .agg(F.count("*").alias("count")) \
            .filter(F.col("count") >= 3)
  else:
    all_songs = playlist_df.select(F.explode("tracks.artist_uri").alias("artist_uri")).distinct()
  return all_songs

def create_artists_pos_mapping(playlist_df: DataFrame, filter_rare: bool = False) -> DataFrame:
  """
  Given the dataframe of artists_uri, it returns a mapping pos -> artist_uri in order to map each artist_uri to a position inside the embedding vector
  """
  artists_df = get_all_artists(playlist_df)
  artists_df.createOrReplaceTempView("ARTISTS")
  artists_df = spark.sql("""
  SELECT 
      row_number() OVER (
          PARTITION BY '' 
          ORDER BY '' 
      ) as pos,
      *
  FROM 
      ARTISTS
  """)

  artists_df = artists_df.sort("artist_uri")

  ARTIST_VECTOR_LENGTH = artists_df.count()

  return artists_df, ARTIST_VECTOR_LENGTH


def create_artists_vector(playlist_df: DataFrame, artist_uri_to_id: dict, vector_length) -> DataFrame:
    """
    Returns a DataFrames containing the playlists, but the tracks are represented as a binary sparse vector.
    """

    @udf(returnType=VectorUDT())
    def extract_vector(tracks):
      pos_list = set()

      def reduce_fn(pos_list, row):
        if row.artist_uri in artist_uri_to_id:
          pos_list.add(artist_uri_to_id.get(row.artist_uri))
        return pos_list
      
      pos_list = reduce(reduce_fn, tracks, pos_list)
      
      return SparseVector(vector_length + 1, sorted(list(pos_list)), [1 for _ in pos_list])

    # Apply the mapping UDF on the "tracks" column of the slice_df dataframe
    mapped_df = playlist_df.withColumn('tracks', extract_vector(F.col('tracks')))

    return mapped_df

def artist_pipeline(playlist_df: DataFrame, is_train: bool = True, is_train_train: bool = False) -> Tuple[DataFrame, int]:
  train_string = "train" if is_train else "test"
  if is_train:
    train_train_string = ""
  else:
    train_train_string = "train" if is_train_train else "test"

  ARTISTS_EMBEDDINGS = os.path.join(SAVED_DFS_PATH, f"nn_artists_embeddings-{train_string}-{train_train_string}-{NUM_PLAYLISTS}.json")
  
  artists_slice_df = create_artists_vector(playlist_df,
                                           artist_uri_to_id, 
                                           FILTERED_ARTIST_VECTOR_LENGTH).cache()
  artists_slice_df.write.mode("overwrite").json(ARTISTS_EMBEDDINGS)
  
  return artists_slice_df


filtered_artist_mapping, FILTERED_ARTIST_VECTOR_LENGTH = create_artists_pos_mapping(slice_df, filter_rare=True)
ARTIST_VECTOR_LENGTH_PATH = os.path.join(SAVED_DFS_PATH, f"nn_artist_vector_length-{NUM_PLAYLISTS}.txt")
with open(ARTIST_VECTOR_LENGTH_PATH, "w") as f:
  f.write('%d' % FILTERED_ARTIST_VECTOR_LENGTH)

artist_uri_to_id = filtered_artist_mapping.select('artist_uri', 'pos').rdd.collectAsMap()

artist_slice_df_train = artist_pipeline(nn_train_df)

artist_slice_df_test_train = artist_pipeline(nn_test_train_df, is_train=False, is_train_train=True)
artist_slice_df_test_test = artist_pipeline(nn_test_test_df, is_train=False, is_train_train=False)

## Get the tracks vector

In [35]:
def get_all_songs(playlist_df: DataFrame, filter_rare: bool = False) -> DataFrame:
  """
  Given a playlist dataframe, returns the dataframe containing the unique list of track_uri that are present in all the playlists.
  """
  if filter_rare:
    all_songs = playlist_df.select(F.explode("tracks.track_uri").alias("track_uri")) \
            .groupBy("track_uri") \
            .agg(F.count("*").alias("count")) \
            .filter(F.col("count") >= 5)
  else:
    all_songs = playlist_df.select(F.explode("tracks.track_uri").alias("track_uri")).distinct()
  return all_songs

def create_songs_pos_mapping(playlist_df: DataFrame, filter_rare: bool = False) -> Tuple[DataFrame, int]:
  """
  Given the dataframe of tracks_uris, it returns a mapping pos -> track_uri in order to map each track_uri to a position inside the embedding vector
  """
  songs_df = get_all_songs(playlist_df, filter_rare)
  songs_df.createOrReplaceTempView("SONGS_INFO")

  songs_df = spark.sql("""
  SELECT 
      row_number() OVER (
          PARTITION BY '' 
          ORDER BY '' 
      ) as pos,
      *
  FROM 
      SONGS_INFO
  """)

  RATING_VECTOR_LENGTH = songs_df.count()

  return songs_df, RATING_VECTOR_LENGTH


#TODO: Since the .rdd is very slow, I can embed the position information of the track inside the track itself,
# So then I can just do pos_list.add(row.rating_position) in a few miliseconds. 
def map_track_df_to_pos(playlist_df: DataFrame, track_uri_to_id: dict, vector_length: int) -> DataFrame:
    """
    Returns a DataFrames containing the playlists, but the tracks are represented as a binary sparse vector.
    """

    @udf(returnType=VectorUDT())
    def extract_vector(tracks):
      pos_list = set()

      def reduce_fn(pos_list, row):
        if row.track_uri in track_uri_to_id:
          pos_list.add(track_uri_to_id.get(row.track_uri))
        return pos_list
      
      pos_list = reduce(reduce_fn, tracks, pos_list)
      
      return SparseVector(vector_length + 1, sorted(list(pos_list)), [1 for _ in pos_list])

    # Apply the mapping UDF on the "tracks" column of the slice_df dataframe
    mapped_df = playlist_df.withColumn('tracks', extract_vector(F.col('tracks')))

    return mapped_df

def track_pipeline(playlist_df: DataFrame, is_train: bool = True, is_train_train: bool = False, filter_rare: bool = False) -> Tuple[DataFrame, DataFrame, int]:
  track_mapping = track_uri_to_id if not filter_rare else filtered_track_uri_to_id
  vector_length = SONG_VECTOR_LENGTH if not filter_rare else FILTERED_SONG_VECTOR_LENGTH
  song_embeddings_df = map_track_df_to_pos(playlist_df, 
                                       track_mapping, 
                                       vector_length).cache()

  train_string = "train" if is_train else "test"
  nn_string = "nn-" if filter_rare else ""
  if is_train:
    train_train_string = ""
  else:
    train_train_string = "train" if is_train_train else "test"

  SONGS_EMBEDDINGS = os.path.join(SAVED_DFS_PATH, f"{nn_string}songs_embeddings-{train_string}-{train_train_string}-{NUM_PLAYLISTS}.json")
  song_embeddings_df.write.mode("overwrite").json(SONGS_EMBEDDINGS)
  
  return song_embeddings_df


SONGS_VECTOR_LENGTH_PATH = os.path.join(SAVED_DFS_PATH, f"songs_vector_length-{NUM_PLAYLISTS}.txt")
FILTERED_SONGS_VECTOR_LENGTH_PATH = os.path.join(SAVED_DFS_PATH, f"nn_songs_vector_length-{NUM_PLAYLISTS}.txt")

song_mapping, SONG_VECTOR_LENGTH = create_songs_pos_mapping(slice_df)
filtered_song_mapping, FILTERED_SONG_VECTOR_LENGTH = create_songs_pos_mapping(slice_df, filter_rare=True)
with open(SONGS_VECTOR_LENGTH_PATH, "w") as f:
  f.write('%d' % SONG_VECTOR_LENGTH)
with open(FILTERED_SONGS_VECTOR_LENGTH_PATH, "w") as f:
  f.write('%d' % FILTERED_SONG_VECTOR_LENGTH)

track_uri_to_id = song_mapping.select('track_uri', 'pos').rdd.collectAsMap()
filtered_track_uri_to_id = filtered_song_mapping.select('track_uri', 'pos').rdd.collectAsMap()

SONGS_INFO_DF = os.path.join(SAVED_DFS_PATH, f"songs_info_df-{NUM_PLAYLISTS}.json")
FILTERED_SONGS_INFO_DF = os.path.join(SAVED_DFS_PATH, f"nn_songs_info_df-{NUM_PLAYLISTS}.json")

song_mapping.write.mode("overwrite").json(SONGS_INFO_DF)
filtered_song_mapping.write.mode("overwrite").json(FILTERED_SONGS_INFO_DF)

songs_slice_df_train = track_pipeline(train_df)
songs_slice_df_test = track_pipeline(test_df, is_train=False)

nn_songs_slice_df_train = track_pipeline(nn_train_df, filter_rare=True)
#Create the test with also train and test for NN
nn_songs_slice_df_test_train = track_pipeline(nn_test_df, is_train=False, is_train_train=True, filter_rare=True)
nn_songs_slice_df_test_test = track_pipeline(nn_test_df, is_train=False, is_train_train=False, filter_rare=True)

In [39]:
nn_songs_slice_df_train.count(), nn_songs_slice_df_test_train.count(), nn_songs_slice_df_test_test.count()

(80002, 19998, 19998)

## Get the Item-based dataframe

In [None]:
def create_playlists_pos_mapping(playlist_df: DataFrame) -> Tuple[DataFrame, int]:
  """
  Returns the dataframe that maps each playlist pid to a position, and the total number of playlists in the dataframe.
  """
  playlist_df.createOrReplaceTempView("PLAYLISTS_INFO")

  playlist_df = spark.sql("""
  SELECT 
      row_number() OVER (
          PARTITION BY '' 
          ORDER BY '' 
      ) as pos,
      *
  FROM 
      PLAYLISTS_INFO
  """)

  playlist_df = playlist_df.select("pid", "pos").sort("pid").cache()
  PLAYLIST_VECTOR_LENGTH = playlist_df.count()
  return playlist_df, PLAYLIST_VECTOR_LENGTH

playlist_map, PLAYLIST_VECTOR_LENGTH = create_playlists_pos_mapping(train_df)

In [None]:
playlist_pid_to_id = playlist_map.rdd.collectAsMap()

In [None]:
def create_playlist_vector(playlist_df: DataFrame, mapping: DataFrame) -> DataFrame:
    """
    Returns a DataFrames containing the track uris mapped to a list of playlist. The list of playlists is represented as a binary sparse vector.
    """
    exploded_df = slice_df.select("tracks", "pid").withColumn("track_uri", F.explode("tracks.track_uri"))
    new_df = exploded_df.groupBy("track_uri").agg(F.collect_list("pid").alias("pid"))

    @F.udf(returnType=VectorUDT())
    def extract_vector(row):
      pos_list = set(playlist_pid_to_id.get(pid) for pid in row)
      
      return SparseVector(NUM_PLAYLISTS + 1, sorted(list(pos_list)), [1 for _ in pos_list]) #TODO: +1 because the positions of the track_uri_to_id start at 1 and not 0, so the first element is always 0

    # Apply the mapping UDF on the "tracks" column of the slice_df dataframe
    mapped_df = new_df.withColumn('embedding', extract_vector(F.col('pid'))).drop("pid")

    return mapped_df

playlist_mapped = create_playlist_vector(playlist_df, playlist_map)

In [None]:
# func = udf(lambda x: len(x.indices))
# playlist_mapped.withColumn("count", func(F.col('embedding'))).drop("embedding").orderBy(F.col("count").asc()).show(truncate=False) #This returns strange results, I think the embedding computation is wrong.

In [None]:
playlist_mapped.show(truncate=False)

In [None]:
PLAYLIST_MAP_PATH = os.path.join(SAVED_DFS_PATH, f"playlist_map-{NUM_PLAYLISTS}.json")
PID_TO_ID_PATH = os.path.join(SAVED_DFS_PATH, f"playlist_pid_to_id-{NUM_PLAYLISTS}.json")
playlist_mapped.write.json(PLAYLIST_MAP_PATH)