# Predicting Song Listens Using Apache Spark

One of the most common uses of big data is to predict what users want.  This allows Google to show you relevant ads, Amazon to recommend relevant products, and Netflix to recommend movies that you might like. We will start with some basic techniques, and then use the [Spark ML][sparkml] library's Alternating Least Squares method to make more sophisticated predictions.

We use triplets from 1 million users from the [million song dataset](http://labrosa.ee.columbia.edu/millionsong/pages/getting-dataset), along with metadata for the songs that includes information such as the song title, artist name and tempo. The triplets com in the form (User ID, Song ID, Number of Plays). 
[sparkml]: https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html

### Load and Cache

The Databricks File System (DBFS) sits on top of S3.  We use both the song plays csv and the song metadata in memory.

In [3]:
# import libraries
import os
from pyspark.sql.types import *
from pyspark.sql import functions as F

# list directories
base_dir = '/FileStore/'
triplets_filename = base_dir + 'train_triplets.txt'

#base_dir2 = '/FileStore/tables/qgz865tr1472080731816/'
songs2tracks_filename = base_dir + 'taste_profile_song_to_tracks.txt'

#base_dir3 = '/FileStore/tables/9204dwdp1472230135186/'
metadata_filename = base_dir + 'track_metadata.csv'

#lyrics_file = base_dr + 'lyrics.txt'

if os.path.sep != '/':
  # Handle Windows.
  triplets_filename = triplets_filename.replace('/', os.path.sep)
  songs2tracks_filename = songs2tracks_filename.replace('/', os.path.sep)
  metadata_filename = metadata_filename.replace('/', os.path.sep)
  #lyrics_file = lyrics_file.replace('/', os.path.sep)
  

In [4]:
# Create schema so the cluster only runs through the data once
plays_df_schema = StructType(
  [StructField('userId', StringType()),
   StructField('songId', StringType()),
   StructField('Plays', IntegerType())]
)

songs2tracks_df_schema = StructType(
  [StructField('songId', StringType()),
   StructField('trackId', StringType())]
)

metadata_df_schema = StructType(
  [StructField('', IntegerType()),
   StructField('analyzer_version', IntegerType()),
   StructField('artist_7digitalid', IntegerType()),
   StructField('artist_familiarity', DoubleType()),
   StructField('artist_hotttness', DoubleType()),
   StructField('artist_id', StringType()),
   StructField('artist_latitude', DoubleType()),
   StructField('artist_location', StringType()),
   StructField('artist_longitude', DoubleType()),
   StructField('artist_mbid', StringType()),
   StructField('artist_name', StringType()),
   StructField('artist_playmeid', IntegerType()),
   StructField('genre', StringType()),
   StructField('idx_artist_terms', IntegerType()),
   StructField('idx_similar_artists', IntegerType()),
   StructField('release', StringType()),
   StructField('release_7digitalid', IntegerType()),
   StructField('song_hotttnesss', DoubleType()),
   StructField('songId', StringType()),
   StructField('title', StringType()),
   StructField('track_7digitalid', IntegerType())
   #StructField('index', IntegerType()),
   #StructField('trackId', StringType()),
   #StructField('duration', DoubleType()),
   #StructField('year', IntegerType()),
   #StructField('shs_perf', DoubleType()),
   #StructField('shs_work', DoubleType())
  ]
)

In [5]:
# load in data
raw_plays_df = sqlContext.read.format('com.databricks.spark.csv') \
                              .options(delimiter = '\t', header=True,inferSchema=False) \
                              .schema(plays_df_schema) \
                              .load(triplets_filename)

songs2tracks_df = sqlContext.read.format('com.databricks.spark.csv') \
                                 .options(delimiter = '\t', header=True,inferSchema=False) \
                                 .schema(songs2tracks_df_schema) \
                                 .load(songs2tracks_filename)

metadata_df = sqlContext.read.format('com.databricks.spark.csv') \
                             .options(delimiter = ',', header=True,inferSchema=False) \
                             .schema(metadata_df_schema) \
                             .load(metadata_filename)

#lyrics_df = sqlContext.read.format('com.databricks.spark.csv') \
#                             .options(delimiter = ',', header=True,inferSchema=True) 
#                            

In [6]:
from pyspark.sql.functions import avg
display(metadata_df.select("artist_familiarity","artist_hotttness","artist_name").groupBy("artist_name").agg(avg("artist_familiarity"),avg("artist_hotttness")))

artist_name,avg(artist_familiarity),avg(artist_hotttness)
Eurythmics,0.7510076200369779,0.5236990777653908
Pery Ribeiro,0.3854174467927126,0.312131221495582
Silverstein,0.7975761098264473,0.6055071356905753
Michelle & Vickie,0.3684043267322525,0.3037906980148842
Marga Gomez,0.3107765232758942,0.2284286668103824
DJ Taz,0.4099894915499974,0.240551990468017
Rhian Sheehan,0.5580077519194866,0.414780659073497
Brian Tyler,0.6752954694563452,0.4426967195814664
Faith Yang,0.3740246330351284,0.1872132902492053
Generation X,0.6111809474469422,0.3990337695919728


In [7]:
# cache the dataframes
raw_plays_df.cache()
raw_plays_df.show(5)

songs2tracks_df.cache()
songs2tracks_df.show(5)

metadata_df.cache()
metadata_df.show(5)

In [8]:
# change ids from strings to integers - this is needed for ALS
userId_change = raw_plays_df.select('userId').distinct()
songId_change = raw_plays_df.select('songId').distinct()
artistId_change = metadata_df.select('artist_id').distinct()

print('Number of unique users: {0}'.format(userId_change.count()))
print('Number of unique songs: {0}'.format(songId_change.count()))
print('Number of unique artists: {0}'.format(artistId_change.count()))

In [9]:
# change ids from strings to integers - this is needed for ALS
userId_change.createOrReplaceTempView('u')
userId_change = spark.sql('select row_number() over (order by "userId") as new_userId, * from u')

songId_change.createOrReplaceTempView('s')
songId_change = spark.sql('select row_number() over (order by "songId") as new_songId, * from s')

artistId_change.createOrReplaceTempView('a')
artistId_change = spark.sql('select row_number() over (order by "artist_id") as new_artistId, * from a')

In [10]:
userId_change.cache()
userId_change.show(5)

songId_change.cache()
songId_change.show(5)

artistId_change.cache()
artistId_change.show(5)

In [11]:
unique_users = userId_change.count()
unique_songs = songId_change.count()
unique_artists = artistId_change.count()
print('Number of unique users: {0}'.format(unique_users))
print('Number of unique songs: {0}'.format(unique_songs))
print('Number of unique artists: {0}'.format(unique_artists))

In [12]:
# join dataframes - append new Ids
raw_plays_df_with_int_ids = raw_plays_df.join(userId_change, 'userId').join(songId_change, 'songId')

# join dataframes - append original artist Ids and then new Ids
raw_plays_df_with_int_ids = raw_plays_df_with_int_ids.join(metadata_df.select('artist_id', 'songId'), 'songId')
raw_plays_df_with_int_ids = raw_plays_df_with_int_ids.join(artistId_change, 'artist_id')

# remove half users to make more manageable -- SKIP THIS STEP
#raw_plays_df_with_int_ids = raw_plays_df_with_int_ids.filter(raw_plays_df_with_int_ids.new_userId < unique_users/2)

print('Number of unique users: {0}'.format(unique_users))
print('Number of unique songs: {0}'.format(unique_songs))
print('Number of unique artists: {0}'.format(unique_artists))

# cache
raw_plays_df_with_int_ids.cache()
raw_plays_df_with_int_ids.show(5)

## Basic Recommendations

One way to recommend songs is to simply always recommend the songs with that are most listened to. In this part, We use Spark to find the name, total number of plays, filter those that have been listened to by more that 200 unique users to account for a broad range of appeal

### For Songs

In [15]:
song_ids_with_total_listens = raw_plays_df_with_int_ids.groupBy('songId') \
                                                       .agg(F.count(raw_plays_df_with_int_ids.Plays).alias('User_Count'),
                                                            F.sum(raw_plays_df_with_int_ids.Plays).alias('Total_Plays')) \
                                                       .orderBy('Total_Plays', ascending = False)

song_ids_with_total_listens.show(3, truncate=False)

In [16]:
song_names_with_plays_df = song_ids_with_total_listens.join(metadata_df, 'songId' ) \
                                                      .filter('User_Count >= 200') \
                                                      .select('artist_name', 'title', 'songId', 'User_Count','Total_Plays') \
                                                      .orderBy('Total_Plays', ascending = False)

song_names_with_plays_df.show(3, truncate = False)

### For Artists

In [18]:
# Basic recommendation (popularity) -- ARTISTS

artist_ids_with_total_listens = raw_plays_df_with_int_ids.groupBy('artist_id') \
                                                       .agg(F.count(raw_plays_df_with_int_ids.Plays).alias('User_Count'),
                                                            F.sum(raw_plays_df_with_int_ids.Plays).alias('Total_Plays')) \
                                                       .orderBy('Total_Plays', ascending = False)
artist_ids_with_total_listens.show(5)

In [19]:
# Join with metadata to get artist details
artist_names_with_plays_df = artist_ids_with_total_listens.join(metadata_df, 'artist_id' ) \
                                                      .filter('User_Count >= 200') \
                                                      .select('artist_name', 'artist_id', 'User_Count','Total_Plays') \
                                                      .distinct() \
                                                      .orderBy('Total_Plays', ascending = False)

artist_names_with_plays_df.show(5)

In [20]:
artist_ids_with_total_listens.show(3)

## Collaborative Filtering

[Collaborative filtering][collab] is a method of making automatic predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption of the collaborative filtering approach is that if a person A has the same opinion as a person B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a person chosen randomly. 

----

For song recommendations, we start with a matrix whose entries are number of song plays by users.

Since not all users have listened to all songs, we do not know all of the entries in this matrix, which is precisely why we need collaborative filtering.  For each user, we have number of song plays for only a subset of the songs.  With collaborative filtering, the idea is to approximate the matrix by factorizing it as the product of two matrices: one that describes properties of each user, and one that describes properties of each song.


We want to select these two matrices such that the error for the users/songs pairs where we know the correct number of plays is minimized.  The Alternating Least Squares (ALS) algorithm does this by first randomly filling the users matrix with values and then optimizing the value of the songs such that the error is minimized.  Then, it holds the songs matrix constant and optimizes the value of the user's matrix.  This alternation between which matrix to optimize is the reason for the "alternating" in the name.

Given a fixed set of user factors (i.e., values in the users matrix), we use the known number of plays to find the best values for the songs factors using the least squares optimization.  Then we "alternate" and pick the best user factors given fixed songs factors.

In [22]:
# We'll hold out 60% for training, 20% of our data for validation, and leave 20% for testing
seed = 1
(split_60_df, split_a_20_df, split_b_20_df) = raw_plays_df_with_int_ids.randomSplit([0.6, 0.2, 0.2], seed = seed)

# Let's cache these datasets for performance
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count())
)
training_df.show(3)
validation_df.show(3)
test_df.show(3)


In [23]:
#Number of plays needs to be double type, not integers
validation_df = validation_df.withColumn("Plays", validation_df["Plays"].cast(DoubleType()))
validation_df.show(10)

In [24]:
artist_plays_df = raw_plays_df_with_int_ids.groupBy("artist_id", "userId", "new_userId", "new_artistId").agg(F.sum(raw_plays_df_with_int_ids.Plays).alias('Plays'))

artist_plays_df.show(5)

In [25]:
# create test, trian, validation for Artist

# We'll hold out 60% for training, 20% of our data for validation, and leave 20% for testing
seed = 1
(split_60_artist, split_a_20_artist, split_b_20_artist) = artist_plays_df.randomSplit([0.6, 0.2, 0.2], seed = seed)

# Let's cache these datasets for performance
training_artist = split_60_artist.cache()
validation_artist = split_a_20_artist.cache()
test_artist = split_b_20_artist.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_artist.count(), validation_artist.count(), test_artist.count())
)

#Number of plays needs to be double type, not integers
training_artist = training_artist.withColumn("Plays", training_artist["Plays"].cast(DoubleType()))
validation_artist = validation_artist.withColumn("Plays", validation_artist["Plays"].cast(DoubleType()))
test_artist = test_artist.withColumn("Plays", test_artist["Plays"].cast(DoubleType()))

training_artist.show(3)
validation_artist.show(3)
test_artist.show(3)

### Alternating Least Squares

In this part, we will use the Apache Spark ML Pipeline implementation of Alternating Least Squares, [ALS](http://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS). ALS takes a training dataset (DataFrame) and several parameters that control the model creation process. To determine the best values for the parameters, we will use ALS to train several models, and then we will select the best model and use the parameters from that model in the rest of the analysis.

The process we will use for determining the best model is as follows:
1. Pick a set of model parameters. The most important parameter to model is the *rank*, which is the number of columns in the Users matrix or the number of rows in the Movies matrix (blue in the diagram above). In general, a lower rank will mean higher error on the training dataset, but a high rank may lead to [overfitting](https://en.wikipedia.org/wiki/Overfitting).  We will train models with ranks of 4, 8, 12 and 16 using the `training_df` dataset, as well as iterating over the regularization parameter. Because most of the number of songs are have only be listened to once, I expect a higher regularization parameter to be more effective.

2. We set the appropriate parameters on the `ALS` object:
    * The "User" column will be set to the values in our `new_userId` DataFrame column.
    * The "Item" column will be set to the values in our `new_songId` or `new_artistId` DataFrame column.
    * The "Rating" column will be set to the values in our `Plays` DataFrame column.
    * We'll set the max number of iterations to be 5.

3. Have the ALS output transformation (i.e., the result of [ALS.fit()](http://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS.fit)) produce a _new_ column
   called "prediction" that contains the predicted value.

4. Create multiple models using [ALS.fit()](http://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS.fit), one for each of our rank values. We'll fit
   against the training data set (`training_df`).

5. For each model, we'll run a prediction against our validation data set (`validation_df`) and check the error.

6. We'll keep the model with the best error rate.

In [27]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Let's initialize our ALS learner
als = ALS()

# Now set the parameters for the method
als.setMaxIter(5)\
   .setSeed(seed)\
   .setItemCol("new_songId")\
   .setRatingCol("Plays")\
   .setUserCol("new_userId")

# Now let's compute an evaluation metric for our test dataset
# We Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="Plays", metricName="rmse")

tolerance = 0.03
ranks = [4, 8, 12, 16]
regParams = [0.15, 0.2, 0.25]
errors = [[0]*len(ranks)]*len(regParams)
models = [[0]*len(ranks)]*len(regParams)
err = 0
min_error = float('inf')
best_rank = -1
i = 0
for regParam in regParams:
  j = 0
  for rank in ranks:
    # Set the rank here:
    als.setParams(rank = rank, regParam = regParam)
    # Create the model with these parameters.
    model = als.fit(training_df)
    # Run the model to create a prediction. Predict against the validation_df.
    predict_df = model.transform(validation_df)

    # Remove NaN values from prediction (due to SPARK-14489)
    predicted_plays_df = predict_df.filter(predict_df.prediction != float('nan'))
    predicted_plays_df = predicted_plays_df.withColumn("prediction", F.abs(F.round(predicted_plays_df["prediction"],0)))
    # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
    error = reg_eval.evaluate(predicted_plays_df)
    errors[i][j] = error
    models[i][j] = model
    print ('For rank %s, regularization parameter %s the RMSE is %s' % (rank, regParam, error))
    if error < min_error:
      min_error = error
      best_params = [i,j]
      j += 1
  i += 1

als.setRegParam(regParams[best_params[0]])
als.setRank(ranks[best_params[1]])
print ('The best model was trained with regularization parameter %s' % regParams[best_params[0]])
print ('The best model was trained with rank %s' % ranks[best_params[1]])
my_model = models[best_params[0]][best_params[1]]

In [28]:
# Let's initialize our ALS learner
als = ALS()

# Now set the parameters for the method
als.setMaxIter(5)\
   .setSeed(seed)\
   .setItemCol("new_songId")\
   .setRatingCol("Plays")\
   .setUserCol("new_userId")

# Now let's compute an evaluation metric for our test dataset
# We Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="Plays", metricName="rmse")

als.setParams(rank = 4, regParam = 0.25)
model = als.fit(training_df)
predict_df = model.transform(validation_df)
predict_test = model.transform(test_df)

In [29]:
# Remove NaN values from prediction (due to SPARK-14489)
predicted_plays_df = predict_df.filter(predict_df.prediction != float('nan'))
predicted_plays_df = predicted_plays_df.withColumn("prediction", F.abs(F.round(predicted_plays_df["prediction"],0)))

In [30]:
#Example of predicted plays
predicted_plays_df.show(10)

Because we're only looking for the nonzero plays, one way to increase the accuracy would be to convert predicted listens that are zero to one. However this would not really help the model overall, since our ultimate goal is for song recommendation and if the predicted number of plays is zero that is okay, this means that the user does o want to listen to the song.

### Testing The Model

So far, we used the `training_df` and `validation_df` datasets to select the best model.  Since we used these two datasets to determine what model is best, we cannot use them to test how good the model is; otherwise, we would be very vulnerable to [overfitting](https://en.wikipedia.org/wiki/Overfitting).  To decide how good the model is, we need to use the `test_df` dataset.  We will use the best rank, and best regularization parameter in the list `best_params` previously determined to create a model for predicting songs for the test dataset and then we will compute the RMSE.

In [33]:
# In ML Pipelines, this next step has a bug that produces unwanted NaN values. We
# have to filter them out. See https://issues.apache.org/jira/browse/SPARK-14489

test_df = test_df.withColumn("Plays", test_df["Plays"].cast(DoubleType()))
predict_test_df = my_model.transform(test_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))

# Round floats to whole numbers
predicted_test_df = predicted_test_df.withColumn("prediction", F.abs(F.round(predicted_test_df["prediction"],0)))
# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_df DataFrame
test_RMSE = reg_eval.evaluate(predicted_test_df)

print('The model had a RMSE on the test set of {0}'.format(test_RMSE))

### Comparing the Model

Looking at the RMSE for the results predicted by the model versus the values in the test set is one way to evalute the quality of our model. Another way to evaluate the model is to evaluate the error from a test set where every rating is the average number of plays from the training set.

In [35]:
avg_plays_df = training_df.groupBy().avg('Plays').select(F.round('avg(Plays)'))

avg_plays_df.show(3)
# Extract the average rating value. (This is row 0, column 0.)
training_avg_plays = avg_plays_df.collect()[0][0]

print('The average number of plays in the dataset is {0}'.format(training_avg_plays))

# Add a column with the average rating
test_for_avg_df = test_df.withColumn('prediction', F.lit(training_avg_plays))

# Run the previously created RMSE evaluator, reg_eval, on the test_for_avg_df DataFrame
test_avg_RMSE = reg_eval.evaluate(test_for_avg_df)

print("The RMSE on the average set is {0}".format(test_avg_RMSE))

Our model performs slightly better than simply predicting 3 songs for all in the test case.

## Prediction for a User

We can make a prediction for a user. To do this we make a list of all the songs that the user has not already listened to, and feed it into to model. The model will generate it the number of predicted plays, which we can arange in descending order.

In [38]:
spark.conf.set("spark.sql.crossJoin.enabled", "true")

In [39]:
# actual listens
raw_plays_df_with_int_ids.filter(raw_plays_df_with_int_ids.new_userId == 3543).join(metadata_df, 'songId').select('artist_name', 'title'). distinct().orderBy('Plays', ascending=False).show()

In [40]:
UserID = 3543
listened_songs = raw_plays_df_with_int_ids.filter(raw_plays_df_with_int_ids.new_userId == UserID) \
                                          .join(metadata_df, 'songId') \
                                          .select('new_songId', 'artist_name', 'title') \
                                          
# generate list of listened songs
listened_songs_list = []
for song in listened_songs.collect():
  listened_songs_list.append(song['new_songId'])

print('Songs user has listened to:')
listened_songs.select('artist_name', 'title').show()

# generate dataframe of unlistened songs
unlistened_songs = raw_plays_df_with_int_ids.filter(~ raw_plays_df_with_int_ids['new_songId'].isin(listened_songs_list)) \
                                            .select('new_songId').withColumn('new_userId', F.lit(UserID)).distinct()

# feed unlistened songs into model
predicted_listens = model.transform(unlistened_songs)

# remove NaNs
predicted_listens = predicted_listens.filter(predicted_listens['prediction'] != float('nan'))

# print output
print('Predicted Songs:')
predicted_listens.join(raw_plays_df_with_int_ids, 'new_songId') \
                 .join(metadata_df, 'songId') \
                 .select('artist_name', 'title', 'prediction') \
                 .distinct() \
                 .orderBy('prediction', ascending = False) \
                 .show(10)

### ALS for Artists

In [42]:
# Let's initialize our ALS learner
als_artist = ALS()

# Now set the parameters for the method
als_artist.setMaxIter(5)\
   .setSeed(seed)\
   .setItemCol("new_artistId")\
   .setRatingCol("Plays")\
   .setUserCol("new_userId")

# Now let's compute an evaluation metric for our test dataset
# We Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="Plays", metricName="rmse")

tolerance = 0.03
ranks = [4, 8, 12, 16]
regParams = [0.15, 0.2, 0.25]
errors_artist = [[0]*len(ranks)]*len(regParams)
models_artist = [[0]*len(ranks)]*len(regParams)
err = 0
min_error = float('inf')
best_rank = -1
i = 0
for regParam in regParams:
  j = 0
  for rank in ranks:
    # Set the rank here:
    als_artist.setParams(rank = rank, regParam = regParam)
    # Create the model with these parameters.
    model_artist = als_artist.fit(training_artist)
    # Run the model to create a prediction. Predict against the validation_df.
    predict_artist = model_artist.transform(validation_artist)

    # Remove NaN values from prediction (due to SPARK-14489)
    predicted_plays_artist = predict_artist.filter(predict_artist.prediction != float('nan'))
    predicted_plays_artist = predicted_plays_artist.withColumn("prediction", F.abs(F.round(predicted_plays_artist["prediction"],0)))
    # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
    error_artist = reg_eval.evaluate(predicted_plays_artist)
    errors_artist[i][j] = error_artist
    models_artist[i][j] = model_artist
    print ('For rank %s, regularization parameter %s the RMSE is %s' % (rank, regParam, error_artist))
    if error_artist < min_error:
      min_error = error_artist
      best_params = [i,j]
      j += 1
  i += 1

als_artist.setRegParam(regParams[best_params[0]])
als_artist.setRank(ranks[best_params[1]])
print ('The best model was trained with regularization parameter %s' % regParams[best_params[0]])
print ('The best model was trained with rank %s' % ranks[best_params[1]])
my_model_artist = models_artist[best_params[0]][best_params[1]]

In [43]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Let's initialize our ALS learner
als_artist = ALS()

# Now set the parameters for the method
als_artist.setMaxIter(5)\
   .setSeed(seed)\
   .setItemCol("new_artistId")\
   .setRatingCol("Plays")\
   .setUserCol("new_userId")

# Now let's compute an evaluation metric for our test dataset
# We Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="Plays", metricName="rmse")

als_artist.setParams(rank = 4, regParam = 0.2)
model_artist = als_artist.fit(training_artist)
predict_artist = model_artist.transform(validation_artist)
predict_artist_test = model_artist.transform(test_artist)

In [44]:
# Remove NaN values from prediction
predicted_plays_artist = predict_artist.filter(predict_artist.prediction != float('nan'))
predicted_plays_artist = predicted_plays_artist.withColumn("prediction", F.abs(F.round(predicted_plays_artist["prediction"],0)))

# Remove NaN values from prediction
predicted_plays_artist_test = predict_artist_test.filter(predict_artist_test.prediction != float('nan'))
predicted_plays_artist_test = predicted_plays_artist_test.withColumn("prediction", F.abs(F.round(predicted_plays_artist_test["prediction"],0)))

predicted_plays_artist.show(10)
predicted_plays_artist_test.show(10)

### Acutal vs Prediction for a User

In [46]:
# actual
artist_plays_df.filter(artist_plays_df.new_userId == 3543).join(metadata_df, 'artist_id').select('artist_name', 'Plays').distinct().orderBy('Plays', ascending=False).show(10)

In [47]:
artist_plays_df.filter(artist_plays_df.artist_id == 'ARAZCWP129462A97F1').orderBy('Plays', ascending=False).show()

In [48]:
artist_plays_df.filter(artist_plays_df.new_userId == 780942).join(metadata_df, 'artist_id').select('artist_name', 'new_artistId', 'Plays').distinct().show()

In [49]:
artist_plays_df.filter((artist_plays_df.new_artistId == 3784) | (artist_plays_df.new_artistId == 453) | (artist_plays_df.new_artistId == 88) | (artist_plays_df.new_artistId == 2214)).join(metadata_df, 'artist_id').sort('Plays', ascending=False).select('artist_name').distinct().show()

In [50]:
UserID = 980727
listened_artists = artist_plays_df.filter(artist_plays_df.new_userId == UserID) \
                                          .join(metadata_df, 'artist_id') \
                                          .select('new_artistId', 'artist_name', 'title') \
                                          
# generate list of listened artists
listened_artists_list = []
for song in listened_artists.collect():
  listened_artists_list.append(song['new_artistId'])

print('Artists user has listened to:')
listened_artists.select('artist_name').distinct().show()

# generate dataframe of unlistened songs
unlistened_artists = artist_plays_df.filter(~ artist_plays_df['new_artistId'].isin(listened_artists_list)) \
                                            .select('new_artistId').withColumn('new_userId', F.lit(UserID)).distinct()

# feed unlistened songs into model
predicted_artists = model_artist.transform(unlistened_artists)

# remove NaNs
predicted_artists = predicted_artists.filter(predicted_artists['prediction'] != float('nan'))

# print output
print('Predicted Songs:')
predicted_artists.join(artist_plays_df, 'new_artistId') \
                 .join(metadata_df, 'artist_id') \
                 .select('artist_name', 'title', 'prediction') \
                 .distinct() \
                 .orderBy('prediction', ascending = False) \
                 .show(10)

In [51]:
listened_songs.select('artist_name', 'title').show()
unlistened_songs = raw_plays_df_with_int_ids.filter(~ raw_plays_df_with_int_ids['new_songId'].isin(listened_songs_list)) \
                                            .select('new_songId').withColumn('new_userId', F.lit(UserID)).distinct()

predicted_listens_2 = model_2.transform(unlistened_songs)
predicted_listens_2 = predicted_listens_2.filter(predicted_listens_2['prediction'] != float('nan'))

print('Predicted Songs:')
predicted_listens_2.join(raw_plays_df_with_int_ids, 'new_songId') \
                 .join(metadata_df, 'songId') \
                 .select('artist_name', 'title', 'prediction') \
                 .distinct() \
                 .orderBy('prediction', ascending = False) \
                 .show(10)

In [52]:
## Testing
# In ML Pipelines, this next step has a bug that produces unwanted NaN values

predicted_test_artist = my_model_artist.transform(test_artist)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_artist = predicted_test_artist.filter(predicted_test_artist.prediction != float('nan'))

# Round floats to whole numbers
predicted_test_artist = predicted_test_artist.withColumn("prediction", F.abs(F.round(predicted_test_artist["prediction"],0)))
# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_df DataFrame
test_artist_RMSE = reg_eval.evaluate(predicted_test_artist)

print('The model had a RMSE on the test set of {0}'.format(test_artist_RMSE))

<br>

## Making Predictions Based on Songs Listened to at Least Twice

In order to lower the error I hypothesize that if a user has only listened to a song once, it is not representative of their listening profile. Listening to songs at least twice may be more representative and yield lower errors in the model predictions. Also since we want to recommend songs that the user likes, and will hopefully listen to more than once, it makes sense to use train on songs that have been listened to more than once.

In [54]:
raw_plays_df_2more_plays = raw_plays_df.join(userId_change, 'userId') \
                                       .join(songId_change, 'songId') \
                                       .filter(raw_plays_df.Plays >= 2)\
                                       .distinct()

tot_entries_2more = raw_plays_df_2more_plays.count()
print('Total enties with two or more plays: {0}'.format(tot_entries_2more))

raw_plays_df_2more_plays = raw_plays_df_2more_plays.filter(raw_plays_df_2more_plays.new_userId < (userId_change.count())*0.8) \
                                                   .select('new_userId', 'new_songId', 'Plays')
raw_plays_df_2more_plays.cache() 

We still see one number (2) which dominates, at around 37%, however this is much better than for one plays, which had almost 60%

Split into training, validation, and test datasets

In [57]:
# We'll hold out 60% for training, 20% of our data for validation, and leave 20% for testing
#seed = 1800083193L
seed = 1
(split_60_df_2more, split_a_20_df_2more, split_b_20_df_2more) = raw_plays_df_2more_plays.randomSplit([0.6, 0.2, 0.2], seed = seed)

# Let's cache these datasets for performance
training_2more_df = split_60_df_2more.cache()
validation_2more_df = split_a_20_df_2more.cache()
test_2more_df = split_b_20_df_2more.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_2more_df.count(), validation_2more_df.count(), test_2more_df.count())
)
validation_2more_df = validation_2more_df.withColumn("Plays", validation_2more_df["Plays"].cast(DoubleType()))
test_2more_df = test_2more_df.withColumn("Plays", test_2more_df["Plays"].cast(DoubleType()))

training_2more_df.show(3)
validation_2more_df.show(3)
test_2more_df.show(3)

### Train Using Alternating Least Squares

In [59]:
# Let's initialize our ALS learner
als_2more = ALS()

# Now set the parameters for the method
als_2more.setMaxIter(2)\
   .setSeed(seed)\
   .setItemCol("new_songId")\
   .setRatingCol("Plays")\
   .setUserCol("new_userId")

# Now let's compute an evaluation metric for our test dataset
# We Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="Plays", metricName="rmse")

tolerance = 0.03
ranks = [4, 8, 12, 16]
regParams = [0.1, 0.15, 0.2, 0.25]
errors = [[0]*len(ranks)]*len(regParams)
models = [[0]*len(ranks)]*len(regParams)
err = 0
min_error = float('inf')
best_rank = -1
i = 0
for regParam in regParams:
  j = 0
  for rank in ranks:
    # Set the rank here:
    als_2more.setParams(rank = rank, regParam = regParam)
    # Create the model with these parameters.
    model = als_2more.fit(training_2more_df)
    # Run the model to create a prediction. Predict against the validation_df.
    predict_df = model.transform(validation_2more_df)

    # Remove NaN values from prediction (due to SPARK-14489)
    predicted_plays_df = predict_df.filter(predict_df.prediction != float('nan'))
    predicted_plays_df = predicted_plays_df.withColumn("prediction", F.abs(F.round(predicted_plays_df["prediction"],0)))
    # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
    error = reg_eval.evaluate(predicted_plays_df)
    errors[i][j] = error
    models[i][j] = model
    print ('For rank %s, regularization parameter %s the RMSE is %s' % (rank, regParam, error))
    if error < min_error:
      min_error = error
      best_params = [i,j]
    j += 1
  i += 1

als_2more.setRegParam(regParams[best_params[0]])
als_2more.setRank(ranks[best_params[1]])
print ('The best model was trained with regularization parameter %s' % regParams[best_params[0]])
print ('The best model was trained with rank %s' % ranks[best_params[1]])
my_model_2more = models[best_params[0]][best_params[1]]

In [60]:
# Let's initialize our ALS learner
als_2 = ALS()

# Now set the parameters for the method
als_2.setMaxIter(5)\
   .setSeed(seed)\
   .setItemCol("new_songId")\
   .setRatingCol("Plays")\
   .setUserCol("new_userId")

# Now let's compute an evaluation metric for our test dataset
# We Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="Plays", metricName="rmse")

als_2.setParams(rank = 16, regParam = 0.25)
model_2 = als_2.fit(training_2more_df)
predict_2 = model_2.transform(validation_2more_df)
predict_2_test = model_2.transform(test_2more_df)

### Test the Model on the Test Dataset

In [62]:
predict2_df = model_2.transform(test_2more_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted2_test_df = predict2_df.filter(predict2_df.prediction != float('nan'))

# Round floats to whole numbers
predicted2_test_df = predicted2_test_df.withColumn("prediction", F.abs(F.round(predicted2_test_df["prediction"],0)))
# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_df DataFrame
test2_RMSE = reg_eval.evaluate(predicted2_test_df)

print('The model had a RMSE on the test set of {0}'.format(test2_RMSE))

### Comparing the Model

We again compare to selecting the average number of plays from the training dataset

In [64]:
avg_plays2_df = training_2more_df.groupBy().avg('Plays').select(F.round('avg(Plays)'))

avg_plays2_df.show(3)
# Extract the average rating value. (This is row 0, column 0.)
training_avg_plays2 = avg_plays2_df.collect()[0][0]

print('The average number of plays in the dataset is {0}'.format(training_avg_plays2))

# Add a column with the average rating
test_for_avg2_df = test_2more_df.withColumn('prediction', F.lit(training_avg_plays2))

# Run the previously created RMSE evaluator, reg_eval, on the test_for_avg_df DataFrame
test_avg2_RMSE = reg_eval.evaluate(test_for_avg2_df)

print("The RMSE on the average set is {0}".format(test_avg2_RMSE))

Though we see higher RMSEs than when we included songs with 1 play, this is to be expected since the model is likely to there is larger mean and larger variation.

We can also see that, compared to average number of plays in the dataset, excluding one play songs the model performs better.

### Prediction for a User

We use an example user to visually see the kind of songs the model predicts

In [67]:
listened_songs.select('artist_name', 'title').show()
unlistened_songs = raw_plays_df_with_int_ids.filter(~ raw_plays_df_with_int_ids['new_songId'].isin(listened_songs_list)) \
                                            .select('new_songId').withColumn('new_userId', F.lit(UserID)).distinct()

predicted_listens_2 = model_2.transform(unlistened_songs)
predicted_listens_2 = predicted_listens_2.filter(predicted_listens_2['prediction'] != float('nan'))

print('Predicted Songs:')
predicted_listens_2.join(raw_plays_df_with_int_ids, 'new_songId') \
                 .join(metadata_df, 'songId') \
                 .select('artist_name', 'title', 'prediction') \
                 .distinct() \
                 .orderBy('prediction', ascending = False) \
                 .show(10)

## Clustering -- K-Means Clustering for Song clusters -- Madhavi

In [69]:
# Step 1 join dataframes - append song hottness & artist hottness
df_for_clustering = raw_plays_df_with_int_ids.join(metadata_df.select('song_hotttnesss', 'artist_name', 'title','artist_familiarity', 'genre','artist_hotttness','songId'), 'songId')

#df_for_clustering = df_for_clustering.join
#df_for_clustering.na.fill(0).show(5)

In [70]:
from pyspark.sql import functions as F

df_temp = df_for_clustering.select(
  "artist_familiarity",
  F.col("artist_familiarity").cast("int").isNotNull().alias("Value")
)


df_temp.where(df_temp.Value == 'false').select("artist_familiarity").count()


In [71]:
# Step2 - Assemble features
from pyspark.ml.feature import VectorAssembler

df = df_for_clustering
vecAssembler = VectorAssembler(inputCols=["Plays", "artist_hotttness"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.show(5)

In [72]:
# Scale data
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')
scaler_model = scaler.fit(new_df)
scaled_data = scaler_model.transform(new_df)

In [73]:
scaled_data.show(5)

In [74]:
# Step 2 - fit  KMeans model
from pyspark.ml.clustering import KMeans

In [75]:
k_means_10 = KMeans(featuresCol='scaledFeatures', k=10)
k_means_15 = KMeans(featuresCol='scaledFeatures', k=15)
model_k10 = k_means_10.fit(scaled_data)
model_k15 = k_means_15.fit(scaled_data)

In [76]:
model_k15_data = model_k15.transform(scaled_data)
#model_k15_data.groupBy('prediction').count().show()


In [77]:
UserID = 177833
listened_songs_cl = model_k15_data.filter(model_k15_data.new_userId == UserID) \
                                          .select('new_songId', 'artist_name', 'title','Plays','Prediction')  
                                          


listened_songs_cl.show()


In [78]:
UserID = 3543
listened_songs_cl = model_k15_data.filter(model_k15_data.new_userId == UserID) \
                                          .select('new_songId', 'artist_name', 'title','Plays','Prediction')  
                                          


listened_songs_cl.show()

In [79]:
# Find other songs in the same cluster
other_songs_cl0 = model_k15_data.filter(model_k15_data.prediction == 0) \
                                          .select('new_songId', 'artist_name', 'title','Plays','Prediction')  
                                          

  # Find other songs in the same cluster
other_songs_cl11 = model_k15_data.filter(model_k15_data.prediction == 11) \
                                          .select('new_songId', 'artist_name', 'title','Plays','Prediction')  

In [80]:
# Other songs in the same cluster
other_songs_cl0.toPandas().drop_duplicates().sort_values('Plays', ascending=False ).head(5)


Unnamed: 0,new_songId,artist_name,title,Plays,Prediction
135356,305745,The Crystal Method,Keep Hope Alive,3,0
108615,257480,The Cardigans,Iron Man,3,0
45924,163994,The Presets,Are You The One?,3,0
163481,356041,The Misfits,Halloween II,3,0
146655,319127,Phil Collins,Two Hearts,3,0


In [81]:
other_songs_cl11.toPandas().drop_duplicates().sort_values('Plays', ascending=False ).head(5)

Unnamed: 0,new_songId,artist_name,title,Plays,Prediction
50349,184410,Kelly Clarkson,Cry,5,11
97579,290467,Madonna,Papa Don't Preach,5,11
62673,253709,The White Stripes,Im Slowly Turning Into You,5,11
111546,332564,Jack Johnson,Monsoon,5,11
47464,171211,Radiohead,Fake Plastic Trees (Acoustic Version),5,11


In [82]:
other_songs_cl9 = model_k15_data.filter(model_k15_data.prediction == 9) \
                                          .select('new_songId', 'artist_name', 'title','Plays','Prediction') 
other_songs_cl9.toPandas().drop_duplicates().sort_values('Plays', ascending=False ).head(5)

Unnamed: 0,new_songId,artist_name,title,Plays,Prediction
94957,300455,Ziggy Marley And The Melody Makers,Namibia,3,9
85706,257635,Plies,I Am The Club (Explicit Album Version),3,9
90517,50061,Lupe Fiasco feat. GemStones,Go Baby (feat GemStones) (Explicit Album Version),3,9
89323,92454,Lupe Fiasco feat. Matthew Santos,Fighters (feat. Matthew Santos) (Amended Album...,3,9
89305,300018,Lupe Fiasco feat. Matthew Santos,Fighters (feat. Matthew Santos) (Amended Album...,3,9


In [83]:
other_songs_cl1 = model_k15_data.filter(model_k15_data.prediction == 1) \
                                          .select('new_songId', 'artist_name', 'title','Plays','Prediction') 
other_songs_cl1.toPandas().drop_duplicates().sort_values('Plays', ascending=False ).head(5)

Unnamed: 0,new_songId,artist_name,title,Plays,Prediction
9414,88189,Usher Featuring The Nu Beginning,Only Human,6,1
9119,151644,Usher Featuring The Nu Beginning,Only Human,6,1
17515,51993,Usher,Hey Daddy (Daddy's Home),6,1
17402,23145,Usher,Hey Daddy (Daddy's Home),6,1
60803,340735,Usher,I'll Show You Love,6,1


## Clustering -- K-Means Clustering Elbow Plot -- Annie

In [85]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.datasets.samples_generator import make_blobs
from pyspark import SparkContext
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext

### Optimize choice of k

One disadvantage of KMeans compared to more advanced clustering algorithms is that the algorithm must be told how many clusters, k, it should try to find. To optimize k we cluster a fraction of the data for different choices of k and look for an "elbow" in the cost function.

In [87]:
cost = np.zeros(20)
for k in range(2,20):
    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
    model = kmeans.fit(new_df.sample(False,0.1, seed=42))
    cost[k] = model.computeCost(new_df)

In [88]:
fig, ax = plt.subplots(1,1, figsize =(10,8))
ax.plot(range(2,20),cost[2:20])
ax.set_xlabel('k')
ax.set_ylabel('cost')
display(fig)

Look like there is very little gain after k=15, so we stick to that choice when processing the full data set.