# Computación Avanzada y sus Aplicaciones a Ingeniería

### Máster Universitario en Ingeniería Informática


# Práctica 3 - Parte II - Sistemas de recomendación con collaborative filtering

En esta práctica afrontaremos vamos a crear un sistema de recomendación de películas con la librería de Spark MLib.

Ten en cuenta que una vez tengas en marcha Spark, podrás visualizar la evolución de cada trabajo de Spark en  <http://localhost:4040>

## Predicción de la puntuación a películas

En esta práctica vamos a intentar predecir lo que quieren los usuarios de manera similar a lo que hacen Amazon o Netflix para recomendar los productos o las películas en las que puedes estar interesado. En este caso, vamos a usar Spark para montar un sistema de recomendación que permita recomendar películas a un usuario. Para ello utilizaremos el modelo de filtro colaborativo con el algoritmo Alternating Least Squares (ALS) disponible en Spark MLlib ([sparkml]).

Como conjunto de datos utilizaremos [MovieLens](http://grouplens.org/datasets/movielens/) que contiene 20 millones de puntuaciones de usuarios a películas. Descomprime el fichero ml-20m.zip en tu carpeta de datos. Dentro de esta carpeta debe estar la carpeta ml-20m con todos los csv's incluidos en el fichero zip.

La práctica está dividida en cuatro partes:
* *Parte 0*: Preliminares
* *Parte 1*: Recomendaciones básicas
* *Parte 2*: Filtro colaborativo
* *Parte 3*: Predicciones para ti

Como ya hemos comentado en las prácticas anteriores, cuidado antes de usar `collect()` ya que el dataset con el que trabajamos es bastante grande.

[sparkml]: http://spark.apache.org/docs/2.0.2/api/python/pyspark.ml.html

En caso de estar utilizando pySpark, **NO** es necesario inicializar el `SparkSession`, es decir, **no** ejecutar la siguiente celda

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Ejemplo pySparkSQL") \
    .config("spark.sql.warehouse.dir", "file:///D:/tmp/spark-warehouse") \
    .getOrCreate()

sc = spark.sparkContext


En caso de estar usando pySpark, ejecutar el siguiente comando o inciiar pyspark con 

`pyspark --conf spark.sql.warehouse.dir=file:///D:/tmp/spark-warehouse`

Otros imports necesarios:

In [2]:
%matplotlib inline 
import matplotlib.pyplot as plt
from test_helper import Test
from pyspark.sql.functions import *
from pyspark.sql import Row

## Notas

En la práctica solo es necesario usar instrucciones básicas de python y las transformaciones y acciones de DataFrames.

**Ejecutar la siguiente celda:** Establecemos los nombres de los ficheros a utilizar

In [3]:
import os
from test_helper import Test

dbfs_dir = './datos/ml-20m'
ratings_filename = dbfs_dir + '/ratings.csv'
movies_filename = dbfs_dir + '/movies.csv'

## Part 0: Preliminarres

Leemos los dos ficheros que nos interesan como DataFrames.

El conjunto de datos consiste en varios ficheros CSV con sus cabeceras, por lo que podemos parsearslos fácilmente con Spark.

De todos los ficheros disponibles, solo nos interesan dos: ratings.csv y movies.csv. El primero almacena en cada fila la puntuación de un usuario a una película, mientras que el segundo almacena en cada fila la información de una película. 

In [4]:
print "Ficheros en la carpeta:"
os.listdir(dbfs_dir)

Ficheros en la carpeta:


['genome-scores.csv',
 'genome-tags.csv',
 'links.csv',
 'movies.csv',
 'ratings.csv',
 'README.txt',
 'tags.csv']

Ya que conocemos el esquema de los ficheros, lo vamos a especificar explícitamente para acelerar la lectura y que Spark no tenga que leer dos veces cada fichero.

In [5]:
from pyspark.sql.types import *

ratings_df_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', DoubleType())]
)
movies_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType())]
)

### Cargamos los datos y los cacheamos

**Paciencia**: La lectura puede tardar un poco...

In [6]:
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *

raw_ratings_df = spark.read.format('csv')\
                    .options(header=True, inferSchema=False)\
                    .schema(ratings_df_schema)\
                    .load(ratings_filename)

# Eliminamos las columnas que no interesan
ratings_df = raw_ratings_df.drop('Timestamp')

raw_movies_df = spark.read.format('csv')\
                    .options(header=True, inferSchema=False)\
                    .schema(movies_df_schema)\
                    .load(movies_filename)

# Eliminamos las columnas que no interesan
movies_df = raw_movies_df.drop('Genres').withColumnRenamed('movieId', 'ID')

# Cacheamos los DataFrames
ratings_df.cache()
movies_df.cache()

assert ratings_df.is_cached
assert movies_df.is_cached

# Contamos el número de elementos en cada DataFrame para forzar la lectura y el cacheo
raw_ratings_count = raw_ratings_df.count()
ratings_count = ratings_df.count()
raw_movies_count = raw_movies_df.count()
movies_count = movies_df.count()

print 'Hay %s puntuaciones y %s películas en el conjunto de datos' % (ratings_count, movies_count)
print 'Puntuaciones:'
ratings_df.show(3)
print 'Películas:'
movies_df.show(3, truncate=False)

assert raw_ratings_count == ratings_count
assert raw_movies_count == movies_count

Hay 20000263 puntuaciones y 27278 películas en el conjunto de datos
Puntuaciones:
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|     29|   3.5|
|     1|     32|   3.5|
+------+-------+------+
only showing top 3 rows

Películas:
+---+-----------------------+
|ID |title                  |
+---+-----------------------+
|1  |Toy Story (1995)       |
|2  |Jumanji (1995)         |
|3  |Grumpier Old Men (1995)|
+---+-----------------------+
only showing top 3 rows



Verificamos que todo esté correcto, no debería dar ningún error la siguiente celda.

In [8]:
assert ratings_count == 20000263
assert movies_count == 27278
assert movies_df.filter(movies_df.title == 'Toy Story (1995)').count() == 1
assert ratings_df.filter((ratings_df.userId == 6) & (ratings_df.movieId == 1) & (ratings_df.rating == 5.0)).count() == 1

Vamos a revisar como son los DataFrames que acabamos de leer.

**Ejecutar las siguientes celdas**

In [9]:
movies_df.show(5, truncate=False)

+---+----------------------------------+
|ID |title                             |
+---+----------------------------------+
|1  |Toy Story (1995)                  |
|2  |Jumanji (1995)                    |
|3  |Grumpier Old Men (1995)           |
|4  |Waiting to Exhale (1995)          |
|5  |Father of the Bride Part II (1995)|
+---+----------------------------------+
only showing top 5 rows



In [10]:
ratings_df.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|     29|   3.5|
|     1|     32|   3.5|
|     1|     47|   3.5|
|     1|     50|   3.5|
+------+-------+------+
only showing top 5 rows



## Part 1: Recomendaciones básicas

Antes de utilizar el filtro colaborativo vamos a realizar unas recomendaciones más simples, basadas en las estadísticas globales. Veremos que la ventaja del filtro colaborativo es que en vez de realizar la misma recomendación a todos los usuarios como haríamos aquí, realizamos recomendaciones personalizadas.

Vamos a por la forma simple de recomendación: La forma más simple consiste en recomendar la película con la mejor puntuación media.

Usaremos Spark para encontrar el nombre, el número de puntuaciones y la media de las puntuaciones de las 20 películas con la media más alta de puntuación y que tengan al menos 500 puntuaciones. Eliminamos las puntuaciones con menos de 500 puntuaciones ya que es posible que estas no sean del gusto de todo el mundo.

### (1a) Películas con la media de puntuación más alta

Estos son los pasos que debes seguir:

1. `ratings_df` contiene tres columnas:
    - userID: El ID del usuario que ha dado la puntuación
    - movieID: El ID de la película puntuada
    - rating: la puntuación.

   Primero, transforma `ratings_df` en un segundo DataFrame, `movie_ids_with_avg_ratings` con las siguientes columnas:
    - El ID de la película
    - El número de puntuaciones recibidas por la película
    - La media de las puntuaciones recibidas por la película
   Para ello considera el uso de GroupBy, junto con agg y las funciones count y avg.

2. Transforma `movie_ids_with_avg_ratings` a otro DataFrame, `movie_names_with_avg_ratings_df` que añade el título de la película acada fila. `movie_names_with_avg_ratings_df` contendrá las siguientes columnas:
    - El ID de la película
    - El título de la película
    - El número de puntuaciones recibidas por la película
    - La media de las puntuaciones recibidas por la película

   **Nota**: Considera el uso de join para unir el DataFrame `movie_ids_with_avg_ratings` y el de las películas `movies_df`

El resultado debería ser similar al siguiente:
```
movie_ids_with_avg_ratings_df:
+-------+-----+------------------+
|movieId|count|average           |
+-------+-----+------------------+
|1831   |7463 |2.5785207021305103|
|431    |8946 |3.695059244355019 |
|631    |2193 |2.7273141814865483|
+-------+-----+------------------+
only showing top 3 rows

movie_names_with_avg_ratings_df:
+-------+-----------------------------+-----+-------+
|average|title                        |count|movieId|
+-------+-----------------------------+-----+-------+
|5.0    |Ella Lola, a la Trilby (1898)|1    |94431  |
|5.0    |Serving Life (2011)          |1    |129034 |
|5.0    |Diplomatic Immunity (2009? ) |1    |107434 |
+-------+-----------------------------+-----+-------+
only showing top 3 rows
```

In [21]:
from pyspark.sql import functions

# A partir de ratingsDF, crear movie_ids_with_avg_ratings_df donde tengamos 
# para cada película el conteo de puntuaciones y la media de las mismas
movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieId').agg(count(ratings_df.rating).alias("count"), avg(ratings_df.rating).alias("average"))
print 'movie_ids_with_avg_ratings_df:'
movie_ids_with_avg_ratings_df.show(3, truncate=False)

# Nota: movie_names_df es una variable temporal, la usaremos para guardar la unión del DataFrame que
# acabamos de obtener y el DataFrame movies_df (la unión se debe realizar por los campos movieId e ID)
cond = [movie_ids_with_avg_ratings_df.movieId == movies_df.ID]
movie_names_df = movie_ids_with_avg_ratings_df.join(movies_df, cond)
# En este segundo paso, eliminar el atributo ID usando drop
movie_names_with_avg_ratings_df = movie_names_df.drop('ID')

print 'movie_names_with_avg_ratings_df:'
movie_names_with_avg_ratings_df.show(3, truncate=False)

movie_ids_with_avg_ratings_df:
+-------+-----+------------------+
|movieId|count|average           |
+-------+-----+------------------+
|3997   |2047 |2.0703468490473864|
|1580   |35580|3.55831928049466  |
|3918   |1246 |2.918940609951846 |
+-------+-----+------------------+
only showing top 3 rows

movie_names_with_avg_ratings_df:
+-------+-----+------------------+--------------------------------+
|movieId|count|average           |title                           |
+-------+-----+------------------+--------------------------------+
|3997   |2047 |2.0703468490473864|Dungeons & Dragons (2000)       |
|1580   |35580|3.55831928049466  |Men in Black (a.k.a. MIB) (1997)|
|3918   |1246 |2.918940609951846 |Hellbound: Hellraiser II (1988) |
+-------+-----+------------------+--------------------------------+
only showing top 3 rows



In [22]:
# TEST Películas con la puntuación media más alta(1a)
Test.assertEquals(movie_ids_with_avg_ratings_df.count(), 26744,
                'movie_ids_with_avg_ratings_df.count() incorrecto (se esperaban 26744)')
movie_ids_with_ratings_take_ordered = movie_ids_with_avg_ratings_df.orderBy('MovieID').take(3)
_take_0 = movie_ids_with_ratings_take_ordered[0]
_take_1 = movie_ids_with_ratings_take_ordered[1]
_take_2 = movie_ids_with_ratings_take_ordered[2]
Test.assertTrue(_take_0[0] == 1 and _take_0[1] == 49695,
                'Conteo de puntuaciones incorrecto para la película {0} (se esperaban 49695)'.format(_take_0[0]))
Test.assertEquals(__builtin__.round(_take_0[2], 2), 3.92, "Puntuación media incorecta para la película {0}. Se esperaba 3.92".format(_take_0[0]))

Test.assertTrue(_take_1[0] == 2 and _take_1[1] == 22243,
                'Conteo de puntuaciones incorrecto para la película {0} (se esperaban 22243)'.format(_take_1[0]))
Test.assertEquals(__builtin__.round(_take_1[2], 2), 3.21, "Puntuación media incorecta para la película {0}. Se esperaba 3.21".format(_take_1[0]))

Test.assertTrue(_take_2[0] == 3 and _take_2[1] == 12735,
                'Conteo de puntuaciones incorrecto para la película  {0} (expected 12735)'.format(_take_2[0]))
Test.assertEquals(__builtin__.round(_take_2[2], 2), 3.15, "Puntuación media incorecta para la película {0}. Se esperaba 3.15".format(_take_2[0]))


Test.assertEquals(movie_names_with_avg_ratings_df.count(), 26744,
                  'movie_names_with_avg_ratings_df.count() incorrecto (se esperaban 26744)')
movie_names_with_ratings_take_ordered = movie_names_with_avg_ratings_df.orderBy(['average', 'title']).take(3)
result = [(r['average'], r['title'], r['count'], r['movieId']) for r in movie_names_with_ratings_take_ordered]
Test.assertEquals(result,
                  [(0.5, u'13 Fighting Men (1960)', 1, 109355),
                   (0.5, u'20 Years After (2008)', 1, 131062),
                   (0.5, u'3 Holiday Tails (Golden Christmas 2: The Second Tail, A) (2011)', 1, 111040)],
                  'Las tres primeras Rows de movie_names_with_avg_ratings_df no son correctas')

1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.


### (1b) Películas con la puntuación más alta y al menos 500 puntuaciones recibidas

Ahora que tenemos el DataFrame con las puntuaciones medias podemos usar Spark para obtener las 20 películas con la puntuación media más alta que tengan por lo menos 500 puntuaciones asignadas.

Utilizar una única transformación para limiar el resultado de las películas a aquellas que han sido puntuadas como mínimo 500 veces.

In [24]:
# Reemplaza <RELLENAR> con el código adecuado
movies_with_500_ratings_or_more = movie_names_with_avg_ratings_df.filter('count >= 500').sort(desc("average"))
print 'Películas con las puntuaciones más altas:'
movies_with_500_ratings_or_more.show(20, truncate=False)

Películas con las puntuaciones más altas:
+-------+-----+------------------+---------------------------------------------------------------------------+
|movieId|count|average           |title                                                                      |
+-------+-----+------------------+---------------------------------------------------------------------------+
|318    |63366|4.446990499637029 |Shawshank Redemption, The (1994)                                           |
|858    |41355|4.364732196832306 |Godfather, The (1972)                                                      |
|50     |47006|4.334372207803259 |Usual Suspects, The (1995)                                                 |
|527    |50054|4.310175010988133 |Schindler's List (1993)                                                    |
|1221   |27398|4.275640557704942 |Godfather: Part II, The (1974)                                             |
|2019   |11611|4.2741796572216   |Seven Samurai (Shichinin no samurai)

In [25]:
# TEST Películas con la puntuación más alta y al menos 500 puntuaciones recibidas (1b)

Test.assertEquals(movies_with_500_ratings_or_more.count(), 4489,
                  'movies_with_500_ratings_or_more.count() incorrecto. Se esperaban 4489.')
top_20_results = [(r['average'], r['title'], r['count']) for r in movies_with_500_ratings_or_more.orderBy(desc('average')).take(20)]

Test.assertEquals(top_20_results,
                  [(4.446990499637029, u'Shawshank Redemption, The (1994)', 63366),
                   (4.364732196832306, u'Godfather, The (1972)', 41355),
                   (4.334372207803259, u'Usual Suspects, The (1995)', 47006),
                   (4.310175010988133, u"Schindler's List (1993)", 50054),
                   (4.275640557704942, u'Godfather: Part II, The (1974)', 27398),
                   (4.2741796572216, u'Seven Samurai (Shichinin no samurai) (1954)', 11611),
                   (4.271333600779414, u'Rear Window (1954)', 17449),
                   (4.263182346109176, u'Band of Brothers (2001)', 4305),
                   (4.258326830670664, u'Casablanca (1942)', 24349),
                   (4.256934865900383, u'Sunset Blvd. (a.k.a. Sunset Boulevard) (1950)', 6525),
                   (4.24807897901911, u"One Flew Over the Cuckoo's Nest (1975)", 29932),
                   (4.247286821705426, u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)', 23220),
                   (4.246001523229246, u'Third Man, The (1949)', 6565),
                   (4.235410064157069, u'City of God (Cidade de Deus) (2002)', 12937),
                   (4.2347902097902095, u'Lives of Others, The (Das leben der Anderen) (2006)', 5720),
                   (4.233538107122288, u'North by Northwest (1959)', 15627),
                   (4.2326233183856505, u'Paths of Glory (1957)', 3568),
                   (4.227123123722136, u'Fight Club (1999)', 40106),
                   (4.224281931146873, u'Double Indemnity (1944)', 4909),
                   (4.224137931034483, u'12 Angry Men (1957)', 12934)],
                  'Top 20 de películas con 500 o más puntuaciones incorrecto')

1 test passed.
1 test passed.


## Parte 2: Filtro colaborativo
En esta parte vamos a aprender a usar MLlib para realizar recomendaciones personalizadas utilizando los datos sobre las puntuaciones de las películas.

Para ello vamos a usar el método de Filtro Colaborativo. Este método trata de hacer predicciones automáticas (filtro) sobre los intereses de un usuario a partir de los gustos de otros muchos usuarios (colaborativo). La suposición sobre la que se realiza el filtro colaborativo es que si a una persona A tiene la misma opinión que una persona B en un tema, es más probable que A tenga la misma opinión que B en otro tema x diferente, respecto a que tenga la opinión sobre x de una persona elegida al azar. Más información: [1][collab], [2][collab2].

Para la recomendación de películas, partimos de una matriz cuyas entradas son las puntuaciones dadas a las películas por los usuarios, donde cada columna representa a un usuario y cada fila representa a una película.

Como todos los usuarios no han puntuado todas las películas, tenemos entradas de la matriz con valor desconocido, razón por la que necesitamos aplicar el filtro colaborativo. Para cada usuario, tenemos las puntuaciones para un pequeño subconjunto de películas. Con el filtro colaborativo la idea es aproximar las puntuaciones de la matriz factorizándola como el producto de dos matrices: una que describe las propiedades de cada usuario y otra que describe las de cada película.

Vamos a tratar de seleccionar estas matrices de tal forma que el error para los pares de usuarios/películas que conocemos sea el mínimo. Para estre proósito usamos el método de [Alternating Least Squares][als]. Este algoritmo primero inicializa aleatoriamente la matriz de usuarios y trata de optimizar la matriz de películas. Luego, mantiene la matriz de películas constante y optimiza la de usuarios. Esta alternancia entre la optimización de ambas matrices es la razón por la que se denomina así el método.

[als]: https://en.wikiversity.org/wiki/Least-Squares_Method
[collab]: https://en.wikipedia.org/?title=Collaborative_filtering
[collab2]: http://recommender-systems.org/collaborative-filtering/

### (2a) Creación del conjunto de entrenamiento

Antes de usar Machine Learning, tenemos que dividir el conjunto de `ratings_df` en tres partes:
* Un conjunto de entrenamiento (DataFrame), que usaremos para entrenar los modelos
* Un conjunto de validación(DataFrame), que usaremos para elegir el mejor modelo
* Un conjunto de test (DataFrame), que usaremos para conocer el rendimiento del modelo final

Para dividir el DataFrame en diferentes grupos podemos usar la transformación [randomSplit()](http://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit) . `randomSplit()` toma un vector con los porcentages de datos que irán a cada partición y una semilla y devuelve tantos DataFrames como elementos tiene el vector.

In [26]:
# Reemplazar <RELLENAR> con los trozos de código correspondientes

# Utilizaremos el 60% de los datos para entrenamiento, el 20% para validación y el 20% para test
# Utilizar randomSplit
seed = 1800009193L
(split_60_df, split_a_20_df, split_b_20_df) = ratings_df.randomSplit([0.6, 0.2, 0.2],seed=seed) #<RELLENAR>

# Vamos a cachear los tres DataFrames
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Train: {0}, validación: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count())
)
training_df.show(3)
validation_df.show(3)
test_df.show(3)

Train: 11998918, validación: 4001830, test: 3999515

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|     29|   3.5|
|     1|     47|   3.5|
+------+-------+------+
only showing top 3 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     32|   3.5|
|     1|    253|   4.0|
|     1|    293|   4.0|
+------+-------+------+
only showing top 3 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    112|   3.5|
|     1|    151|   4.0|
|     1|    318|   4.0|
+------+-------+------+
only showing top 3 rows



In [27]:
# TEST Creación del conjunto de entrenamiento (2a)
print "Si los primeros tres test fallan pero los números son similares, no hay problema"
Test.assertEquals(training_df.count(), 11998918, "Conteo de training_df incorrecto. Se esperaban 11998918")
Test.assertEquals(validation_df.count(), 4001830, "Conteo de validation_df incorrecto. Se esperaban 4001830")
Test.assertEquals(test_df.count(), 3999515, "Conteo de test_df incorrecto. Se esperaban 3999515")

Test.assertEquals(training_df.filter((ratings_df.userId == 1) & (ratings_df.movieId == 5952) & (ratings_df.rating == 5.0)).count(), 1)
Test.assertEquals(training_df.filter((ratings_df.userId == 1) & (ratings_df.movieId == 1193) & (ratings_df.rating == 3.5)).count(), 1)
Test.assertEquals(training_df.filter((ratings_df.userId == 1) & (ratings_df.movieId == 1196) & (ratings_df.rating == 4.5)).count(), 1)

Test.assertEquals(validation_df.filter((ratings_df.userId == 1) & (ratings_df.movieId == 296) & (ratings_df.rating == 4.0)).count(), 1)
Test.assertEquals(validation_df.filter((ratings_df.userId == 1) & (ratings_df.movieId == 32) & (ratings_df.rating == 3.5)).count(), 1)
Test.assertEquals(validation_df.filter((ratings_df.userId == 1) & (ratings_df.movieId == 6888) & (ratings_df.rating == 3.0)).count(), 1)

Test.assertEquals(test_df.filter((ratings_df.userId == 1) & (ratings_df.movieId == 4993) & (ratings_df.rating == 5.0)).count(), 1)
Test.assertEquals(test_df.filter((ratings_df.userId == 1) & (ratings_df.movieId == 4128) & (ratings_df.rating == 4.0)).count(), 1)
Test.assertEquals(test_df.filter((ratings_df.userId == 1) & (ratings_df.movieId == 4915) & (ratings_df.rating == 3.0)).count(), 1)

Si los primeros tres test fallan pero los números son similares, no hay problema
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.


Después de dividir el DataFrame, el train debe tener unas 12 millones de instancias y la validación y el test unas 4 millones. (El número exacto de entradas puede variar ligeramente por la aleatoriedad de `randomSplit()`.)

### (2b) Alternating Least Squares

En esta parte vamos a usar la implementación del algoritmo en MLlib [ALS](http://spark.apache.org/docs/2.0.2/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS). ALS toma un conjunto de entrenamiento (DatAFrame) y varios parámetros que controlan la creación del modelo. Para determinar los mejores valores para los parámetros utilizaremos ALS para entrenar varios modelos y elegiremos aquel con el mejor comportamiento para el resto de la práctica.

Para determinar el mejor modelo usaremos la partición de validación de la siguiente forma:
**NOTA: Idealmente, podríamos utilizar `CrossValidator` o `TrainValidationSplit`de MLlib para la estimación. Sin embargo tiene algunos problemas con ALS como explicamos después.**

1. Seleccionamos un conjunto de parámetros para probar el modelo. El parámetro más importante en ALS es *rank*, que determina el número de columnas en la matriz de usuarios o el número de filas en la matriz de películas. En general, un rank menor conllevará un mayor error en training, pero un rank muy alto puede llevarnos a sobreentrenar y obtener malas predicciones en validación/test. Usaremos los valores de rank = {4, 8, 12} con el DataFrame `training_df`

2. Establecemos los parámetros necesarios para ejecutar ALS:
    * La columna "User" será la columna `userId` de nuestro DataFrame.
    * La columna "Item" será la columna `movieId` de nuestro DataFrame.
    * La columna "Rating" será la columna `rating` de nuestro DataFrame.
    * La columna "Prediction" se llamará `prediction` en nuestro DataFrame generado.
    * Utilizaremos el parámetro de regularización con valor 0.1 (este podría ser otro parámetro a ajustar como rank).

**Nota**: Documentación de [ALS] [ALS](http://spark.apache.org/docs/2.0.2/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS)


3. Crearemos varios modelos usando [ALS.fit()](http://spark.apache.org/docs/2.0.2/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS.fit), uno para cada valor de rank. Utilizar el conjunto de entrenamiento (`training_df`).

4. Con cada modelo obtenido, usaremos el método `transform()` para obtener la predicción sobre el conjunto de validación  (`validation_df`) en un nuevo DataFrame con la predicción en una nueva columna llamada "prediction".

5. Comprobaremos el error obtenido.

6. Nos quedaremos con el modelo que obtenga el menor error en validación.

#### ¿Por qué usar nuestra propia validación y no la de MLlib?

Un tema importante del filtro colaborativo es como dar puntuaciones a los nuevos usuarios (usuarios que no han puesto ninguna puntuación). Algunos sistemas dan unas predicciones por defecto y otros no predicen nada para nuevos usuarios. Este segundo es el caso de Spark ALS, produce NaN's para las recomendaciones a usuarios nuevos. 

Sin entrar en mucho detalle, para usar  [CrossValidator](http://spark.apache.org/docs/2.0.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) debemos usar un evaluador como RMSE que no soporta NaN's y a nada que haya un NaN el resultado serán NaN y la elección de parámetros no tendrá sentido. El problema es que al hacer el particionamiento puede haber usuarios sin puntuaciones en training, por lo que sus puntuaciones predichas en validación serán NaN.

Este es un problema con diferentes soluciones. En nuestro caso, simplemente eliminaremos los NaNs antes de calcular el error cuadrático medio (RMSE), y esta es la razón por la que lo hacemos manualmente.

**NOTA: el próximo código tardará un poco en ejecutarse**

In [34]:
# Sustituye <RELLENAR> por el código correspondiente
from pyspark.ml.recommendation import ALS

# Inicializamos el Estimator ALS
als = ALS()

# Establecemos los parámetros para ALS
als.setMaxIter(5)\
    .setSeed(seed)\
    .setRegParam(0.1)\
    .setUserCol("userId")\
    .setItemCol("movieId")
    # <RELLENAR>  # Establecer las columnas de usuario, item, rating y predicción, setUserCol, setItemCol...

# Importamos el Evaluator para el conjunto de validación
from pyspark.ml.evaluation import RegressionEvaluator

# Creamos un evaluator RMSE usando la etiqueta que es rating y la predicción que es prediction
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

tolerance = 0.03
ranks = [4, 8, 12]
errors = [0, 0, 0]
models = [0, 0, 0]
err = 0
min_error = float('inf')
best_rank = -1
for rank in ranks:
    # Establecemos el valor de rank con setRank para ALS
    als.setRank(rank)
    # Entrenamos el modelo con estos parámetros
    model = als.fit(training_df)
    # Ejecutamos el modelo para predecir los valores usando transform() en validación (validation_df)
    predict_df = model.transform(validation_df)

    # Eliminamos los valores NaN
    predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

    # Ejecutamos el evaluador RMSE creado previamente, reg_eval.evaluate, sobre el DataFrame predicted_ratings_df
    error = reg_eval.evaluate(predicted_ratings_df)
    errors[err] = error
    models[err] = model
    print 'Para el rank %s el RMSE es %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_rank = err
    err += 1

als.setRank(ranks[best_rank])
print 'El mejor modelo ha sido entrenado con rank %s' % ranks[best_rank]
my_model = models[best_rank]

Para el rank 4 el RMSE es 0.827825178948
Para el rank 8 el RMSE es 0.815401279892
Para el rank 12 el RMSE es 0.809404403199
El mejor modelo ha sido entrenado con rank 12


In [35]:
# TEST
Test.assertEquals(__builtin__.round(min_error, 2), 0.81, "Valor no esperado para el mejor RMSE. Debía ser 0.81 (redondeado). Obtenido {0}".format(__builtin__.round(min_error, 2)))
Test.assertEquals(ranks[best_rank], 12, "Valor no esperado para el mejor rank. Debía ser 12. Obtenido {0}".format(ranks[best_rank]))
Test.assertEqualsHashed(als.getItemCol(), "18f0e2357f8829fe809b2d95bc1753000dd925a6", "Columna de item en ALS incorrecta {0}.".format(als.getItemCol()))
Test.assertEqualsHashed(als.getUserCol(), "db36668fa9a19fde5c9676518f9e86c17cabf65a", "Columna de user en ALS incorrecta {0}.".format(als.getUserCol()))
Test.assertEqualsHashed(als.getRatingCol(), "3c2d687ef032e625aa4a2b1cfca9751d2080322c", "Columna de rating en ALS incorrecta {0}.".format(als.getRatingCol()))

1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.


### (2c) Testeando el modelo

Para saber lo bueno que es realmente el modelo obtenido debemos utilizar el conjunto de test (`test_df`), que no ha sido utilizado ni para el entrenamiento ni para la elección de parámetros `test_df` dataset. Utilizaremos el modelo obtenido con el mejor rank (`best_rank`) almacenado en `my_model`para realizar las predicciones sobre el conjunto de test y obtendremos el error sobre este conjunto usando la raíz error cuadrático medio RMSE.

Debes seguir los siguientes pasos:
* Utiliza el método `transform()` del modelo `my_model` para predecir las puntuaciones en el conjunto de `test_df`. Obtendremos un nuevo DataFrame `predict_df`.
* Filtramos los valores NaN obtenidos en la predicción. Utilizar el código incluido.
* Usar el evaluador de RMSE `reg_eval` para obtener el error en test.

In [36]:
# Sustituye <RELLENAR> por el código correspondiente

# Predecir los valores para test_df
predict_df = my_model.transform(test_df)

# Eliminamos los valores NaN
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))

# Ejecutamos el evaluador RMSE, reg_eval, sobre predicted_test_df
test_RMSE = reg_eval.evaluate(predicted_test_df)

print('El modelo ha obtenido un RMSE en test de {0}'.format(test_RMSE))

El modelo ha obtenido un RMSE en test de 0.808681089623


In [37]:
# TEST Testeando el modelo (2c)
Test.assertTrue(__builtin__.abs(test_RMSE - 0.809624038485) < tolerance, 'incorrect test_RMSE: {0:.11f}'.format(test_RMSE))

1 test passed.


### (2d) Comparando el modelo obtenido 

Atendiendo solo al RMSE obtenido es difícil saber si el modelo es de calidad o no. Sin embargo, podemos comparar este error frente a lo que sería predecir para todas las películas y usuarios la media de todas las puntuaciones en el entrenamiento. Es evidente que nuestro modelo debe comportarse mejor que esta predicción "tonta".

Pasos a seguir:
* Usar el DataFrame `training_df` para calcular la media de todas las puntuaciones (usar agg o groupBy() + avg).
* Utilizar la puntuación media junto con el DataFrame `test_df` para crear otro DataFrame (`test_for_avg_df`) con una columna `prediction` con el valor de puntuación medio para todas las filas. **Nota**: Utilizar la función `lit()` para crear la columna con dicho valor. Utiliza la transformación `withColumn`.
* Utiliza el evaluador `reg_eval` para obtener el error sobre el nuevo DataFrame `test_for_avg_df`.

In [40]:
# Sustituye <RELLENAR> por el código correspondiente
# Calcula la media de todas las puntuaciones en training_df training_avg_rating
avg_rating_df = training_df.agg({'rating':'avg'})

# Extrae el valor medio de las puntuaciones (fila 0, columna 0)
training_avg_rating = avg_rating_df.collect()[0][0]

print('El valor medio de las puntuaciones en el conjunto de entrenamiento es {0}'.format(training_avg_rating))

# Añade una columna con el valor medio de las puntuaciones
test_for_avg_df = test_df.withColumn('prediction', lit(training_avg_rating)) 

# Ejecuta el evaluador RMSE, reg_eval, sobre el DataFrame test_for_avg_dd
test_avg_RMSE = reg_eval.evaluate(test_for_avg_df)

print("El RMSE de la predicción constante para todos es {0}".format(test_avg_RMSE))

El valor medio de las puntuaciones en el conjunto de entrenamiento es 3.52540083198
El RMSE de la predicción constante para todos es 1.05144966746


In [41]:
# TEST Comparando el modelo obtenido (2d)
Test.assertTrue(__builtin__.abs(training_avg_rating - 3.52547984237) < 0.0001,
                'Valor incorrecto de training_avg_rating (expected 3.52547984237): {0:.11f}'.format(training_avg_rating))
Test.assertTrue(__builtin__.abs(test_avg_RMSE - 1.05190953037) < 0.001,
                'Valor incorrecto de test_avg_RMSE (expected 1.0519743756): {0:.11f}'.format(test_avg_RMSE))

1 test passed.
1 test passed.


¡Ya sabemos cómo predecir puntuaciones para las películas!

## Parte 3: Predicciones para ti
En esta última parte del ejercicio vamos a predecir las películas que habría que recomendarte a ti mismo. Para ello, primero necesitamos saber tus preferencias.

Vamos a crear un DataFrame con tus preferencias denominado `ratings_df`.

**(3a) Tus preferencias**

Para ayudarte de la hora de establecer tus preferencias, a continuación puedes obtener una lista con los nombres y códigos de las 50 películas con mejor puntuación del DataFrame `movies_with_500_ratings_or_more`.

In [56]:
print 'Películas mejor puntuadas:'
print '(ID de la película, número de puntuaciones, puntuación media, título de la película)'
movies_with_500_ratings_or_more.orderBy(movies_with_500_ratings_or_more['average'].desc()).show(5)

Películas mejor puntuadas:
(ID de la película, número de puntuaciones, puntuación media, título de la película)
+-------+-----+-----------------+--------------------+
|movieId|count|          average|               title|
+-------+-----+-----------------+--------------------+
|    318|63366|4.446990499637029|Shawshank Redempt...|
|    858|41355|4.364732196832306|Godfather, The (1...|
|     50|47006|4.334372207803259|Usual Suspects, T...|
|    527|50054|4.310175010988133|Schindler's List ...|
|   1221|27398|4.275640557704942|Godfather: Part I...|
+-------+-----+-----------------+--------------------+
only showing top 5 rows



El ID de usuario 0 no está asignado, por lo que lo usaremos para tus puntuaciones. La variable `my_user_ID` tiene el valor 0 asignado. Utilizándolo, crea un nuevo DataFrame llamado `my_ratings_df` con tus puntuaciones para al menos 10 películas. Cada entrada tiene que estar en el siguiente formato:  `(my_user_id, movieID, rating)`.  

Como en el dataset original, las puntuaciones deben estar entre 1 y 5 (incluidos ambos). Si no has visto al menos 10 de esas películas puedes mostrar más cambiando el parámetro pasado a `take()` hasta que encuentres 10 películas que hayas visto (o sino puedes establecer la puntuación que crees que tendrían las películas para ti).

In [46]:
# Sustituye <RELLENAR> por el código correspondiente
from pyspark.sql import Row
my_user_id = 0

# Ten en cuenta que el ID de las películas es el último número en cada línea
# Un error común es utilizar el número de puntuación como ID de película, ¡ten cuidado!
my_rated_movies = [ 
    (my_user_id, 527, 4.5),
    (my_user_id, 260, 1.0),
    (my_user_id, 1196, 1.0),
    (my_user_id, 2571, 1.0),
    (my_user_id, 2324, 5.0),
    (my_user_id, 2571, 1.0),
    (my_user_id, 7153, 4.0),
    (my_user_id, 4993, 4.0),
    (my_user_id, 1262, 4.0),
    (my_user_id, 3462, 3.5),
    
    #<RELLENAR>
     # El formato de cada línea debe ser (my_user_id, movie ID, tu puntuación)
     # Por ejemplo, para dar a la película "Star Wars: Episode IV - A New Hope (1977)" cinco estrellas, debería añadir la siguiente línea
     #   (my_user_id, 260, 5),
]

my_ratings_df = spark.createDataFrame(my_rated_movies, ['userId','movieId','rating'])
print 'Mis puntuaciones son:'
my_ratings_df.show(10)

Mis puntuaciones son:
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     0|    527|   4.5|
|     0|    260|   1.0|
|     0|   1196|   1.0|
|     0|   2571|   1.0|
|     0|   2324|   5.0|
|     0|   2571|   1.0|
|     0|   7153|   4.0|
|     0|   4993|   4.0|
|     0|   1262|   4.0|
|     0|   3462|   3.5|
+------+-------+------+



### (3b) Unión de tus puntuaciones con el conjunto de entrenamiento

Para poder obtener nuevas predicciones para ti, debemos incluir las puntuaciones previas en el conjunto de entrenamiento. Utiliza para ello la transformación [unionAll()](http://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.unionAll); utiliza `unionAll()` para crear un nuevo conjunto de entrenamiento con tus puntuaciones (`my_ratings_df`) y el conjunto de train `training_df`.

In [47]:
# Sustituye <RELLENAR> por el código correspondiente

# Utiliza unionAll() para unir my_ratings_df con training_df
training_with_my_ratings_df = training_df.unionAll(my_ratings_df)

print ('El conjunto de entrenamiento tiene ahora %s más filas que el original' %
       (training_with_my_ratings_df.count() - training_df.count()))
assert (training_with_my_ratings_df.count() - training_df.count()) == my_ratings_df.count()

El conjunto de entrenamiento tiene ahora 10 más filas que el original


### (3c) Entrenamiento del modelo con tus puntuaciones

Ahora vamos a entrenar un nuevo modelo considerando tus puntuaciones junto con las que ya teníamos en el training original. Utilizaremos los mismos parámetros que en las partes (2b) y (2c). **Recuerda incluir TODOS los parámetros**.

**NOTA:** Esta ejecución tardará un poco, paciencia.

In [49]:
# Sustituye <RELLENAR> por el código correspondiente

# Establece los parámetros para ALS: 
#    regularización, columna de items, usuarios y puntuaciones y el rank (el mejor obtenido)
als.setPredictionCol("prediction")\
   .setMaxIter(5)\
   .setSeed(seed)\
    .setRegParam(0.1)\
    .setUserCol("userId")\
    .setItemCol("movieId")\
    .setRatingCol("rating")\
    .setRank(12.0)

# Entrenar el modelo con los parámetros establecidos, es decir, utilizar fit() con el nuevo training set: training_with_my_ratings_df
my_ratings_model = als.fit(training_with_my_ratings_df)

### (3d) Comprobar el error RMSE del nuevo modelo

Vamos a calcular el RMSE obtenido para el nuevo modelo para el conjunto de test.
* Utiliza el modelo (`transform()`) para obtener las predicciones para el conjunto de test `test_df`
* Después, utiliza `reg_eval` (el evaluador) para calcular el RMSE (método `evaluate`).

In [None]:
predict_df = my_model.transform(test_df)

# Eliminamos los valores NaN
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))

# Ejecutamos el evaluador RMSE, reg_eval, sobre predicted_test_df
test_RMSE = reg_eval.evaluate(predicted_test_df)


In [50]:
# Sustituye <RELLENAR> por el código correspondiente
my_predict_df = my_ratings_model.transform(test_df)

# Filtramos los valores NaN
predicted_test_my_ratings_df = my_predict_df.filter(my_predict_df.prediction != float('nan'))

# Obtener el error usando reg_eval y predicted_test_my_ratings_df
test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print('El modelo tiene un RMSE en el conjunto de test de {0}'.format(test_RMSE_my_ratings))

El modelo tiene un RMSE en el conjunto de test de 0.809872889746


### (3e) Predicción de nuevas puntuaciones para ti

Hasta ahora lo único que hemos hecho es calcular el error del modelo. Ahora, lo que vamos a hacer es predecir que puntuaciones le habrías dado a las películas que todavía no has puntuado.

Debemos seguir los siguientes pasos:
* Filtrar las películas que has puntuado manualmente. Utilizaremos la variable `my_rated_movie_ids` para guardar los IDs de las películas puntuadas y almacenaremos las no puntuadas en un DataFrame `not_rated_df`.

   **Nota**: La función [Column.isin()](http://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.Column.isin)
   y el operador lógico "no" `~` pueden ayudar en esta operación. Aquí viene un ejemplo con `isin()`:

```
    > df1 = spark.createDataFrame([("Jim", 10), ("Julie", 9), ("Abdul", 20), ("Mireille", 19)], ["name", "age"])
    > df1.show()
    +--------+---+
    |    name|age|
    +--------+---+
    |     Jim| 10|
    |   Julie|  9|
    |   Abdul| 20|
    |Mireille| 19|
    +--------+---+

    > names_to_delete = ["Julie", "Abdul"] # esto es  una lita de Python
    > df2 = df1.filter(~ df1["name"].isin(names_to_delete)) # "NOT IN"
    > df2.show()
    +--------+---+
    |    name|age|
    +--------+---+
    |     Jim| 10|
    |Mireille| 19|
    +--------+---+
```

* Transformar `not_rated_df` en `my_unrated_movies_df` siguiendo los siguientes pasos:
    - renombrar la columna "ID" por "movieId" utilizando `withColumnRenamed()`
    - añadir una columna "userId" con el valor que tiene la variable `my_user_id` definida anteriormente. Utiliza para ello el método `withColumn()` junto con la función `lit()` para añadir una columna con el mismo valor en todas las filas.

* Crear el DataFrame `predicted_ratings_df` aplicando el modelo aprendindo `my_ratings_model` al DataFrame `my_unrated_movies_df`.

In [53]:
print test_df.columns
print my_unrated_movies_df.columns

['userId', 'movieId', 'rating']
['movieId', 'title', 'my_user_id']


In [55]:
# Sustituye <RELLENAR> por el código correspondiente

# Creamos una lista con los IDs de las películas ya puntuadas
my_rated_movie_ids = [x[1] for x in my_rated_movies] # simplemente cogemos el segundo valor de las tuplas

# Filtrar las películas ya puntuadas - utiliza my_rated_movie_ids con el método isin sobre la columna "ID" - ver ejemplo anterior
not_rated_df = movies_df.filter(~ movies_df["ID"].isin(my_rated_movie_ids))


# Renombrar la columna "ID" por "movieId" y añadir la columna "userId" con my_user_id usando la función lit()
my_unrated_movies_df = not_rated_df.withColumn('userId',lit(my_user_id)).withColumnRenamed('ID','movieId')

# Utilizar el modelo my_rating_model para predecir las puntuaciones dadas a las películas que no han sido puntuadas manualmente
raw_predicted_ratings_df = my_ratings_model.transform(my_unrated_movies_df)

# Filtramos los NaN
predicted_ratings_df = raw_predicted_ratings_df.filter(raw_predicted_ratings_df['prediction'] != float('nan'))

### (3f) Predicción final de las películas recomendadas para ti

Ya hemos predecido las puntuaciones, ahora podemos mostrar las 25 películas con mejores puntuaciones que serían las recomendadas.

Pasos a seguir:
* Unir el DataFrame `predicted_ratings_df` con `movie_names_with_avg_ratings_df` para obtener el conteo de puntuaciones para cada película
* Ordenar el DataFrame resultante (`predicted_with_counts_df`) por la puntuación (descendente) y eliminar las películas con 75 puntuaciones o menos
* Imprimir el top 25 de las películas restantes

In [61]:
# Sustituye <RELLENAR> por el código correspondiente

# Utiliza un join() para unir predicted_ratings_df con movie_names_with_avg_ratings_df (campo movieId)
cond = [predicted_ratings_df.movieId == movie_names_with_avg_ratings_df.movieId]
predicted_with_counts_df = predicted_ratings_df.join(movie_names_with_avg_ratings_df, cond)
# Utiliza un filter para filtrar las películas con al menos de 75 puntuaciones, aprovecha para ordenar el DataFrame por la columna prediction de manera descendente
predicted_highest_rated_movies_df = predicted_with_counts_df.filter('count > 75').sort(desc('prediction'))

# Mostrar el top25 de las películas con show()
print ('El top 25 de películas recomendadas para ti es (películas con al menos 75 puntuaciones):')
predicted_highest_rated_movies_df.show(25)

El top 25 de películas recomendadas para ti es (películas con al menos 75 puntuaciones):
+-------+--------------------+------+----------+-------+-----+------------------+--------------------+
|movieId|               title|userId|prediction|movieId|count|           average|               title|
+-------+--------------------+------+----------+-------+-----+------------------+--------------------+
|   3808|Two Women (Ciocia...|     0|  4.534254|   3808|  250|              3.87|Two Women (Ciocia...|
|  27857|As it is in Heave...|     0|  4.474224|  27857|  184|3.8668478260869565|As it is in Heave...|
|   3746|Butterfly (La len...|     0|  4.468356|   3746|  367|3.7724795640326976|Butterfly (La len...|
|   4459|Alaska: Spirit of...|     0|  4.315805|   4459|   89| 3.443820224719101|Alaska: Spirit of...|
|   1893|Beyond Silence (J...|     0|  4.314941|   1893|  185|  3.67027027027027|Beyond Silence (J...|
|   2197|    Firelight (1997)|     0| 4.2988453|   2197|  141|3.5425531914893615|    Fi