![](https://raw.githubusercontent.com/databricks/spark-training/master/website/img/matrix_factorization.png))

In [1]:
import os
import sys
from pyspark import SparkContext
from pyspark import SparkConf


conf = SparkConf()
conf.setAppName("spark-env")

sc = SparkContext(conf=conf)


from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark import SparkContext


In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recommend').getOrCreate()

data = spark.read.csv('./data/movielens-20m-dataset/rating.csv', inferSchema=True, header=True)
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [8]:
#Count null value
from pyspark.sql.functions import col,sum
data.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in data.columns)).show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     0|      0|     0|        0|
+------+-------+------+---------+



In [9]:
#Count Null value
from pyspark.sql.functions import lit, col

rows = data.count()
summary = data.describe()
summary.show()

+-------+-----------------+------------------+------------------+
|summary|           userId|           movieId|            rating|
+-------+-----------------+------------------+------------------+
|  count|         20000263|          20000263|          20000263|
|   mean|69045.87258292554| 9041.567330339605|3.5255285642993797|
| stddev|40038.62665316145|19789.477445413166|1.0519889192942444|
|    min|                1|                 1|               0.5|
|    max|           138493|            131262|               5.0|
+-------+-----------------+------------------+------------------+



In [10]:
data.show(5)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
+------+-------+------+-------------------+
only showing top 5 rows



In [11]:
#Split dataset to train and test
# train_data, test_data = data.randomSplit([0.8, 0.2])
train_data, test_data = data.randomSplit([0.1, 0.9])

## Alternating Least Squares (ALS)

- Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by using Alternating Least Squares. The implementation in MLlib has these parameters:

    - numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
    - rank is the number of latent factors in the model.
    - iterations is the number of iterations to run.
    - lambda specifies the regularization parameter in ALS.
    - implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
    - alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.


In [12]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [13]:
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=10, regParam=0.1, rank=8, nonnegative=True, coldStartStrategy="drop",\
          userCol='userId', itemCol='movieId', ratingCol='rating')
model = als.fit(train_data)

In [14]:
print('Factorized user matrix with rank = %d' % model.rank)
model.userFactors.show(5)

print('-'*50)

print('Factorized item matrix with rank = %d' % model.rank)
model.itemFactors.show(5)

Factorized user matrix with rank = 8
+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.5212919, 1.131...|
| 20|[0.0, 0.001707591...|
| 30|[0.1865501, 0.0, ...|
| 40|[0.0, 0.16109952,...|
| 50|[1.0250514, 0.901...|
+---+--------------------+
only showing top 5 rows

--------------------------------------------------
Factorized item matrix with rank = 8
+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.72092277, 0.33...|
| 20|[0.020011581, 0.3...|
| 30|[0.059172858, 0.1...|
| 40|[0.61517817, 0.21...|
| 50|[0.37421164, 0.88...|
+---+--------------------+
only showing top 5 rows



In [15]:
print('Recommended top users (e.g. 1 top user) for all items with the corresponding predicted ratings:')
model.recommendForAllItems(1).show(5)

print('-'*50)

print('Recommended top items (e.g. 1 top item) for all users with the corresponding predicted ratings:')
model.recommendForAllUsers(1).show(5)

Recommended top users (e.g. 1 top user) for all items with the corresponding predicted ratings:
+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580| [[96311, 6.142189]]|
|   4900| [[25990, 5.686511]]|
|   5300| [[50025, 5.850458]]|
|   6620|[[129644, 5.50455...|
|   7240|[[26606, 5.0060287]]|
+-------+--------------------+
only showing top 5 rows

--------------------------------------------------
Recommended top items (e.g. 1 top item) for all users with the corresponding predicted ratings:
+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   148|[[101425, 5.76914...|
|   463|[[101425, 5.89643...|
|   471|[[101425, 5.59479...|
|   496|[[101425, 7.220426]]|
|   833|[[74754, 4.4274783]]|
+------+--------------------+
only showing top 5 rows



In [16]:
#Let see how the model perform
predictions = model.transform(test_data)
predictions.show()

+------+-------+------+-------------------+----------+
|userId|movieId|rating|          timestamp|prediction|
+------+-------+------+-------------------+----------+
| 74757|    148|   3.5|2003-09-29 16:35:35|  2.599261|
| 96393|    148|   3.0|2000-09-28 19:41:30| 3.1067977|
| 53338|    148|   1.0|1996-06-09 11:30:25| 2.5434332|
| 22684|    148|   4.0|1996-05-14 07:10:00| 2.7635562|
| 97435|    148|   4.0|2003-01-13 18:48:42| 3.0464308|
|137949|    148|   4.0|2000-02-18 21:37:43| 3.0111864|
| 19067|    148|   2.0|1996-05-30 19:07:44| 1.2519557|
| 87301|    148|   2.0|2000-11-23 02:05:35| 3.3392105|
| 88527|    148|   2.0|2000-08-07 14:48:44|  2.563173|
|108726|    148|   3.0|2000-01-25 20:23:13| 2.0031128|
| 92852|    148|   3.0|1996-08-12 01:23:51| 2.6814404|
|123246|    148|   3.0|1996-05-25 09:37:36|  2.161244|
| 20132|    148|   3.0|2002-05-19 02:36:33| 2.3097687|
| 22884|    148|   3.0|1999-12-11 21:31:08|  2.385243|
| 96427|    148|   3.0|1997-04-03 23:47:22|  3.200615|
| 10303|  

## Evaluate the predictions

- Evaluate the model by computing the RMSE on the test data

In [17]:
# check the root mean squared error
evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='rating')
rmse = evaluator.evaluate(predictions)
print('Root mean squared error of the test_data: %.4f' % rmse)


Root mean squared error of the test_data: 0.9222


In [18]:
# see historical rating of the user
user_history = train_data.filter(train_data['userId']==11)
user_history.show()

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|    11|    253|   4.5|2009-01-02 01:16:36|
|    11|    441|   1.5|2009-01-01 23:52:42|
|    11|    500|   4.5|2009-01-02 01:15:49|
|    11|    616|   4.0|2009-01-01 04:02:22|
|    11|    631|   2.0|2009-01-01 04:32:18|
|    11|    741|   4.5|2009-01-01 05:44:19|
|    11|    858|   2.5|2009-01-02 01:15:24|
|    11|   1210|   4.5|2009-01-02 01:13:45|
|    11|   1291|   4.5|2009-01-02 01:16:41|
|    11|   1527|   4.5|2009-01-02 01:17:56|
|    11|   1688|   3.0|2009-01-01 04:44:35|
|    11|   2011|   5.0|2009-01-01 04:58:45|
|    11|   2134|   5.0|2009-01-01 05:56:38|
|    11|   2378|   3.0|2009-01-02 00:24:51|
|    11|   2688|   3.5|2009-01-01 04:04:16|
|    11|   2717|   4.5|2009-01-01 05:35:10|
|    11|   2793|   5.0|2009-01-01 04:42:38|
|    11|   2916|   5.0|2009-01-01 05:47:50|
|    11|   3702|   4.0|2009-01-01 05:45:26|
|    11|   3897|   4.0|2009-01-0

In [19]:
# a list of movies we are thinking to offer
user_suggest = test_data.filter(train_data['userId']==11).select(['movieId', 'userId'])
user_suggest.show()

+-------+------+
|movieId|userId|
+-------+------+
|      1|    11|
|     10|    11|
|     19|    11|
|     32|    11|
|     39|    11|
|     65|    11|
|    110|    11|
|    145|    11|
|    150|    11|
|    153|    11|
|    158|    11|
|    160|    11|
|    165|    11|
|    170|    11|
|    172|    11|
|    173|    11|
|    185|    11|
|    208|    11|
|    231|    11|
|    256|    11|
+-------+------+
only showing top 20 rows



In [20]:
# offer movies with a high predicted rating
user_offer = model.transform(user_suggest)
user_offer.orderBy('prediction', ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|   2571|    11| 4.8414764|
|   2959|    11|  4.773497|
|  59315|    11| 4.6840982|
|  58559|    11|  4.678283|
|   1196|    11|   4.65102|
|  64034|    11| 4.6153884|
|    318|    11| 4.6108084|
|  33794|    11| 4.5910625|
|   1198|    11| 4.5810275|
|  60069|    11| 4.5555177|
|   6539|    11|   4.54848|
|    356|    11| 4.5424957|
|   1036|    11|  4.539179|
|  51662|    11|  4.516956|
|   5903|    11| 4.5067563|
|   7254|    11|  4.504723|
|   2762|    11| 4.5040073|
|    589|    11| 4.4953265|
|    260|    11|  4.486622|
|    593|    11|  4.484823|
+-------+------+----------+
only showing top 20 rows



# Reference

[spark-dataframe-and-operations](https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/)

![](https://www.analyticsvidhya.com/wp-content/uploads/2016/10/DataFrame-in-Spark.png)