# Recomendador de películas:

Para el sistema de recomendación se va a implentar el enfoque de filtrado colaborativo que consiste en contruir un modelo a partir de las calificaciones dadas por cada usuario y los usuraios similares a éste. Con este modelo se podrán predecir las calificaciones de las películas no vistas por el usuario para así, sugerirle las que obtengan mayor puntuación en dicha predicción.

## Carga de librerías e inicio de la sesión de Spark:

In [2]:

import numpy as np
import pandas as pd

from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StructType, StructField, StringType, DoubleType

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler



In [3]:
spark = SparkSession.builder\
       .appName("Recomendador peliculas")\
       .config("spark.some.config.option", "config-value")\
       .getOrCreate()\

## Carga de datos:

Se va a utilizar el archivo `ratings.dat` que consta de tres campos por registro; el id del usuario, el id de la película y la calificación del usuario a dicha película respectivamente.

Para facilitar la carga del archivo, previamente, se ha reemplazado el separador `::` por `:`.

In [4]:
# Parse dating agency ratings data as a Spark dataframe
ratings = "ratings.dat"
schema = StructType([StructField("user_id", IntegerType(), False),
                     StructField("movie_id", IntegerType(), False),
                     StructField("rating", DoubleType(), True)])
ratings_df = spark.read.format("csv").option("header", "false").option("delimiter", ":").schema(schema).load(ratings)
ratings_df = ratings_df.na.drop(how="any")
ratings_df.show()
ratings_df.printSchema()

+-------+--------+------+
|user_id|movie_id|rating|
+-------+--------+------+
|      1|    1193|   5.0|
|      1|     661|   3.0|
|      1|     914|   3.0|
|      1|    3408|   4.0|
|      1|    2355|   5.0|
|      1|    1197|   3.0|
|      1|    1287|   5.0|
|      1|    2804|   5.0|
|      1|     594|   4.0|
|      1|     919|   4.0|
|      1|     595|   5.0|
|      1|     938|   4.0|
|      1|    2398|   4.0|
|      1|    2918|   4.0|
|      1|    1035|   5.0|
|      1|    2791|   4.0|
|      1|    2687|   3.0|
|      1|    2018|   4.0|
|      1|    3105|   5.0|
|      1|    2797|   4.0|
+-------+--------+------+
only showing top 20 rows

root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: double (nullable = true)



## Modelo:

La forma adecuada para introducir estos datos en un modelo es con una matriz de iteración donde las filas representan las películas y cada columna corresponde a un usuario o viceversa. Se trata de una matriz dispersa ya que encontramos muchos datos faltantes. Esta dispersión se puede calcular de la siguiente forma:

In [34]:
# Contamos las valoraciones: 
numerator = ratings_df.select("rating").count()

# Contamos el total de usuarios y peliculas únicos:
num_users = ratings_df.select("user_id").distinct().count()
num_movies = ratings_df.select("movie_id").distinct().count()

denominator = num_users * num_movies

# Dispersión: 
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("Tenemos una dispersión del  ", "%.2f" % sparsity + "% ")

Tenemos una dispersión del   95.53% 


Para este tipo de datos se aplican algoritmos de factorización matricial. Se va a utilizar el algoritmo ALS (Alternating Least Square) proporcinado por la libreria ML de pyspark que se ejecuta de forma paralela.

In [39]:
# Definimos el algoritmo ALS:

als = ALS(userCol="user_id",itemCol="movie_id",ratingCol="rating",coldStartStrategy='drop',nonnegative = True,implicitPrefs = False)


Se van a ajustar los hiperparamentros entrenando 12 modelos. Mediante la validación cruzada se obtendrá el modelo que mejor se ajusta a los datos utilizando el RMSE como medida.

In [40]:
#Separación en train y test:
train, test = ratings_df.randomSplit([0.8, 0.2], seed = 47)

In [41]:
# Hiperparametros a probar:

param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50]) \
            .addGrid(als.maxIter, [5,10]) \
            .addGrid(als.regParam, [.01, .05, .1])\
            .build()
         
# Evaluador:

evaluator = RegressionEvaluator(
    metricName='rmse',
    labelCol='rating',
    predictionCol='prediction'
)


#Definimos la validación cruzada:
cv = CrossValidator(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=5
)


In [42]:
# Entrenamos entrenamos los modelos y validamos:
model = cv.fit(train)

# Mejor modelo:
best_model = model.bestModel

Se obtienen y muestran las predicciones del modelo que mejor se ajusta:

In [43]:
test_predictions = best_model.transform(test)
test_predictions.show()

+-------+--------+------+----------+
|user_id|movie_id|rating|prediction|
+-------+--------+------+----------+
|   3184|     148|   4.0| 2.9953053|
|   3539|     148|   3.0|  3.045854|
|   3053|     148|   3.0| 2.5148985|
|    660|     463|   3.0| 2.5178556|
|   3753|     463|   2.0| 2.5155425|
|   5047|     463|   3.0| 2.3473194|
|   3709|     463|   3.0| 2.2141466|
|    202|     463|   3.0|  2.616348|
|   2777|     463|   3.0| 2.9701884|
|    721|     463|   4.0| 3.2723997|
|   4252|     463|   3.0| 2.3923914|
|    934|     463|   3.0| 2.1712956|
|    516|     471|   2.0|   2.37645|
|   1303|     471|   4.0| 3.3341773|
|   3704|     471|   5.0| 4.4221826|
|   1884|     471|   2.0| 3.4690475|
|   3986|     471|   4.0| 4.1356173|
|   4186|     471|   3.0| 3.0597675|
|   5222|     471|   4.0| 3.0094497|
|   1199|     471|   3.0|  2.342823|
+-------+--------+------+----------+
only showing top 20 rows



In [44]:
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.8571958718805348
