# Matrix Factorization with Mllib in Spark

This notebook is a continuation of our previous Matrix Factorization notebook where we built a MF engine with Keras. Here we use Matrix Factorization from Mllib in Spark.

In [1]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark import SparkContext

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
# set the random state
rs = 123

In [3]:
sc = SparkSession \
    .builder \
    .getOrCreate()

24/06/10 22:22:12 WARN Utils: Your hostname, kemi-macCN7G03L9K7.local resolves to a loopback address: 127.0.0.1; using 192.168.1.134 instead (on interface en0)
24/06/10 22:22:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/10 22:22:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


We load the same sampled dataframe used previously in our matrix factorization engine built with Keras

In [4]:
# load in the data

df = sc.read.csv("sample_df.csv.gz",header=True,inferSchema=True)

                                                                                

Let's have a look at our data

In [5]:
df.show(10)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|     29|   3.5|
|     1|     32|   3.5|
|     1|     47|   3.5|
|     1|     50|   3.5|
|     1|    112|   3.5|
|     1|    151|   4.0|
|     1|    223|   4.0|
|     1|    253|   4.0|
|     1|    260|   4.0|
+------+-------+------+
only showing top 10 rows



We can easily print the schema and the number of rows/columns

In [6]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [7]:
print('number of rows : {}, number of columns :{}'.format(df.count(),len(df.columns)))

[Stage 3:>                                                          (0 + 1) / 1]

number of rows : 5399624, number of columns :3


                                                                                

In [8]:
# split into train and test
train, test = df.randomSplit([0.8, 0.2],seed=rs)

A matrix factorisation model trained by regularized alternating least-squares `ALS`.

`K` is the number of latent dimentionality and `epochs` is our number of iterations.

In [9]:
# train the model
K = 16
epochs = 10
model = ALS.train(train, K, epochs, nonnegative=True, seed=rs)

24/06/10 22:22:23 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/06/10 22:22:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/06/10 22:22:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


### Now we evaluate our model on the train and test sets

In [10]:
# train
x1 = train.rdd.map(lambda p: (p[0], p[1]))
x2 = model.predictAll(x1)
p = x2.map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = train.rdd.map(lambda r: ((r[0], r[1]), r[2])).join(p) # notice we had to map test such it has the same shape as p
# joins on first item: (user_id, movie_id)
# each row of result is: ((user_id, movie_id), (rating, prediction))
mse = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("***** train rmse: %s *****" % mse**0.5)



***** train rmse: 0.6178613873360993 *****


                                                                                

In [11]:
# test
x1 = test.rdd.map(lambda p: (p[0], p[1]))
x2 = model.predictAll(x1)
p = x2.map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = test.rdd.map(lambda r: ((r[0], r[1]), r[2])).join(p) 
# joins on first item: (user_id, movie_id)
# each row of result is: ((user_id, movie_id), (rating, prediction))
mse = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("***** test rmse: %s *****" % mse**0.5)



***** test rmse: 0.9337275729072203 *****


                                                                                

We remember that the RMSE obtained for our Keras matrix factorization model on the test set was 0.8723, which is on pair with the current one.

----

below is just a simple check of how x1, x2, and ratesAndPreds look like.

In [12]:
print(x1)
x1.take(5)


PythonRDD[287] at RDD at PythonRDD.scala:53


                                                                                

[(1, 32), (1, 151), (1, 318), (1, 337), (1, 653)]

In [13]:
print(x2)
x2.take(5)

MapPartitionsRDD[278] at mapPartitions at PythonMLLibAPI.scala:1344


[Rating(user=54040, product=170, rating=3.0028685211934585),
 Rating(user=54040, product=595, rating=3.631608037813594),
 Rating(user=54040, product=367, rating=1.0830850704306616),
 Rating(user=54040, product=377, rating=1.6751371063918228),
 Rating(user=54040, product=288, rating=4.341193207379497)]

In [14]:
print(p)
p.take(5)

PythonRDD[290] at RDD at PythonRDD.scala:53


[((54040, 170), 3.0028685211934585),
 ((54040, 595), 3.631608037813594),
 ((54040, 367), 1.0830850704306616),
 ((54040, 377), 1.6751371063918228),
 ((54040, 288), 4.341193207379497)]

In [15]:
print(ratesAndPreds)
ratesAndPreds.take(5)

PythonRDD[292] at RDD at PythonRDD.scala:53


[((104, 180), (1.0, 1.863569166612951)),
 ((305, 1), (5.0, 3.773345251306051)),
 ((347, 905), (5.0, 4.539851368985019)),
 ((393, 47), (5.0, 4.443834535552931)),
 ((516, 542), (3.0, 3.667399265251589))]