Para la realizacion de esta tarea se ha empleado las siguentes paginas como tutorial, siendo la principal el curso de datacamp de creación de un recomendador empleando pyspark.
#### Building Recommendation Engines with PySpark
https://www.datacamp.com/courses/recommendation-engines-in-pyspark

#### Building a Movie Recommendation Service with Apache Spark & Flask - Part 1
https://www.codementor.io/jadianes/building-a-recommender-with-apache-spark-python-example-app-part1-du1083qbw


## Cargamos las librerias

In [1]:
#Paquetes usuales.
import numpy as np
import pandas as pd

#Paquetes de pyspark.
from pyspark.sql.functions import *
from pyspark.sql import SQLContext, SparkSession
from pyspark import SparkContext
 
#Paquetes para la creación del recomendador.
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pyspark.sql import Row



In [2]:
# Iniciamos Spark y el contexto de SQL
spark = SparkContext()
sqlContext = SQLContext(spark)

sparky=SparkSession.builder.appName('movielensdb_recsys')
# Comprobacion de que Spark funciona y mostramos la version

print(spark.version)


2.4.0


## Cargamos los datos

In [3]:
datos = sqlContext.read.csv(
    'ratings.dat',
    header=False,
    sep=':'
    ,inferSchema=True
)

## Acondicionamiento de los datos

Mostramos los datos, para ver que forma tiene.

In [4]:
datos.show()

+---+----+----+----+---+----+---------+
|_c0| _c1| _c2| _c3|_c4| _c5|      _c6|
+---+----+----+----+---+----+---------+
|  1|null|1193|null|  5|null|978300760|
|  1|null| 661|null|  3|null|978302109|
|  1|null| 914|null|  3|null|978301968|
|  1|null|3408|null|  4|null|978300275|
|  1|null|2355|null|  5|null|978824291|
|  1|null|1197|null|  3|null|978302268|
|  1|null|1287|null|  5|null|978302039|
|  1|null|2804|null|  5|null|978300719|
|  1|null| 594|null|  4|null|978302268|
|  1|null| 919|null|  4|null|978301368|
|  1|null| 595|null|  5|null|978824268|
|  1|null| 938|null|  4|null|978301752|
|  1|null|2398|null|  4|null|978302281|
|  1|null|2918|null|  4|null|978302124|
|  1|null|1035|null|  5|null|978301753|
|  1|null|2791|null|  4|null|978302188|
|  1|null|2687|null|  3|null|978824268|
|  1|null|2018|null|  4|null|978301777|
|  1|null|3105|null|  5|null|978301713|
|  1|null|2797|null|  4|null|978302039|
+---+----+----+----+---+----+---------+
only showing top 20 rows



Como vemos será necesario procesarlos para quedarnos con los datos limpios de forma que podamos crear el recomendador. Eliminaremos la última variable ya que el timestamp no nos afecta a la hora de realizar las recomendaciones.

In [5]:
datos.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: integer (nullable = true)



In [6]:
sel=['_c0', '_c2', '_c4']
datos = datos.select(sel)
newcol=['userId','MovieId','Rating']
for i in range(len(sel)):
    datos = datos.withColumnRenamed(sel[i], newcol[i])

In [7]:
datos.show()

+------+-------+------+
|userId|MovieId|Rating|
+------+-------+------+
|     1|   1193|     5|
|     1|    661|     3|
|     1|    914|     3|
|     1|   3408|     4|
|     1|   2355|     5|
|     1|   1197|     3|
|     1|   1287|     5|
|     1|   2804|     5|
|     1|    594|     4|
|     1|    919|     4|
|     1|    595|     5|
|     1|    938|     4|
|     1|   2398|     4|
|     1|   2918|     4|
|     1|   1035|     5|
|     1|   2791|     4|
|     1|   2687|     3|
|     1|   2018|     4|
|     1|   3105|     5|
|     1|   2797|     4|
+------+-------+------+
only showing top 20 rows



Estudiamos la matriz dispersa, ya que el metodo ALS funciona bien con datos vacios.

In [8]:
# Contamos las valoraciones totales en el dataset. 
numerator = datos.select("rating").count()

# Contamos el numero total de userUd y movieId , que son únicos. 

num_users = datos.select("userId").distinct().count()
num_movies = datos.select("movieId").distinct().count()

# El denemoniador sera el numero de usuarios multiplicado por el total
# de peliculas.

denominator = num_users * num_movies

# Obtenemos la dispersion. 

sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("El conjunto de datos de valoraciones esta vacio al  ", "%.2f" % sparsity + "% ")

El conjunto de datos de valoraciones esta vacio al   95.53% 


In [9]:
# Convertimos las columnas a los datos correctos. 
datos = datos.select(
    datos.userId.cast("integer"), 
    datos.MovieId.cast("integer"),
    datos.Rating.cast("double")
)

In [10]:
datos.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- MovieId: integer (nullable = true)
 |-- Rating: double (nullable = true)



## Creacion del modelo

Separamos los datos en conjunto de entrenamiento y conjunto de test.

In [11]:
train, test = datos.randomSplit([0.8, 0.2], seed = 23)


In [12]:
# Creamos el modelo ALS. 

als = ALS(
    userCol="userId",
    itemCol="MovieId",
    ratingCol="Rating",
    coldStartStrategy='drop',
    nonnegative = True,
    implicitPrefs = False
)

# Confirmamos que el modelo llamado "als" se ha creado. 

type(als)


# Añadimos hyperparametros y sus respectivos valores a param_grid 

param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50]) \
            .addGrid(als.maxIter, [5,10]) \
            .addGrid(als.regParam, [.01, .05, .1])\
            .build()
print ("Número de modelos a probar: ", len(param_grid))          
# Definimos el evaluador como RMSE y mostramos la longitud del mismo

evaluator = RegressionEvaluator(
    metricName='rmse',
    labelCol='Rating',
    predictionCol='prediction'
)
print ("El evaluador : ",evaluator)


cv = CrossValidator(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=5
)
print ("El crossValidator : ",cv)


Número de modelos a probar:  12
El evaluador :  RegressionEvaluator_d67386d76db2
El crossValidator :  CrossValidator_061d83fb4315


## Entrenamos el modelo

Emplearemos unicamente 12 modelos ya que si se toman un número superior, el tiempo de entrenamiento puede requerir de bastante tiempo.
(Se probó inicialmente con 64 modelos, tras una hora no había finalizado)

In [13]:
# Entrenamos el modelo de validacion cruzacda con los datos de entrenamiento.

model = cv.fit(train)

# Extraemos el mejor modelo de los obtenidos.
best_model = model.bestModel

In [14]:
# Obtenemos las predicciones y mostramos unas pocas.
test_predictions = best_model.transform(test)
test_predictions.show()

+------+-------+------+----------+
|userId|MovieId|Rating|prediction|
+------+-------+------+----------+
|   673|    148|   5.0| 3.2892156|
|  4169|    148|   3.0| 2.8381042|
|  4227|    148|   2.0|  2.057883|
|  4387|    148|   1.0| 2.3310013|
|  1605|    148|   2.0| 2.4195395|
|  4169|    463|   2.0| 2.6698651|
|  4277|    463|   4.0|  3.347712|
|  4510|    463|   2.0|   2.16754|
|  3709|    463|   3.0| 2.4040918|
|  5249|    463|   3.0| 2.8984437|
|   721|    463|   4.0| 3.2624204|
|  4613|    463|   4.0|  4.033521|
|  3618|    463|   3.0|  2.698788|
|  1980|    463|   2.0| 2.4382515|
|   934|    463|   3.0|  2.194287|
|  5300|    471|   4.0| 4.1296616|
|   392|    471|   4.0| 3.5982165|
|  5614|    471|   5.0| 3.8690703|
|  3704|    471|   5.0| 4.3825336|
|    78|    471|   4.0|  3.318017|
+------+-------+------+----------+
only showing top 20 rows



In [15]:
# Calculamos y mostramos el RMSE para las predicciones para el conjunto de test.
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.8692385753494289


In [16]:
spark.stop()