Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE". You can run all the tests with the validate button. If the validate command takes too long, you can also confirm that you pass all the tests if you can run through the whole notebook without getting validation errors.

For this problem set, we'll be using the Jupyter notebook:

![](jupyter.png)

## Music recommender exercises

In this notebook you will make recommendations based on user's music preferences. Training is done with alternating least squares (ALS) recommendation model to recommend songs to users.

https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html#collaborative-filtering

https://spark.apache.org/docs/latest/api/python/_modules/pyspark/mllib/recommendation.html

dataset by:  
Thierry Bertin-Mahieux, Daniel P.W. Ellis, Brian Whitman, and Paul Lamere.   
The Million Song Dataset. In Proceedings of the 12th International Society  
for Music Information Retrieval Conference (ISMIR 2011), 2011.  

We use a sample data of "uniqe_tracks.txt" from http://millionsongdataset.com/sites/default/files/AdditionalFiles/unique_tracks.txt.

In [1]:
import pyspark.mllib
from pyspark.sql import *
from pyspark import *
from pyspark.rdd import *
from pyspark.ml import *
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.sql.types import *
from pyspark.mllib.recommendation import *
import random


sc = SparkContext("local","music")
spark = SparkSession(sc)

sampleUsersPath = "sampleUsers.txt"
sampleTracksPath = "sampleTracks.txt"

## Load

Load data from path and create a dataframe from it. It should have three (3) columns:  
-> `user`: String (user_id)  
-> `song`: String (song_id)  
-> `count`: Int (number of times listened)  

Columns are seperated by "\t".  
Counts that are higher than 20 should be reduced to 20.  
Using the `StructField`schema can help.

param `path` path to file (users.txt)  
`return` DataFrame

In [2]:
def load(path):
    schema = StructType([
                        StructField("user", StringType()), 
                        StructField("song", StringType()),
                        StructField("count", IntegerType())
    ])
    df = spark.read.text(path)
    rddFromDF = df.rdd.map(lambda row: row.value.split('\t'))
    rddFromDF = rddFromDF.map(lambda row: [row[0], row[1], int(row[2]) if int(row[2]) <= 20 else 20])
    dff = spark.createDataFrame(rddFromDF, schema)
    return dff
    raise NotImplementedError()

In [3]:
loaded = load(sampleUsersPath).persist()
loaded.show()

+--------------------+------------------+-----+
|                user|              song|count|
+--------------------+------------------+-----+
|b80344d063b5ccb32...|SOBBMDR12A8C13253B|    2|
|b80344d063b5ccb32...|SODZWFT12A8C13C0E4|    1|
|b80344d063b5ccb32...|SOHQWYZ12A6D4FA701|    1|
|b80344d063b5ccb32...|SOJNNUA12A8AE48C7A|    1|
|b80344d063b5ccb32...|SOLXHAI12A6D4FD6F3|    1|
|b80344d063b5ccb32...|SOOSIVQ12A6D4F8AE0|    1|
|b80344d063b5ccb32...|SORJNVW12A8C13BF90|    1|
|85c1f87fea955d09b...|SODJTHN12AF72A8FCD|    2|
|85c1f87fea955d09b...|SOIDFHN12A8C13ABAC|    2|
|4bd88bfb25263a75b...|SOWEHOM12A6BD4E09E|    1|
|9d6f0ead607ac2a6c...|SOCLQES12A58A7BB1D|    2|
|9d6f0ead607ac2a6c...|SOKLRPJ12A8C13C3FE|    2|
|9bb911319fbc04f01...|SOXBXBI12A8C13C71C|    5|
|b64cdd1a0bd907e5e...|SOBDWET12A6701F114|    2|
|b64cdd1a0bd907e5e...|SOLQYOG12B0B80BA71|    2|
|b64cdd1a0bd907e5e...|SOZPQES12A6D4F8E57|    2|
|17aa9f6dbdf753831...|SODHHEG12A58A779FB|    2|
|17aa9f6dbdf753831...|SODUANR12A6D4F5036

In [4]:
'''load test'''
correctCols = StructType([\
StructField("user",StringType(),True),\
StructField("song",StringType(),True),\
StructField("count",IntegerType(),True)])

fakeData = [("abc123", "123abc", 2)]

fakeDf = spark.createDataFrame(fakeData, correctCols)

assert loaded.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, loaded.dtypes)

assert loaded.filter('count>20').count() == 0, "counts higher than 20 was expected to be 0 but it was %s" % loaded.filter('count>20').count()

test1 = str(loaded.sample(False, 0.01, seed=123).limit(1).first())
correct1 = "Row(user='a58de017cbeda1763ea002fe027ed41b4ed53109', song='SOJTLHS12A8C13F633', count=3)"
assert test1 == correct1, "the row was expected to be %s but it was %s" % (correct1, test1)


## Convert

Convert user and song fields into doubles. Use mllib's StringIndexer. Final schema:  
-> `user`: String  
-> `song`: String  
-> `count`: Int  
-> `user_indexed`: Double  
-> `song_indexed`: Double  

param `df` Dataframe which has been created using method `load()`  
`return` Dataframe


In [5]:
def convert(df):
    inputc = ["user", "song"]
    outputc = ["user_indexed", "song_indexed"]
    stringIndexer = StringIndexer(inputCols=inputc, outputCols=outputc)
    model = stringIndexer.fit(df)
    result = model.transform(df)
    return result
    raise NotImplementedError()

In [6]:
converted = convert(loaded).persist()
converted.show()

+--------------------+------------------+-----+------------+------------+
|                user|              song|count|user_indexed|song_indexed|
+--------------------+------------------+-----+------------+------------+
|b80344d063b5ccb32...|SOBBMDR12A8C13253B|    2|       162.0|       577.0|
|b80344d063b5ccb32...|SODZWFT12A8C13C0E4|    1|       162.0|      1053.0|
|b80344d063b5ccb32...|SOHQWYZ12A6D4FA701|    1|       162.0|      1646.0|
|b80344d063b5ccb32...|SOJNNUA12A8AE48C7A|    1|       162.0|      1945.0|
|b80344d063b5ccb32...|SOLXHAI12A6D4FD6F3|    1|       162.0|      2306.0|
|b80344d063b5ccb32...|SOOSIVQ12A6D4F8AE0|    1|       162.0|      2702.0|
|b80344d063b5ccb32...|SORJNVW12A8C13BF90|    1|       162.0|      3124.0|
|85c1f87fea955d09b...|SODJTHN12AF72A8FCD|    2|       810.0|       951.0|
|85c1f87fea955d09b...|SOIDFHN12A8C13ABAC|    2|       810.0|      1728.0|
|4bd88bfb25263a75b...|SOWEHOM12A6BD4E09E|    1|      1151.0|      3824.0|
|9d6f0ead607ac2a6c...|SOCLQES12A58A7BB

In [7]:
'''convert test'''
correctCols = StructType([\
StructField("user",StringType(),True),\
StructField("song",StringType(),True),\
StructField("count",IntegerType(),True),\
StructField("user_indexed",DoubleType(),True),\
StructField("song_indexed",DoubleType(),True)])

fakeData = [("abc123", "123abc", 2, 1.0, 2.0)]

fakeDf = spark.createDataFrame(fakeData, correctCols)

assert converted.dtypes == fakeDf.dtypes, "the schema was expected to be %s but it was %s" % (fakeDf.dtypes, converted.dtypes)

test2 = str(converted.sample(False, 0.1, seed=1234).limit(1).first())
correct2 = "Row(user='5a905f000fc1ff3df7ca807d57edb608863db05d', song='SOCHPFL12AF72A3F64', count=2, user_indexed=5.0, song_indexed=767.0)"
assert test2 == correct2, "the row was expected to be %s but it was %s" % (correct2, test2)


## To Rating

create RDD of Rating classes. Note that you need to use  
http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.recommendation.Rating.html#pyspark.mllib.recommendation.Rating

The params of the Rating function should be user=`user_indexed`, product=`song_indexed`and rating=`count`

param `df` Dataframe which has `user_indexed` and `song_indexed` fields (output from `convert()` method)  
`return` RDD containg Rating objects


In [8]:
def toRating(df):
    rdd = df.rdd.map(lambda row: Rating(user=row[3],product=row[4],rating=row[2]))
    return rdd
    raise NotImplementedError()

In [9]:
rated = toRating(converted).persist()
rated.take(10)

[Rating(user=162, product=577, rating=2.0),
 Rating(user=162, product=1053, rating=1.0),
 Rating(user=162, product=1646, rating=1.0),
 Rating(user=162, product=1945, rating=1.0),
 Rating(user=162, product=2306, rating=1.0),
 Rating(user=162, product=2702, rating=1.0),
 Rating(user=162, product=3124, rating=1.0),
 Rating(user=810, product=951, rating=2.0),
 Rating(user=810, product=1728, rating=2.0),
 Rating(user=1151, product=3824, rating=1.0)]

In [10]:
'''toRating tests'''
rows = [Rating(user=162, product=577, rating=2.0),
 Rating(user=162, product=1053, rating=1.0),
 Rating(user=162, product=1646, rating=1.0),
 Rating(user=162, product=1945, rating=1.0),
 Rating(user=162, product=2306, rating=1.0)]
assert rated.take(5) == rows, "the first 5 rows were expected to be %s but they were %s" % (rows, rated.take(5))

random.seed(54321)
r = random.randint(100, 2000)

test3 = str(toRating(converted).collect()[r])
correct3 = "Rating(user=599, product=1321, rating=1.0)"
assert test3 == correct3, "the row was expected to be %s but it was %s" % (correct3, test3)


## Train ALS

train ALS model. For testing purposes you have to include a seed, e.g seed=seed. Use the parameter rank=10.

https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html#collaborative-filtering

param `data` RDD of Rating objects that is created using `toRating()` method.  
param `seed` value used for testing purposes. 
`return` trained ALS model

In [13]:
def trainALS(data, seed):
    rank = 10
    model = ALS.train(data, rank, seed=seed)
    return model
    raise NotImplementedError()

In [14]:
random.seed(123)
rSeed = random.randint(0, 10000)
model = trainALS(rated, rSeed)

## Recommend Songs

Recommend 5 most suitable songs to a given user, as in 5 songs with the highest ratings.

param `model` trained ALS model  
param `user` user id converted (user_indexed) to Integer (with `convert()`)  
`return` recommendations in Array  


In [15]:
def recommendSongs(model, user):
    top5 = model.recommendProducts(user,5)
    return top5
    raise NotImplementedError()

In [16]:
recommends = recommendSongs(model, 162)
recommends

[Rating(user=162, product=157, rating=12.384455745744464),
 Rating(user=162, product=4074, rating=12.091127878041066),
 Rating(user=162, product=2310, rating=12.091127878041066),
 Rating(user=162, product=3986, rating=11.486571886780661),
 Rating(user=162, product=1669, rating=11.480036775598396)]

In [17]:
'''model and recommendSongs tests'''
assert type(recommends[0]) == pyspark.mllib.recommendation.Rating, "the type was expected to be pyspark.mllib.recommendation.Rating  but it was %s" % type(recommends[0]) 
assert recommends[0].user == 162, "the user was expected to be 162 but it was %s" % recommends[0].user
assert len(recommends) == 5, "the amount of recommendations was expected to be 5 but it was %s" % len(recommends)

## Get Song Names

Get the song name and the artist for a given Rating object

Example

`[Rating(182412,218057,29.471691093542958),  
Rating(182412,206693,28.39630617887081),  
Rating(182412,230654,28.090021579109706),  
Rating(182412,200542,27.43900484648816),  
Rating(182412,254771,24.82362529344695)] ` 

You should create something like this  

`[["My Business","Guy"],  
["The Man With The Bag","Floyd/Animal/Zoot"],  
["Challenger","Growing"],  
["Una Ragazza In Due", "I Giganti"],
["That Is Why", "Say Anything"]]`  

First the song name and then the name of the band

You should start by converting the unique_tracks data into a dataframe. Columns are seperated by `<SEP>`. The correct schema is:

-> `track_id`: String  
-> `song_id`: String  
-> `artist`: String  
-> `title`: String

Next you should filter the `converted` dataframe based on if the `song_indexed` value is found in the Rating object array. Then you should Join the two dataframes and select the "title" and "artist" values. You will want to exclude duplicates. Finally, convert the dataframe into array (e.g convert it to rdd and use the collect() method).


param `converted` Dataframe created using `convert()` method  
param `ar` Array of Rating object produced using `recommendSongs()`  
param `path` path to unique track names file  
`return` corresponding song + artist names inside array, e.g., [['Our Song', 'Taylor Swift'],
 ['Boom (2006 Remastered Album Version)', 'P.O.D.']]


In [18]:
def getSongNames(converted, ar, path):
    schema = StructType([
                        StructField("track_id", StringType()), 
                        StructField("song_id", StringType()),
                        StructField("artist", StringType()),
                        StructField("title", StringType())
    ])
    
    tracks = spark.read.text(path)
    tracksRDD = tracks.rdd.map(lambda row: row.value.split('<SEP>'))
    arr = []
    for each in ar:
        arr.append(each[1])
    songs = converted.rdd.filter(lambda song: song[4] in arr).toDF()
    tracksDF = spark.createDataFrame(tracksRDD, schema)
    joinedSong = tracksDF.join(songs, tracksDF["song_id"] == songs["song"], how="inner")
    joinedSong = joinedSong.select("title", "artist").distinct()
    rec_song = joinedSong.rdd.map(lambda row: [row[0], row[1]]).collect()
    return rec_song
    raise NotImplementedError()

In [19]:
songNames = getSongNames(converted, recommends, sampleTracksPath)
songNames

[['Cordeiro De Nana', 'João Gilberto / Gilberto Gil / Caetano Veloso'],
 ['Limbo (Remastered LP Version)', 'Rush'],
 ['Whataya Want From Me', 'Adam Lambert'],
 ['Awakenings', 'Symphony X'],
 ['Inferno (unleash The Fire)', 'Symphony X']]

In [20]:
'''getSongNames test'''
assert len(songNames) == 5, "the amount of song names was expected to be 5 but it was %s" % len(songNames)
assert type(songNames[0]) == list, "the type of a songNames element was expected to be list but it was %s" % type(songNames[0])
test5 = songNames[3]
correct5 = ['Awakenings', 'Symphony X']
assert test5 == correct5, "the row was expected to be %s but it was %s" % (correct5, test5)

## Recommend

Recommend returns 5 song recommendations for a given user_id. Output should consists of Arrays that are size of 2. First element of the array is song name and the second artist name. You will basically just have to insert the parameters into the methods you have created. Remember the type of parameter that `recommendSongs()` accepts.

Example return

`[["My Business","Guy"],  
["The Man With The Bag","Floyd/Animal/Zoot"],  
["Challenger","Growing"],  
["Una Ragazza In Due", "I Giganti"],
["That Is Why", "Say Anything"]]`   

param `path` path to user data  
param `userId` user_id (String) that can be found from user dataset  
param `tracksPath` path to unique song names dataset   
param `seed` used for testing, put it into the `trainsALS()` method   
`return` corresponding song + artist names inside array

In [35]:
def recommend(path, userId, tracksPath, seed):
    userDF = load(path)
    convDF = convert(userDF)
    toRateRDD = toRating(convDF)
    model = trainALS(toRateRDD, seed)
    user = convDF.where(convDF["user"]==userId)
    user = user.first().user_indexed
    recos = recommendSongs(model, int(user))
    rec_songs = getSongNames(convDF, recos, tracksPath)
    return rec_songs
    raise NotImplementedError()

In [36]:
recom = recommend(sampleUsersPath, "b80344d063b5ccb3212f76538f3d9e43d87dca9e" ,sampleTracksPath, rSeed)
recom

[['Cordeiro De Nana', 'João Gilberto / Gilberto Gil / Caetano Veloso'],
 ['Limbo (Remastered LP Version)', 'Rush'],
 ['Whataya Want From Me', 'Adam Lambert'],
 ['Awakenings', 'Symphony X'],
 ['Inferno (unleash The Fire)', 'Symphony X']]

In [37]:
'''recommend test'''
assert len(recom) == 5, "the amount of recommendations was expected to be 5 but it was %s" % len(recom)
assert type(recom[0]) == list, "the type of a 'recommend' element was expected to be list but it was %s" % type(recom[0])
#test if the same user and seed returns the same as songNames
assert recom == songNames, "the song names were expected to be %s but they were %s" % (songNames, recom)

In [None]:
spark.catalog.clearCache()
sc.stop()
spark.stop()