In [1]:
# import the usual
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import numpy as np
import math
import re
import itertools
from scipy.sparse import csr_matrix

%matplotlib inline
pd.set_option('display.max_columns', 500)

In [2]:
import findspark
findspark.init()
#from pyspark.ml.recommendation import ALS
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [3]:
sc = SparkContext()

In [4]:
# sp = SparkSession.builder.appName("s").getOrCreate()

In [5]:
slot1 = sc.textFile(r"F:\Data_Repository\lastfm\df_slot1.tsv")
slot2 = sc.textFile(r"F:\Data_Repository\lastfm\df_slot2.tsv")
slot3 = sc.textFile(r"F:\Data_Repository\lastfm\df_slot3.tsv")
slot4 = sc.textFile(r"F:\Data_Repository\lastfm\df_slot4.tsv")

In [6]:
type(slot1)

pyspark.rdd.RDD

In [7]:
slots = [slot1, slot2, slot3, slot4]

In [8]:
def isNumber(s):
    try:
        float(s)
        return True
    except ValueError:
        pass
 
    return False

def computeRMSE(model,data):
    
    """ Takes ALS models and testing data as input and returns RMSE value """
    
    data_for_predict = data.map(lambda x: (x[0], x[1]))
    
    predictions = model.predictAll(data_for_predict).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = data.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
    return error

In [9]:
%%time
modelnameid = 1
for data in slots:
    path = str('F:\Data_Repository\lastfm')
    modelname = path + "\slot" + str(modelnameid) + ".tsv"
    data = data.map(lambda x: x.split('\t'))
    data2 = data.map(lambda x : [x[i] for i in [0,1,2]]) #only 3 columns exist
    data2 = data2.filter(lambda x: isNumber(x[2])) # Remove faulty rows
    data2 = data2.map(lambda x: [x[0], x[1], float(x[2])]) #Change plays into float
    users = data2.map(lambda x: x[0]).distinct().zipWithIndex()
    artists = data2.map(lambda x: x[1]).distinct().zipWithIndex()
    data2 = data2.map(lambda r: (r[0], (r[1], r[2]))).join(users).map(lambda r: (r[1][1], r[1][0][0], r[1][0][1]))
    data2 = data2.map(lambda r: (r[1], (r[0], r[2]))).join(artists).map(lambda r: (r[1][0][0], r[1][1], r[1][0][1]))
    plays = data2.map(lambda x: x[2])
    data2 = data2.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
    training_RDD, validation_RDD, test_RDD = data2.randomSplit([6, 2, 2], seed = 2)
    validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
    test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))
    
    seed = 5
    #iterations = 10
    iterations = 1
    regularization_parameter = 0.1
    ranks = [5,10,15]
    #ranks = [15] #to reduce loop
    errors = [0, 0, 0]
    err = 0
    tolerance = 0.02
    alpha = 0.01

    min_error = float('inf')
    best_rank = -1
    best_iteration = -1

    for rank in ranks:
        e = 0
        #for i in range(5):
        for i in range(1):
            # Split the data
            training_RDD, validation_RDD, test_RDD = data2.randomSplit([6, 2, 2], seed = 2)
            validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
            test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

            model = ALS.trainImplicit(training_RDD, rank, seed=seed, iterations=iterations,
                              lambda_=regularization_parameter,alpha=alpha)
        #     predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
        #     rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
        #     error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
            e += computeRMSE(model,validation_RDD)

        error = e/5.5
        errors[err] = error
        err += 1
        print ('For rank %s the RMSE is %s' % (rank, error))
        if error < min_error:
            min_error = error
            best_rank = rank

    print ('The best model was trained with rank %s' % best_rank)

    ####to save model
    #print("Saving model to the given path")
    #model.save(sc, modelname)
    modelnameid = modelnameid + 1

For rank 5 the RMSE is 1.1681614305385888
For rank 10 the RMSE is 1.1678819944463574
For rank 15 the RMSE is 1.1671706920636706
The best model was trained with rank 15
For rank 5 the RMSE is 1.0204170683254816
For rank 10 the RMSE is 1.020022155681369
For rank 15 the RMSE is 1.0196947585522675
The best model was trained with rank 15
For rank 5 the RMSE is 1.2072978578503877
For rank 10 the RMSE is 1.2066129841598738
For rank 15 the RMSE is 1.2064489655607
The best model was trained with rank 15
For rank 5 the RMSE is 1.1906307046531146
For rank 10 the RMSE is 1.189835280967131
For rank 15 the RMSE is 1.1895705703178825
The best model was trained with rank 15
Wall time: 7min 5s


In [10]:
data = slot1

In [11]:
data = data.map(lambda x: x.split('\t'))
# header = data.first()
# print(header)

In [12]:
# data2 = data.map(lambda x : [x[i] for i in [0,1,3]])
data2 = data.map(lambda x : [x[i] for i in [0,1,2]]) #only 3 columns exist

In [13]:
#print ("length of uncleaned data -",data2.count())

In [14]:
def isNumber(s):
    try:
        float(s)
        return True
    except ValueError:
        pass
 
    return False

In [15]:
data2 = data2.filter(lambda x: isNumber(x[2])) # Remove faulty rows
data2 = data2.map(lambda x: [x[0], x[1], float(x[2])]) #Change plays into float

In [16]:
data2.first()

['user_000001', '15 Step', 2.0]

#### no need to reduce

In [17]:
%%time
#Convert strings into integers
users = data2.map(lambda x: x[0]).distinct().zipWithIndex()
artists = data2.map(lambda x: x[1]).distinct().zipWithIndex()
# int_user = users.map(lambda u: (u[1], u[0]))
# int_artist = artists.map(lambda i: (i[1], i[0]))
# users.collect()
# artists.collect()

Wall time: 4.01 s


In [18]:
%%time
# Substitutes the ObjectIDs in the ratings RDD with the corresponding int values
data2 = data2.map(lambda r: (r[0], (r[1], r[2]))).join(users).map(lambda r: (r[1][1], r[1][0][0], r[1][0][1]))
data2 = data2.map(lambda r: (r[1], (r[0], r[2]))).join(artists).map(lambda r: (r[1][0][0], r[1][1], r[1][0][1]))

Wall time: 53.9 ms


In [19]:
%%time
# data2.filter(lambda x: x[0] == 12).collect()
# plays = data2.map(lambda x: x[2]).collect() ##seems like data2 loses data after .collect is called
plays = data2.map(lambda x: x[2])
# data2.collect()

Wall time: 0 ns


In [20]:
%%time
data2.first()

Wall time: 7.89 s


(1, 931, 11.0)

In [21]:
%%time
# Use 'Rating' function to get the values in the right format
data2 = data2.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
#data2.count()

Wall time: 0 ns


#### Dataframe converted to rating

In [22]:
data2.first()

Rating(user=1, product=931, rating=11.0)

In [23]:
%%time
# Use randomsplit to split the data into train, validation and testing sets

training_RDD, validation_RDD, test_RDD = data2.randomSplit([6, 2, 2], seed = 2)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

Wall time: 0 ns


In [24]:
%%time
# Define computeRMSE

def computeRMSE(model,data):
    
    """ Takes ALS models and testing data as input and returns RMSE value """
    
    data_for_predict = data.map(lambda x: (x[0], x[1]))
    
    predictions = model.predictAll(data_for_predict).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = data.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
    return error

Wall time: 0 ns


#### Model validation

In [44]:
%%time
# Final Model

model = ALS.trainImplicit(training_RDD, 15, seed=5, iterations= 1, lambda_ = regularization_parameter)

#model = ALS.train(data2, 15, seed=5, iterations= 1, lambda_ = regularization_parameter)
#predictions = model.predictAll(validation_for_predict_RDD)
predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))


Wall time: 14.8 s


In [45]:
%%time
#     rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
#     error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
computeRMSE(model,validation_RDD)

Wall time: 41.3 s


6.419438806350188

In [46]:
# rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)

In [47]:
# rates_and_preds.first()

In [48]:
## validation rdd
validation_RDD.first()

Rating(user=23, product=931, rating=4.0)

In [49]:
## validation having just user item
validation_for_predict_RDD.first()

(23, 931)

In [None]:
# validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))


In [30]:
# predictions = model.predictAll(validation_RDD).map(lambda r: ((r[0], r[1]), r[2]))
# ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)

In [31]:
# %%time
# print (validation_RDD.count())
# print (predictions.count()   )
# #computeRMSE(model,data2)

In [32]:
%%time
#computeRMSE(model,data2)

Wall time: 0 ns


#### Recommend Products for top-n

In [35]:
def recommendProducts(self, user, num):
    """
    Recommends the top "num" number of products for a given user and
    returns a list of Rating objects sorted by the predicted rating in
    descending order. """
    pass

In [41]:
%time
n = 2
recos = model.recommendProducts(23, n)

Wall time: 0 ns


In [42]:
recos

[Rating(user=23, product=3021, rating=31.343305724051433),
 Rating(user=23, product=998, rating=21.964829819243604)]

In [None]:
#  def predict(self, user, product):
#         """
#         Predicts rating for the given user and product.
#         """
#         return self._java_model.predict(int(user), int(product)

In [52]:
validation_RDD.first()

[Rating(user=23, product=931, rating=4.0)]

In [50]:
pred_rating = model.predict(23, 931)
pred_rating

0.0032886570342311773

In [None]:
spark = SparkSession(sc) #to convert into df

In [62]:
valid_df = validation_RDD.toDF()

In [64]:
valid_df_pandas = valid_df.toPandas()

In [71]:
valid_df_pandas.head()

Unnamed: 0,user,product,rating
0,23,931,4.0
1,775,931,1.0
2,782,931,4.0
3,4,41,1.0
4,9,41,4.0


In [83]:
valid_df_pandas [valid_df_pandas['user'] == 43]

Unnamed: 0,user,product,rating
8457,43,1755,1.0


In [None]:
for index, row in item.iterrows():
        prediction = algo.predict(row['userid'], row['songid'], verbose=False)
        predicted_rating = prediction.est
        output = output.append(pd.Series([str(row['userid']), str(row['songid']),
                        predicted_rating], index=output.columns), ignore_index=True)

In [86]:
# %%time
# ### to store recommended product with ratings
# #pred_productid = []
# output = pd.DataFrame(columns = ['userid', 'songid','pred_score','actual_score'])
# for index, row in valid_df_pandas.iterrows():
#     if row['user'] == 43 or row['user'] == 439:
#         continue
#     pred_rating = model.predict( row['user'], row['product'] ) 
#     output.append( pd.Series([
#         int(row['user']), int(row['product']),
#         pred_rating, float(row['rating'])
#                         ], index= output.columns), ignore_index=True)