In [10]:
clear all

[H[2J

In [11]:
import findspark

In [12]:
# find spark location

findspark.init('/home/chunyi/spark-2.2.1-bin-hadoop2.7')

In [13]:
import pyspark

In [14]:
from pyspark.sql import SparkSession

In [15]:
spark = SparkSession.builder.appName('rec').getOrCreate()

In [16]:
from pyspark.sql.types import (StructField, StringType,
                              IntegerType, StructType)

In [17]:
def stripSpace(string):
    return string.replace(' ','')

In [18]:
# load all data

data = spark.read.csv('../data/all_play_simple1000000.log.fn', sep='\t')

In [19]:
from pyspark.sql.functions import udf

udf_stripSpace = udf(stripSpace, StringType())

In [20]:
# rename all columns

data = data.withColumn('user_id', udf_stripSpace(data['_c0']))
data = data.withColumn('song_id', udf_stripSpace(data['_c1']))
data = data.withColumn('play_length', udf_stripSpace(data['_c2']))
data = data.withColumn('song_length', udf_stripSpace(data['_c3']))
data = data.withColumn('paid', udf_stripSpace(data['_c4']))

In [21]:
data = data.select(data['user_id'].cast('int'),
                  data['song_id'].cast('int'),
                  data['play_length'].cast('int'),
                  data['song_length'].cast('int'),
                  data['paid'].cast('int'))

In [22]:
data.describe().show()

+-------+--------------------+-----------------+-----------------+-----------------+--------+
|summary|             user_id|          song_id|      play_length|      song_length|    paid|
+-------+--------------------+-----------------+-----------------+-----------------+--------+
|  count|            10000000|          9967890|          9883104|          9968938|10000000|
|   mean| 1.432927969506529E8|5578081.254429874|4634.293739901958|22.68671035971936|     0.0|
| stddev|3.7731686983832605E7|5844308.841806572| 708974.643502864|680151.9550865976|     0.0|
|    min|                   0|               -1|      -2147483648|      -2147483648|       0|
|    max|           156855255|        129163448|        291445468|           487765|       0|
+-------+--------------------+-----------------+-----------------+-----------------+--------+



In [23]:
# filter out null values 

data = data.filter('user_id > 0 and song_id > 0 and play_length >= 0 and song_length >= 0')
#data.describe().show()

In [24]:
data.createOrReplaceTempView('data')

In [25]:
# use max song length of the same song_id to replace original song length 

max_song_length_matrix = spark.sql("SELECT song_id, max(song_length) AS max_length FROM data GROUP BY song_id")

In [26]:
# to check how many songs each user played, the purpose is to check wheather active user or not

user_song_count = spark.sql("SELECT user_id, count(song_id) as Played_song FROM data GROUP BY user_id")
user_song_count.show(5)

+---------+-----------+
|  user_id|Played_song|
+---------+-----------+
|154412037|        181|
|154417891|         95|
|154421973|        348|
|154409364|         74|
|154409339|         70|
+---------+-----------+
only showing top 5 rows



In [27]:
user_song_count.count()

188143

In [28]:
# make a dataframe of inactive users

inactive_user = user_song_count.filter('count(song_id) < 4')
inactive_user.count()

36431

In [29]:
# calculate the most popular songs
popular_songs = spark.sql("SELECT song_id, count(user_id) as user_count FROM data WHERE play_length*2 > song_length GROUP BY song_id ORDER BY user_count DESC")
popular_songs.show(5)

+--------+----------+
| song_id|user_count|
+--------+----------+
|15249349|     73839|
| 9950164|     63331|
|15807836|     34047|
| 5237384|     26379|
| 6468891|     21318|
+--------+----------+
only showing top 5 rows



In [30]:
# use max song length to replace original song length
max_song_length_matrix.createOrReplaceTempView('max_song_length_matrix')
data = spark.sql("SELECT data.user_id, data.song_id, data.play_length,  max_song_length_matrix.max_length AS song_length FROM data LEFT JOIN max_song_length_matrix ON data.song_id = max_song_length_matrix.song_id" )

In [31]:
data.describe().show()

+-------+-------------------+-----------------+------------------+-----------------+
|summary|            user_id|          song_id|       play_length|      song_length|
+-------+-------------------+-----------------+------------------+-----------------+
|  count|            9021772|          9021772|           9021772|          9021772|
|   mean| 1.43803855624193E8|6076455.466154432| 4704.195162325095|462.4028196456306|
| stddev|3.685866101581352E7|5838175.925927465|124421.31650359664|748.9155380395235|
|    min|             113767|                5|                 0|                0|
|    max|          156855255|        129163448|         214410221|            19944|
+-------+-------------------+-----------------+------------------+-----------------+



In [32]:
from pyspark.sql.functions import col, when, round, sum, avg
    

In [33]:
# calculate how many times using play_length and song_length column
data = data.withColumn('play_times', when(col('song_length') == 0, 0).otherwise(round(col('play_length')/col('song_length'))))


In [34]:
# change column to int type
data = data.select(data['user_id'],
                  data['song_id'],
                  data['play_length'],
                  data['song_length'],
                  data['play_times'].cast('int'))

In [35]:
data.createOrReplaceTempView('data')

In [36]:
# for each user, calculate how many times each song plays

data = spark.sql("SELECT user_id, song_id,  sum(play_times) AS total_play_times, sum(play_length) AS total_play_length, max(song_length) AS song_length FROM data GROUP BY user_id, song_id" )

In [37]:
data.show(5)

+---------+-------+----------------+-----------------+-----------+
|  user_id|song_id|total_play_times|total_play_length|song_length|
+---------+-------+----------------+-----------------+-----------+
|154742955|   1591|               2|              705|        300|
|154428230|   1645|               1|              235|        318|
|154618406|   1645|               3|              794|        318|
|154322633|   1645|               0|                0|        318|
|154808361|   1645|              27|             7074|        318|
+---------+-------+----------------+-----------------+-----------+
only showing top 5 rows



In [38]:
data.describe().show()

+-------+--------------------+-----------------+------------------+------------------+-----------------+
|summary|             user_id|          song_id|  total_play_times| total_play_length|      song_length|
+-------+--------------------+-----------------+------------------+------------------+-----------------+
|  count|             4262933|          4262933|           4262933|           4262933|          4262933|
|   mean|1.4811727738671824E8|5809818.124727975|32.346000511854164|   9955.6282489075|431.7727794455132|
| stddev|2.8870189729737677E7|5912551.265410428|  837.011136527386|238920.84508017145|697.9206487124021|
|    min|              113767|                5|                 0|                 0|                0|
|    max|           156855255|        129163448|            847061|         215148056|            19944|
+-------+--------------------+-----------------+------------------+------------------+-----------------+



In [39]:
# based on total play times create ratings
import math
from pyspark.sql.functions import log
data = data.withColumn('rating', when(data["total_play_times"] > 4, 5).otherwise((data["total_play_times"]  ).cast('int') ) )
#data = data.withColumn('rating', when(col('total_play_times') == 0, 0).otherwise(2*log(col('total_play_times')) ))

In [40]:
data.show(5)

+---------+-------+----------------+-----------------+-----------+------+
|  user_id|song_id|total_play_times|total_play_length|song_length|rating|
+---------+-------+----------------+-----------------+-----------+------+
|154742955|   1591|               2|              705|        300|     2|
|154428230|   1645|               1|              235|        318|     1|
|154618406|   1645|               3|              794|        318|     3|
|154322633|   1645|               0|                0|        318|     0|
|154808361|   1645|              27|             7074|        318|     5|
+---------+-------+----------------+-----------------+-----------+------+
only showing top 5 rows



In [41]:
### Recommender system ####
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [42]:
matrix_recommender = data.select('user_id', 'song_id', 'rating')
matrix_recommender = matrix_recommender.filter('rating > 1')

#del(data)

In [43]:
matrix_recommender.show(5)

+---------+-------+------+
|  user_id|song_id|rating|
+---------+-------+------+
|154742955|   1591|     2|
|154618406|   1645|     3|
|154808361|   1645|     5|
|154578420|   1645|     3|
|154656960|   1645|     5|
+---------+-------+------+
only showing top 5 rows



In [66]:
(training, test) = matrix_recommender.randomSplit([0.9, 0.1])

In [46]:
als = ALS(maxIter=10, regParam=0.01, userCol="user_id", itemCol="song_id", ratingCol="rating")

In [47]:
training.describe().show()

+-------+--------------------+-----------------+------------------+
|summary|             user_id|          song_id|            rating|
+-------+--------------------+-----------------+------------------+
|  count|              538089|           538089|            538089|
|   mean|1.4291902020716646E8|6193906.680376295| 3.455383774803053|
| stddev| 3.834459605273708E7|6245690.739111764|1.3315886832368184|
|    min|              113767|              714|                 2|
|    max|           156848238|         21596231|                 5|
+-------+--------------------+-----------------+------------------+



In [48]:
model = als.fit(matrix_recommender)

In [49]:
user_recs = model.recommendForAllUsers(3)

In [50]:
user_recs.show(5)

+--------+--------------------+
| user_id|     recommendations|
+--------+--------------------+
|10226023|[[4550582,14.9629...|
|25933646|[[4660913,6.93507...|
|51492253|[[9559482,5.29850...|
|54697408|[[6234930,19.0983...|
|68845918|[[291041,13.94921...|
+--------+--------------------+
only showing top 5 rows



In [51]:
user_song_count.show(5)

+---------+-----------+
|  user_id|Played_song|
+---------+-----------+
|154412037|        181|
|154417891|         95|
|154421973|        348|
|154409364|         74|
|154409339|         70|
+---------+-----------+
only showing top 5 rows



In [52]:
user_recs.createOrReplaceTempView('user_recs')
user_song_count.createOrReplaceTempView('user_song_count')

user_recs = spark.sql("SELECT user_id, recommendations[0].song_id AS first_choice, recommendations[1].song_id AS second_choice, recommendations[2].song_id AS third_choice FROM user_recs" )

In [53]:
user_recs.show(5)


+--------+------------+-------------+------------+
| user_id|first_choice|second_choice|third_choice|
+--------+------------+-------------+------------+
|10226023|     4550582|      6644296|     7160391|
|25933646|     4660913|       397734|      291041|
|51492253|     9559482|      4427756|     4532756|
|54697408|     6234930|      3995183|      848438|
|68845918|      291041|      6225691|     6202348|
+--------+------------+-------------+------------+
only showing top 5 rows



In [54]:
user_recs.createOrReplaceTempView('user_recs')
user_recs = spark.sql("SELECT c.user_id, r.first_choice, r.second_choice, r.third_choice,  c.Played_song FROM user_recs r RIGHT JOIN user_song_count c ON r.user_id = c.user_id") 

In [55]:
user_recs.show(25)

+---------+------------+-------------+------------+-----------+
|  user_id|first_choice|second_choice|third_choice|Played_song|
+---------+------------+-------------+------------+-----------+
|   885405|        null|         null|        null|          6|
| 10226023|     4550582|      6644296|     7160391|        105|
| 20605158|        null|         null|        null|         18|
| 25933646|     4660913|       397734|      291041|         25|
| 51492253|     9559482|      4427756|     4532756|         95|
| 54697408|     6234930|      3995183|      848438|         63|
| 68845918|      291041|      6225691|     6202348|        491|
| 95438432|        null|         null|        null|          2|
| 96692401|        null|         null|        null|         10|
|101031185|     4381878|       206989|     6297559|        158|
|127185921|     6807387|      4642487|    20856573|        189|
|135471192|     5958025|      4427756|     7015944|          5|
|137840040|     9953188|      4669707|  

In [56]:
pop_song1 = popular_songs.head(3)[0].song_id
pop_song2 = popular_songs.head(3)[1].song_id
pop_song3 = popular_songs.head(3)[2].song_id

In [57]:
#user_recs.createOrReplaceTempView('user_recs')
#user_recs = spark.sql("SELECT user_id,  (IFNULL(first_choice, pop_song1)) AS my_first_choice, (IFNULL(second_choice, pop_song2)) AS my_second_choice, (IFNULL(third_choice, pop_song3)) AS my_third_choice FROM user_recs" )

user_recs = user_recs.withColumn('first_rec', when(user_recs['first_choice'].isNull(), pop_song1).otherwise(user_recs['first_choice']))
user_recs = user_recs.withColumn('second_rec', when(user_recs['second_choice'].isNull(), pop_song2).otherwise(user_recs['second_choice']))
user_recs = user_recs.withColumn('third_rec', when(user_recs['third_choice'].isNull(), pop_song3).otherwise(user_recs['third_choice']))


In [58]:
user_recs.show(10)

+---------+------------+-------------+------------+-----------+---------+----------+---------+
|  user_id|first_choice|second_choice|third_choice|Played_song|first_rec|second_rec|third_rec|
+---------+------------+-------------+------------+-----------+---------+----------+---------+
|   885405|        null|         null|        null|          6| 15249349|   9950164| 15807836|
| 10226023|     4550582|      6644296|     7160391|        105|  4550582|   6644296|  7160391|
| 20605158|        null|         null|        null|         18| 15249349|   9950164| 15807836|
| 25933646|     4660913|       397734|      291041|         25|  4660913|    397734|   291041|
| 51492253|     9559482|      4427756|     4532756|         95|  9559482|   4427756|  4532756|
| 54697408|     6234930|      3995183|      848438|         63|  6234930|   3995183|   848438|
| 68845918|      291041|      6225691|     6202348|        491|   291041|   6225691|  6202348|
| 95438432|        null|         null|        null

In [59]:
user_recs = user_recs.select(user_recs['user_id'],
                 user_recs['first_rec'],
                 user_recs['second_rec'],
                 user_recs['third_rec'])

In [60]:
user_recs.show(10)

+---------+---------+----------+---------+
|  user_id|first_rec|second_rec|third_rec|
+---------+---------+----------+---------+
|   885405| 15249349|   9950164| 15807836|
| 10226023|  4550582|   6644296|  7160391|
| 20605158| 15249349|   9950164| 15807836|
| 25933646|  4660913|    397734|   291041|
| 51492253|  9559482|   4427756|  4532756|
| 54697408|  6234930|   3995183|   848438|
| 68845918|   291041|   6225691|  6202348|
| 95438432| 15249349|   9950164| 15807836|
| 96692401| 15249349|   9950164| 15807836|
|101031185|  4381878|    206989|  6297559|
+---------+---------+----------+---------+
only showing top 10 rows



In [67]:
(training, test) = matrix_recommender.randomSplit([0.8, 0.2])

In [68]:
train_model = als.fit(training)

In [64]:
predictions = train_model.transform(test)
predictions.show()

+---------+-------+------+-----------+
|  user_id|song_id|rating| prediction|
+---------+-------+------+-----------+
|154742955|   1591|     2|        NaN|
|154506618|   1829|     3| -0.9510112|
|154489919|   2866|     5|        NaN|
|  1685126|  69048|     5|  2.9696333|
|154778073|  78064|     2|        NaN|
|154825738|  78113|     2|-0.11484612|
| 51682053|  87656|     2|  1.0502037|
|154508595| 110904|     2|  4.5336943|
|154557905| 118989|     2| -1.0619042|
|154782109| 118989|     2|  2.8061755|
|154602076| 118989|     2| 0.90969324|
|154570059| 121749|     5|  1.6512744|
|154582530| 135976|     4|  5.2731338|
|  1685126| 156296|     2|   4.137638|
|154419334| 156365|     5|  3.0505266|
| 28638487| 156365|     5|  2.7221353|
|154661836| 156365|     3|  2.9482758|
|  1685126| 156365|     2|  2.5658448|
|154496208| 161295|     4|  1.2917211|
|154489212| 200878|     5|  3.9808974|
+---------+-------+------+-----------+
only showing top 20 rows



In [65]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print('RMSE')
print(rmse)

RMSE
nan
