# Music Recommender




Purpose to create recommendation engine for Record recommendations

## Spark Bootstraping for Google Colab

Run this before start working with the notebooks from the spark course. 
When you will start a new (and fresh) notebook at Colab. Google Cloud will create a new Docker container just for your use. 

Executing this notebook will install into the container the software. The container will be reused by the user until it will destroy by inactivity.


In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark

## Set Environment Variables
Set the locations where Spark and Java are installed.

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[2] pyspark-shell"

## Cloning our Github repo

In [0]:
!rm -rf /content/SparkCourse2019/
!git clone https://github.com/bazarum/SparkCourse2019.git

Cloning into 'SparkCourse2019'...
remote: Enumerating objects: 227, done.[K
remote: Counting objects: 100% (227/227), done.[K
remote: Compressing objects: 100% (59/59), done.[K
remote: Total 2298 (delta 175), reused 214 (delta 162), pack-reused 2071[K
Receiving objects: 100% (2298/2298), 324.21 MiB | 14.93 MiB/s, done.
Resolving deltas: 100% (478/478), done.
Checking out files: 100% (1818/1818), done.


## Start a SparkSession
This will start a local Spark session:: and getting the data

In [0]:
!wget http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz
!tar xvf profiledata_06-May-2005.tar.gz
!mv profiledata_06-May-2005/* .
!rmdir profiledata_06-May-2005


--2019-03-07 17:19:09--  http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz
Resolving www.iro.umontreal.ca (www.iro.umontreal.ca)... 132.204.26.36
Connecting to www.iro.umontreal.ca (www.iro.umontreal.ca)|132.204.26.36|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 135880312 (130M) [application/x-gzip]
Saving to: ‘profiledata_06-May-2005.tar.gz.2’


2019-03-07 17:19:20 (13.6 MB/s) - ‘profiledata_06-May-2005.tar.gz.2’ saved [135880312/135880312]

profiledata_06-May-2005/
profiledata_06-May-2005/artist_data.txt
profiledata_06-May-2005/README.txt
profiledata_06-May-2005/user_artist_data.txt
profiledata_06-May-2005/artist_alias.txt


## Initialize the spark session

In [0]:
import findspark
print(findspark.init())

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

None


## Some functions to setup Spark Structures for Dataframe

In [0]:
from pyspark.sql.types import *

def get_structtype(txt_):
  
  details = txt_.split(':')
  struct_ = StructField(details[0], StringType(), True)
  
  if details[1]=="IntegerType":
    struct_ = StructField(details[0], IntegerType(), True)

  if details[1]=="DoubleType":
    struct_ = StructField(details[0], DoubleType(), True)
    
  return(struct_)
    
def get_structure(txt_):
  
  fieldList = txt_.split('|')
  fields_ = []
  for field_name in fieldList:
    fields_.append(get_structtype(field_name))
  
  return(StructType(fields_))

## Import the data.

user_artist_data was " " separated file, and artist_data.txt also looked to be, but in the end when trying to merge files, there were problems with joining and filtering.  When changed to "\t" separated file, all worked ok.

In [0]:
artist_data = 'artist_data.txt'
user_artist_data = 'user_artist_data.txt'

user_schema = get_structure("UserId:IntegerType|ArtistId:IntegerType|Counter:IntegerType")
artist_schema = get_structure("ArtistID:IntegerType|Title:StringType")

# Convert numeric values where necessary.
artists=spark.read.schema(artist_schema).option("header", "false").csv(artist_data, sep="\t")
users=spark.read.schema(user_schema).option("header", "false").csv(user_artist_data, sep=" ")
user_dd = spark.read.csv(user_artist_data, sep="\t")

artists = artists.withColumn("ArtistId", artists["ArtistId"].cast("double"))
users = users.withColumn("UserId", users["UserId"].cast("double"))
users = users.withColumn("ArtistId", users["ArtistId"].cast("double"))
users = users.withColumn("Counter", users["Counter"].cast("double")) # was integer but I think needs to be float for ALS

artists.show(5)



+-----------+--------------------+
|   ArtistId|               Title|
+-----------+--------------------+
|  1134999.0|        06Crazy Life|
|  6821360.0|        Pang Nakarin|
|1.0113088E7|Terfel, Bartoli- ...|
|1.0151459E7| The Flaming Sidebur|
|  6826647.0|   Bodenstandig 3000|
+-----------+--------------------+
only showing top 5 rows



## This was to give an idea of the Artists Listened to by UserId 1000002

In [0]:

users.filter(users["UserId"]==1000002).show()

+---------+---------+-------+
|   UserId| ArtistId|Counter|
+---------+---------+-------+
|1000002.0|      1.0|   55.0|
|1000002.0|1000006.0|   33.0|
|1000002.0|1000007.0|    8.0|
|1000002.0|1000009.0|  144.0|
|1000002.0|1000010.0|  314.0|
|1000002.0|1000013.0|    8.0|
|1000002.0|1000014.0|   42.0|
|1000002.0|1000017.0|   69.0|
|1000002.0|1000024.0|  329.0|
|1000002.0|1000025.0|    1.0|
|1000002.0|1000028.0|   17.0|
|1000002.0|1000031.0|   47.0|
|1000002.0|1000033.0|   15.0|
|1000002.0|1000042.0|    1.0|
|1000002.0|1000045.0|    1.0|
|1000002.0|1000054.0|    2.0|
|1000002.0|1000055.0|   25.0|
|1000002.0|1000056.0|    4.0|
|1000002.0|1000059.0|    2.0|
|1000002.0|1000062.0|   71.0|
+---------+---------+-------+
only showing top 20 rows



## This was a sample of the Artists data

This was to ensure that the merge and filter would work later.

In [0]:
artists.filter(artists["ArtistId"]==1).show()

+--------+----------+
|ArtistId|     Title|
+--------+----------+
|     1.0|Portishead|
+--------+----------+



In [0]:

artists.show(5)

+-----------+--------------------+
|   ArtistId|               Title|
+-----------+--------------------+
|  1134999.0|        06Crazy Life|
|  6821360.0|        Pang Nakarin|
|1.0113088E7|Terfel, Bartoli- ...|
|1.0151459E7| The Flaming Sidebur|
|  6826647.0|   Bodenstandig 3000|
+-----------+--------------------+
only showing top 5 rows



## This was to identify if any records did not contain ArtistIds

In [0]:
no_artists = users.filter(users.ArtistId.isNull())
# No empty artists in the dataset.
no_artists.count()

0

## The subsequent section created some temporary views for the Users and Artists.  

I did not use them, but will keep the code here for the moment.

In [0]:
import os
from pyspark.sql.types import *
from pyspark.sql import functions as F


In [0]:
users.createOrReplaceTempView("users")
df1 = spark.sql("select distinct UserId from users")
df1.show(5)

+---------+
|   UserId|
+---------+
|1000075.0|
|1000148.0|
|1000452.0|
|1000691.0|
|1000714.0|
+---------+
only showing top 5 rows



In [0]:
df1.count()

artists.createOrReplaceTempView("artists")
df2 = spark.sql("select * from artists")
df2.show(5)

+-----------+--------------------+
|   ArtistId|               Title|
+-----------+--------------------+
|  1134999.0|        06Crazy Life|
|  6821360.0|        Pang Nakarin|
|1.0113088E7|Terfel, Bartoli- ...|
|1.0151459E7| The Flaming Sidebur|
|  6826647.0|   Bodenstandig 3000|
+-----------+--------------------+
only showing top 5 rows



In [0]:
df2.count()

1848671

In [0]:
df1.count()

148111

## Splitting the data for Train / Test

The data was split into Train / Validation and test.

In [0]:
# Smaller dataset so we will use 0.7 / 0.15 / 0.15 May use the _over 2 if too much processing time.
# users_over_2 = users.filter(users.Counter >= 2)

In [0]:
# Smaller dataset so we will use 0.7 / 0.15 / 0.15 May use the _over 2 if too much processing time.
(training_df, validation_df, test_df) = users.randomSplit([0.7, 0.15, 0.15])

training_df = training_df.cache()
validation_Df = validation_df.cache()
test_df = test_df.cache()

## Modelling for Recomendation Engine

In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator


# Let's initialize our ALS learner
als = ALS()

# Now set the parameters for the method
als.setMaxIter(5)\
   .setSeed(42)\
   .setItemCol("ArtistId")\
   .setRatingCol("Counter")\
   .setUserCol("UserId")

# Now let's compute an evaluation metric for our test dataset
# We Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="ArtistId", labelCol="Counter", metricName="rmse")

# We have run this before with these results, so to save future time we are changing the ranks and regParams
# The best model was trained with regularization parameter 0.15
# The best model was trained with rank 4
# For rank 4, regularization parameter 0.15 the RMSE is 2408928.61238154

tolerance = 0.03
ranks = [4]
regParams = [0.15]

# Original ranks and params tested.
# ranks = [4, 8, 12, 16]
# regParams = [0.15, 0.2, 0.25]

errors = [[0]*len(ranks)]*len(regParams)
models = [[0]*len(ranks)]*len(regParams)

err = 0
min_error = float('inf')
best_rank = -1
i = 0
for regParam in regParams:
  j = 0
  for rank in ranks:
    # Set the rank here:
    als.setParams(rank = rank, regParam = regParam)
    # Create the model with these parameters.
    model = als.fit(training_df)
    # Run the model to create a prediction. Predict against the validation_df.
    predict_df = model.transform(validation_df)

    # Remove NaN values from prediction (due to SPARK-14489) # Has been resolved but not sure which version, so keeping in anyway.
    predicted_plays_df = predict_df.filter(predict_df.prediction != float('nan'))
    predicted_plays_df = predicted_plays_df.withColumn("prediction", F.abs(F.round(predicted_plays_df["prediction"],0)))
    
    # Run the previously created RMSE evaluator, reg_eval, on the predicted_plays_df DataFrame
    error = reg_eval.evaluate(predicted_plays_df)
    errors[i][j] = error
    models[i][j] = model
    print( 'For rank %s, regularization parameter %s the RMSE is %s' % (rank, regParam, error))
    if error < min_error:
      min_error = error
      best_params = [i,j]
    j += 1
  i += 1

als.setRegParam(regParams[best_params[0]])
als.setRank(ranks[best_params[1]])
print('The best model was trained with regularization parameter %s' % (regParams[best_params[0]]))
print('The best model was trained with rank %s' % (ranks[best_params[1]]))
my_model = models[best_params[0]][best_params[1]]

For rank 4, regularization parameter 0.15 the RMSE is 2412978.306293676
The best model was trained with regularization parameter 0.15
The best model was trained with rank 4


## The model has been trained with regularization parameter of 0.15 and rank 4.

Using the my_model set with the chosen parameters, going forward to test with the Test dataset, and running predictions, and checking listtened / unlistened songs.

In [0]:
# In ML Pipelines, this next step has a bug that produces unwanted NaN values. We
# have to filter them out. See https://issues.apache.org/jira/browse/SPARK-14489
# Like I said earlier, I think the bug has been removed, but filtering NaN is not an issue.

test_df = test_df.withColumn("Counter", test_df["Counter"].cast(DoubleType()))
predict_df = my_model.transform(test_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))

# Round floats to whole numbers
predicted_test_df = predicted_test_df.withColumn("prediction", F.abs(F.round(predicted_test_df["prediction"],0)))
# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_df DataFrame
test_RMSE = reg_eval.evaluate(predicted_test_df)

print('The model had a RMSE on the test set of {0}'.format(test_RMSE))

The model had a RMSE on the test set of 2409405.592605308


## Looking at some statistics ...

In [0]:
avg_plays_df = training_df.groupBy().avg('Counter').select(F.round('avg(Counter)'))

avg_plays_df.show(3)
# Extract the average rating value. (This is row 0, column 0.)
training_avg_plays = avg_plays_df.collect()[0][0]

print('The average number of plays in the dataset is {0}'.format(training_avg_plays))

# Add a column with the average rating
test_for_avg_df = test_df.withColumn('prediction', F.lit(training_avg_plays))

# Run the previously created RMSE evaluator, reg_eval, on the test_for_avg_df DataFrame
test_avg_RMSE = reg_eval.evaluate(test_for_avg_df)

print("The RMSE on the average set is {0}".format(test_avg_RMSE))


+----------------------+
|round(avg(Counter), 0)|
+----------------------+
|                  15.0|
+----------------------+

The average number of plays in the dataset is 15.0
The RMSE on the average set is 3063831.526930576


In [0]:
# This is a the top of the UserId file, to give some values to test on.
# It should not matter as to which users go into the model, as the idea is to 
# train on what they do not have rather than what they do have, so the fact that 
# this may be in one or other of the 3 models not an issue.

# +-------+--------+-------+
# | UserId|ArtistId|Counter|
# +-------+--------+-------+
# |1000002|       1|   55.0|
# |1000002| 1000006|   33.0|
# |1000002| 1000007|    8.0|
# |1000002| 1000009|  144.0|
# |1000002| 1000010|  314.0|
+-------+--------+-------+

## Showing the Listened Artists by the chosen user.

In [0]:
UserId = 1000002

listened_artists = users.filter(users.UserId == UserId) \
                                          .join(artists, 'ArtistId') \
                                          .select('UserId', 'ArtistId', 'Title', 'Counter')

listened_artists.show()

+---------+---------+--------------------+-------+
|   UserId| ArtistId|               Title|Counter|
+---------+---------+--------------------+-------+
|1000002.0|1000716.0|                Seal|    1.0|
|1000002.0|1001152.0|             Embrace|    2.0|
|1000002.0|1001270.0|    Shanks & Bigfoot|    2.0|
|1000002.0|1003495.0|               Maysa|    3.0|
|1000002.0|1003518.0|     Eric Marienthal|    2.0|
|1000002.0|1022997.0|Rainy Days and Mo...|    1.0|
|1000002.0|1068317.0|      Gabriel Ananda|    1.0|
|1000002.0|6737629.0|    The Gillmor Gang|    1.0|
|1000002.0|    782.0|      Weather Report|   48.0|
|1000002.0|1000631.0|             Madness|   27.0|
|1000002.0|1000632.0|       Goo Goo Dolls|  119.0|
|1000002.0|1002401.0|              Divers|    3.0|
|1000002.0|    735.0|         Perez Prado|    1.0|
|1000002.0|   5761.0|          Status Quo|    8.0|
|1000002.0|1003413.0|         Warren Hill|    4.0|
|1000002.0|2089141.0|      Slamin' Gladys|   12.0|
|1000002.0|   3009.0|         K

## Getting the full list of artists heard by the chosen user.

This was used to filter out the artists not heard before.

In [0]:
listened_artists_list =  [row.ArtistId for row in listened_artists.collect()]

print('Artists user has listened to:')
print(listened_artists_list)

Artists user has listened to:
[1000716.0, 1001152.0, 1001270.0, 1003495.0, 1003518.0, 1022997.0, 1068317.0, 6737629.0, 782.0, 1000631.0, 1000632.0, 1002401.0, 735.0, 5761.0, 1003413.0, 2089141.0, 3009.0, 3085.0, 1000024.0, 1001855.0, 1003477.0, 2167.0, 1002457.0, 1003610.0, 1004058.0, 758.0, 1226.0, 1007753.0, 1003263.0, 1069200.0, 1007347.0, 3321.0, 1000056.0, 1004378.0, 1008164.0, 1003650.0, 1003409.0, 1003628.0, 1053817.0, 1448.0, 4267.0, 1000014.0, 1003158.0, 1003404.0, 1003475.0, 1009970.0, 1002287.0, 1004316.0, 1280943.0, 1289.0, 5702.0, 1000107.0, 1000315.0, 1000853.0, 1002577.0, 1015566.0, 1022098.0, 1098856.0, 1284697.0, 4137.0, 1000764.0, 1003379.0, 1023192.0, 1495.0, 1000438.0, 1004039.0, 3328.0, 1042262.0, 1000201.0, 1002586.0, 1008850.0, 1022862.0, 1000317.0, 1000597.0, 1041136.0, 831.0, 1195.0, 1000283.0, 1000515.0, 1001804.0, 1021499.0, 4241.0, 1001727.0, 1003083.0, 1003342.0, 1003417.0, 1003421.0, 1005615.0, 1242169.0, 1000848.0, 1002736.0, 1003511.0, 1004637.0, 1029770

## Getting the list of artists not heard by the chosen user.

In [0]:
unlistened_artists = users.filter(users.ArtistId.isin(list(listened_artists_list))==False)
unlistened_artists.count()

20825470

In [0]:
unlistened_artists.show(5) # Just a sample.

+---------+---------+-------+
|   UserId| ArtistId|Counter|
+---------+---------+-------+
|1000019.0|1000036.0|    5.0|
|1000019.0|1000069.0|    3.0|
|1000019.0|1000071.0|    2.0|
|1000019.0|1000076.0|   10.0|
|1000019.0|1000080.0|    3.0|
+---------+---------+-------+
only showing top 5 rows



## Merging the dataset with the Artists, in order to get the Artist Title from ID.

In [0]:
# Merging the unlistened Artists and Totalling / Sorting by Counter to get total number of plays.

unlistened_artists_list = unlistened_artists.join(artists, 'ArtistId').select('UserId', 'ArtistId', 'Title', 'Counter')
grouped_unlistened_artists_list = unlistened_artists_list.groupBy("Title").sum("Counter")

# Renaminig the total column for sorting.
grouped_unlistened_artists_list = grouped_unlistened_artists_list.withColumnRenamed("sum(Counter)", "TotalPlays")
# Top 10 Artists not heard by UserId 1000002

print("Top 10 Played Artists")
grouped_unlistened_artists_list.orderBy(["TotalPlays"], ascending=[0]).show(10)

+-------------------+----------+
|              Title|TotalPlays|
+-------------------+----------+
|       Modest Mouse| 1328869.0|
|        Bright Eyes| 1234387.0|
|Death Cab for Cutie| 1117143.0|
|      Elliott Smith| 1080412.0|
|                 U2| 1015064.0|
|          Nightwish| 1010807.0|
|           Interpol|  979539.0|
|Something Corporate|  921263.0|
|               Beck|  906576.0|
|           The Cure|  895458.0|
+-------------------+----------+
only showing top 10 rows



## Now to use the prediction portion

Running the Unlistened Artists through the model to come up with the predictions based on the unlistened artists.

In [0]:
# Again filtering on the NaN ... but should not be necessary

predicted_listens = my_model.transform(unlistened_artists)
predicted_listens = predicted_listens.filter(predicted_listens['prediction'] != float('nan'))

predictions = predicted_listens.join(artists, 'ArtistId') \
                 .select('ArtistId', 'Title', 'prediction') \
                 .distinct() \
                 .orderBy('prediction', ascending = False)

grouped_predictions = predictions.groupBy("Title").sum("prediction")
grouped_predictions = grouped_predictions.withColumnRenamed("sum(prediction)", "Prediction")

print('Top 10 Predicted Artists:')
grouped_predictions = grouped_predictions.orderBy("Prediction", ascending = False)
grouped_predictions.show(10)

Top 10 Predicted Artists:
+-------------------+------------------+
|              Title|        Prediction|
+-------------------+------------------+
|        Bright Eyes|1087848.5356666297|
|      Elliott Smith|1053005.4397909641|
|Death Cab for Cutie| 921259.4432001486|
|               Tool| 897752.3415871225|
|           Interpol| 883973.1147891581|
|       Modest Mouse|  868206.573631011|
|    Jimmy Eat World| 847365.1839979813|
|               Beck| 801374.5849915743|
|Something Corporate| 750667.3168459535|
|          The Shins| 738299.7268771473|
+-------------------+------------------+
only showing top 10 rows



Proposed predictions for the chosen user
