In [2]:
#spark_session.stop()

In [3]:
from pyspark import SparkContext
from pyspark.sql import SparkSession ,Row
from pyspark.sql.functions import col
from pyspark.sql import SQLContext
import pyspark.sql.functions as F

from pyspark.mllib.recommendation import *
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode

from operator import *
import os
import random

In [4]:
# initializing a spark session
spark_session = SparkSession.builder.appName('GRP10_MusicRec').getOrCreate()
# creating spark context for sql
SQL_context = SQLContext(spark_session.sparkContext)


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/12 17:23:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# reading csv data from local FS
# THIS PART NEEDS TO BE CHANGED TO HDFS VERSION
fPATH = 'train_triplets.txt'
triplets = spark_session.read.csv(fPATH, header = False, sep=r'\t')

# alter col name
triplets = triplets.withColumnRenamed('_c0','User').withColumnRenamed('_c1','Song').withColumnRenamed('_c2','Count')
triplets.show(n=10)
#print((triplets.count(), len(triplets.columns)))

                                                                                

+--------------------+------------------+-----+
|                User|              Song|Count|
+--------------------+------------------+-----+
|b80344d063b5ccb32...|SOAKIMP12A8C130995|    1|
|b80344d063b5ccb32...|SOAPDEY12A81C210A9|    1|
|b80344d063b5ccb32...|SOBBMDR12A8C13253B|    2|
|b80344d063b5ccb32...|SOBFNSP12AF72A0E22|    1|
|b80344d063b5ccb32...|SOBFOVM12A58A7D494|    1|
|b80344d063b5ccb32...|SOBNZDC12A6D4FC103|    1|
|b80344d063b5ccb32...|SOBSUJE12A6D4F8CF5|    2|
|b80344d063b5ccb32...|SOBVFZR12A6D4F8AE3|    1|
|b80344d063b5ccb32...|SOBXALG12A8C13C108|    1|
|b80344d063b5ccb32...|SOBXHDL12A81C204C0|    1|
+--------------------+------------------+-----+
only showing top 10 rows



In [6]:
# initializing a new dataframe
# generate hash for the upcoming processing
tripletsDF = triplets.withColumn('UserID', F.hash(col('User')))
tripletsDF = tripletsDF.withColumn('SongID', F.hash(col('Song')))
tripletsDF = tripletsDF.withColumn('CountNum', col('Count').cast(IntegerType()))


# This DF should have 48M entries, if we want to just have a test,
# limit it to 10k or 100k (still takes hours on single node)
tripletsDF = tripletsDF.limit(50000)

# export a csv file for a glance view of tripletsDF
tripletsDF.limit(200).write.csv('tripletsDF_preview')

                                                                                

In [7]:
tripletsDF.show(5)



+--------------------+------------------+-----+----------+-----------+--------+
|                User|              Song|Count|    UserID|     SongID|CountNum|
+--------------------+------------------+-----+----------+-----------+--------+
|b80344d063b5ccb32...|SOAKIMP12A8C130995|    1|1365117428| 1315780877|       1|
|b80344d063b5ccb32...|SOAPDEY12A81C210A9|    1|1365117428|-1623759929|       1|
|b80344d063b5ccb32...|SOBBMDR12A8C13253B|    2|1365117428|-1218290021|       2|
|b80344d063b5ccb32...|SOBFNSP12AF72A0E22|    1|1365117428|-1227648141|       1|
|b80344d063b5ccb32...|SOBFOVM12A58A7D494|    1|1365117428| 2054460487|       1|
+--------------------+------------------+-----+----------+-----------+--------+
only showing top 5 rows



                                                                                

In [8]:
# train test split
# make this seprately if we skip the cv process
(train, test) = tripletsDF.randomSplit([0.8,0.2], seed = 42)
# to choose if CV best result will be used, 0 for use our own hyperparameters, 1 for auto
CV = 0
# initializing ALS model
ALS_model = ALS(maxIter = 3, userCol = "UserID", itemCol = 'SongID', ratingCol = 'CountNum', coldStartStrategy = 'drop')

In [9]:
# grid seraching for the best parameter, adding more costs exponential time
grid = ParamGridBuilder().addGrid(ALS_model.rank, [10]).addGrid(ALS_model.regParam, [0.01]).build()

# initialzie a RMSE evaluator
evaluator = RegressionEvaluator(metricName = 'rmse', labelCol = 'CountNum', predictionCol = 'prediction')

if CV == 1:
    # set up cross validation process
    CrossVal = CrossValidator(numFolds = 5, estimator = ALS_model, estimatorParamMaps = grid, evaluator = evaluator)

    model = CrossVal.fit(train)

    # get the best model from cross validation
    TopModel = model.bestModel

In [10]:
# using the best hyperparameters from cv process
if CV == 1:
    estimator1 = ALS(rank = TopModel._java_obj.parent().getRank(), regParam = TopModel._java_obj.parent().getRegParam(), maxIter = 10, userCol = "UserID", itemCol = 'SongID', ratingCol = 'CountNum', coldStartStrategy = 'drop')
else:
    estimator1 = ALS(rank = 10, regParam = 0.01, maxIter = 3, userCol = "UserID", itemCol = 'SongID', ratingCol = 'CountNum', coldStartStrategy = 'drop')
model1 = estimator1.fit(train)


# make predictions from this model and see the RMSE
predictions1 = model1.transform(test)
print('The RMSE is:', evaluator.evaluate(predictions1))

23/03/12 17:24:30 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/03/12 17:24:30 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/03/12 17:24:31 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

The RMSE is: 7.600019281104039


                                                                                

In [11]:
# generating recommendations (TOP 10)
Top10Rec = model1.recommendForAllUsers(10)
Top10Rec.printSchema()
#Top10Rec.show(30)

Top10RecExploded = Top10Rec.withColumn('rec', explode("recommendations")).select('UserID', col("rec.SongID"), col("rec.Rating"))
Top10RecExploded.show(30)

# export a glance view of Top10Recommendations
Top10RecExploded.limit(100).write.csv('Top10Recommendations')



root
 |-- UserID: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- SongID: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



                                                                                

+-----------+-----------+---------+
|     UserID|     SongID|   Rating|
+-----------+-----------+---------+
|-2142932360| 1714300901| 71.20638|
|-2142932360|-1040959179| 49.89726|
|-2142932360|  800926323|46.211792|
|-2142932360| -129211381|39.312992|
|-2142932360|-1319581813| 30.28074|
|-2142932360|  155649641|29.927979|
|-2142932360| 1458280062|28.692703|
|-2142932360|  -14208851|28.170979|
|-2142932360| 1491754181|27.321358|
|-2142932360| 1539449717| 25.38141|
|-2137966073|   81446206|4174.1694|
|-2137966073| 1458280062| 447.3444|
|-2137966073| 1930078612| 172.2673|
|-2137966073| -613834292|144.28098|
|-2137966073|-1839976387|129.85287|
|-2137966073|  614659202|128.24974|
|-2137966073| -649334583|121.97104|
|-2137966073| 1491754181|120.93936|
|-2137966073|  367117523|107.33791|
|-2137966073| -450919340|94.838036|
|-2137573205|-1040959179| 86.33725|
|-2137573205| -129211381|68.023285|
|-2137573205|  434524559|52.839947|
|-2137573205| -164725097|  38.4372|
|-2137573205| -150803833|31.

                                                                                

In [15]:
# export a glance view of Top10Recommendations
Top10RecExploded.write.csv('Top10Recommendations_50000')

                                                                                

In [12]:
fPATH2 = 'unique_tracks.txt'
# read another file with tracks and artist info
uTracks = spark_session.read.csv(fPATH2, sep = '<SEP>')
# renaming columns
uTracks = uTracks.withColumnRenamed("_c1","Song").withColumnRenamed("_c2","ArtistName").withColumnRenamed("_c3","Title")
# same as before
uTracks= uTracks.withColumn("SongID", F.hash(col("Song")))

uTracks.show(10)
#print((tracks.count(), len(tracks.columns)))

+------------------+------------------+--------------------+--------------------+-----------+
|               _c0|              Song|          ArtistName|               Title|     SongID|
+------------------+------------------+--------------------+--------------------+-----------+
|TRAAAAW128F429D538|SOMZWCG12A8C13C480|              Casual|    I Didn't Mean To| 1552522404|
|TRAAABD128F429CF47|SOCIWDW12A8C13D406|        The Box Tops|           Soul Deep| -496217085|
|TRAAADZ128F9348C2E|SOXVLOJ12AB0189215|    Sonora Santanera|     Amor De Cabaret| 1426331605|
|TRAAAEF128F4273421|SONHOTT12A8C13493C|            Adam Ant|     Something Girls|  544571125|
|TRAAAFD128F92F423A|SOFSOCN12A8C143F5D|                 Gob|      Face the Ashes| 1666660322|
|TRAAAMO128F1481E7F|SOYMRWW12A6D4FAB14|Jeff And Sheri Ea...|The Moon And I (O...|  104420301|
|TRAAAMQ128F1460CD3|SOMJBYD12A6D4F8557|             Rated R|Keepin It Real (S...| -105457899|
|TRAAAPK128E0786D96|SOHKNRJ12A6701D1F8|Tweeterfriendly M...|

In [17]:
# join two dataframes so that we can match song title with id
preview1 = Top10RecExploded.join(uTracks, on = 'SongID').sort('Rating', ascending = False)

# if nothing shows here, you need to use a larger limit for tripletsDF, e.g. 100,000
preview1.show(10)

                                                                                

+----------+-----------+---------+------------------+------------------+--------------+--------------------+
|    SongID|     UserID|   Rating|               _c0|              Song|    ArtistName|               Title|
+----------+-----------+---------+------------------+------------------+--------------+--------------------+
|-946922627|-2132194054|30.251984|TRAWLYS12903CCFEFC|SOVAGPG12AB0189963|Joa~o Gilberto|Samba De Una Nota...|
|-946922627| -412531544| 22.15806|TRAWLYS12903CCFEFC|SOVAGPG12AB0189963|Joa~o Gilberto|Samba De Una Nota...|
+----------+-----------+---------+------------------+------------------+--------------+--------------------+



                                                                                

In [14]:
AVGtripletDF = tripletsDF.groupby('Song').avg('CountNum')
AVGtripletDF = tripletsDF.join(AVGtripletDF,on = 'Song')
AVGtripletDF = AVGtripletDF.withColumn('Rating',F.col('CountNum') / F.col('avg(CountNum)'))
AVGtripletDF.limit(10).show()

                                                                                

+------------------+--------------------+-----+----------+-----------+--------+------------------+-------------------+
|              Song|                User|Count|    UserID|     SongID|CountNum|     avg(CountNum)|             Rating|
+------------------+--------------------+-----+----------+-----------+--------+------------------+-------------------+
|SOAKIMP12A8C130995|b80344d063b5ccb32...|    1|1365117428| 1315780877|       1|1.6666666666666667|                0.6|
|SOAPDEY12A81C210A9|b80344d063b5ccb32...|    1|1365117428|-1623759929|       1|               4.5| 0.2222222222222222|
|SOBBMDR12A8C13253B|b80344d063b5ccb32...|    2|1365117428|-1218290021|       2|              23.0|0.08695652173913043|
|SOBFNSP12AF72A0E22|b80344d063b5ccb32...|    1|1365117428|-1227648141|       1|               1.0|                1.0|
|SOBFOVM12A58A7D494|b80344d063b5ccb32...|    1|1365117428| 2054460487|       1|               1.0|                1.0|
|SOBNZDC12A6D4FC103|b80344d063b5ccb32...|    1|1

In [18]:
# train test split
# make this seprately if we skip the cv process
(train, test) = AVGtripletDF.randomSplit([0.8,0.2], seed = 42)
# to choose if CV best result will be used, 0 for use our own hyperparameters, 1 for auto
CV = 0
# initializing ALS model
ALS_model = ALS(maxIter = 3, userCol = "UserID", itemCol = 'SongID', ratingCol = 'CountNum', coldStartStrategy = 'drop')


# grid seraching for the best parameter, adding more costs exponential time
grid = ParamGridBuilder().addGrid(ALS_model.rank, [10]).addGrid(ALS_model.regParam, [0.01]).build()

# initialzie a RMSE evaluator
evaluator = RegressionEvaluator(metricName = 'rmse', labelCol = 'CountNum', predictionCol = 'prediction')

if CV == 1:
    # set up cross validation process
    CrossVal = CrossValidator(numFolds = 5, estimator = ALS_model, estimatorParamMaps = grid, evaluator = evaluator)

    model = CrossVal.fit(train)

    # get the best model from cross validation
    TopModel = model.bestModel
    

# using the best hyperparameters from cv process
if CV == 1:
    estimator2 = ALS(rank = TopModel._java_obj.parent().getRank(), regParam = TopModel._java_obj.parent().getRegParam(), maxIter = 10, userCol = "UserID", itemCol = 'SongID', ratingCol = 'CountNum', coldStartStrategy = 'drop')
else:
    estimator2 = ALS(rank = 10, regParam = 0.01, maxIter = 3, userCol = "UserID", itemCol = 'SongID', ratingCol = 'CountNum', coldStartStrategy = 'drop')
model2 = estimator2.fit(train)


# make predictions from this model and see the RMSE
predictions2 = model2.transform(test)
print('The RMSE is:', evaluator.evaluate(predictions2))

                                                                                

The RMSE is: 8.201614445747547


In [19]:
Top10NewRec = model2.recommendForAllUsers(10)
Top10NewRecExploded = Top10NewRec.withColumn('rec', explode("recommendations")).select('UserID', col("rec.SongID"), col("rec.Rating"))
Top10NewRecExploded.show(20)



+-----------+-----------+---------+
|     UserID|     SongID|   Rating|
+-----------+-----------+---------+
|-2142932360| 1714300901| 60.71674|
|-2142932360| -129211381|41.585102|
|-2142932360|  461088686| 37.29334|
|-2142932360| 1539449717| 30.09113|
|-2142932360| 1491754181| 29.64621|
|-2142932360| 1829551989| 26.67107|
|-2142932360| 1659373507|25.137074|
|-2142932360| 1597669298| 24.55417|
|-2142932360| -634560697|24.495926|
|-2142932360|-1510434197|24.495926|
|-2137966073| 1930078612| 86.14151|
|-2137966073| 1597669298|79.077965|
|-2137966073| 1814487439| 70.98833|
|-2137966073|-1185505407| 67.59891|
|-2137966073|  802546531| 55.71886|
|-2137966073|  692732998|45.635353|
|-2137966073| -474679864|   43.288|
|-2137966073|-1556289008| 42.63865|
|-2137966073|  167025416|42.444782|
|-2137966073| -323877619|41.578884|
+-----------+-----------+---------+
only showing top 20 rows



                                                                                

In [24]:
preview2 = Top10NewRecExploded.join(uTracks, on='SongID').filter('UserID = -1935480550').sort('Rating',ascending=False).select('SongID','UserID','Rating','Title')
preview2.show()

                                                                                

+-----------+-----------+--------+------------+
|     SongID|     UserID|  Rating|       Title|
+-----------+-----------+--------+------------+
|-2127621405|-1935480550|94.13689|This Lullaby|
+-----------+-----------+--------+------------+



In [22]:
preview3 = AVGtripletDF.join(uTracks,on="SongID").filter('UserID = -1935480550').sort('CountNum', ascending=False).select('SongID','UserID','CountNum','Title')
preview3.show()



+-----------+-----------+--------+--------------------+
|     SongID|     UserID|CountNum|               Title|
+-----------+-----------+--------+--------------------+
|-1632100307|-1935480550|       3|Angie (1993 Digit...|
| 1681856507|-1935480550|       1|     Call Of Da Wild|
+-----------+-----------+--------+--------------------+



                                                                                

In [25]:
preview2.write.csv('Top10NewRecommendations_All')
preview3.write.csv('AVGtripletDF_All')

                                                                                