# Recommender systems

## Movielens 100k dataset

### Caricamento librerie

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

### Avvio sessione Spark

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

### Caricamento csv

In [3]:
df = spark.read.csv("data/ratings.csv", sep=",", header=True, inferSchema=True)

In [4]:
df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



### Visualizzazione schema dataset
* user -> integer
* item -> integer
* rating -> double

In [5]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



### Divisione dataset in training e test set

In [6]:
training, test = df.randomSplit(weights=[0.8, 0.2], seed=42)

### Creazione del modello ALS e fit sui dati di training
* paper: https://www.researchgate.net/publication/220788980_Large-Scale_Parallel_Collaborative_Filtering_for_the_Netflix_Prize
* documentazione: https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS
* codice sorgente: https://spark.apache.org/docs/2.2.0/api/python/_modules/pyspark/ml/recommendation.html#ALS
* tuning degli iperparametri: https://gist.github.com/KevinLiao159/9f69049d6d3d8a096c0ea08dbc29591b#file-tune_als-py

In [7]:
# impostando il parametro implicitPrefs=True è possibile effettuare previsioni nei casi in cui le valutazioni
# sono incluse in un intervallo tra 0 ed 1 (ovviamente la metrica di riferimento non sarà più il rmse)
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(training)

### Previsione sui dati di test

In [8]:
predictions = model.transform(test)
predictions.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   133|    471|   4.0| 843491793| 2.3973393|
|   599|    471|   2.5|1498518822| 2.2770236|
|   387|    471|   3.0|1139047519|  2.641001|
|   520|    471|   5.0|1326609921| 3.7206843|
|   287|    471|   4.5|1110231536| 2.1928751|
|   469|    471|   5.0| 965425364| 3.0696914|
|   414|    471|   5.0| 961514069| 3.1914182|
|   260|    471|   4.5|1109409455| 3.0017877|
|   373|    471|   5.0| 846830388| 2.6672652|
|   357|    471|   3.5|1348627082|  3.872034|
|   492|    833|   4.0| 863976674| 2.8625958|
|   463|   1088|   3.5|1145460096| 3.1891673|
|    41|   1088|   1.5|1458939142|   2.90814|
|   594|   1088|   4.5|1109035643|  4.220158|
|   391|   1088|   1.0|1030824424|  3.216228|
|    10|   1088|   3.0|1455619275|  3.429026|
|   414|   1088|   3.0| 961514273| 3.4413023|
|    68|   1088|   3.5|1158534614| 3.3752687|
|    19|   1238|   3.0| 965705784|

### Valutazione dei risultati sui dati di test

In [9]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [10]:
rmse = evaluator.evaluate(predictions)
rmse

0.8851835186708817

### Generazione di 10 raccomandazioni di film per ogni utente

In [11]:
user_recs = model.recommendForAllUsers(10)
user_recs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[26326, 4.760275...|
|   463|[[7842, 5.078032]...|
|   496|[[51931, 4.832078...|
|   148|[[98491, 4.695774...|
|   540|[[69524, 5.145454...|
|   392|[[166534, 5.30136...|
|   243|[[33834, 5.870501...|
|    31|[[33649, 5.233990...|
|   516|[[4429, 4.843769]...|
|   580|[[141718, 4.91951...|
|   251|[[3925, 5.719379]...|
|   451|[[177593, 5.23877...|
|    85|[[51931, 5.444975...|
|   137|[[6650, 4.6907825...|
|    65|[[3925, 4.7745056...|
|   458|[[932, 5.252403],...|
|   481|[[3451, 4.6622014...|
|    53|[[33649, 6.738617...|
|   255|[[33834, 6.289697...|
|   588|[[96004, 4.825836...|
+------+--------------------+
only showing top 20 rows



>### Conversione dei risultati in dataframe pandas

In [12]:
df_user_recs = user_recs.toPandas()
df_user_recs.head()

Unnamed: 0,userId,recommendations
0,471,"[(26326, 4.760274887084961), (945, 4.674196243..."
1,463,"[(7842, 5.07803201675415), (171495, 4.84267044..."
2,496,"[(51931, 4.832077980041504), (3473, 4.73790550..."
3,148,"[(98491, 4.695774078369141), (5666, 4.61504220..."
4,540,"[(69524, 5.145453929901123), (28, 5.0389614105..."


### Generazione di 10 raccomandazioni di utenti per ogni film

In [13]:
movie_recs = model.recommendForAllItems(10)
movie_recs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580|[[53, 5.071892], ...|
|   4900|[[53, 4.7231035],...|
|   5300|[[53, 4.1715555],...|
|   6620|[[236, 4.8972793]...|
|   7340|[[413, 4.6364503]...|
|  32460|[[53, 5.598545], ...|
|  54190|[[53, 5.5879984],...|
|    471|[[43, 5.113896], ...|
|   1591|[[53, 4.0478888],...|
| 140541|[[576, 4.386676],...|
|   1342|[[373, 3.9129126]...|
|   2122|[[295, 4.6826873]...|
|   2142|[[99, 3.7577298],...|
|   7982|[[547, 5.1920986]...|
|  44022|[[543, 4.892355],...|
| 141422|[[99, 2.9341936],...|
| 144522|[[53, 2.913804], ...|
|    833|[[461, 3.4337633]...|
|   5803|[[53, 4.141196], ...|
|   7833|[[393, 4.143931],...|
+-------+--------------------+
only showing top 20 rows



>### Conversione dei risultati in dataframe pandas

In [14]:
df_movie_recs = movie_recs.toPandas()
df_movie_recs.head()

Unnamed: 0,movieId,recommendations
0,1580,"[(53, 5.071891784667969), (475, 4.370511531829..."
1,4900,"[(53, 4.7231035232543945), (154, 4.53876638412..."
2,5300,"[(53, 4.171555519104004), (191, 4.139414310455..."
3,6620,"[(236, 4.897279262542725), (53, 4.887851238250..."
4,7340,"[(413, 4.636450290679932), (53, 4.115792751312..."


### Generazione di 10 raccomandazioni per degli utenti specifici

In [15]:
users_list = (1, 20, 300)
users = df.select("userId").filter(f"userId in {users_list}")
user_sub_recs = model.recommendForUserSubset(users, 10)
user_sub_recs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   300|[[26326, 5.308459...|
|     1|[[25771, 5.681392...|
|    20|[[1262, 5.473758]...|
+------+--------------------+



### Generazione di 10 raccomandazioni per dei film specifici

In [16]:
movies_list = (1, 20, 300)
movies = df.select("movieId").filter(f"movieId in {movies_list}")
movie_sub_recs = model.recommendForItemSubset(movies, 10)
movie_sub_recs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|    300|[[53, 4.994001], ...|
|      1|[[53, 5.289158], ...|
|     20|[[53, 4.2887683],...|
+-------+--------------------+

