In [32]:
#Importing libraries
from pyspark.sql import SparkSession #to connect to spark cluster/core
from pyspark import SparkContext  #to read file aptly

import math
import re
import numpy as np
import itertools
import datetime

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:

spark = SparkSession.builder \
        .master("local[*]") \
        .appName("RecommendationSystems") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
        
sc=SparkContext.getOrCreate()

In [4]:
#Creating pointer to the file 
userArtistsFull = sc.textFile("lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv")

In [5]:
#Reading 5 rows from the memory to the disk 
userArtistsFull.take(5)

['00000c289a1829a808ac09c00daf10bc3c4e223b\t3bd73256-3905-4f3a-97e2-8b341527f805\tbetty blowtorch\t2137',
 '00000c289a1829a808ac09c00daf10bc3c4e223b\tf2fb0ff0-5679-42ec-a55c-15109ce6e320\tdie Ärzte\t1099',
 '00000c289a1829a808ac09c00daf10bc3c4e223b\tb3ae82c2-e60b-4551-a76d-6620f1b456aa\tmelissa etheridge\t897',
 '00000c289a1829a808ac09c00daf10bc3c4e223b\t3d6bbeb7-f90e-4d10-b440-e153c0d10b53\telvenking\t717',
 '00000c289a1829a808ac09c00daf10bc3c4e223b\tbbd2ffd7-17f4-4506-8572-c1ea58c3f9a8\tjuliette & the licks\t706']

In [6]:
#Extracting only the required columns "UserID","ArtistID" and "plays"
userArtists = userArtistsFull.map(lambda line: line.split("\t")).map(lambda tokens: (tokens[0],tokens[1],tokens[3])).cache()

In [23]:
print ("length of data -",userArtists.count())

length of data - 17559530


In [11]:
#Converting the user and artists strings into numerical IDs
users = userArtists.map(lambda x: x[0]).distinct().zipWithIndex()
artists = userArtists.map(lambda x: x[1]).distinct().zipWithIndex()

In [12]:
print("Number of distinct Users :", users.count())
print("Number of distinct Artists :", artists.count())

Number of distinct Users : 359349
Number of distinct Artists : 160168


In [24]:
#data2 = userArtists.map(lambda r: (r[0], (r[1], r[2])))
#data3 = userArtists.map(lambda r: (r[0], (r[1], r[2]))).join(users).map(lambda r: (r[1][1], r[1][0][0], r[1][0][1]))

In [14]:
#data2.take(1)

[('00000c289a1829a808ac09c00daf10bc3c4e223b',
  ('3bd73256-3905-4f3a-97e2-8b341527f805', '2137'))]

In [25]:
#data3.take(1)

[(0, 'a74b1b7f-71a5-4011-9441-d0b5e4122711', '609')]

In [26]:
#Substituting the users and artists with their respective IDs
userArtists = userArtists.map(lambda r: (r[0], (r[1], r[2]))).join(users).map(lambda r: (r[1][1], r[1][0][0], r[1][0][1]))
userArtists = userArtists.map(lambda r: (r[1], (r[0], r[2]))).join(artists).map(lambda r: (r[1][0][0], r[1][1], r[1][0][1]))

In [29]:
#Using the rating function(from mllib.recommendation) to create object of better format (int, int, float)
userArtists = userArtists.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
userArtists.count()

17559530

In [44]:
#Deviding the data into train,validation and test - "SDF Object"
userArtistSDF = spark.createDataFrame(userArtists, ['UserId', 'ArtistId', 'plays'] )
training_RDD, validation_RDD, test_RDD = userArtistsDF.randomSplit([0.6, 0.2, 0.2],seed=2)
validation_userArtists = validation_RDD.select("UserId","ArtistID")
test_userArtists = test_RDD.select("UserId","ArtistID")

In [45]:
#Deviding the data into train,validation and test - "Ratings Object"

training_RDDr, validation_RDDr, test_RDDr = userArtists.randomSplit([6, 2, 2],seed=2)
validation_userArtistsR = validation_RDDr.map(lambda x: (x[0], x[1]))
test_userArtistsR = test_RDDr.map(lambda x: (x[0], x[1]))


### Elementory Training

In [47]:
#Training without  specific hyper parameters
model = ALS(userCol="UserId", itemCol="ArtistId", ratingCol="plays").fit(training_RDD)

In [48]:
predictions = model.transform(training_RDD)
predictions.show()

+------+--------+-----+----------+
|UserId|ArtistId|plays|prediction|
+------+--------+-----+----------+
|109622|     148|776.0| 509.52637|
|262597|     148|140.0| 192.65195|
|302550|     148|229.0|  329.1944|
| 11280|     148|224.0|  357.5207|
| 71984|     148|201.0| 209.24959|
|121812|     148|109.0| 208.89005|
|129627|     148| 77.0| 148.16809|
|212128|     148| 69.0|  97.50506|
|288522|     148| 73.0|106.645226|
|266184|     148| 20.0| 44.081608|
| 40824|     148|548.0| 365.89624|
|290570|     148|153.0| 109.49223|
| 15292|     148|334.0| 340.38077|
|139486|     148| 96.0| 333.65973|
|176799|     148|319.0|  198.7132|
|349692|     148|238.0| 198.41357|
| 72464|     148|118.0| 182.11542|
| 81532|     148|206.0| 93.044106|
|207532|     148| 19.0| 25.378273|
| 36528|     148|156.0| 123.07705|
+------+--------+-----+----------+
only showing top 20 rows



In [50]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="plays", predictionCol="prediction")
print ("The root mean squared error for our model is: ", str(evaluator.evaluate(predictions)))

The root mean squared error for our model is:  208.18046340202068
