# Construcción de un recomendador con Spark

## 1. Introducción
El objetivo general de la actividad será construir un recomendador de películas utilizando el método ALS (Alternating Least Squares), con un dataset que contiene las puntuaciones otorgadas por los usuarios a distintas películas.

Dividiremos la actividad en dos apartados, en el primero prepararemos y entrenaremos el modelo, y en el segundo evaluaremos el resultado y realizaremos algunas recomendaciones utilizando el modelo de recomendación.

## 2. Entrenamiento del modelo

Comenzamos importando las liberías y configurando Spark para que se ejecute en modo local así como establecemos el nombre de nuestra aplicación, que en este caso será "movies_recommender". Tras esto, inicializamos Spark.

In [30]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col, explode

spark_conf = SparkConf().setMaster("local").setAppName("movies_recommender")

spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()

Empezamos cargando los datos de puntuación de los usuarios, el cual se trata de un dataset en csv con cuatro columnas:
- El usuario (user_id).
- El id de la película (item_id).
- La puntuación (rating).
- La fecha (timestamp), que en este caso no nos será necesaria.

Mostramos unas líneas del dataframe y el esquema, para asegurarnos de que se ha cargado correctamente.

In [64]:
data = spark.read.options(delimiter="\t", header=False, inferSchema=True).csv("./ml-100k/u.data").toDF("user_id", "item_id", "rating", "timestamp")

data.show(5)
data.printSchema()

+-------+-------+------+---------+
|user_id|item_id|rating|timestamp|
+-------+-------+------+---------+
|    196|    242|     3|881250949|
|    186|    302|     3|891717742|
|     22|    377|     1|878887116|
|    244|     51|     2|880606923|
|    166|    346|     1|886397596|
+-------+-------+------+---------+
only showing top 5 rows

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)



También cargamos el archivo que contiene información detallada de las películas, que aunque no es necesario para entrenar el recomendador, lo utilizaremos más adelante para poder visualizarlo de una forma más "human-friendly", utilizando los títulos de las películas en lugar del id.

In [65]:
movies = spark.read.options(delimiter="|", header=False, inferSchema=True).csv("./ml-100k/u.item").select(col("_c0").alias("item_id"), col("_c1").alias("tittle"))
movies.show(5)

+-------+-----------------+
|item_id|           tittle|
+-------+-----------------+
|      1| Toy Story (1995)|
|      2| GoldenEye (1995)|
|      3|Four Rooms (1995)|
|      4|Get Shorty (1995)|
|      5|   Copycat (1995)|
+-------+-----------------+
only showing top 5 rows



El siguiente paso es separar el conjunto de datos en dos subconjuntos, uno de entrenamiento y otro de test que utilizaremos respectivamente para entrenar el modelo y para evaluarlo. Utilizamos `.cache()` sobre el dataframe de entrenamiento para mantenerlo en memoria y mejorar el rendimiento.

In [68]:
splits= data.randomSplit([0.7, 0.3], 1234)

training_df = splits[0]
training_df.cache()
test_df = splits[1]

Una vez realizado todo lo anterior, ya podemos preparar el entrenamiento de nuestro recomendador. Para ello debemos indicarle qué columna del dataframe representa a los usuarios (user_id), qué columna representa los elementos a recomendar (item_id) y qué columna representa las puntuaciones (rating). Por otro lado, utilizamos el parámetro `coldStartStrategy="drop"` para evitar que aparezcan valores "nan" en caso de que existan usuarios del conjunto de pruebas que no aparezcan en el de entrenamiento durante la validación cruzada.

Para el proceso de validación cruzada, preparamos un grid con varios valores para los parámetros `rank` y `regParam`, un evaluador en base a la métrica RMSE, y finalmente, entrenamos el modelo.

In [69]:
als = ALS(userCol="user_id", itemCol="item_id", ratingCol="rating", coldStartStrategy="drop") 
als_grid = ParamGridBuilder().addGrid(als.rank, [10, 50, 100]).addGrid(als.regParam, [0.01, 0.05, 0.1]).build()
als_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
cv = CrossValidator(estimator=als, estimatorParamMaps=als_grid, evaluator=als_evaluator, numFolds=5)

#entrenamos
cv_model = cv.fit(training_df)


## 3. Evaluación y predicciones

Teniendo ya entrenado a nuestro recomendador, pasaremos a aplicar el modelo sobre el conjunto de test y evaluar la calidad del resultado mediante el valor de RMSE.

Al ejecutar el bloque de código que aparece a continuación podremos comprobar que obtenemos un valor de 0.93, que teniendo en cuenta el rango de las puntuaciones (de 1 a 5), parece un margen de error aceptable de cara a obtener recomendaciones que serán del agrado de los usuarios.

In [71]:
prediction_test = cv_model.transform(test_df)

prediction_test.select("rating", "prediction").show(5)

print("RMSE en test: {}".format(als_evaluator.evaluate(prediction_test, {als_evaluator.metricName: "rmse"})))

+------+----------+
|rating|prediction|
+------+----------+
|     3| 1.9691514|
|     4| 4.1209908|
|     5|  3.698054|
|     2|  3.153656|
|     4|  3.020012|
+------+----------+
only showing top 5 rows

RMSE en test: 0.9255832687243486


Una vez validado el resultado, podemos utilizar la función `recommendForAllUsers` para generar un nuevo dataframe que contenga recomendaciones para los usuarios.

In [72]:
recommendations = cv_model.bestModel.recommendForAllUsers(3)

recommendations.show(truncate=False)

+-------+-------------------------------------------------------+
|user_id|recommendations                                        |
+-------+-------------------------------------------------------+
|471    |[[102, 4.5626817], [140, 4.5159774], [8, 4.4912596]]   |
|463    |[[887, 4.380709], [19, 4.2427382], [221, 4.145413]]    |
|833    |[[1597, 4.6237516], [1187, 4.492218], [1019, 4.444743]]|
|496    |[[56, 4.549359], [320, 4.4464393], [190, 4.2504497]]   |
|148    |[[169, 4.957275], [408, 4.806561], [173, 4.714294]]    |
|540    |[[1449, 4.706043], [318, 4.627708], [169, 4.6172495]]  |
|392    |[[1463, 4.9152503], [483, 4.8581896], [199, 4.7037654]]|
|243    |[[1449, 4.47582], [511, 4.351343], [582, 4.343897]]    |
|623    |[[496, 4.719715], [318, 4.5986795], [50, 4.5773196]]   |
|737    |[[127, 4.6581316], [60, 4.651312], [175, 4.6235495]]   |
|897    |[[64, 4.870203], [318, 4.7335334], [496, 4.637326]]    |
|858    |[[127, 4.267202], [689, 4.177561], [272, 4.0020847]]   |
|31     |[

Vemos que las recomendaciones aparecen en una columna de tipo array, podemos utilizar `explode` para separar ese array en filas y poder hacer un `join` con el dataframe que cargamos al principio con los títulos de las películas.

In [73]:
single_recommendations = recommendations \
    .withColumn("rec", explode("recommendations")) \
    .select("user_id", col("rec.item_id"), col("rec.rating"))

single_recommendations.join(movies, on="item_id").show(5, truncate=False)

+-------+-------+---------+---------------------------------------------+
|item_id|user_id|rating   |tittle                                       |
+-------+-------+---------+---------------------------------------------+
|102    |471    |4.5626817|Aristocats, The (1970)                       |
|140    |471    |4.5159774|Homeward Bound: The Incredible Journey (1993)|
|8      |471    |4.4912596|Babe (1995)                                  |
|887    |463    |4.380709 |Eve's Bayou (1997)                           |
|19     |463    |4.2427382|Antonia's Line (1995)                        |
+-------+-------+---------+---------------------------------------------+
only showing top 5 rows



Por último, podemos filtrar el dataframe de entrenamiento utilizando uno de los user_id que vemos en las recomendaciones y utilizar de nuevo un join, para poder comparar las películas recomendadas para ese usuario con las que ha visto anteriormente. En este caso (user_id = 471) vemos que sus peliculas más puntuadas son películas infantiles como Aladdin o Home Alone, que en general cuadran con el tipo de película que aparece en las recomendaciones.

In [74]:
training_df.join(movies, on="item_id").filter(data.user_id == 471).orderBy("rating").show(n=100,truncate=False)

+-------+-------+------+---------+---------------------------------------------+
|item_id|user_id|rating|timestamp|tittle                                       |
+-------+-------+------+---------+---------------------------------------------+
|627    |471    |1     |889827881|Robin Hood: Prince of Thieves (1991)         |
|588    |471    |1     |889827881|Beauty and the Beast (1991)                  |
|420    |471    |1     |889828027|Alice in Wonderland (1951)                   |
|596    |471    |1     |889827881|Hunchback of Notre Dame, The (1996)          |
|432    |471    |1     |889827822|Fantasia (1940)                              |
|71     |471    |3     |889828154|Lion King, The (1994)                        |
|50     |471    |3     |889827757|Star Wars (1977)                             |
|95     |471    |4     |889827822|Aladdin (1992)                               |
|1      |471    |4     |889827881|Toy Story (1995)                             |
|172    |471    |4     |8898