# Create collaborative filtering model to get song recommendations

We use PySpark to create a collaborative filtering recommender to recommend songs based on a single liked song. We use a dataset of (playlist,track) pairs (see Preprocess notebook). I use the heuristic that if songs often appear in playlists togther they are similar and would make a good recommendation, i.e. we use Jaccard Similarity. It is infeasible to compute this exactly so instead I use Locality Sensitive Hashing to get approximate answers. This is implemented by MinHashLSH in PySpark. ApproxSimilarityJoin returns all sufficently similar pairs. I save all of these to a table in Recommendations.db. The choice of which recommendations to show is made when serving results.

In [62]:
import numpy as np
import pandas as pd
import os
import json
from time import time

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import MinHashLSH


#Location of processed data to use in (playlist_id,track_id) pairs
csv_name = 'spotify_dataset_top10000.csv'

#Setup Spark Session
os.environ['JAVA_HOME'] = r"C:\Program Files\Java\jdk-15.0.1" # This is ony required for my machine
spark = SparkSession.builder.config("spark.driver.memory", "10g").getOrCreate() #Default memory limit isnt sufficient on my machine

df = spark.read.option("header", "true").schema('playlist_id integer, track_id integer').csv(csv_name).na.drop()


In [63]:
#We need max len to create sparse vectors
max_len = df.select("playlist_id").distinct().count()

In [64]:

# Function to create sparse vectors
def encode(arr, length):
    arr = list(set(arr))    
    return Vectors.sparse(length, [(x,1.0) for x in arr]) 

#Accumulate all the playlist,track pairs into sparse vectors which can be processed by MinHashLSH
vecs = df.groupBy('track_id').agg(F.collect_list("playlist_id").alias("vec")).sort('track_id')\
    .withColumn('sparse', F.udf(encode, VectorUDT())(F.col("vec"),F.lit(max_len)))\
    .select('track_id','sparse').cache()


#Create Locally Senstive Hashing model
#Increasing numHashTables gives more accurate results but increases computation time.
# Has to be large enough to give sufficently many recommendations for all songs.
mh = MinHashLSH(inputCol="sparse", outputCol="hashes",numHashTables = 10)
model = mh.fit(vecs)
transformed = model.transform(vecs).cache()

In [65]:

#Perform approximate similartiy join to find similar songs
sim = model.approxSimilarityJoin(transformed, transformed,1).filter('DatasetA.track_id <> DatasetB.track_id').cache()

# Extract (track, recommendation, distance) triples and save to pandas.
# This is used to save to SQL. TODO: replace with pure PySpark
df_rec = sim.rdd.map(lambda r: (r[0][0],r[1][0],r[2])).toDF(('track_id','recommended_id','distance')).toPandas()
df_rec.index.name = 'index'


In [66]:
df_rec.head(10)

Unnamed: 0_level_0,track_id,recommended_id,distance
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,0,5308,0.992933
1,0,9298,0.984496
2,0,4010,0.971616
3,0,1799,0.960396
4,1,5058,0.929134
5,1,2821,0.971138
6,1,845,0.94964
7,1,6439,0.965753
8,1,3848,0.97561
9,1,2759,0.964549


In [67]:
# Save results to SQL database

import sqlite3
conn = sqlite3.connect('recommendations.db')

cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS Recommendations")

df_rec.to_sql('Recommendations',conn)

# Save (commit) the changes
conn.commit()

# We can also close the connection if we are done with it.
# Just be sure any changes have been committed or they will be lost.
conn.close()