## Recomendación por filtrado colabarativo

Como ya sabemos, las recomendaciones por filtrado colaborativo, usan la propia información que los usuarios nos deja al interactuar con nuestros items. Pordemos ver la idea en el siguiente gif:

![](../img/450px-Collaborative_filtering.gif)

Hemos visto que dentro de las técnicas de filtrado colaborativo podemos distinguir entre las "basado en memoria" y las basdas en factorización de matrices.

En este notebook vamos a ver un ejemplo con Spark (pyspark) y este segundo tipo de sistemas

![](../img/memory-model-cf.jpg)

Para ello nos basamos en el siguiente [guión](https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html) (ya han dado de baja la web pero podemos usar el siguiente [link](https://web.archive.org/web/20160316113725/https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html)) del Spark Summit 2014.

### Antes de empezar

Necesitamos descargar los datos en la carpeta `../datos`

In [2]:
!ls -l /datos

ls: cannot access /datos: No such file or directory


**NOTA:** Si no existen las carpetas `ml-1m` y `tag-genome` usamos el script `descargar_movilens.sh` para descargarlos.

## Los datos

En la carpeta `ml-1m`  que contiene: 

> Stable benchmark dataset. 1 million ratings from 6000 users on 4000 movies. Released 2/2003.

Hemos descargado estos datos que son pequeños para hacer las pruebas, pero el sistema que vamos a utilizar con Spark es distribuido y lo podríamos hacer sobre un cluster con el mismo código para datos más grandes.

Los datos que incluye MovieLens son:

* `movies.dat`: Incluye el catálogo de películas separado por `::` cada campo.
* `ratings.dat`: Incluye los ratings entre usuarios y películas en este caso la puntuación (de 1 a 5) que han dado a esa película. Este archivo es nuestra matriz $M_{(n, p)}$ .
* `users.dat`: Incluye información de los usuarios pero en nuestro ejercicio no vamos a utilizar este archivo.


In [3]:
!hadoop fs -text /datos/ml-1m/movies.dat | head

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
10::GoldenEye (1995)::Action|Adventure|Thriller
text: Unable to write to output stream.


In [4]:
!hadoop fs -text /datos/ml-1m/ratings.dat | head

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
text: Unable to write to output stream.


## Incluirnos en el recomendador

Una de las características importantes de los sitemas de recomendación basados en factorización de matrices. Es que desde el entrenamiento del modelo tendremos que incluir a todos los usuarios a los que vamos a querer recomendar. Al contrario que otros modelos de *machine learning* donde una vez entrenado el modelo podemos predecir a nuevos usuarios.

Para ello vamos a incluir nuestras preferencias como un nuevo usuario y después veremos las recomendaciones que obtenemos para nosotros mismos.

¿Cómo hacemos esto?

El siguiente script en python `spark_als/bin/rateMovies` sirve para generar nuestras recomendaciones.

Una vez ejecutado se crearán nuestros ratings en el archivo `personalRatings.txt`

In [4]:
!cat personalRatings.txt

0::1::4::1583788918
0::780::4::1583788918
0::1210::5::1583788918
0::648::5::1583788918
0::344::1::1583788918
0::165::5::1583788918
0::153::4::1583788918
0::597::1::1583788918
0::1580::3::1583788918
0::231::1::1583788918


## Spark y MLlib

Para nuestro recomendador vamos a usar Spark y la librería MLlib que incluye el algoritmo ALS:    

&nbsp;<br>

![](../img/matrix_factorization.png)

Lo primero de todo comprobamos que tenemos creado el `SparkContext`:

In [10]:
import os
import sys
import pandas as pd
import numpy as np

from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

# Cargamos las funciones definidas en el archivo funciones_auxiliares.py
from funciones_auxiliares import *

In [11]:
conf = (

    SparkConf()
    .setAppName(u"Sistemas de Recomendación")
    .set("spark.executor.memory", "4g")
    .set("spark.executor.cores", "2")
    .set("spark.default.parallelism", "800")
    .set("spark.sql.shuffle.partitions", "800")
    .set("spark.submit.pyFiles", "funciones_auxiliares.py")

)

In [12]:
spark = (

    SparkSession.builder
    .config(conf=conf)
    .enableHiveSupport()
    .getOrCreate()

)

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.io.FileNotFoundException: File file:/C:/Users/jhern/Jupyter%20Notebooks/ML%20II/Recommender%20Systems/04.%20Filtrado%20Colaborativo/funciones_auxiliares.py does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
	at org.apache.spark.SparkContext.addFile(SparkContext.scala:1544)
	at org.apache.spark.SparkContext.addFile(SparkContext.scala:1508)
	at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:462)
	at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:462)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:462)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
	at java.lang.reflect.Constructor.newInstance(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)


Usamos la función `loadRatings` para cargar nuestros ratings personales: 

Definimos la carpeta donde se encuentras nuestros archivos y usamos la función `parseRating` de manera distribuida:

In [8]:
ratings_hdfs = '/datos/ml-1m/ratings.dat'

In [9]:
try:
    spark.read.options(header=False, sep="::").csv(ratings_hdfs).show()
except Exception as e: 
    print(e)

'Delimiter cannot be more than one character: ::'


In [10]:
lines = spark.sparkContext.textFile(ratings_hdfs)

In [11]:
lines.take(4)

['1::1193::5::978300760',
 '1::661::3::978302109',
 '1::914::3::978301968',
 '1::3408::4::978300275']

In [12]:
parts = lines.map(lambda row: row.split("::"))

In [13]:
parts.take(4)

[['1', '1193', '5', '978300760'],
 ['1', '661', '3', '978302109'],
 ['1', '914', '3', '978301968'],
 ['1', '3408', '4', '978300275']]

In [14]:
ratingsRDD = (
    parts
    .map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]), rating=float(p[2]), timestamp=int(p[3])))
)

In [15]:
ratingsRDD.take(4)

[Row(movieId=1193, rating=5.0, timestamp=978300760, userId=1),
 Row(movieId=661, rating=3.0, timestamp=978302109, userId=1),
 Row(movieId=914, rating=3.0, timestamp=978301968, userId=1),
 Row(movieId=3408, rating=4.0, timestamp=978300275, userId=1)]

In [16]:
ratings = ratingsRDD.toDF()

In [17]:
ratings.show(4)

+-------+------+---------+------+
|movieId|rating|timestamp|userId|
+-------+------+---------+------+
|   1193|   5.0|978300760|     1|
|    661|   3.0|978302109|     1|
|    914|   3.0|978301968|     1|
|   3408|   4.0|978300275|     1|
+-------+------+---------+------+
only showing top 4 rows



Pegamos ahora nuestros ratings al mismo `DF`:

In [18]:
myRatings = pd.read_csv(

    "personalRatings.txt",
    sep="::",
    names=["userId", "movieId", "rating", "timestamp"],
    engine='python'

)

In [19]:
myRatings

Unnamed: 0,userId,movieId,rating,timestamp
0,0,1,4,1583788918
1,0,780,4,1583788918
2,0,1210,5,1583788918
3,0,648,5,1583788918
4,0,344,1,1583788918
5,0,165,5,1583788918
6,0,153,4,1583788918
7,0,597,1,1583788918
8,0,1580,3,1583788918
9,0,231,1,1583788918


In [20]:
ratings = (

    ratings
    .union(
        spark.createDataFrame(myRatings).select(ratings.columns)
    )

).cache()

In [21]:
movies_hdfs = '/datos/ml-1m/movies.dat'

In [22]:
movies = (

    spark.sparkContext
    .textFile(movies_hdfs)
    .map(lambda x: x.split("::"))
    .map(lambda x: Row(movieId=x[0], movieTitle=x[1], genres=x[2]))
    .toDF()

).cache()

In [23]:
conteos = (

    ratings
    .select(
        F.count("*").alias("count"),
        F.countDistinct('userId').alias('userId'),
        F.countDistinct('movieId').alias('movieId')
    )

).first()

Una vez cargados ambos archivos vamos a contar el número de películas, usuarios y ratings que tenemos:

In [24]:
print("Got %d ratings from %d users on %d movies." % conteos)

Got 1000219 ratings from 6041 users on 3706 movies.


Luego siguiendo nuestra notación tenemos que:

* $n=6041$
* $p=3706$

Así que la matriz $M$ tiene un tamaño de $6041\cdot3706=22387946$ pero solo tenemos información de $1000218$, es decir un 4%.

### Ejecución ALS

In [25]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [26]:
als = ALS(

    maxIter=5, 
    regParam=0.01, 
    userCol="userId", 
    itemCol="movieId", 
    ratingCol="rating",

)

In [27]:
model = als.fit(ratings)

In [28]:
model

ALS_47a8aa4465b78508473c

In [29]:
predictions = model.transform(ratings)

In [30]:
predictions.printSchema()

root
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- userId: long (nullable = true)
 |-- prediction: float (nullable = false)



In [31]:
predictions.show()

+-------+------+----------+------+----------+
|movieId|rating| timestamp|userId|prediction|
+-------+------+----------+------+----------+
|   1580|   4.0| 963028136|  4777| 3.7535176|
|   1580|   1.0| 958306790|  5758| 2.7779112|
|   1580|   5.0| 958254615|  5759| 3.7450058|
|   1580|   4.0| 977085423|   182| 3.7428849|
|   1580|   4.0| 974330478|  2380| 3.4430072|
|   1580|   3.0| 959097998|  5621|  2.643875|
|   1580|   3.0| 977176300|   168| 3.5407069|
|   1580|   3.0| 975883616|  1952| 3.0650675|
|   1580|   5.0| 962855202|  5167| 3.6679313|
|   1580|   3.0| 977501276|   117|  3.779159|
|   1580|   4.0| 973218090|  2743| 4.0912867|
|   1580|   4.0| 972526890|  2845|  4.117525|
|   1580|   4.0| 970125552|  3056|  3.797142|
|   1580|   4.0| 966303578|  3931| 3.8660393|
|   1580|   4.0| 965352384|  4130| 3.8831992|
|   1580|   4.0| 959290430|  5580| 3.6374292|
|   1580|   5.0| 974559452|  2279| 3.7993147|
|   1580|   4.0|1006656956|  2926| 3.5773067|
|   1580|   4.0| 975600806|   687|

In [32]:
evaluator = RegressionEvaluator(
    metricName="rmse", 
    labelCol="rating",
    predictionCol="prediction"
)

In [33]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.7713123288648368


In [34]:
userRecs = model.recommendForAllUsers(10)

In [35]:
userRecs.printSchema()

root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [36]:
userRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|  3997|[[2545, 9.269251]...|
|  2122|[[572, 6.5878496]...|
|  1829|[[3867, 9.124575]...|
|  4519|[[1539, 9.595539]...|
|  5156|[[557, 6.2800536]...|
|  3175|[[1857, 7.107347]...|
|  1580|[[2562, 7.295412]...|
|  3918|[[2305, 9.726513]...|
|  1025|[[2964, 12.635993...|
|  2235|[[136, 8.588034],...|
|  1483|[[2964, 9.721584]...|
|  1990|[[1471, 9.474205]...|
|  2580|[[2332, 9.618193]...|
|  1721|[[2192, 7.7569265...|
|  4161|[[2705, 5.9256997...|
|  3179|[[681, 8.902837],...|
|  4219|[[3944, 6.4903746...|
|  4929|[[2209, 7.9432025...|
|  5117|[[136, 8.072485],...|
|  5287|[[572, 6.0133295]...|
+------+--------------------+
only showing top 20 rows



In [37]:
(

    userRecs
    .filter(""" userId = 0 """)
    .withColumn("recommendations",F.explode("recommendations"))
    .withColumn("movieId",F.col('recommendations')['movieId'])
    .withColumn("rating",F.col('recommendations')['rating'])
    .drop("recommendations")
    .join(movies, 'movieId')
    .orderBy(F.desc('rating'))
    
).toPandas()

Unnamed: 0,movieId,userId,rating,genres,movieTitle
0,2545,0,18.208933,Comedy,Relax... It's Just Sex (1998)
1,2984,0,17.591209,Documentary,On Any Sunday (1971)
2,1859,0,17.287172,Drama,Taste of Cherry (1997)
3,751,0,17.187618,Comedy,Careful (1992)
4,3867,0,16.29343,Drama,All the Rage (a.k.a. It's the Rage) (1999)
5,2964,0,15.348584,Drama,Julien Donkey-Boy (1999)
6,2246,0,14.548626,Comedy,Stars and Bars (1988)
7,2483,0,14.534881,Comedy|Horror|Thriller,"Day of the Beast, The (El D�a de la bestia) (1..."
8,3850,0,14.424432,Crime|Thriller,Whatever Happened to Aunt Alice? (1969)
9,2332,0,14.371967,Crime|Drama,Belly (1998)


In [38]:
movieRecs = model.recommendForAllItems(10)

In [39]:
movieRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1829|[[2640, 6.709895]...|
|   3175|[[4565, 6.0995197...|
|   3918|[[0, 6.7730665], ...|
|   2122|[[3587, 7.0087514...|
|   1580|[[5642, 5.815876]...|
|   3179|[[448, 5.555192],...|
|   1990|[[3396, 5.58051],...|
|   2580|[[5328, 5.34707],...|
|   1483|[[0, 10.328633], ...|
|   1721|[[1520, 6.186659]...|
|   1025|[[2441, 7.087336]...|
|   2235|[[2441, 2.5812492...|
|   1139|[[283, 5.258524],...|
|   1322|[[1664, 7.057609]...|
|     85|[[5328, 7.2474275...|
|   2525|[[5642, 6.900554]...|
|   3089|[[0, 6.6785035], ...|
|   3220|[[2441, 2.5812492...|
|   2443|[[2441, 9.088114]...|
|   2923|[[2151, 10.5367],...|
+-------+--------------------+
only showing top 20 rows



In [40]:
users = ratings.select(als.getUserCol()).distinct().limit(3)

In [41]:
users.show()

+------+
|userId|
+------+
|  1508|
|  1766|
|  1776|
+------+



In [42]:
userSubsetRecs = model.recommendForUserSubset(users, 10)

In [43]:
userSubsetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|  1210|[[3245, 8.203256]...|
|  1756|[[1471, 9.243921]...|
|  1302|[[2569, 10.503956...|
+------+--------------------+



In [44]:
userSubsetRecs.printSchema()

root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



### Entrenamiento de los parámetros

Para decidir qué parámetros utilizar en nuestro algoritmo vamos a dividir la muestra en tres trozos:
entrenamiento (60%), validación (20%) y test (20%). Para ello lo hacemos basado en el último dígito del `timestamp` 
(ver la función `parseRating` línea 12)

In [45]:
(training, test) = ratings.randomSplit([0.8, 0.2], seed=1234)

Seleccionamos ahora los posibles valores de nuestros parámetros:

In [46]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [47]:
print(ALS().explainParams())

alpha: alpha for implicit preference (default: 1.0)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
coldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'. (default: nan)
finalStorageLevel: StorageLevel for ALS model factors. (default: MEMORY_AND_DISK)
implicitPrefs: whether to use implicit preference (default: False)
intermediateStorageLevel: StorageLevel for intermediate datasets. Cannot be 'NONE'. (default: MEMORY_AND_DISK)
itemCol: column name for item ids. Ids must be within the integer value range. (default: item)
maxIter: max number of iterations (>= 0). (default: 10)
n

In [48]:
als = ALS(

    userCol="userId", 
    itemCol="movieId", 
    ratingCol="rating",
    coldStartStrategy="drop"

)

In [49]:
paramGrid = (

    ParamGridBuilder()
    .addGrid(als.rank, [10, 100])
    .addGrid(als.regParam, [0.01, 0.1])
    .addGrid(als.maxIter, [5, 10])
    .build()

)

In [50]:
crossval = CrossValidator(

    estimator=als,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=2

) 

**¡Cuidado esta parte tarda mucho!**

Un ejercicio habitual en *machine learning* es comparar el resultado de nuestro modelo con el *baseline*. En este caso con la media de los ratings y ver si nuestro modelo es mejor y en cuanto

### Modelo final
Terminamos entrenando el modelo final con todos los datos y los parámetros que hemos elegido

In [51]:
finalModel = ALS(

    regParam=0.01, #mallado.iloc[0]['regParam'],
    maxIter=20, #mallado.iloc[0]['maxIter'],
    rank=100, #mallado.iloc[0]['rank'],
    userCol="userId", 
    itemCol="movieId", 
    ratingCol="rating",
    coldStartStrategy="drop"

).fit(ratings)

In [52]:
rmse = evaluator.evaluate(finalModel.transform(ratings))
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.363889289559987


## Ver nuestras recomendaciones

Vamos a recuperar las recomendaciones según los ratings que pusimos al principio

In [53]:
userRecs = finalModel.recommendForAllUsers(20)

¿Mejor?

In [54]:
recommendations = (

    userRecs
    .filter(""" userId = 0 """)
    .withColumn("recommendations", F.explode("recommendations"))
    .withColumn("movieId", F.col('recommendations')['movieId'])
    .withColumn("rating", F.col('recommendations')['rating'])
    .drop("recommendations")
    .join(ratings, ['userId', 'movieId'], 'left_anti')
    .join(movies, 'movieId')
    .orderBy(F.desc('rating'))

).toPandas()

In [55]:
recommendations

Unnamed: 0,movieId,userId,rating,genres,movieTitle
0,1196,0,4.985356,Action|Adventure|Drama|Sci-Fi|War,Star Wars: Episode V - The Empire Strikes Back...
1,2628,0,4.852185,Action|Adventure|Fantasy|Sci-Fi,Star Wars: Episode I - The Phantom Menace (1999)
2,260,0,4.82462,Action|Adventure|Fantasy|Sci-Fi,Star Wars: Episode IV - A New Hope (1977)
3,2927,0,4.718616,Drama|Romance,Brief Encounter (1946)
4,3793,0,4.713987,Action|Sci-Fi,X-Men (2000)
5,2116,0,4.704656,Adventure|Animation|Children's|Sci-Fi,"Lord of the Rings, The (1978)"
6,1284,0,4.661451,Film-Noir|Mystery,"Big Sleep, The (1946)"
7,32,0,4.621117,Drama|Sci-Fi,Twelve Monkeys (1995)
8,3213,0,4.606632,Animation|Children's,Batman: Mask of the Phantasm (1993)
9,465,0,4.595791,Action|Drama|War,Heaven & Earth (1993)


## Entendiendo cómo se realizan las predicciones

Vamos a entender qué descomposición se ha realizado y cómo se realizan las predicciones. 
La matriz de ratings tiene tamaño $(6041\times3706)$ como hemos visto y en el entrenamiento se ha decidido utilizar 100 variables latentes luego la descomposición que hemos realizado es:
&nbsp;<br>
&nbsp;<br>

$$
{\Large
M_{(6041\times 3706)} = U_{(6041\times 100)}\;V_{(100 \times 3706)}
}
$$

¿Dónde están esas matrices calculadas?

In [56]:
6041 *  3706

22387946

In [57]:
100 * (6041 +  3706)

974700

In [58]:
finalModel.itemFactors

DataFrame[id: int, features: array<float>]

In [59]:
finalModel.userFactors

DataFrame[id: int, features: array<float>]

In [60]:
len(finalModel.itemFactors.first()['features'])

100

In [61]:
len(finalModel.userFactors.first()['features'])

100

In [62]:
finalModel.itemFactors.count()

3706

In [63]:
finalModel.userFactors.count()

6041

Como hemos visto el objeto `finalModel` además contiene varias funciones para hacer las predicciones, pero vamos a hacerlo a mano para entender cómo funciona algebráicamente. 

Nueso id de usuario es el 0 así que podemos quedarnos con la fila de la matriz $U$ que hace referencia a nuestro usuario:

In [64]:
user_feature = np.array(finalModel.userFactors.filter(""" id==0 """).first()['features'])
user_feature

array([-2.27074966e-01,  1.38208076e-01, -2.64803991e-02,  3.33696641e-02,
        3.75877678e-01,  1.97964418e-03,  3.29350948e-01, -6.45460367e-01,
        5.07412963e-02,  1.86721161e-01,  1.78169698e-01,  1.32799298e-01,
       -1.88942447e-01,  1.98551621e-02, -2.40450785e-01, -9.10611823e-03,
       -2.87065029e-01,  1.95335701e-01, -2.04408228e-01, -8.81161690e-02,
       -8.37703515e-03,  3.46145667e-02, -1.79549024e-01,  1.17066026e-01,
        2.99455255e-01, -1.08999521e-01, -5.09200990e-02,  1.18876822e-01,
       -1.41237780e-01, -9.10440758e-02,  2.96517432e-01,  2.66705062e-02,
       -1.28219515e-01, -4.31135565e-01, -1.59222141e-01,  2.27298751e-01,
        8.62219632e-02, -1.97181344e-01,  1.85575053e-01, -1.23524396e-02,
        1.55094895e-04, -6.17765747e-02, -1.72597855e-01,  3.14780861e-01,
       -2.43565604e-01, -3.39462698e-01,  2.94025224e-02,  2.02893227e-01,
       -1.64375007e-02,  2.98378766e-01, -1.92630947e-01, -1.83136929e-02,
       -3.02985613e-03,  

Recuperamos nuestra primera recomendación:

In [65]:
recommendations.head(1)

Unnamed: 0,movieId,userId,rating,genres,movieTitle
0,1196,0,4.985356,Action|Adventure|Drama|Sci-Fi|War,Star Wars: Episode V - The Empire Strikes Back...


Extraemos de la columna $V$ la columna correspondiente con esta película

In [66]:
product_features = np.array(
    finalModel.itemFactors
    .filter(F.col('id') == int(recommendations.movieId[0]))
    .first()['features']
)
product_features

array([ 0.25193983,  0.4593589 ,  0.69353467, -0.59827894,  0.17555617,
        0.35568225,  0.10605962, -0.7525779 ,  0.09290539, -0.44745201,
        0.14659329,  0.59422117, -1.06495166, -0.37317017, -0.16108066,
        0.06068132,  0.27428964,  0.42094019,  0.36652872, -0.04606232,
        0.14537959, -0.14121681,  0.20360228,  0.19725148,  0.68586254,
        0.29007748, -0.05535936,  0.00187748,  0.36029673, -0.12457822,
        0.40042466, -0.23747288, -0.20063256, -0.49935016, -0.04984265,
        0.19434354, -0.36708063,  0.37541667, -0.027764  ,  0.07531852,
        0.67511994, -0.31004584, -0.02863843,  1.03480291,  0.02380848,
       -0.30676618,  0.09542111, -0.13920975,  0.04061755,  0.33759603,
       -0.0051939 , -0.45956063,  0.00498535,  0.40992159,  0.57044476,
       -0.71875125,  0.35103998,  0.00398565,  0.26617759, -0.67313874,
        0.25990111, -0.10224196,  0.03150572, -0.39903805,  0.79356968,
        0.15292095,  0.50998664, -0.40732798,  0.16901854,  0.18

Para terminar, es fácil de comprobar que matemáticamente:
$$
{\Large
m_{ij} =\; <u_i, v_j>
}
$$

Es decir, el rating del usuario $i$ y el item $j$ es el producto escalar de la fila  $i$-esima de la matriz $U$ y la columna $j$-esima de la matriz $V$

In [67]:
np.dot(user_feature, product_features)

4.985357844265989

### Guardamos las dos matrices de manera local

Para terminar vamos a guardar las dos matrices de manera local para poder usarlas más tarde

In [68]:
item_factors = (

    finalModel
    .itemFactors

).toPandas()

In [69]:
item_factors.to_json('item_factors.json', orient='records')

In [70]:
user_factors = (

    finalModel
    .userFactors

).toPandas()

In [71]:
user_factors.to_json('user_factors.json', orient='records')

## Ejercicio

Crear un sistema de recomendaciones para los datos de **last.fm**

In [72]:
import os
import sys
import pandas as pd
import numpy as np

from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

# Cargamos las funciones definidas en el archivo funciones_auxiliares.py
from funciones_auxiliares import *

In [73]:
conf = (

    SparkConf()
    .setAppName(u"Sistemas de Recomendación")
    .set("spark.executor.memory", "4g")
    .set("spark.executor.cores", "2")
    .set("spark.default.parallelism", "800")
    .set("spark.sql.shuffle.partitions", "800")
    .set("spark.submit.pyFiles", "funciones_auxiliares.py")

)

In [74]:
spark = (

    SparkSession.builder
    .config(conf=conf)
    .enableHiveSupport()
    .getOrCreate()

)

<center>

![image](../img/logo_lastfm.png)
    
</center>

http://ocelma.net/MusicRecommendationDataset/lastfm-360K.html

https://musicbrainz.org

In [75]:
!hadoop fs -text /datos/lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv | head

00000c289a1829a808ac09c00daf10bc3c4e223b	3bd73256-3905-4f3a-97e2-8b341527f805	betty blowtorch	2137
00000c289a1829a808ac09c00daf10bc3c4e223b	f2fb0ff0-5679-42ec-a55c-15109ce6e320	die Ärzte	1099
00000c289a1829a808ac09c00daf10bc3c4e223b	b3ae82c2-e60b-4551-a76d-6620f1b456aa	melissa etheridge	897
00000c289a1829a808ac09c00daf10bc3c4e223b	3d6bbeb7-f90e-4d10-b440-e153c0d10b53	elvenking	717
00000c289a1829a808ac09c00daf10bc3c4e223b	bbd2ffd7-17f4-4506-8572-c1ea58c3f9a8	juliette & the licks	706
00000c289a1829a808ac09c00daf10bc3c4e223b	8bfac288-ccc5-448d-9573-c33ea2aa5c30	red hot chili peppers	691
00000c289a1829a808ac09c00daf10bc3c4e223b	6531c8b1-76ea-4141-b270-eb1ac5b41375	magica	545
00000c289a1829a808ac09c00daf10bc3c4e223b	21f3573f-10cf-44b3-aeaa-26cccd8448b5	the black dahlia murder	507
00000c289a1829a808ac09c00daf10bc3c4e223b	c5db90c4-580d-4f33-b364-fbaa5a3a58b5	the murmurs	424
00000c289a1829a808ac09c00daf10bc3c4e223b	0639533a-0402-40ba-b6e0-18b067198b73	lunachicks	403
text: Unable to write to ou

In [76]:
esquema = T.StructType([

    T.StructField('user_id',T.StringType(),True),
    T.StructField('artist_id',T.StringType(),True),
    T.StructField('artist_name',T.StringType(),True),
    T.StructField('plays',T.DoubleType(),True)

])

In [77]:
plays = (

    spark.read
    .csv('/datos/lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv', schema=esquema, sep='\t')
    .na.drop()

)

In [78]:
plays.show()

+--------------------+--------------------+--------------------+------+
|             user_id|           artist_id|         artist_name| plays|
+--------------------+--------------------+--------------------+------+
|00000c289a1829a80...|3bd73256-3905-4f3...|     betty blowtorch|2137.0|
|00000c289a1829a80...|f2fb0ff0-5679-42e...|           die Ärzte|1099.0|
|00000c289a1829a80...|b3ae82c2-e60b-455...|   melissa etheridge| 897.0|
|00000c289a1829a80...|3d6bbeb7-f90e-4d1...|           elvenking| 717.0|
|00000c289a1829a80...|bbd2ffd7-17f4-450...|juliette & the licks| 706.0|
|00000c289a1829a80...|8bfac288-ccc5-448...|red hot chili pep...| 691.0|
|00000c289a1829a80...|6531c8b1-76ea-414...|              magica| 545.0|
|00000c289a1829a80...|21f3573f-10cf-44b...|the black dahlia ...| 507.0|
|00000c289a1829a80...|c5db90c4-580d-4f3...|         the murmurs| 424.0|
|00000c289a1829a80...|0639533a-0402-40b...|          lunachicks| 403.0|
|00000c289a1829a80...|a342964d-ca53-4e5...|    walls of jericho|

In [79]:
spark.stop()