In [1]:
import numpy as np
import pandas as pd
import os
import json

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import MinHashLSH

csv_name = 'spotify_dataset_top10000.csv'


os.environ['JAVA_HOME'] = r"C:\Program Files\Java\jdk-15.0.1" 
spark = SparkSession.builder.config("spark.driver.memory", "10g").getOrCreate()

df = spark.read.option("header", "true").schema('playlist_id integer, track_id integer').csv(csv_name).na.drop()


In [2]:
max_len = df.select("playlist_id").distinct().count()

In [3]:
from pyspark.sql.functions import lit,concat,col
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType
from pyspark.ml.linalg import Vectors, VectorUDT
from time import time

def encode(arr, length):
    arr = list(set(arr))    
    return Vectors.sparse(length, [(x,1.0) for x in arr]) 

vecs = df.groupBy('track_id').agg(F.collect_list("playlist_id").alias("vec")).sort('track_id')\
    .withColumn('sparse', F.udf(encode, VectorUDT())(F.col("vec"),F.lit(max_len)))\
    .select('track_id','sparse').cache()


mh = MinHashLSH(inputCol="sparse", outputCol="hashes",numHashTables = 20)
model = mh.fit(vecs)
transformed = model.transform(vecs).cache()

In [None]:
from pyspark.sql.window import Window

#Perform approximate similartiy join to find similar songs

sim = model.approxSimilarityJoin(transformed, transformed,1).filter('DatasetA.track_id <> DatasetB.track_id')

w = Window.partitionBy("datasetA.track_id").orderBy("distCol")
rec = sim.filter('DatasetA.track_id <> DatasetB.track_id')\
.withColumn("rn", F.row_number().over(w)).where(F.col("rn") == 1).drop("rn").collect()



In [14]:
#Check minimum number of reccomendations per 
sim.groupby('DatasetA.track_id').count().agg(F.min(F.col('count'))).collect()
sim.filter('DatasetA.track_id <> DatasetB.track_id').select(sim.datasetA.track_id).distinct().count()

[Row(min(count)=31)]

In [47]:
rec_ids = [[r[0][0],r[1][0]] for r in rec]
df_rec = pd.DataFrame(rec_ids)
df_rec.columns = ['track','rec']

with open('name_to_id.json', 'rb') as d:
    name_to_id = json.load(d)
    id_to_name = {v:k for k,v in name_to_id.items()}

df_rec.index = df_rec.track
df_rec = df_rec.applymap(lambda x: id_to_name[x])
df_rec.index.name = 'track_id'
df_rec

Unnamed: 0_level_0,track,rec
track_id,Unnamed: 1_level_1,Unnamed: 2_level_1
148,Skrillex-Bangarang (feat. Sirah),Skrillex-Kyoto (feat. Sirah)
463,Maia Wilson-Fixer Upper,Josh Gad-In Summer
471,Ed Sheeran-Small Bump,Ed Sheeran-U.N.I.
496,Coldplay-Fix You,Coldplay-The Scientist
833,Azealia Banks-212,Azealia Banks-1991
...,...,...
7874,Jason Mraz-A Beautiful Mess,Jason Mraz-Make It Mine
7999,Friendly Fires-Paris,Friendly Fires-Jump In The Pool
8054,Weezer-El Scorcho,Weezer-The Good Life
9067,Massive Attack-Paradise Circus - Gui Boratto R...,Massive Attack-Girl I Love You


In [49]:
import sqlite3
conn = sqlite3.connect('reccomendations.db')

cursor = conn.cursor()
cursor.execute("DROP IF EXISTS TABLE top_one")

df_rec.to_sql('top_one',conn)

# Save (commit) the changes
conn.commit()

# We can also close the connection if we are done with it.
# Just be sure any changes have been committed or they will be lost.
conn.close()


OperationalError: near "IF": syntax error