### Music Artist Recommender Engine built with Apache Spark and Python

In [2]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql.functions import concat, col, lit
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator

#### Load data
Load the two datasets of music artist information and user ratings.

In [4]:
artist = spark.read.load("/FileStore/tables/artist.csv", format='csv', header = True)
music = spark.read.load("/FileStore/tables/music.csv", format='csv', header = True)

In [5]:
artist = artist.select("artistID", "artist")
artist.show(10)

In [6]:
music = music.select("userID", "artistID", "rating")
music.show(10)

#### Data processing

In [8]:
# Reset userID, artistID and rating dtypes
artist = artist.withColumn('artistID', artist['artistID'].cast('int'))
music = (music.withColumn('userID', music['userID'].cast('int'))
           .withColumn('artistID', music['artistID'].cast('int'))
           .withColumn('rating', music['rating'].cast('float')))

In [9]:
music.printSchema()

In [10]:
artist.printSchema()

In [11]:
# Rating should be from 0-100, 255 implies do not play anymore
music = music.filter(music['rating'] <= 100)

In [12]:
music.describe().show()

#### Data Exploration
Check the number of artists and users.
Check the artist that with most ratings and users made most ratings.

In [14]:
top_artist = music.groupBy("artistID").count().orderBy('count', ascending=False)
top_user = music.groupBy("userID").count().orderBy('count', ascending=False)
top_artist.createOrReplaceTempView("top_artist")
top_user.createOrReplaceTempView("top_user")
# artist.createOrReplaceTempView("artist")

In [15]:
pop_artist = spark.sql("select * \
                       from top_artist join artist \
                       on top_artist.artistID=artist.artistID \
                       order by top_artist.count desc")

In [16]:
# Show top 20 artist that have most ratings.
pop_artist.show(20)

In [17]:
# top 20 users that have made most ratings.
top_user.show(20)

In [18]:
temp1 = sum(music.groupBy("artistID").count().toPandas()['count'] == 1)
temp2 = music.select('artistID').distinct().count()
print ('{} out of {} artist with one rating'.format(temp1, temp2))
temp3 = sum(music.groupBy("userID").count().toPandas()['count'] == 1)
temp4 = music.select('userID').distinct().count()
print ('{} out of {} user rate one artist'.format(temp3, temp4))

#### Split data into train and test.

In [20]:
# The datasets will be repeatedly used, persist them in memory using the cache function
train, test = music.randomSplit([0.8, 0.2], seed=1234)
train.cache()
test.cache()

#### Build the recommender model using ALS

In [22]:
# If the rating matrix is derived from another source of information, you can set implicitPrefs to True to get better results.
als = ALS(userCol='userID', itemCol='artistID', ratingCol='rating', coldStartStrategy='drop')

In [23]:
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# this grid will have 2 x 5 x 4 = 40 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder()\
    .addGrid(als.maxIter, [5, 10]) \
    .addGrid(als.regParam, [0.05, 0.1, 0.2, 0.4, 0.8])\
    .addGrid(als.rank, [6, 8, 10, 12])\
    .build()

In [24]:
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(metricName="rmse", labelCol="rating",
                          predictionCol="prediction"),
                          numFolds=2)

In [25]:
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

In [26]:
bestModel = cvModel.bestModel

In [27]:
# Make predictions on test data. model is the model with combination of parameters that performed best.
predictions = bestModel.transform(test)
predictions.show(5)

In [28]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse_test = evaluator.evaluate(predictions)

In [29]:
# RMSE of the testing dataset
rmse_test

Compare the RMSE of ALS model with the benchmark results using mean rating of each artist as the prediction.

In [31]:
mean_df = train.groupby('artistId').mean().select('artistID','avg(rating)')
mean_df = mean_df.withColumnRenamed('avg(rating)','prediction')
mean_df.show(5)

In [32]:
test.createOrReplaceTempView("test")
mean_df.createOrReplaceTempView("mean_df")
mean_sql = spark.sql("select test.*, mean_df.prediction \
                       from test join mean_df \
                       on test.artistID = mean_df.artistID")

In [33]:
# RMSE of mean ratings
rmse_mean = evaluator.evaluate(mean_sql)
rmse_mean

ALS model has RMSE much less than using mean rating as prediction.

#### Make recommendations for a selected user

In [36]:
# Generate top 10 user recommendations for a specified set of movies
users = music.select(als.getUserCol()).distinct().limit(1)
userSubsetRecs = bestModel.recommendForUserSubset(users, 10)
users.show()

In [37]:
userSubsetRecs = userSubsetRecs.toPandas()
ans = []
for i in range(10):
  ans.append(userSubsetRecs[['recommendations']].iloc[0,0][i][0])
artist.where(col("artistID").isin(ans)).show()