# COM6012 Scalable Machine Learning 2019 - Haiping Lu
# Lab 3: Matrix factorisation for collaborative filtering recommender systems

## Objectives

* Task 1: To finish in the lab session. **Essential**
* Task 2: To finish in the lab session. **Essential**
* Task 3: To explore by yourself. **Optional but recommended**

**Suggested reading**: 
* [Collaborative Filtering in Spark](https://spark.apache.org/docs/2.3.2/ml-collaborative-filtering.html)
* [DataBricks movie recommendations tutorial](https://github.com/databricks/spark-training/blob/master/website/movie-recommendation-with-mllib.md![image.png](attachment:image.png)). [**DataBricks**](https://en.wikipedia.org/wiki/Databricks) is a company founded by the creators of Apache Spark, checking out their latest packages at [their GitHub page](https://github.com/databricks), e.g., [integration with Scikit-learn](https://github.com/databricks/spark-sklearn), [Deep Learning Pipelines for Apache Spark including TensorFlow](https://github.com/databricks/spark-deep-learning)
* [Collaborative Filtering on Wiki](http://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering)
* [Python API on ALS for recommender system](https://spark.apache.org/docs/2.3.2/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS)
* Chapter 15 (particularly Section 15.3.2) of [PySpark tutorial](https://runawayhorse001.github.io/LearningApacheSpark/pyspark.pdf) 

[**Learn PySpark APIs via Pictures**](https://github.com/jkthompson/pyspark-pictures) (**from recommended/discover repositories** in GitHub, i.e., found via **recommender systems**!)

https://github.com/haipinglu/ScalableML/

If running this notebook on HPC via [Jupyter Hub](https://jupyter-sharc.shef.ac.uk/), we need to run the following cell. If we are running this notebook on our local machine, skip the following cell.

In [None]:
import os
import subprocess
def module(*args):        
    if isinstance(args[0], list):        
        args = args[0]        
    else:        
        args = list(args)        
    (output, error) = subprocess.Popen(['/usr/bin/modulecmd', 'python'] + args, stdout=subprocess.PIPE).communicate()
    exec(output)    
module('load', 'apps/java/jdk1.8.0_102/binary')    
os.environ['PYSPARK_PYTHON'] = os.environ['HOME'] + '/.conda/envs/jupyter-spark/bin/python'

## 1. Movie recommendation via collaborative filtering



Basic setup unless using shell

In [4]:
#import findspark
#findspark.init()
import pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[2]") \
    .appName("COM6012 Collaborative Filtering RecSys") \
    .getOrCreate()

sc = spark.sparkContext

### Collaborative filtering
[Collaborative filtering](http://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) is a classic approach for recommender systems. These techniques aim to fill in the missing entries of a user-item association matrix primarily based on the matrix *itself*.  `spark.ml` currently supports **model-based** collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries, using the **alternating least squares (ALS)** algorithm. 

API: `class pyspark.ml.recommendation.ALS(rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, implicitPrefs=False, alpha=1.0, userCol='user', itemCol='item', seed=None, ratingCol='rating', nonnegative=False, checkpointInterval=10, intermediateStorageLevel='MEMORY_AND_DISK', finalStorageLevel='MEMORY_AND_DISK', coldStartStrategy='nan')`

The following parameters are available:
- *rank*: the number of latent factors in the model (defaults to 10).
- *maxIter* is the maximum number of iterations to run (defaults to 10).
- *regParam*: the regularization parameter in ALS (defaults to 1.0).
- *numUserBlocks*/*numItemBlocks*: the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
- *implicitPrefs*: whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).
- *alpha*: a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).
- *nonnegative*: whether or not to use nonnegative constraints for least squares (defaults to false).
- *coldStartStrategy*: can be set to “drop” in order to drop any rows in the DataFrame of predictions that contain NaN values (defaults to "nan", assigning NaN to a user and/or item factor is not present in the model.

### Movie recommendation

In the cells below, we present a small example of collaborative filtering with the data taken from the [MovieLens](http://grouplens.org/datasets/movielens/) project. In this notebook, we use the old 100k dataset (already downloaded in the `Data` folder but you are encouraged to view the source.

The dataset looks like this:

    196     242     3       881250949
    186     302     3       891717742
    22      377     1       878887116
    244     51      2       880606923
    ...

This is a **tab separated** list of 
    
    user id | item id | rating | timestamp 

####  Explicit vs. implicit feedback

The data above is typically viewed as a user-item matrix with the ratings as the entries and users and items determine the row and column indices. The ratings are **explicit feedback**. The *Mean Squared Error* of rating prediction can be used to evaluate the recommendation model.

The ratings can also be used differently. We can treat them as  treated as numbers representing the strength in observations of user actions, i.e., as **implicit feedback** similar to the number of clicks, or the cumulative duration someone spent viewing a movie. Such numbers are then related to the level of confidence in observed user preferences, rather than explicit ratings given to items. The model then tries to find latent factors that can be used to predict the expected preference of a user for an item.

#### Cold-start problem

The cold-start problem refers to the cases when some users and/or items in the test dataset were not present during training the model. In Spark, these users and items are either assigned `NaN` (not a number, default) or dropped (option `drop`).


In [2]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

Read the data in and split words (tab separated)

In [11]:
lines = spark.read.text("C:\Github\ScalableML\Data\MovieLens100k.data").rdd
parts = lines.map(lambda row: row.value.split("\t")) # 读取row的值，并将\t删掉并用,分开

In [17]:
lines.collect()

[Row(value='196\t242\t3\t881250949'),
 Row(value='186\t302\t3\t891717742'),
 Row(value='22\t377\t1\t878887116'),
 Row(value='244\t51\t2\t880606923'),
 Row(value='166\t346\t1\t886397596'),
 Row(value='298\t474\t4\t884182806'),
 Row(value='115\t265\t2\t881171488'),
 Row(value='253\t465\t5\t891628467'),
 Row(value='305\t451\t3\t886324817'),
 Row(value='6\t86\t3\t883603013'),
 Row(value='62\t257\t2\t879372434'),
 Row(value='286\t1014\t5\t879781125'),
 Row(value='200\t222\t5\t876042340'),
 Row(value='210\t40\t3\t891035994'),
 Row(value='224\t29\t3\t888104457'),
 Row(value='303\t785\t3\t879485318'),
 Row(value='122\t387\t5\t879270459'),
 Row(value='194\t274\t2\t879539794'),
 Row(value='291\t1042\t4\t874834944'),
 Row(value='234\t1184\t2\t892079237'),
 Row(value='119\t392\t4\t886176814'),
 Row(value='167\t486\t4\t892738452'),
 Row(value='299\t144\t4\t877881320'),
 Row(value='291\t118\t2\t874833878'),
 Row(value='308\t1\t4\t887736532'),
 Row(value='95\t546\t2\t879196566'),
 Row(value='38\t95\t

In [16]:
parts.collect()

[['196', '242', '3', '881250949'],
 ['186', '302', '3', '891717742'],
 ['22', '377', '1', '878887116'],
 ['244', '51', '2', '880606923'],
 ['166', '346', '1', '886397596'],
 ['298', '474', '4', '884182806'],
 ['115', '265', '2', '881171488'],
 ['253', '465', '5', '891628467'],
 ['305', '451', '3', '886324817'],
 ['6', '86', '3', '883603013'],
 ['62', '257', '2', '879372434'],
 ['286', '1014', '5', '879781125'],
 ['200', '222', '5', '876042340'],
 ['210', '40', '3', '891035994'],
 ['224', '29', '3', '888104457'],
 ['303', '785', '3', '879485318'],
 ['122', '387', '5', '879270459'],
 ['194', '274', '2', '879539794'],
 ['291', '1042', '4', '874834944'],
 ['234', '1184', '2', '892079237'],
 ['119', '392', '4', '886176814'],
 ['167', '486', '4', '892738452'],
 ['299', '144', '4', '877881320'],
 ['291', '118', '2', '874833878'],
 ['308', '1', '4', '887736532'],
 ['95', '546', '2', '879196566'],
 ['38', '95', '5', '892430094'],
 ['102', '768', '2', '883748450'],
 ['63', '277', '4', '875747401

We need to convert the text (`String`) into numbers (`int` or `float`) and then convert RDD to DataFrame

In [18]:
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),rating=float(p[2]), timestamp=int(p[3]))) #利用map函数将每个feature变为int或者float的数据类型
ratings = spark.createDataFrame(ratingsRDD)# 从RDD变为DataFrame

If there is a warning `RuntimeWarning: numpy.dtype size changed, may indicate binary incompatibility`, [the warning is benign](https://stackoverflow.com/questions/40845304/runtimewarning-numpy-dtype-size-changed-may-indicate-binary-incompatibility)

Check data

In [19]:
ratings.show(5)

+-------+------+---------+------+
|movieId|rating|timestamp|userId|
+-------+------+---------+------+
|    242|   3.0|881250949|   196|
|    302|   3.0|891717742|   186|
|    377|   1.0|878887116|    22|
|     51|   2.0|880606923|   244|
|    346|   1.0|886397596|   166|
+-------+------+---------+------+
only showing top 5 rows



Check data type

In [21]:
ratings.printSchema() # printSchema主要用来检查各个feature的数据类型

root
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- userId: long (nullable = true)



Prepare the training/test data.

In [22]:
(training, test) = ratings.randomSplit([0.8, 0.2]) #将数据随机分成训练和测试集

Build the recommendation model using ALS on the training data. Note we set cold start strategy to `drop` to ensure we don't get NaN evaluation metrics.

In [24]:
als = ALS(maxIter=10, regParam=0.1, userCol="userId", itemCol="movieId", ratingCol="rating", # ALS就是推荐系统算法，并用冷启动策略为drop来规避所有NaN的数值
          coldStartStrategy="drop")
model = als.fit(training) #训练模型

Evaluate the model by computing the RMSE on the test data

In [25]:
predictions = model.transform(test) #做出预测
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction") #计算RMSE值来得到评价器
rmse = evaluator.evaluate(predictions) #得到rmse数值
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.9279586442759181


Generate top 10 movie recommendations for each user

In [26]:
userRecs = model.recommendForAllUsers(10) # 应该是专门针对该算法才有的一个属性，对所有用户进行推荐物品，参数为推荐的东西个数

In [28]:
userRecs.show(5,  False)# 输出的首先是电影id，然后是评分，排列从大到小

+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                       |
+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|471   |[[1355, 5.428046], [397, 5.1798944], [680, 5.1275935], [1413, 5.009255], [1260, 4.919072], [862, 4.8222327], [909, 4.8197036], [1217, 4.7607417], [1157, 4.7469654], [1620, 4.658706]]|
|463   |[[320, 4.6432285], [1598, 4.572976], [1233, 4.536373], [19, 4.482874], [1473, 4.456006], [1278, 4.442326], [1463, 4.3482957], [1063, 4.3472424], [884, 4.295187], [190, 4.2924447]]   |
|833   |[[1368, 5.1094384], [1597, 4.469

Generate top 10 user recommendations for each movie

In [12]:
movieRecs = model.recommendForAllItems(10) # 对每一个物品推荐合适的客户，参数是用户的个数，从大到小
movieRecs.show(5, False)

+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movieId|recommendations                                                                                                                                                                      |
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1580   |[[127, 1.1051196], [166, 1.0697128], [887, 1.0459104], [688, 1.0255091], [200, 1.0179391], [337, 1.0074975], [374, 1.0005884], [196, 0.9933915], [134, 0.99123067], [97, 0.98901075]]|
|471    |[[688, 5.016009], [810, 4.814686], [849, 4.671538], [907, 4.644399], [939, 4.5389132], [164, 4.538198], [477, 4.517595], [357, 4.50101], [472, 4.490762], [636, 4.459334]]           |
|1591   |[[427, 5.870049], [4, 5.018244]

In [38]:
ratings.limit?

In [47]:
myRDD = sc.parallelize([ (1,20, 21), (1,21, 22), (1,21,20), (2,20), (2,22), (2,20), (3,21), (3,22) ])
myRDD.distinct().collect()

[(2, 20), (2, 22), (3, 21), (1, 20, 21), (1, 21, 22), (1, 21, 20), (3, 22)]

Generate top 10 movie recommendations for a specified set of users

In [48]:
users = ratings.select(als.getUserCol()).distinct().limit(3)# 选取完全不同的3个的客户
userSubsetRecs = model.recommendForUserSubset(users, 10)# 为上面选取的客户推荐10部电影
users.show()
userSubsetRecs.show(3,False)

+------+
|userId|
+------+
|    26|
|    29|
|   474|
+------+

+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                   |
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|26    |[[1463, 4.301269], [1449, 4.110512], [1643, 4.0030746], [318, 3.988646], [64, 3.937416], [1064, 3.89215], [127, 3.8710608], [1398, 3.8678012], [515, 3.861407], [963, 3.854049]]  |
|474   |[[1463, 5.4257073], [1643, 5.2696495], [1449, 5.149214], [1064, 4.9274817], [318, 4.873536], [127, 4.865032], [64, 4.8493643], [483, 4.8195662], [98, 4.793346], [1131, 4.786705

Generate top 10 user recommendations for a specified set of movies

In [49]:
movies = ratings.select(als.getItemCol()).distinct().limit(3) # 选取完全不同的3个的物品
movieSubSetRecs = model.recommendForItemSubset(movies, 10) # 为三个物品推荐10个最适配的客户
movies.show()
movieSubSetRecs.show(3,False)

+-------+
|movieId|
+-------+
|    474|
|     29|
|     26|
+-------+

+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movieId|recommendations                                                                                                                                                                  |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|26     |[[324, 4.528414], [939, 4.4603667], [252, 4.4263177], [270, 4.420497], [691, 4.34266], [801, 4.336977], [770, 4.3261523], [592, 4.3187], [30, 4.316876], [462, 4.3150434]]       |
|474    |[[810, 5.1888857], [808, 5.1277757], [583, 5.08961], [310, 4.9676676], [686, 4.9261856], [794, 4.916385], [118, 4.9160233], [296, 4.8948846], [928, 4.8936057], [355, 4.

In [56]:
dfItemFactors=model.itemFactors #展现物品的特征参数

In [84]:
dfItemFactors.features

Column<b'features'>

In [77]:
import pandas as pd

In [62]:
dfItemFactors

DataFrame[id: int, features: array<float>]

In [57]:
dfItemFactors.show()

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.36533204, 1.34...|
| 20|[-0.03900454, 1.4...|
| 30|[-0.004661479, 1....|
| 40|[0.12645723, 0.80...|
| 50|[-0.022403317, 1....|
| 60|[0.08731015, 1.41...|
| 70|[0.021964483, 1.2...|
| 80|[0.041436937, 0.2...|
| 90|[0.5424719, 1.024...|
|100|[0.27774057, 1.31...|
|110|[0.512027, 0.7915...|
|120|[0.30575147, 0.31...|
|130|[0.090320274, 0.9...|
|140|[0.60254896, 1.19...|
|150|[0.19938004, 0.92...|
|160|[0.61080897, 0.96...|
|170|[0.06997781, 1.37...|
|180|[0.14831701, 1.06...|
|190|[-0.07998923, 1.4...|
|200|[-0.041158237, 1....|
+---+--------------------+
only showing top 20 rows



**`.describe().show()` is very handy to inspect your (big) data for understanding/debugging. Try to use it more often to see.**

In [17]:
dfItemFactors.describe().show()

+-------+------------------+
|summary|                id|
+-------+------------------+
|  count|              1654|
|   mean| 830.5822249093108|
| stddev|481.47850932410614|
|    min|                 1|
|    max|              1680|
+-------+------------------+



In [52]:
allmovies = ratings.select(als.getItemCol()).distinct()
allmovies.describe().show()

+-------+-----------------+
|summary|          movieId|
+-------+-----------------+
|  count|             1682|
|   mean|            841.5|
| stddev|485.6958925088827|
|    min|                1|
|    max|             1682|
+-------+-----------------+



In [53]:
allmovies.show()

+-------+
|movieId|
+-------+
|    474|
|     29|
|     26|
|    964|
|   1677|
|     65|
|    191|
|   1224|
|    558|
|   1010|
|    418|
|   1277|
|   1258|
|    541|
|   1360|
|    222|
|    938|
|    293|
|    270|
|   1127|
+-------+
only showing top 20 rows



In [19]:
1682-1665

17

**Question**: Why is there a difference of 17?

## 2. Exercise - Further analysis of the MovieLens data (completing two or more questions is considered as completion of this exercise).
* Consider more parameter settings to observe the effecttsm e.g., different values of *rank* and/or *regParam*, `nan` vs `drop` for `coldStartStrategy`, etc.
* Use cross validation to select the best model among various parameter settings (reference: Section 15.3.2 of [PySpark tutorial](https://runawayhorse001.github.io/LearningApacheSpark/pyspark.pdf) )
* Create a standalone program that carries out collaborative filtering. Run this on a bigger [MovieLens dataset](http://grouplens.org/datasets/movielens/), e.g., 1M, 10M or 20M.

* Use 10-fold cross validation (with an 80% training and 20% testing split) to find an average mean average (or squared) error on your test data. Keep your program as parallel as possible. You can create your splits randomly (or any other way you choose!), and don't forget who has access to various variables and who doesn't...

## 3. More Recommender Systems via ALS (Optional but recommended)

### Databricks tutorial
* Complete the tasks in [quiz provided by DataBricks](https://github.com/databricks/spark-training/blob/master/machine-learning/python/MovieLensALS.py) on their data or the data from MovieLens directly. [Solution](https://github.com/databricks/spark-training/blob/master/machine-learning/python/solution/MovieLensALS.py) is posted but you are suggested to try before consulting the solution.

### Santander Kaggle competition on produce recommendation
* A recent Kaggle competition on [Santander Product Recommendation](https://www.kaggle.com/c/santander-product-recommendation) with a prize of **USD 60,000**, and **1,787 teams** participating. 
* Follow this [PySpark notebook on an ALS-based solution](https://www.elenacuoco.com/2016/12/22/alternating-least-squares-als-spark-ml/)
* Learn the way to consider **implcit preferences** and do the same for other recommendation problems.


### Stock Portfolio Recommendations
* Follow Chapter 15 of [PySpark tutorial](https://runawayhorse001.github.io/LearningApacheSpark/pyspark.pdf)  to perform [Stock Portfolio Recommendations](https://en.wikipedia.org/wiki/Portfolio_investment))
* The data can be downloaded from [Online Retail Data Set](https://archive.ics.uci.edu/ml/datasets/online+retail) at UCI. 
* Please pay attention to the **data cleaning** step that removes rows containing null value. You may need to do the same when you are dealing with real data.
* The data manipulation steps are useful to learn.

### Context-aware recommendation

* See the method in [Joint interaction with context operation for collaborative filtering](https://www.sciencedirect.com/science/article/pii/S0031320318304242?dgcid=rss_sd_all) and implement it in PySpark
* Perform the **time split recommendation** as disscussed in the paper for the above recommender systems.
