# Music Recommender System using Apache Spark and Python

## Description

A recommender system that will recommend new musical artists to a user based on their listening history. Suggesting different songs or musical artists to a user is important to many music streaming services, such as Pandora and Spotify. In addition, this type of recommender system could also be used as a means of suggesting TV shows or movies to a user (e.g., Netflix). 

To create this system I have used Spark and collaborative filtering technique. 

## Datasets

I have used publicly available song data from audioscrobbler, which can be found [here](http://www-etud.iro.umontreal.ca/~bergstrj/audioscrobbler_data.html). However, the original data files were modified so that the code will run in a reasonable time on a single machine. The reduced data files have been suffixed with `_small.txt` and contains only the information relevant to the top 50 most prolific users (highest artist play counts).

The original data file `user_artist_data.txt` contained about 141,000 unique users, and 1.6 million unique artists. About 24.2 million users’ plays of artists are recorded, along with their count.

Note that when plays are scribbled, the client application submits the name of the artist being played. This name could be misspelled or nonstandard, and this may only be detected later. For example, "The Smiths", "Smiths, The", and "the smiths" may appear as distinct artist IDs in the data set, even though they clearly refer to the same artist. So, the data set includes `artist_alias.txt`, which maps artist IDs that are known misspellings or variants to the canonical ID of that artist.

The `artist_data.txt` file then provides a map from the canonical artist ID to the name of the artist.

## Necessary Package Imports

In [18]:
from pyspark.mllib.recommendation import *
import random
from operator import *

In [19]:
def parseUserArtist(item):
    item=item.split()
    userId=int(item[0])
    artistId=int(item[1])
    count=int(item[2])
    result_tup=(userId,artistId,count)
    return result_tup
def convertBadToGoodIds(item):
    artistId=item[1]
    if artistId in canonicalMap.keys():
        artistId=canonicalMap.get(item[1])
    return (item[0],artistId,item[2])  

artistData = sc.textFile('artist_data_small.txt').map(lambda x: x.split('\t')).map(lambda x: [int(x[0]), x[1]])
artistAlias = sc.textFile('artist_alias_small.txt').map(lambda x: x.split('\t')).map(lambda x: [int(x[0]), int(x[1])])
canonicalMap = artistAlias.collectAsMap()
userArtistData = sc.textFile('user_artist_data_small.txt').map(parseUserArtist)
userArtistData=userArtistData.map(convertBadToGoodIds)




## Data Exploration

Finds the three users with the highest number of total play counts (sum of all counters) and prints the user ID, the total play count, and the mean play count (average number of times a user played an artist). 

In [20]:
splitUserData = userArtistData.map(lambda x: (x[0], x[2]))
finalList = splitUserData.reduceByKey(lambda a,b: a + b).map(lambda x: (x[1], x[0])).sortByKey(False)
countMap = splitUserData.countByKey()
for item in finalList.collect()[0:3]:
    print "User %d has a total play count of %d and a mean play count of %d." %(item[1],item[0],(item[0]/countMap[item[1]]))

User 1059637 has a total play count of 674412 and a mean play count of 1878.
User 2064012 has a total play count of 548427 and a mean play count of 9455.
User 2069337 has a total play count of 393515 and a mean play count of 1519.


####  Splitting Data for Testing

Divides the data (`userArtistData`) into:
* A training set, `trainData`, that will be used to train the model which is 40% of the data.
* A validation set, `validationData`, used to perform parameter tuning which is 40% of the data.
* A test set, `testData`, used for a final evaluation of the model which is 20% of the data.

A random seed value of 13 is used. Since these datasets will be repeatedly used it is persisted in memory using the cache.

In [21]:
trainData, validationData, testData = userArtistData.randomSplit([0.4, 0.4, 0.2], 13)
trainData.cache()
validationData.cache()
testData.cache()
print trainData.collect()[0:3]
print validationData.collect()[0:3]
print testData.collect()[0:3]
print trainData.count()
print validationData.count()
print testData.count()

[(1059637, 1000049, 1), (1059637, 1000056, 1), (1059637, 1000113, 5)]
[(1059637, 1000010, 238), (1059637, 1000062, 11), (1059637, 1000112, 423)]
[(1059637, 1000094, 1), (1059637, 1000130, 19129), (1059637, 1000139, 4)]
19817
19633
10031


## The Recommender Model

The model is trained with implicit feedback. You can read more information about this from the collaborative filtering page: [http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html](http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html). The function (http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS.trainImplicit) has a few tunable parameters that will affect how the model is built. Therefore, to get the best model, a small parameter sweep is done to choose the model that performs the best on the validation set.

After building a model for evaluation, a parameter sweep is done, evaluating each combination of parameters on the validation data to choose the optimal set of parameters. The parameters are then used to make predictions on the test data.

### Model Evaluation

Suppose we have a model and some dataset of *true* artist plays for a set of users. This model can be used to predict the top X artist recommendations for a user and these recommendations can be compared the artists that the user actually listened to (here, X will be the number of artists in the dataset of *true* artist plays). Then, the fraction of overlap between the top X predictions of the model and the X artists that the user actually listened to can be calculated. This process is repeated for all users and an average value returned.

For example, suppose a model predicted [1,2,4,8] as the top X=4 artists for a user. Suppose, that user actually listened to the artists [1,3,7,8]. Then, for this user, the model would have a score of 2/4=0.5. To get the overall score, this would be performed for all users, with the average returned.

**NOTE: when using the model to predict the top-X artists for a user, the artists listed with that user in the  training data are not used.**

`modelEval` takes a model (the output of ALS.trainImplicit) and a dataset as input. For parameter tuning, the dataset parameter is set to the validation data (`validationData`). After parameter tuning, the model is evaluated on the test data (`testData`).

In [22]:
def modelEval(model,dataset):
    
    subDataset=dataset.map(lambda x:(x[0],x[1])).groupByKey()
    datasetMap = subDataset.collectAsMap()
    subTrainData=trainData.map(lambda x:(x[0],x[1])).groupByKey()
    trainDataMap = subTrainData.collectAsMap()
    allArtists=artistData.map(lambda x:(x[0]))
    allArtists = allArtists.collect()
    total=0.0
    userCount=0
    
    for user in datasetMap.keys():
        artistsInTrainData=trainDataMap.get(user)
        artistsNotInTrainData=[]
        for x in allArtists:
            if x not in artistsInTrainData:
                artistsNotInTrainData.append(x)
        result=[]
        for x in artistsNotInTrainData:
            record=(user,x)
            result.append(record)
        finalRDD=sc.parallelize(result)
        trueArtists=datasetMap.get(user) 
        X=len(trueArtists)
        finalResult=model.predictAll(finalRDD)
        prediction = finalResult.map(lambda x: (x[2], x[1])).sortByKey(False).map(lambda x: x[1])
        total += len(set(prediction.take(X)).intersection(set(trueArtists)))/float(X)
        userCount=userCount+1
        
    
    print "The model score for rank %d is %f"%(rank,float(total/float(userCount)))  

### Model Construction

The best model is built using the validation set of data and the `modelEval` function. Although, there are a few parameters we could optimize, for the sake of time, I have tried a few different values for the [rank parameter](http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html#collaborative-filtering) (everything else is left at its default value, **except make `seed`=345**). Loop through the values [2, 10, 20] and figure out which one produces the highest scored based on your model evaluation function.

Note: this procedure may take several minutes to run.


In [23]:
ranks=[2,10,20]
for rank in ranks:
    Model = ALS.trainImplicit(trainData, rank=rank, seed=345)
    rank = rank
    modelEval(Model, validationData)

The model score for rank 2 is 0.093266
The model score for rank 10 is 0.097496
The model score for rank 20 is 0.083883


The top X recommended Artists for a particular user can be computer as below using the Best model.

In [28]:
topFive = bestModel.recommendProducts(1059637,5)
artistMap=artistData.collectAsMap()
i = 0
for artist in topFive:
    print "Artist " + str(i) + ": " + artistMap.get(artist[1])
    i += 1

Artist 0: blink-182
Artist 1: Elliott Smith
Artist 2: Taking Back Sunday
Artist 3: Incubus
Artist 4: Death Cab for Cutie
