# Music Recommender System using Apache Spark and Python


## Necessary Package Imports

In [1]:
import csv 
import random
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

## Loading data

In [2]:
data = list()
included_cols = [12, 13, 11]
with open('../Sample Data/merged_BR3.csv') as csvfile:
    reader = csv.reader(csvfile)
    next(reader, None)
    for row in reader:
        if row[3] == 'Huntersville':
            content = (int(float(row[12])), int(float(row[13])), float(row[11]))
            data.append(tuple(content))
dataParallelized = sc.parallelize(data)
#dataParallelized.collect()

####  Splitting Data for Testing

In [3]:
#splitting the RDD into training and test datasets [.6, .4]
training_set, testing_set = dataParallelized.randomSplit([.6,.4], 13)
training_set.cache()
testing_set.cache()

print training_set.take(3)
print testing_set.take(3)


[(22, 3, 1.85), (23, 3, 2.0), (24, 3, 0.644444444444)]
[(26, 3, 5.0), (27, 3, 4.0), (1551, 68, 5.0)]


## The Recommender Model

For this project, we will train the model with implicit feedback. You can read more information about this from the collaborative filtering page: [http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html](http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html). The [function you will be using](http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS.trainImplicit) has a few tunable parameters that will affect how the model is built. 

### Model Evaluation

Although there may be several ways to evaluate a model, we will use a simple method here. Suppose we have a model and some dataset of *true* artist plays for a set of users. This model can be used to predict the top X artist recommendations for a user and these recommendations can be compared the artists that the user actually listened to (here, X will be the number of artists in the dataset of *true* artist plays). Then, the fraction of overlap between the top X predictions of the model and the X artists that the user actually listened to can be calculated. This process can be repeated for all users and an average value returned.

For example, suppose a model predicted [1,2,4,8] as the top X=4 artists for a user. Suppose, that user actually listened to the artists [1,3,7,8]. Then, for this user, the model would have a score of 2/4=0.5. To get the overall score, this would be performed for all users, with the average returned.

In [10]:
import math

def score(predict, actual):
    MSE = []
    count = 0.0
    for a in actual:
        for p in predict:
            if a[0] == p[0]:
                #print str(p[1]) + " " + str(a[1])
                count += 1
                SE = (a[1] - p[1])**2
                MSE.append(SE)
    if count == 0.0:
        return -1
    else:
        return sum(MSE)/count

def modelEval(mod, trainData, testData):
    test_userIDs = testData.map(lambda p: p[0]).distinct().collect()
    #print test_userIDs
    test_companyIDs = dataParallelized.map(lambda p: p[1]).distinct().collect()
    #print test_companyIDs
    trainSet = trainData.map(lambda x: (x[0], x[1])).filter(lambda x: x[0] in test_userIDs)
    trainSet = trainSet.groupByKey().map(lambda x: (x[0], list(x[1])))
    #print trainSet.take(3)
    #if bid not in [y[0] for y in x[1]]
    validationSet = trainSet.flatMap(lambda x: [(x[0],bid) for bid in test_companyIDs])
    #print validationSet.take(3)
    actualD = testData.map(lambda x: (x[0], (x[1], x[2]))).groupByKey()
    actualD = actualD.map(lambda x: (x[0], list(x[1]))).collectAsMap()
    print actualD
    predictD = mod.predictAll(validationSet).map(lambda x: (x[0], (x[1], x[2])))
    predictD = predictD.groupByKey().map(lambda x: (x[0], sorted(list(x[1]), key=lambda score: score[1], reverse=True)))
    maxList = predictD.map(lambda x: x[1][0][1])
    minList = predictD.map(lambda x: x[1][-1][1])
    maxVal = max(maxList.collect())
    minVal = min(minList.collect())
    scale = maxVal - minVal
    predictD_scale = predictD.map(lambda x: (x[0], [(y[0],((y[1]-minVal)/scale)*6) for y in x[1]]))
    
    #print predictD.take(1)
    scores = []
    for entry in predictD_scale.collect():
        score_pe = score(entry[1], actualD[entry[0]])
        #print score_pe
        if score_pe != -1:
            scores.append(score_pe)
    MSE_score = sum(scores)/float(len(scores))
    RMSE_score = math.sqrt(MSE_score)
    return RMSE_score

    
    


### Model Construct
Now we can build the best model possibly using the validation set of data and the `modelEval` function. Loop through the values [2, 10, 20] and figure out which one produces the highest scored based on your model evaluation function.

In [12]:
ranks = [2]
for r in ranks:
    model = ALS.trainImplicit(training_set, rank=r, seed=345)
    scorer = modelEval(model, training_set, testing_set)
    print "The model score for rank %d is %f" % (r, scorer)


{2: [(3293, 3.0), (7667, 1.7)], 3: [(551, 1.7), (1343, 5.4), (3333, 3.05), (3565, 3.0), (3809, 3.0), (3996, 5.4), (4175, 3.0), (4694, 3.42), (7279, 3.45), (7802, 3.45), (7853, 4.4), (9877, 3.15), (9886, 5.0)], 7: [(3398, 5.15)], 11: [(3844, 5.0), (6073, 0.625)], 65548: [(9905, 1.0)], 8207: [(5506, 4.25)], 16405: [(3802, 3.0), (4212, 4.3)], 22: [(3483, 4.3)], 23: [(1370, 3.3), (1527, 5.4), (1805, 3.0), (2249, 0.5375), (2271, 4.0), (2558, 3.0), (2563, 3.3), (3287, 3.0), (3902, 4.0), (3914, 3.0), (4609, 3.0), (4949, 1.85), (6912, 4.0), (7041, 1.7), (7579, 3.0), (7625, 4.0), (8133, 4.3), (8263, 4.3), (9877, 4.15)], 24: [(308, 5.28571428571), (1371, 4.325), (2533, 2.0), (2589, 1.76666666667), (2645, 0.578571428571), (3423, 1.68666666667), (3876, 4.35555555556), (3914, 4.38648648649), (3917, 4.0), (4504, 4.353125), (4508, 2.0), (4672, 3.38), (5985, 4.35), (6061, 4.3), (7602, 4.35227272727), (7689, 5.36666666667), (9840, 3.0), (9886, 4.32), (9887, 3.34375), (9910, 1.57368421053)], 25: [(1230,

Now, using the bestModel, we will check the results over the test data. 

In [7]:
bestModel = ALS.trainImplicit(trainData, rank=10, seed=345)
modelEval(bestModel, testData)

0.05918491594853054

## Trying Some Businesses  Recommendations
Using the best model above, predict the top 5 artists for user `1059637` using the [recommendProducts](http://spark.apache.org/docs/1.5.2/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.MatrixFactorizationModel.recommendProducts) function. Map the results (integer IDs) into the real artist name using `artistAlias`. Print the results. The output should look as follows:

In [16]:
recommend_artists = bestModel.recommendProducts(1059637, 5)
for i in range(5):
    print "Artist %s : %s" % (i, artist_data_map.get(recommend_artists[i].product))

Artist 0 : The Used
Artist 1 : blink-182
Artist 2 : Taking Back Sunday
Artist 3 : Brand New
Artist 4 : Jimmy Eat World
