# Music Recommender System using ALS Algorithm with Apache Spark and Python


## /!\ Disclaimer : Installation /!\ 
This notebook is run on google Colab environment.

To run this notebook, you need to have spark installed on your PC. If not, To run it on Google colab, you have first to install Java and Spark by executing the following cell.

If you already have spark correctly installed, you can skip it, and go directlty to the next section.

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://apache.mediamirrors.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar xf spark-3.0.1-bin-hadoop3.2.tgz
!pip3 install -q findspark

In [2]:
import os
#os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

## Description: 

The goal of this project is to create a recommender system that will recommend new musical artists to a user based on their listening history.

To achieve it, we will use Spark and the collaborative filtering technique. We will also use built-in functions provided with spark API. This technique can be also use to build a film recommander system.


### Dataset : 

We will be using some publicly available song data from audioscrobbler. However, we modified the original data files so that the code will run in a reasonable time on a single machine. The reduced data files contains only the information relevant to the top 50 most prolific users (highest artist play counts).


## Import libraries

In [3]:
# Import libraries
import findspark
findspark.init()

from pyspark.mllib.recommendation import *
import random
from operator import *
from collections import defaultdict

## Initialize Spark context

In [4]:
# Initialize Spark Context
from pyspark import SparkContext, SparkConf
spark = SparkContext.getOrCreate()
spark.stop()
spark = SparkContext('local','Recommender')

Load the three datasets into RDDs and name them artistData, artistAlias, and userArtistData.

In [5]:
artistData = spark.textFile('/content/drive/MyDrive/Colab Datasets/data_spark/artist_data_small.txt').map(lambda s:(int(s.split("\t")[0]),s.split("\t")[1]))
artistAlias = spark.textFile('/content/drive/MyDrive/Colab Datasets/data_spark/artist_alias_small.txt')
userArtistData = spark.textFile('/content/drive/MyDrive/Colab Datasets/data_spark/user_artist_data_small.txt')

## Data Exploration:

In the next section we will find the users' total play counts. Then find and print the three users with the highest number of total play counts (sum of all counters).

In [6]:
# Split a sequence into seperate entities and store as int
userArtistData = userArtistData.map(lambda s:(int(s.split(" ")[0]),int(s.split(" ")[1]),int(s.split(" ")[2])))

# Create a dictionary of the 'artistAlias' dataset
artistAliasDictionary = {}
dataValue = artistAlias.map(lambda s:(int(s.split("\t")[0]),int(s.split("\t")[1])))
for temp in dataValue.collect():
    artistAliasDictionary[temp[0]] = temp[1]

# If artistid exists, replace with artistsid from artistAlias, else retain original
userArtistData = userArtistData.map(lambda x: (x[0], artistAliasDictionary[x[1]] if x[1] in artistAliasDictionary else x[1], x[2]))

userSum = userArtistData.map(lambda x:(x[0],x[2]))
playCount1 = userSum.map(lambda x: (x[0],x[1])).reduceByKey(lambda a,b : a+b)
playCount2 = userSum.map(lambda x: (x[0],1)).reduceByKey(lambda a,b:a+b)
playSumAndCount = playCount1.leftOuterJoin(playCount2)


# Count instances by key and store in broadcast variable
playSumAndCount = playSumAndCount.map(lambda x: (x[0],x[1][0],int(x[1][0]/x[1][1])))

# Compute and display users with the highest playcount along with their mean playcount across artists
TopThree = playSumAndCount.top(3,key=lambda x: x[1])
for i in TopThree:
    print('User '+str(i[0])+' has a total play count of '+str(i[1])+' and a mean play count of '+str(i[2])+'.')

User 1059637 has a total play count of 674412 and a mean play count of 1878.
User 2064012 has a total play count of 548427 and a mean play count of 9455.
User 2069337 has a total play count of 393515 and a mean play count of 1519.


## Splitting Data for Testing

In [7]:
# Split the 'userArtistData' dataset into training, validation and test datasets. Store in cache for frequent access
trainData, validationData, testData = userArtistData.randomSplit((0.4,0.4,0.2),seed=16)
trainData.cache()
validationData.cache()
testData.cache()

# Display the first 3 records of each dataset followed by the total count of records for each datasets
print(trainData.take(3))
print(validationData.take(3))
print(testData.take(3))
print(trainData.count())
print(validationData.count())
print(testData.count())

[(1059637, 1000010, 238), (1059637, 1000049, 1), (1059637, 1000056, 1)]
[(1059637, 1000094, 1), (1059637, 1000113, 5), (1059637, 1000130, 19129)]
[(1059637, 1000062, 11), (1059637, 1000112, 423), (1059637, 1000427, 20)]
19764
19854
9863


## The Recommender Model
For this project, we will train the model with implicit feedback. You can read more information about this from the <strong>collaborative filtering</strong> page: http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html. 

To find the best model's parameters combination, we will do a small parameter sweep and choose the model that performs the best on the validation set.

Therefore, we must first devise a way to evaluate models. Once we have a method for evaluation, we can run a parameter sweep, evaluate each combination of parameters on the validation data, and choose the optimal set of parameters. The parameters then can be used to make predictions on the test data.

### Model Evaluation
To evaluate a model, we will compare the fraction of overlap between the top X predictions of the model and the X artists that the user actually listened to. This process can be repeated for all users and an average value returned.

Our function will be named function `modelEval` and will take a model (the output of ALS.trainImplicit) and a dataset as input. The model will be evaluated on the test data (testData).

In [8]:
def modelEval(model, dataset):
    
    # All artists in the 'userArtistData' dataset
    AllArtists = spark.parallelize(set(userArtistData.map(lambda x:x[1]).collect()))
    
    
    # Set of all users in the current (Validation/Testing) dataset
    AllUsers = spark.parallelize(set(dataset.map(lambda x:x[0]).collect()))
    
    
    # Create a dictionary of (key, values) for current (Validation/Testing) dataset
    ValidationAndTestingDictionary ={}
    for temp in AllUsers.collect():
        tempFilter = dataset.filter(lambda x:x[0] == temp).collect()
        for item in tempFilter:
            if temp in ValidationAndTestingDictionary:
                ValidationAndTestingDictionary[temp].append(item[1])
            else:
                ValidationAndTestingDictionary[temp] = [item[1]]
                    
    
    # Create a dictionary of (key, values) for training dataset
    TrainingDictionary = {}
    for temp in AllUsers.collect():
        tempFilter = trainData.filter(lambda x:x[0] == temp).collect()
        for item in tempFilter:
            if temp in TrainingDictionary:
                TrainingDictionary[temp].append(item[1])
            else:
                TrainingDictionary[temp] = [item[1]]
        
    
    # Calculate the prediction score for each user
    PredictionScore = 0.00
    for temp in AllUsers.collect():
        ArtistPrediction =  AllArtists.map(lambda x:(temp,x))
        ModelPrediction = model.predictAll(ArtistPrediction)
        tempFilter = ModelPrediction.filter(lambda x :not x[1] in TrainingDictionary[x[0]])
        topPredictions = tempFilter.top(len(ValidationAndTestingDictionary[temp]),key=lambda x:x[2])
        l=[]
        for i in topPredictions:
            l.append(i[1])
        PredictionScore+=len(set(l).intersection(ValidationAndTestingDictionary[temp]))/len(ValidationAndTestingDictionary[temp])    

        
    # Print average score of the model for all users for the specified rank
    print("The model score for rank "+str(model.rank)+" is ~"+str(PredictionScore/len(ValidationAndTestingDictionary)))

### Model Construction

Now we will just try a few different values for the rank parameter. We coud do it better latter. We will loop through the values [2, 8, 10, 15, 20] and figure out which one produces the highest scored based on your model evaluation function.

Note: this procedure may take several minutes to run.


In [9]:
rankList = [2, 8, 10, 11, 15]
for rank in rankList:
  model = ALS.trainImplicit(trainData, rank , seed=345)
  modelEval(model,validationData)

The model score for rank 2 is ~0.08271570722731275
The model score for rank 8 is ~0.08649870403564805
The model score for rank 10 is ~0.09606702451185019
The model score for rank 11 is ~0.09248519196233718
The model score for rank 15 is ~0.09133240312585972


In [10]:
bestModel = ALS.trainImplicit(trainData, rank=10, seed=345)
modelEval(bestModel, testData)

The model score for rank 10 is ~0.058094084259049265


## Trying Some Artist Recommendations
Using the best model above, predict the top 5 artists for user 1059637 using the <strong>recommendProducts</strong> function.

In [11]:
# Top 5 artists for a particular user and list their names
TopFive = bestModel.recommendProducts(1059637,5)
for item in range(0,5):
    print("Artist "+str(item)+": "+artistData.filter(lambda x:x[0] == TopFive[item][1]).collect()[0][1])

Artist 0: My Chemical Romance
Artist 1: Evanescence
Artist 2: Brand New
Artist 3: Death Cab for Cutie
Artist 4: Avril Lavigne


Now we will predict the top 5 artists for user **2069337** .

In [12]:
# Top 5 artists for a particular user and list their names
TopFive = bestModel.recommendProducts(2069337,5)
for item in range(0,5):
    print("Artist "+str(item)+": "+artistData.filter(lambda x:x[0] == TopFive[item][1]).collect()[0][1])

Artist 0: Give up the Ghost
Artist 1: Whatever It Takes
Artist 2: Keepsake
Artist 3: Kid Dynamite
Artist 4: Counter Action
