# Using Spark MLLIB 

In [107]:

import numpy as np
import pandas as pd
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel, Rating
from pyspark.sql.types import FloatType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
from pyspark.sql.types import *


In [108]:
#Reading into Pandas dataframe

movie_data = pd.read_csv('C:\\Users\\saikiran\\Downloads\\ml-latest-small\\ml-latest-small\\ratings.csv', sep = ",")

movie_data = movie_data.iloc[:,0:3]
p_schema = StructType([StructField('user',IntegerType(),True),StructField('item',IntegerType(),True),StructField('rating',DoubleType(),True)])
#converting Pandas dataframe to pyspark dataframe
movie_lens = sqlContext.createDataFrame(movie_data,p_schema)
movie_lens.show()




+----+----+------+
|user|item|rating|
+----+----+------+
|   1|  31|   2.5|
|   1|1029|   3.0|
|   1|1061|   3.0|
|   1|1129|   2.0|
|   1|1172|   4.0|
|   1|1263|   2.0|
|   1|1287|   2.0|
|   1|1293|   2.0|
|   1|1339|   3.5|
|   1|1343|   2.0|
|   1|1371|   2.5|
|   1|1405|   1.0|
|   1|1953|   4.0|
|   1|2105|   4.0|
|   1|2150|   3.0|
|   1|2193|   2.0|
|   1|2294|   2.0|
|   1|2455|   2.5|
|   1|2968|   1.0|
|   1|3671|   3.0|
+----+----+------+
only showing top 20 rows



In [109]:
movie_lens.printSchema()

root
 |-- user: integer (nullable = true)
 |-- item: integer (nullable = true)
 |-- rating: double (nullable = true)



In [130]:
movie_lens_rdd = movie_lens.rdd
X_train, X_test= movie_lens_rdd.randomSplit([0.7, 0.3])
# Training the model
rank = 5
numIterations = 10
model = ALS.train(X_train, rank, numIterations, lambda_ = 0.1)
testdata = X_test.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = X_test.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)

# calculating error
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
RMSE = np.sqrt(MSE)
print("Root Mean Squared Error =" + str(RMSE))

Mean Squared Error = 0.861106985224
Root Mean Squared Error =0.9279585040422882


# Own Implementation

In [111]:
import numpy as np
import pandas as pd
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col
from pyspark.sql.types import *


In [112]:
#grouping the data based on userid
user_item_ratings = movie_lens_rdd.map(lambda r: (int(r[0]),(int(r[1]), r[2])))
user_item_ratings = user_item_ratings.groupByKey()
user_item_ratings.take(1)

[(1, <pyspark.resultiterable.ResultIterable at 0x13586b38>)]

In [113]:
#grouping data based ion itemid
item_user_ratings = movie_lens_rdd.map(lambda r: (int(r[1]),(int(r[0]), r[2])))
item_user_ratings = item_user_ratings.groupByKey()
item_user_ratings.take(1)

[(1, <pyspark.resultiterable.ResultIterable at 0x1356ba20>)]

In [114]:
print "Number of Items (columns of A matrix):", item_user_ratings.count()
print "Number of Users (rows of A matrix):", user_item_ratings.count()


Number of Items (columns of A matrix): 9066
Number of Users (rows of A matrix): 671


In [115]:
# Parameters
lambda_ = sc.broadcast(0.1) # Regularization parameter
n_factors = sc.broadcast(3) # latent factors of User matrix and Item matrix
n_iterations = 10 # How many times to iterate over the user and item matrix calculations.

In [116]:
Items = item_user_ratings.map(lambda line: (line[0], 5 * np.random.rand(1, n_factors.value)))
print Items.take(10)

[(1, array([[1.23171603, 0.24265017, 3.10112495]])), (2, array([[4.37840687, 4.60891084, 0.08161432]])), (3, array([[0.42945383, 1.37201934, 1.06338654]])), (4, array([[0.46110377, 4.71009181, 2.64415859]])), (5, array([[0.43231563, 3.88621269, 4.15218662]])), (6, array([[3.61210956, 3.3653311 , 1.96826261]])), (7, array([[3.12599783, 1.32253927, 1.02734962]])), (8, array([[2.82620379, 4.11641606, 3.32292057]])), (9, array([[1.63786827, 0.76335916, 3.47691966]])), (10, array([[1.68042117, 2.39159696, 0.55746939]]))]


In [117]:
Items_broadcast = sc.broadcast({
  k: v for (k, v) in Items.collect()
})


In [118]:
j = 0
for k, v in {k: v for (k, v) in Items.collect()}.iteritems():
    print k,v
    j+=1
    if j > 10:
        break

1 [[3.86289867 1.00056649 1.97921342]]
2 [[1.14372587 0.90837747 2.77237806]]
3 [[1.2629488  3.88758693 4.04774345]]
4 [[1.40924223 2.54150205 4.71161541]]
5 [[2.64275101 0.62759836 1.1176232 ]]
6 [[0.32824575 3.59807892 0.32324028]]
7 [[3.29227074 1.31497343 4.54320573]]
8 [[3.56184697 1.90792738 4.12617274]]
9 [[0.90165773 3.74701136 3.99214718]]
10 [[0.47373144 1.08796188 3.81162039]]
11 [[2.44485905 4.63160819 4.90666724]]


In [119]:
j = 0
for i in user_item_ratings.take(1)[0][1]:
    print i
    j+=1
    if j > 10:
        break

(31, 2.5)
(1029, 3.0)
(1061, 3.0)
(1129, 2.0)
(1172, 4.0)
(1263, 2.0)
(1287, 2.0)
(1293, 2.0)
(1339, 3.5)
(1343, 2.0)
(1371, 2.5)


In [120]:
def Update_User(userTuple):
    
    Itemssquare = np.zeros([n_factors.value,n_factors.value])
    for matrixA_item_Tuple in userTuple[1]:
        itemRow = Items_broadcast.value[matrixA_item_Tuple[0]][0]
        for i in range(n_factors.value):
            for j in range(n_factors.value):
                Itemssquare[i,j] += float(itemRow[i]) * float(itemRow[j])
    leftMatrix = np.linalg.inv(Itemssquare + lambda_.value * np.eye(n_factors.value))
    rightMatrix = np.zeros([1,n_factors.value])
    for matrixA_item_Tuple in userTuple[1]:
        for i in range(n_factors.value):
            rightMatrix[0][i] += Items_broadcast.value[matrixA_item_Tuple[0]][0][i] * float(matrixA_item_Tuple[1])
    newUserRow = np.dot(leftMatrix, rightMatrix.T).T
    return (userTuple[0], newUserRow)

In [121]:
Users = user_item_ratings.map(Update_User)

In [122]:
print Users.take(1)

[(1, array([[0.44038489, 0.36815262, 0.09943495]]))]


In [123]:
# The item matrix is needed in all partitions when solving for rows of User matrix individually
Users_broadcast = sc.broadcast({
  k: v for (k, v) in Users.collect()
})

In [124]:
def Update_Item(itemTuple):
    
    Userssquare = np.zeros([n_factors.value,n_factors.value])
    for matrixA_user_Tuple in itemTuple[1]:
        userRow = Users_broadcast.value[matrixA_user_Tuple[0]][0]
        for i in range(n_factors.value):
            for j in range(n_factors.value):
                Userssquare[i,j] += float(userRow[i]) * float(userRow[j])
    leftMatrix = np.linalg.inv(Userssquare + lambda_.value * np.eye(n_factors.value))
    rightMatrix = np.zeros([1,n_factors.value])
    for matrixA_user_Tuple in itemTuple[1]:
        for i in range(n_factors.value):
            rightMatrix[0][i] += Users_broadcast.value[matrixA_user_Tuple[0]][0][i] * float(matrixA_user_Tuple[1])
    newItemRow = np.dot(leftMatrix, rightMatrix.T).T
    return (itemTuple[0], newItemRow)

In [125]:
Items = item_user_ratings.map(Update_Item)
print Items.take(1)

[(1, array([[2.72138913, 3.98632499, 2.10678328]]))]


In [126]:
Items_broadcast = sc.broadcast({
  k: v for (k, v) in Items.collect()
})

In [127]:
def getRowSumSquares(userTuple):
    userRow = Users_broadcast.value[userTuple[0]]
    rowSSE = 0.0
    for matrixA_item_Tuple in userTuple[1]:
        predictedRating = 0.0
        for i in range(n_factors.value):
            predictedRating += userRow[0][i] * Items_broadcast.value[matrixA_item_Tuple[0]][0][i]
        SE = (float(matrixA_item_Tuple[1]) - predictedRating) ** 2
        rowSSE += SE
    return rowSSE

In [128]:
SSE = user_item_ratings.map(getRowSumSquares).reduce(lambda a, b: a + b)
Count = movie_lens_rdd.count()
MSE = SSE / Count
RMSE = np.sqrt(MSE)
print "MSE:", MSE
print "RMSE:", RMSE

MSE: 0.6718622349541447
RMSE: 0.8196720288957924


In [129]:
for iter in range(n_iterations):
    Users = user_item_ratings.map(Update_User)
    Users_broadcast = sc.broadcast({k: v for (k, v) in Users.collect()})
    Items = item_user_ratings.map(Update_Item)
    Items_broadcast = sc.broadcast({k: v for (k, v) in Items.collect()})
    SSE = user_item_ratings.map(getRowSumSquares).reduce(lambda a, b: a + b)
    MSE = SSE / Count
    RMSE = np.sqrt(MSE)
    print "For Interation ={},RMSE is {}:".format(iter,RMSE)

For Interation =0,RMSE is 0.765117033876:
For Interation =1,RMSE is 0.745845231106:
For Interation =2,RMSE is 0.734569091586:
For Interation =3,RMSE is 0.727264941407:
For Interation =4,RMSE is 0.722281954318:
For Interation =5,RMSE is 0.71874328772:
For Interation =6,RMSE is 0.716015744796:
For Interation =7,RMSE is 0.713966656331:
For Interation =8,RMSE is 0.712363243951:
For Interation =9,RMSE is 0.71108129002:


# For 10 iterations the algorithm implementation has obtained a RMSE value of 0.711 and using spark MLLib for 10 iterations, RMSE value obtained is 0.928