## 1. Explicit example

### 1.1 Preparing the data

In [0]:
from pyspark.sql.functions import split, to_timestamp
ratings = (spark.read.text("/databricks-datasets/definitive-guide/data/sample_movielens_ratings.txt")
         .select(split("value", "::").alias("column"))
#               .selectExpr("split(value, '::') as col")
         .selectExpr(
             "cast(column[0] as int) as userId",
             "cast(column[1] as int) as movieId",
             "cast(column[2] as float) as rating",
             "cast(column[3] as long) as ts",
         ).withColumn("timestamp", to_timestamp('ts')))
# pay attention it is not read.format('txt')
ratings.show()
ratings.count()

+------+-------+------+----------+-------------------+
|userId|movieId|rating|        ts|          timestamp|
+------+-------+------+----------+-------------------+
|     0|      2|   3.0|1424380312|2015-02-19 21:11:52|
|     0|      3|   1.0|1424380312|2015-02-19 21:11:52|
|     0|      5|   2.0|1424380312|2015-02-19 21:11:52|
|     0|      9|   4.0|1424380312|2015-02-19 21:11:52|
|     0|     11|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     12|   2.0|1424380312|2015-02-19 21:11:52|
|     0|     15|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     17|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     19|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     21|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     23|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     26|   3.0|1424380312|2015-02-19 21:11:52|
|     0|     27|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     28|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     29|   1.0|1424380312|2015-02-19 21:11:52|
|     0|  

### 1.2 Building ALS Model

### 1.2.1 Building a first Model

In [0]:
train.show()

+------+-------+------+----------+-------------------+
|userId|movieId|rating|        ts|          timestamp|
+------+-------+------+----------+-------------------+
|     0|      2|   3.0|1424380312|2015-02-19 21:11:52|
|     0|      3|   1.0|1424380312|2015-02-19 21:11:52|
|     0|      9|   4.0|1424380312|2015-02-19 21:11:52|
|     0|     11|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     12|   2.0|1424380312|2015-02-19 21:11:52|
|     0|     17|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     21|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     23|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     26|   3.0|1424380312|2015-02-19 21:11:52|
|     0|     27|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     29|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     30|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     31|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     34|   1.0|1424380312|2015-02-19 21:11:52|
|     0|     37|   1.0|1424380312|2015-02-19 21:11:52|
|     0|  

In [0]:
from pyspark.ml.recommendation import ALS
train, test = ratings.randomSplit([0.8, 0.2], seed=42)

als = (ALS(nonnegative=True)
      .setMaxIter(10)
      .setRegParam(0.01)
      .setUserCol("userId")
      .setItemCol("movieId")
      .setRatingCol("rating")
      .setColdStartStrategy("drop")       
      )
# Use .setColdStartStrategy("drop") so that the model can deal with missing values. If not the metric will be NaN
# ne melanger pas missing value in train set and User-item matrix. Here we are in the train set, there is no reason to have missing value, or objective is to fill missing value in User-item matrix

print(als.explainParams())

als_fitted = als.fit(train)

predictions = als_fitted.transform(test)
type(als)


In [0]:
from pyspark.sql.functions import col
predictions.filter(col('userId') ==0 ).show(20)

predictions.count() # 258
test.count() # 258

+------+-------+------+----------+-------------------+----------+
|userId|movieId|rating|        ts|          timestamp|prediction|
+------+-------+------+----------+-------------------+----------+
|     0|      5|   2.0|1424380312|2015-02-19 21:11:52| 1.2601709|
|     0|     15|   1.0|1424380312|2015-02-19 21:11:52|  0.667108|
|     0|     19|   1.0|1424380312|2015-02-19 21:11:52| 2.2614455|
|     0|     28|   1.0|1424380312|2015-02-19 21:11:52|  4.976384|
|     0|     41|   2.0|1424380312|2015-02-19 21:11:52|0.53107774|
|     0|     47|   1.0|1424380312|2015-02-19 21:11:52| 1.6459627|
|     0|     59|   2.0|1424380312|2015-02-19 21:11:52|  2.034288|
|     0|     71|   1.0|1424380312|2015-02-19 21:11:52| 1.6430756|
|     0|     95|   2.0|1424380312|2015-02-19 21:11:52| 1.1799268|
|     0|     96|   1.0|1424380312|2015-02-19 21:11:52| 1.1346163|
|     0|     98|   1.0|1424380312|2015-02-19 21:11:52|0.90535057|
+------+-------+------+----------+-------------------+----------+

Out[53]: 

In [0]:
#### 1.2.2 tuning our hyperparameters

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# create the parameter grid
params = ParamGridBuilder().addGrid(als.regParam, [.01, .05, .1, .15]).addGrid(als.rank, [10, 50, 100, 150]).build()
#instantiating crossvalidator estimator
cv = CrossValidator(estimator=als, estimatorParamMaps=params, evaluator=evaluator, parallelism=4)
best_model = cv.fit(ratings)
model = best_model.bestModel
type(model)

list(zip(best_model.avgMetrics, params))

predictions = model.transform(test)

### 1.3 Evaluators for Recommendation
#### 1.3.1 Regression Metrics

In [0]:
predictions.show(5)

+------+-------+------+----------+-------------------+----------+
|userId|movieId|rating|        ts|          timestamp|prediction|
+------+-------+------+----------+-------------------+----------+
|    28|     13|   2.0|1424380312|2015-02-19 21:11:52| 2.0092368|
|    28|     49|   4.0|1424380312|2015-02-19 21:11:52|  3.498866|
|    28|     50|   1.0|1424380312|2015-02-19 21:11:52| 1.1469705|
|    28|     56|   1.0|1424380312|2015-02-19 21:11:52| 1.1156723|
|    28|     57|   3.0|1424380312|2015-02-19 21:11:52| 2.5464206|
+------+-------+------+----------+-------------------+----------+
only showing top 5 rows



In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = (RegressionEvaluator()
            .setMetricName("rmse")
            .setLabelCol("rating")
            .setPredictionCol("prediction"))

evaluator.evaluate(predictions) # 1.354594931101075

# After tuning, much better 0.42416345097443475

Out[115]: 0.42416345097443475

In [0]:
# [OPTIONNAL] une autre facon de faire mais cest exactement la meme chose et meme resultat
# sauf ici on recupere un objet "metrics" qui nous permet de prendre plusieurs metric. Dans la methode en haut il faudrait changer .setMetricName("rmse")
from pyspark.mllib.evaluation import RegressionMetrics
regComparison = predictions.select("rating", "prediction").rdd.map(lambda x : (x[0], x[1]))
metrics = RegressionMetrics(regComparison)
metrics.rootMeanSquaredError # 0.42416345097443475
metrics.meanAbsoluteError # 0.3069587398407071

Out[118]: 0.3069587398407071

#### 1.3.2 Regression Metrics
* More interesting, RankingMetric does not focus on the value of the rating but rather whether or not our algorithm recommends an already ranked item again to a user.
* However, Ranking Metrics is implemented for RDD but not DF

In [0]:
from pyspark.mllib.evaluation import RankingMetrics, RegressionMetrics
from pyspark.sql.functions import collect_list, desc

In [0]:
perUserActual = (predictions.where(col("rating")>1)
                 .orderBy("userId", desc("rating"))
                 .groupby("userId").agg(collect_list("movieId").alias("movies")))

perUserActual.filter(col("userId") ==2).show(5, False) #2     |[8, 40, 66, 71

# for testing and showing
(predictions.where(col("rating")>1).filter(col("userId") ==2)
                 .orderBy("userId", desc("rating")).show())



+------+---------------+
|userId|movies         |
+------+---------------+
|2     |[8, 40, 66, 71]|
+------+---------------+

+------+-------+------+----------+-------------------+----------+
|userId|movieId|rating|        ts|          timestamp|prediction|
+------+-------+------+----------+-------------------+----------+
|     2|      8|   5.0|1424380312|2015-02-19 21:11:52| 4.2671285|
|     2|     40|   4.0|1424380312|2015-02-19 21:11:52|  2.889827|
|     2|     66|   3.0|1424380312|2015-02-19 21:11:52|  2.737658|
|     2|     71|   3.0|1424380312|2015-02-19 21:11:52| 2.6057885|
+------+-------+------+----------+-------------------+----------+



In [0]:
perUserprediction = (predictions.where(col("prediction")>1)
                 .orderBy("userId", desc("prediction"))
                 .groupby("userId").agg(collect_list("movieId").alias("movies")))

perUserprediction.show(5, False)

# for testing and showing
(predictions.where(col("prediction")>1).filter(col("userId") ==2)
                 .orderBy("userId", desc("prediction")).show())

+------+---------------------------------------------------------------+
|userId|movies                                                         |
+------+---------------------------------------------------------------+
|28    |[49, 57, 13, 95, 94, 59, 50, 56]                               |
|26    |[88, 68, 54, 36, 18, 40, 35, 76]                               |
|27    |[90, 28, 93, 91, 98, 40]                                       |
|12    |[64, 35, 50, 16, 7, 72, 52, 83]                                |
|22    |[51, 74, 88, 68, 69, 18, 90, 70, 6, 40, 10, 78, 37, 82, 99, 44]|
+------+---------------------------------------------------------------+
only showing top 5 rows

+------+-------+------+----------+-------------------+----------+
|userId|movieId|rating|        ts|          timestamp|prediction|
+------+-------+------+----------+-------------------+----------+
|     2|      8|   5.0|1424380312|2015-02-19 21:11:52| 4.2671285|
|     2|     40|   4.0|1424380312|2015-02-19 21:11:52|

In [0]:
perUserActual.join(perUserprediction, ['userId']).where("userId ==2").show(5, False)

# Only implemented in RDD so we have to transforme or df to rdd
perUserActualvPred = perUserActual.join(perUserprediction, ['userId']).rdd.map(lambda row : (row[1], row[2]))

type(perUserActualvPred)

+------+---------------+---------------------------+
|userId|movies         |movies                     |
+------+---------------+---------------------------+
|2     |[8, 40, 66, 71]|[8, 40, 66, 71, 47, 35, 50]|
+------+---------------+---------------------------+

Out[121]: pyspark.rdd.PipelinedRDD

In [0]:
# how to show a Rdd
dataColl=perUserActualvPred.collect()
for row in dataColl:
    print(row[0] , row[1])

[49, 57, 13, 95] [94, 49, 57, 13, 59, 56, 95, 50]
[88, 36, 54, 68, 18] [88, 76, 18, 36, 68, 54, 40, 35]
[35, 64, 16, 50, 7] [64, 50, 35, 77, 41, 7]
[51, 74, 88, 68, 69, 18, 70, 6, 90] [90, 68, 51, 18, 88, 74, 37, 6, 69, 10, 70, 78, 40, 44, 66, 99]
[2, 4] [4, 47, 2, 63]
[18, 72, 14, 88] [88, 18, 98, 71, 14, 72, 15]
[2, 71] [2, 26, 89, 0, 71, 68]
[54] [46, 57]
[51] [81, 51, 43, 58]
[51, 2, 45] [51, 45, 26, 76, 93, 78, 2, 40]
[90, 13, 48, 21] [90, 88, 48, 13, 21, 28, 39, 84, 31]
[90, 94, 98, 74] [90, 94, 96, 74, 72, 98, 45, 34, 33]
[64, 22] [51, 85, 6, 34, 22]
[32, 43, 64, 90, 95, 37, 77] [90, 64, 32, 58, 88, 95, 37]
[46, 10, 49] [49, 10, 84, 11, 59, 37]
[52, 87] [87, 22, 52]
[60, 3, 92] [92, 45, 3, 86, 18, 60]
[73, 33] [73, 33, 38, 0, 4]
[39, 76] [26, 48]
[4, 16] [10, 17, 4]
[12, 16, 84] [2, 40, 43, 12, 16]
[69, 90, 77, 68] [77, 68, 95, 69, 90, 87, 98]
[19, 7, 51] [7, 19, 51, 70, 85, 83]
[87, 40] [12, 40, 45, 38, 87, 54]
[48, 69, 66, 35, 51, 71, 6] [47, 48, 66, 35, 51, 6, 71, 11, 69, 77,

In [0]:
ranks = RankingMetrics(perUserActualvPred)
ranks.meanAveragePrecision # 0.4027830646365129
ranks.precisionAt(3) # 0.7011494252873564 # Compute the average precision of all the queries, truncated at ranking position k

# Apres tuning, better performence 0.48202654625068414
# ranks.precisionAt(3) = 0.8390804597701149

Out[124]: 0.8390804597701149

#### 1.4 Making recommendation

In [0]:
als_fitted.recommendForAllUsers(10).orderBy('userId').show(20, False) ## output the top 10 recommendations for each user

+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                          |
+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0     |[{28, 4.976384}, {53, 4.6533184}, {9, 3.689191}, {76, 3.688112}, {92, 3.5214987}, {2, 3.058579}, {74, 2.996165}, {26, 2.7998185}, {81, 2.5798297}, {12, 2.5289855}]      |
|1     |[{22, 3.9353995}, {68, 3.6685915}, {62, 3.3762326}, {77, 3.0090818}, {28, 2.9866629}, {90, 2.8740778}, {94, 2.8349586}, {9, 2.5197878}, {85, 2.5134313}, {75, 2.489832}] |
|2     |[{2, 7.161218}, {93, 5.000541}, {71, 4.9900684}, {83, 4.886592}, {39, 4.807678}, {11, 4.597951}, 

In [0]:
als_fitted.recommendForAllItems(5).show(20, False) # output the top 5 recommendations user for movies

+-------+------------------------------------------------------------------------------------+
|movieId|recommendations                                                                     |
+-------+------------------------------------------------------------------------------------+
|20     |[{17, 4.5484686}, {12, 3.623176}, {23, 3.5507388}, {5, 3.0708005}, {10, 2.5509143}] |
|40     |[{10, 3.8552082}, {12, 3.348205}, {17, 3.2712836}, {6, 3.0019913}, {4, 2.8416793}]  |
|10     |[{17, 4.4298344}, {12, 4.285824}, {23, 3.7058458}, {10, 2.8956482}, {28, 2.7350574}]|
|50     |[{23, 4.087554}, {11, 3.9969647}, {29, 3.3832972}, {5, 3.04032}, {27, 2.956647}]    |
|80     |[{26, 5.909457}, {3, 3.9757066}, {22, 3.1074533}, {13, 3.0845206}, {11, 2.9974697}] |
|70     |[{4, 3.873875}, {6, 3.4831345}, {7, 2.946424}, {26, 2.7985535}, {28, 2.6751142}]    |
|60     |[{26, 3.4489312}, {3, 3.0222995}, {22, 2.6350856}, {13, 2.589878}, {4, 2.3404317}]  |
|90     |[{23, 4.9897566}, {5, 4.80008}, {16, 4.79

## 2. Implicit example (in chapitre 28 recommendation)

In [0]:
raw_user_artist_data = spark.read.text("/FileStore/tables/user_artist_data_small.txt")

raw_user_artist_data.count()

raw_user_artist_data.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-1550011534196587>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mraw_user_artist_data[0m [0;34m=[0m [0mspark[0m[0;34m.[0m[0mread[0m[0;34m.[0m[0mtext[0m[0;34m([0m[0;34m"/FileStore/tables/user_artist_data_small.txt"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m [0;34m[0m[0m
[1;32m      3[0m [0mraw_user_artist_data[0m[0;34m.[0m[0mcount[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m [0;34m[0m[0m
[1;32m      5[0m [0mraw_user_artist_data[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m