<img style="float: right;" width="200" src="img/kschool.png"></img>
<h1 style="font-size: 2.5em"> Recomendador de Películas MovieLens</h1>

<span style="float: right; text-align: right;">Jorge Ayuso Rejas<br>Abril 2016</span>
<br><br>

---
En el siguiente *notebook* vamos a trabajar con los datos de [MovieLens](https://movielens.org/) y con Spark
para hacer recomendaciones de películas y profundizar en los sistemas de recomendación basados en filtros colaborativos.

Para ello nos basamos en el siguiente [guión](https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html) del Spark Summit 2014.

### Antes de empezar

Necesitamos descargar los datos en la carpeta `../datos`

In [None]:
!ls -l ../datos

**NOTA:** Si no existen las carpetas `ml-1m` y `tag-genome` usamos el script `descargar_movilens.sh` para descargarlos.

## Los datos

En la carpeta `ml-1m`  que contiene: 

> Stable benchmark dataset. 1 million ratings from 6000 users on 4000 movies. Released 2/2003.

Hemos descargado estos datos que son pequeños para hacer las pruebas, pero el sistema que vamos a utilizar con Spark es distribuido y lo podríamos hacer sobre un cluster con el mismo código para datos más grandes.


Los datos que incluye MovieLens son:

* `movies.dat`: Incluye el catálogo de películas separado por `::` cada campo.
* `ratings.dat`: Incluye los ratings entre usuarios y películas en este caso la puntuación (de 1 a 5) que han dado a esa película. Este archivo es nuestra matriz $M_{(n,p)}$ .
* `users.dat`: Incluye información de los usuarios pero en nuestro ejercicio no vamos a utilizar este archivo.


In [None]:
!head ../datos/ml-1m/movies.dat

In [None]:
!head ../datos/ml-1m/ratings.dat

## Incluirnos en el recomendador

Una de las características importantes de los sitemas de recomendación basados en factorización de matrices. Es que desde el entrenamiendo del modelo tendremos que incluir a todos los usuarios a los que vamos a querer recomendar. Al contrario que otros modelos de *machine learning* donde una vez entrenado el modelo podemos predecir a nuevos usuarios.

Para ello vamos a incluir nuestras preferencias como un nuevo usuario y después veremos las recomendaciones que obtenemos para nosotros mismos.

¿Cómo hacemos esto?

El siguiente script en python `spark_als/bin/rateMovies` sirve para generar nuestras recomendaciones.

Una vez ejecutado se crearán nuestros ratings en el archivo `personalRatings.txt`

In [None]:
!cat personalRatings.txt

## Spark y MLlib

Para nuestro recomendador vamos a usar Spark y la librería MLlib que incluye el algoritmo ALS:    

&nbsp;<br>

<center>
<img src="https://databricks-training.s3.amazonaws.com/img/matrix_factorization.png" width="550"><img>
http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html
</center>


Lo primero de todo comprobamos que tenemos creado el `SparkContext`:

In [None]:
sc

Nos tiene que devolver algo del estilo:

```
<pyspark.context.SparkContext at 0x7f4250d6aa10>

```

Además debemos de ver la web del Spark UI en http://127.0.0.1:4040/.

Empezamos con el proceso de Spark:

In [None]:
# Cargamos librería necesarias para los scripts

import sys
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS
from pyspark.sql import *
from pyspark.sql.functions import *

# Cargamos las funciones definidas en el archivo funciones_auxiliares.py

from funciones_auxiliares import *

Usamos la función `loadRatings` para cargar nuestros ratings personales: 

In [None]:
myRatings = loadRatings("personalRatings.txt")
myRatings

Convertimos los datos a un `RDD`:

In [None]:
myRatingsRDD = sc.parallelize(myRatings, 1)

Definimos la carpeta donde se encuentras nuestros archivos y usamos la función `parseRating` de manera distribuida:

In [None]:
movieLensHomeDir = "../datos/ml-1m/"
ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseRating)

In [None]:
ratings.take(5)

Normalemente el tamaño de items, en nuestro caso películas, es un número razonable y que nos cabe bien en la memoría RAM del *driver*. En general es $n$ (el tamaño de usuarios) el que suele tener un tamaño grande $p << n$.    

Por ese motivo podemos hacer un `collect` sobre el `RDD` de las películas:

In [None]:
movies = dict(sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie).collect())
len(movies)

Una vez cargados ambos archivos vamos a contar el número de películas, usuarios y ratings que tenemos:

In [None]:
numRatings = ratings.count()
numUsers = ratings.values().map(lambda r: r[0]).distinct().count()
numMovies = ratings.values().map(lambda r: r[1]).distinct().count()
print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)

Luego siguiendo nuestra notación tenemos que:

* $n=6040$
* $p=3706$

Así que la matriz $M$ tiene un tamaño de $6040\cdot3706=22384240$ pero solo tenemos información de $1000209$, es decir un 4%.

### Entrenamiento de los parámetros

Para dedicir qué parámetros utilizar en nuestro algoritmo vamos a dividir la muestra en tres trozos:
entrenamiento (60%), validación (20%) y test (20%). Para ello lo hacemos basado en el último digito del `timestamp` 
(ver la función `parseRating` línea 12)



In [None]:
numPartitions = 4

In [None]:
training = (
      ratings.filter(lambda x: x[0] < 6)
      .values()
      .union(myRatingsRDD)
      .repartition(numPartitions)
      .cache()
)

In [None]:
validation = (
      ratings.filter(lambda x: x[0] >= 6 and x[0] < 8)
      .values()
      .repartition(numPartitions)
      .cache()
)

In [None]:
test = (
    ratings.filter(lambda x: x[0] >= 8)
    .values()
    .cache()
)

In [None]:
numTraining = training.count()
numValidation = validation.count()
numTest = test.count()

print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest)

Selecionamos ahora los posibles valores de nuestros parámetros:

In [None]:
ranks = [8, 12]
lambdas = [0.1, 10.0]
numIters = [10, 20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1

Con `itertools.product` generamos la malla de posibilidades:

In [None]:
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
    model = ALS.train(training, rank, numIter, lmbda)
    validationRmse = computeRmse(model, validation, numValidation)
    print "RMSE (validation) = %f for the model trained with " % validationRmse + \
          "rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
    if (validationRmse < bestValidationRmse):
        bestModel = model
        bestValidationRmse = validationRmse
        bestRank = rank
        bestLambda = lmbda
        bestNumIter = numIter

Evaluamos el mejor modelo (guardado en `bestModel`) con la partición de test:

In [None]:
testRmse = computeRmse(bestModel, test, numTest)
print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
  + "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)

Un ejercicio habitual en *machine learning* es comparar el resultado de nuestro modelo con el *baseline*. En este caso con la media de los ratings y ver si nuestro modelo es mejor y en cuanto

In [None]:
meanRating = training.union(validation).map(lambda x: x[2]).mean()
baselineRmse = sqrt(test.map(lambda x: (meanRating - x[2]) ** 2).reduce(add) / numTest)
improvement = (baselineRmse - testRmse) / baselineRmse * 100
print "The best model improves the baseline by %.2f" % (improvement) + "%."

### Modelo final
Terminamos entrenando el modelo final con todos los datos y los parámetros que hemos elegido

In [None]:
final = (ratings
          .values()
          .union(myRatingsRDD)
          .repartition(numPartitions)
)

In [None]:
finalModel = ALS.train(final, bestRank, bestNumIter, bestLambda)

## Ver nuestras recomendaciones

Vamos a recuperar las recomendaciones según los ratings que pusimos al principio

In [None]:
myRatedMovieIds = set([x[1] for x in myRatings])
candidates = sc.parallelize([m for m in movies if m not in myRatedMovieIds])
predictions = finalModel.predictAll(candidates.map(lambda x: (0, x))).collect()
recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:10]

In [None]:
recommendations

In [None]:
print "Movies recommended for you:"
for i in xrange(len(recommendations)):
    print (u"%2d | (Rating: %.3f) | %s" % (i + 1 ,recommendations[i][2], movies[recommendations[i][1]] ))

## Entendiendo cómo se realizan las predicciones
Vamos a entender qué descomposición se ha realizado y cómo se realizan las predicciones. 
La matriz de ratings tiene tamaño $(6041\times3706)$ como hemos visto y en el entranimiento se ha decidio utilizar 12 variables latentes luego la descomposición que hemos realizado es:
&nbsp;<br>
&nbsp;<br>

$$
{\Large
M_{(6041\times 3706)} = U_{(6041\times 12)}\;V_{(12 \times 3706)}
}
$$

¿Dónde están esas matrices calculadas?

In [None]:
finalModel.userFeatures().take(1)[0]

In [None]:
len(finalModel.userFeatures().take(1)[0][1])

In [None]:
finalModel.userFeatures().count()

In [None]:
finalModel.productFeatures().take(1)[0]

In [None]:
finalModel.productFeatures().count()

Como hemos visto el objeto `finalModel` además contiene varias funciones para hacer las predicciones, pero vamos a hacerlo a mano para entender cómo funciona algebráicamente. 

Para ello vamos a usar de ejemplo los datos de nuestras recomendacions:

In [None]:
myRatings

Nueso id de usuario es el 0 así que podemos quedanos con la fila de la matriz $U$ que hace referencia a nuestro usuario:

In [None]:
user_feature = finalModel.userFeatures().filter(lambda x: x[0]==0).collect()[0]
user_feature

Recuperamos nuestra primera recomendación:

In [None]:
recommendations[0]

Extraemos de la columna $V$ la columna correspondiente con esta película

In [None]:
product_features = finalModel.productFeatures().filter(lambda x: x[0] == recommendations[0][1]).collect()[0]
product_features

Para terminar, es fácil de comprobar que matemáticamente:
$$
{\Large
m_{ij} =\; <u_i,v_j>
}
$$

Es decir, el rating del usuario $i$ y el item $j$ es el producto escalar de la fila  $i$-esima de la matriz $U$ y la columna $j$-esima de la matriz $V$

In [None]:
import numpy as np

In [None]:
np.dot(np.array(user_feature[1]),np.array(product_features[1]))

In [None]:
sc.stop()