### Load data

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.functions import isnull
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import numpy as np

In [2]:
df = spark.read.csv('../data/event_ds.csv',header=True,inferSchema=True).cache()
df

DataFrame[uid: int, event: string, song_id: decimal(20,0), date: timestamp]

### Extract features

In [3]:
D_record = df.filter(F.col('event')=='D').groupBy('uid','event','song_id').count().orderBy('count')
P_record = df.filter(F.col('event')=='P').groupBy('uid','event','song_id').count().orderBy('count')
D_record = D_record.select(F.col('uid'), F.col('event'), F.col('song_id'), F.col('count').alias('count_D'))
P_record = P_record.select(F.col('uid'), F.col('event'), F.col('song_id'), F.col('count').alias('count_P'))

In [4]:
rating = P_record.join(D_record,on=['uid', 'song_id'],how='full')

In [5]:
rating = rating.select(F.col('uid'), F.col('song_id'), F.col('count_P'), F.col('count_D'))

In [6]:
rating.count()

3066686

In [7]:
rating.filter(isnull('count_P')).count()

164309

In [8]:
rating.filter(isnull('count_D')).count()

2621780

In [6]:
rating = rating.na.fill(0) 
# fill the cell with null, which means no (play/download)action about this user for this song

### Create Label

In [8]:
# create rating
rating = rating.withColumn('rating', rating['count_P']*0.5+rating['count_D']*2)

In [24]:
# use when().otherwise() to modify specific value in the column
from pyspark.sql.functions import when
rating = rating.withColumn('rating',when((F.col('rating')>5), 5).otherwise(F.col('rating')))

### Clean Data

In [29]:
# clean the bot
threshold = np.percentile(rating.toPandas()['count_P'],99.9)
print(threshold)
rating = rating.filter(F.col('count_P') < threshold)

141.0


In [30]:
# change 'song_id' type
rating = rating.withColumn('song_id',F.col('song_id').cast('int'))
threshold = np.percentile(rating.toPandas()['song_id'], 99.9)
rating = rating.filter((F.col('song_id') < threshold)&(F.col('song_id')>=0))

#rating.groupBy().max("song_id").show() # find the max number of specific column
#rating.describe(['song_id']).show()

In [31]:
#rating.describe(['rating']).show()
rating = rating.orderBy(F.col('uid'))

### Build model and predict

In [32]:
(train, test) = rating.randomSplit([0.8, 0.2], seed = 1)

In [38]:
from pyspark.sql import SQLContext
from pyspark.ml.linalg import Vectors, VectorUDT
als = ALS(rank = 30, maxIter=10, regParam=0.01, userCol="uid", itemCol="song_id", ratingCol="rating", implicitPrefs=False, 
          coldStartStrategy="drop")
model = als.fit(train)


In [39]:
prediction = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(prediction)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.4849623532656782


In [47]:
userRecs = model.recommendForAllUsers(5)
userRecs.show()

+---------+--------------------+
|      uid|     recommendations|
+---------+--------------------+
|124990565|[[4501803, 3.4186...|
|149954144|[[6742904, 10.503...|
|150174271|[[4302200, 2.4715...|
|164304672|[[4842234, 8.2191...|
|165428768|[[6816986, 4.9931...|
|167077244|[[1215971, 4.3852...|
|167571369|[[4501803, 3.5530...|
|167572567|[[4501803, 8.6694...|
|167574357|[[4573901, 6.0769...|
|167577928|[[1215971, 3.9367...|
|167579462|[[6332667, 6.7785...|
|167579971|[[20868331, 0.999...|
|167580596|[[7548, 4.618738]...|
|167583357|[[1215971, 1.8285...|
|167583863|[[4501803, 3.5242...|
|167587488|[[282692, 4.04056...|
|167589105|[[474851, 4.35586...|
|167593356|[[1215971, 3.3696...|
|167593840|[[1215971, 3.7090...|
|167595576|[[4501803, 2.4906...|
+---------+--------------------+
only showing top 20 rows



### Try to recommend top 5 songs for one user

In [48]:
a = userRecs.filter(F.col('uid')==124990565).toPandas()
a['recommendations'][0]

[Row(song_id=4501803, rating=3.4186224937438965),
 Row(song_id=6403632, rating=2.595127820968628),
 Row(song_id=1215971, rating=2.5286459922790527),
 Row(song_id=316421, rating=2.4918084144592285),
 Row(song_id=7033556, rating=2.4749271869659424)]

In [49]:
# We could see that predicted rating of song 316421 is accurate
rating.filter(F.col('uid')==124990565).show()

+---------+-------+-------+-------+------+
|      uid|song_id|count_P|count_D|rating|
+---------+-------+-------+-------+------+
|124990565|4544935|      1|      0|   0.5|
|124990565|7185706|      1|      0|   0.5|
|124990565|4396732|      2|      0|   1.0|
|124990565|6586179|      2|      0|   1.0|
|124990565|4318660|      1|      0|   0.5|
|124990565|  77260|      3|      0|   1.5|
|124990565| 316421|      5|      0|   2.5|
|124990565| 897468|      1|      0|   0.5|
|124990565| 591718|      1|      0|   0.5|
+---------+-------+-------+-------+------+



In [None]:
# prediction.describe(['prediction']).show()
# prediction.describe(['rating']).show()