# Motor de recomendación con PySpark

___
## Introducción

Para la realización de esta tarea se utilizará el [dataset](http://grouplens.org/datasets/movielens/100k/) que contiene 100000 puntuaciones de 1000 usuarios sobre 1700 películas de Movielens. Se recomienda leer el fichero README.txt para entender el dataset.

El objetivo es **crear un recomendador** utilizando PySpark para futuros usuarios basandose en los datos historicos. Se utilizara el algoritmo **ALS**.

Los pasos a seguir son:
1. Cargar los datos en un dataframe
2. Transformar el dataframe para dejarlo listo si fuese necesario
3. Crear un modelo para predecir la puntuacion de cada usuario-película
4. Valorar la calidad con el conjunto de test

Como extra se intetará sacar conclusiones sobre las puntuaciones y una serie características.

Para la ejecución de la tarea se ha utilizado el **entorno de Databricks con Spark 2.2.1**

___
## Preparación del entorno: paquetes y variables

Lo primero que vamos hacer es **importar todos los paquetes** necesarios y crear el contexto para los data frames (SQLContext)

In [4]:
# -*- coding: utf-8 -*-
%matplotlib inline

import re
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.recommendation import ALS, ALSModel

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc

sqlc = SQLContext(sc)

A continuación se crean las **variables** para:
* Las rutas y nombres de los siguientes ficheros: puntuaciones usuario-película, información de los usuarios, información de las películas, información de las categorías de las películas
* El separador de los campos en los ficheros que pueden ser un pipe o un tabulador
* El nombre y posición de las columnas de los ficheros; además del tipo de dato que será en el dataframe

In [6]:
# Rutas y nombres de los ficheros y separador por defecto que utiliza
RUTA_FICHERO_PUNTUACION = '/FileStore/tables/u.data'
RUTA_FICHERO_USUARIO = '/FileStore/tables/u.user'
RUTA_FICHERO_ITEM = '/FileStore/tables/u.item'
RUTA_FICHERO_GENERO = '/FileStore/tables/u.genre'
SEPARADOR_TABULADOR = '\t'
SEPARADOR_PIPE = '|'

# Variables con información de las columnas de los ficheros: 
#    - posicion 0: nombre de la columna en el fichero
#    - posicion 1: posicion de la columna en el fichero
#    - posicion 2: tipo dataframe
#    - posicion 3: True si la columna puede ser nullable. Se utilizara al convertir de RDD a dataframe.
# Se crea también una lista con ellos dentro por si se quiere iterar
# Fichero puntuaciones
COL_PUNTUACION_USERID = ('userid', 0, IntegerType(), True)
COL_PUNTUACION_ITEMID = ('itemid', 1, IntegerType(), True)
COL_PUNTUACION_RATING = ('rating', 2, FloatType(), True)
COL_PUNTUACION_TIMESTAMP = ('timestamp', 3, IntegerType(), True)
COL_PUNTUACION_PREDICTION = ('prediction', None, None, None)
COL_PUNTUACION_NUMEROPUNTUACIONES = ('numero_puntuaciones', None, None, None)
COLS_PUNTUACION = (
    COL_PUNTUACION_USERID, 
    COL_PUNTUACION_ITEMID, 
    COL_PUNTUACION_RATING, 
    COL_PUNTUACION_TIMESTAMP)
# Fichero usuario
COL_USUARIO_ID = ('user_userid', 0, IntegerType(), True)
COL_USUARIO_AGE = ('age', 1, IntegerType(), True)
COL_USUARIO_GENDER = ('gender', 2, StringType(), True)
COL_USUARIO_OCCUPATION = ('occupation', 3, StringType(), True)
COL_USUARIO_ZIPCODE = ('zipcode', 4, StringType(), True)
COLS_USUARIO = (
    COL_USUARIO_ID, 
    COL_USUARIO_AGE, 
    COL_USUARIO_GENDER, 
    COL_USUARIO_OCCUPATION,
    COL_USUARIO_ZIPCODE)
# Fichero genero
COL_GENERO_NAME = ('name', 0, StringType(), False)
COL_GENERO_ID = ('genre_genreid', 1, IntegerType(), False)
COLS_GENERO = (
    COL_GENERO_NAME, 
    COL_GENERO_ID)
# Fichero item
COL_ITEM_ID = ('item_itemid', 0, IntegerType(), False)
COL_ITEM_TITLE = ('title', 1, StringType(), False)
COL_ITEM_RELEASEDATE = ('releasedate', 2, StringType(), False)
COL_ITEM_VIDERELEASEDATE = ('videoreleasedate', 3, StringType(), False)
COL_ITEM_IMDBURL = ('imdburl', 4, StringType(), False)
COL_ITEM_UNKNOWN = ('unknown', 5, IntegerType(), False)
COL_ITEM_ACTION = ('action', 6, IntegerType(), False)
COL_ITEM_ADVENTURE = ('adventure', 7, IntegerType(), False)
COL_ITEM_ANIMATION = ('animation', 8, IntegerType(), False)
COL_ITEM_CHILDRENS = ('childrens', 9, IntegerType(), False)
COL_ITEM_COMEDY = ('comedy', 10, IntegerType(), False)
COL_ITEM_CRIME = ('crime', 11, IntegerType(), False)
COL_ITEM_DOCUMENTARY = ('documentary', 12, IntegerType(), False)
COL_ITEM_DRAMA = ('drama', 13, IntegerType(), False)
COL_ITEM_FANTASY = ('fantasy', 14, IntegerType(), False)
COL_ITEM_FILMNOIR = ('filmnoir', 15, IntegerType(), False)
COL_ITEM_HORROR = ('horror', 16, IntegerType(), False)
COL_ITEM_MUSICAL = ('musical', 17, IntegerType(), False)
COL_ITEM_MYSTERY = ('mystery', 18, IntegerType(), False)
COL_ITEM_ROMACE = ('Romance', 19, IntegerType(), False)
COL_ITEM_SCIFI = ('scifi', 20, IntegerType(), False)
COL_ITEM_THRILLER = ('thriller', 21, IntegerType(), False)
COL_ITEM_WAR = ('war', 22, IntegerType(), False)
COL_ITEM_WESTERN = ('western', 23, IntegerType(), False)
COL_ITEM_GENEROS = ('generos', None, ArrayType(StringType()), False)
COL_ITEM_GENERO = ('genero', None, None, None)
COLS_ITEM = (
    COL_ITEM_ID, 
    COL_ITEM_TITLE,
    COL_ITEM_RELEASEDATE,
    COL_ITEM_VIDERELEASEDATE,
    COL_ITEM_IMDBURL)
COLS_ITEM_TODOSGENEROS = (
    COL_ITEM_UNKNOWN,
    COL_ITEM_ACTION, 
    COL_ITEM_ADVENTURE,
    COL_ITEM_ANIMATION,
    COL_ITEM_CHILDRENS,
    COL_ITEM_COMEDY,
    COL_ITEM_CRIME,
    COL_ITEM_DOCUMENTARY,
    COL_ITEM_DRAMA,
    COL_ITEM_FANTASY,
    COL_ITEM_FILMNOIR,
    COL_ITEM_HORROR,
    COL_ITEM_MUSICAL,
    COL_ITEM_MYSTERY,
    COL_ITEM_ROMACE,
    COL_ITEM_SCIFI,
    COL_ITEM_THRILLER,
    COL_ITEM_WAR,
    COL_ITEM_WESTERN)


___
## Carga del dataframe y acciones

Lo primero es cargar los dataframes:
* **Puntuaciones**: se elimina el timestamp ya que se considera que no es importante.
* **Usuarios**: no se eliminará ningún campo ya que todos los atributos (edad, sexo, profesión, código postal) pueden ser importantes a la hora de hacer un posterior análisis. Se podría discutir si el código postal es importante pero para mí sí lo es ya que dependiendo de la zona podrían tener gustos distintos.
* **Item**: se eliminará release date, video release date y IMDb URL ya son atributos que no aportan mucha información. De nuevo, se podría debatir si la fecha es importante o no ya que puede haber usuarios a los que les guste sólo las películas antiguas; yo no lo he considerado así.
* **Género**: los dos atributos son necesarios ya que uno es el id o posición en el fichero Item y el otro el nombre del género.

In [8]:
# Se carga las puntuaciones utilizando COLS_PUNTUACION y después se elimina el timestamp
esquemaPuntuacion = StructType()
for col in COLS_PUNTUACION:
    esquemaPuntuacion.add(StructField(col[0], col[2], col[3]))
dataframePuntuacion = sqlc.read.format('com.databricks.spark.csv'). \
                option('delimiter', SEPARADOR_TABULADOR). \
                option('header', 'false'). \
                load(RUTA_FICHERO_PUNTUACION, schema=esquemaPuntuacion)
cabecerasReducidas = list()
for col in COLS_PUNTUACION:
    if col[0] != COL_PUNTUACION_TIMESTAMP[0]:
        cabecerasReducidas.append(col[0])
dataframePuntuacion = dataframePuntuacion.select(cabecerasReducidas)

# Se carga los usuarios utilizando COLS_USUARIO
esquemaUsuario = StructType()
for col in COLS_USUARIO:
    esquemaUsuario.add(StructField(col[0], col[2], col[3]))
dataframeUsuario = sqlc.read.format('com.databricks.spark.csv'). \
                option('delimiter', SEPARADOR_PIPE). \
                option('header', 'false'). \
                load(RUTA_FICHERO_USUARIO, schema=esquemaUsuario)

# Se carga los items utilizando COLS_ITEM y COLS_ITEM_TODOSGENEROS
esquemaItem = StructType()
for col in COLS_ITEM:
    esquemaItem.add(StructField(col[0], col[2], col[3]))
for col in COLS_ITEM_TODOSGENEROS:
    esquemaItem.add(StructField(col[0], col[2], col[3]))
dataframeItem = sqlc.read.format('com.databricks.spark.csv'). \
                option('delimiter', SEPARADOR_PIPE). \
                option('header', 'false'). \
                load(RUTA_FICHERO_ITEM, schema=esquemaItem)
cabecerasReducidas = list()
for col in COLS_ITEM:
    if col[0] == COL_ITEM_ID[0] or col[0] == COL_ITEM_TITLE[0]:
        cabecerasReducidas.append(col[0])
for col in COLS_ITEM_TODOSGENEROS:
    cabecerasReducidas.append(col[0])
dataframeItem = dataframeItem.select(cabecerasReducidas)

# Se carga los géneros utilizando COLS_GENERO
esquemaGenero = StructType()
for col in COLS_GENERO:
    esquemaGenero.add(StructField(col[0], col[2], col[3]))
dataframeGenero = sqlc.read.format('com.databricks.spark.csv'). \
                option('delimiter', SEPARADOR_PIPE). \
                option('header', 'false'). \
                load(RUTA_FICHERO_GENERO, schema=esquemaGenero)

print dataframePuntuacion.show(4)
print dataframeUsuario.show(4)
print dataframeItem.show(4)
print dataframeGenero.show(4)

Se mira que no haya ningún NULL o NAN en rating y después se añade los atributos de los usuarios y de las películas al dataframe de puntuaciones por si los atributos pueden ser de utilidad al modelo ALS.

In [10]:
# Número de NAN y NULL en rating
print 'Número de filas con NAN en las puntuaciones en la columna rating: {0}', dataframePuntuacion.where(isnan(COL_PUNTUACION_RATING[0])).count()
print 'Número de filas con NULL en las puntuaciones en la columna rating: {0}', dataframePuntuacion.where(isnull(COL_PUNTUACION_RATING[0])).count()

# Join puntuaciones-usuarios-item
dataframePuntuacionUsuarioItem = dataframePuntuacion. \
                                  join(dataframeUsuario, dataframePuntuacion.userid==dataframeUsuario.user_userid, 'left_outer'). \
                                  join(dataframeItem, dataframePuntuacion.itemid==dataframeItem.item_itemid, 'left_outer')
# Se eliminan las columnas duplicadas
cabecerasReducidas = list()
for col in dataframePuntuacionUsuarioItem.columns:
    if col != COL_USUARIO_ID[0] and col !=COL_ITEM_ID[0] :
        cabecerasReducidas.append(col)
dataframePuntuacionUsuarioItem = dataframePuntuacionUsuarioItem.select(cabecerasReducidas)

print 'Columnas con el dataframe con joins: {0}', dataframePuntuacionUsuarioItem.columns
print 'Número de filas: {0}', dataframePuntuacionUsuarioItem.count()


Se divide el dataframe entre entrenamiento y test.

In [12]:
dataframePuntuacionUsuarioItem.cache()
# Se divide el dataframe en entrenamiento y test
dataframePuntuacionUsuarioItemDividido = dataframePuntuacionUsuarioItem.randomSplit([0.7, 0.3], 1234)
dataframePuntuacionUsuarioItemEntrenamiento = dataframePuntuacionUsuarioItemDividido[0]
dataframePuntuacionUsuarioItemTest = dataframePuntuacionUsuarioItemDividido[1]

___
## Creación del modelo ALS
Para crear el modelo se utilizará cross validator en el que se variarán varios parámetros: rank, maxIter, alpha y regParam.

Se ha elegido la estrategia de eliminar la fila que cuyo valor sea NAN en la predicción por los siguientes motivos:
* Se ha ejecutado 1 vez sin cross validation y se ha visto que de los 29874 puntuaciones del conjunto de test, sólo 57 no tienen valor (0,19%), por lo que no tiene que tener impacto a la hora de valorar el modelo.
* Se utiliza cross validation por lo que se tiene que utilizar este método por que si no se obtendrá NAN para el RMSE cuando no se disponga del valor de un elemento.

Se utiliza el dataframe con todos los datos de puntuaciones-usuarios-películas como se ha comentado antes. Tiene la desventajas: 1) tarda más en ejecutarse, 2) se necesitan todos estos datos cuando se quiere obtener la predicción de una puntuación para un usuario y 3) si se añade más géneros para las películas se tiene que volver a entrenar. Por su parte, se tendrían que obtener mejores resultados que sólo con el dato de usuario, item y puntuación. Dependiendo de las características del negocio (¿se tiene todos los datos de un usuario?, ¿las categorías son frecuentemente modificadas?) se reducirá o no las columnas del dataframe.

In [14]:
# Evaluador y cross validation
evaluatorRegression = RegressionEvaluator(labelCol=COL_PUNTUACION_RATING[0])

als = ALS(userCol=COL_PUNTUACION_USERID[0], itemCol=COL_PUNTUACION_ITEMID[0], ratingCol=COL_PUNTUACION_RATING[0], coldStartStrategy='drop')
# als = ALS(userCol=COL_PUNTUACION_USERID[0], itemCol=COL_PUNTUACION_ITEMID[0], ratingCol=COL_PUNTUACION_RATING[0])
grid = ParamGridBuilder(). \
        addGrid(als.rank, [5, 10, 15, 20]). \
        addGrid(als.maxIter, [5, 10]). \
        addGrid(als.alpha, [1.0, 2.0]). \
        addGrid(als.regParam, [0.1, 0.5, 1.0]). \
        build()
# grid = ParamGridBuilder().addGrid(als.rank, [5, 20]).build()
crossValidator = CrossValidator(estimator=als, estimatorParamMaps=grid, evaluator=evaluatorRegression, numFolds=2)
crossValidatorModel = crossValidator.fit(dataframePuntuacionUsuarioItemEntrenamiento)

___
## Valoración del modelo

Como se ha dicho, el modelo descarta las filas que no tienen niguna puntuación en el entrenamiento, por tanto el conjunto de test tendrá 0 NAN. A continuación se muestra como se haría si se hubiese elegido la opción NA y el describe para el de entrenamiento y test para el rating y la predicción:

In [16]:
# Se obtiene la predicción para entrenamiento y test
dataframePuntuacionUsuarioItemEntrenamientoPrediccion = crossValidatorModel.bestModel.transform(dataframePuntuacionUsuarioItemEntrenamiento)
dataframePuntuacionUsuarioItemTestPrediccion = crossValidatorModel.bestModel.transform(dataframePuntuacionUsuarioItemTest)

In [17]:
# Se muestra el número de filas y las que son NAN en el conjunto de entrenamiento y de test
print 'Conjunto de entrenamiento predicción -> Número de filas: {0}, número con NAN en predicción: {1}'.format(dataframePuntuacionUsuarioItemEntrenamientoPrediccion.count(), dataframePuntuacionUsuarioItemEntrenamientoPrediccion.where(isnan(COL_PUNTUACION_PREDICTION[0])).count())
print 'Conjunto de test predicción -> Número de filas: {0}, número con NAN en predicción: {1}'.format(dataframePuntuacionUsuarioItemTestPrediccion.count(), dataframePuntuacionUsuarioItemTestPrediccion.where(isnan(COL_PUNTUACION_PREDICTION[0])).count())

# Describe método para rating y prediction
print dataframePuntuacionUsuarioItemEntrenamientoPrediccion.select(COL_PUNTUACION_RATING[0], COL_PUNTUACION_PREDICTION[0]).describe().show()
print dataframePuntuacionUsuarioItemTestPrediccion.select(COL_PUNTUACION_RATING[0], COL_PUNTUACION_PREDICTION[0]).describe().show()

**RMSE** para el conjunto de entrenamiento y el de test. Como se puede observar es más pequeño el primero como era de esperar. No obstante el valor es bastante alto en ambos casos:

In [19]:
# Se obtiene el RMSE sobre los dos conjuntos
rmseEntrenamiento = evaluatorRegression.evaluate(dataframePuntuacionUsuarioItemEntrenamientoPrediccion, {evaluatorRegression.metricName: 'rmse'})
rmseTest = evaluatorRegression.evaluate(dataframePuntuacionUsuarioItemTestPrediccion, {evaluatorRegression.metricName: 'rmse'})

print 'RMSE en training: {0}'.format(rmseEntrenamiento)
print 'RMSE en test: {0}'.format(rmseTest)

Por último se representan en un gráfico donde el modelo ideal tendría todos los puntos en la línea oblícua.
Se tienen que hacer varias observaciones:
* Mientras la puntuación real es discreta, la predicha es continua
* El máximo de las predicciones es mayor que 5 y el mínimo menor que 1. Se podrían modificar para que estuviesen en ese rango
* No se ajustan a la línea que se ha dicho

In [21]:
# Se crea la lista con la puntuación real y la predicha
xEntrenamiento, yEntrenamiento = list(), list()
for entrenamientoPrediccion in dataframePuntuacionUsuarioItemEntrenamientoPrediccion.collect():
    xEntrenamiento.append(entrenamientoPrediccion[COL_PUNTUACION_RATING[0]])
    yEntrenamiento.append(entrenamientoPrediccion[COL_PUNTUACION_PREDICTION[0]])
xTest, yTest = list(), list()
for testPrediccion in dataframePuntuacionUsuarioItemTestPrediccion.collect():
    xTest.append(testPrediccion[COL_PUNTUACION_RATING[0]])
    yTest.append(testPrediccion[COL_PUNTUACION_PREDICTION[0]])

# Se crea el gráfico y se muestra
plt.clf()
plt.xlim(-1, 6)
plt.ylim(-1, 6)
plt.xlabel('Puntuacion real')
plt.ylabel('Puntuacion segun el modelo')
plt.title('Puntuacion real vs prediccion')

plt.plot([0, 20], [0, 20], 'b')
# Se pasan los datos de entrenamiento y test al gráfico
plt.plot(xEntrenamiento, yEntrenamiento, 'go', label='Entrenamiento')
plt.plot(xTest, yTest, 'ro', label='Test')
plt.legend(loc='lower right')

plt.show()
display()

___
## Análisis más exhaustivos

### Género de películas con mejor puntuación
Se obtendrá un dataframe donde cada fila tendrá los valores item_id-película-género. Por cada película, habrá tantas filas como géneros tenga.

Para ello:
1. Se obtendrá un diccionario con los géneros donde la clave es el id o posición en el fichero de item y la clave será el género.
2. El siguiente paso es añadir una columna con un array con los géneros que tiene cada película. Para ello se utilizará RDDs para posteriormente trasformalo en otro dataframe
3. El último paso es hacer un explode.

In [24]:
def trasformarFilaColumnasgeneroColumageneros(fila, diccionarionGenero, ultimoIndiceNoGenero=1):
    """
    Esta función trasforma la fila con atributos donde los últimos son los géneros en la forma 0/1 en otra donde los géneros con valor 1
    son unidos en una lista que contiene los nombres de los generos.

    :param fila: fila del RDD con el formato atributo1, atributo2, ..., genero1 (0/1), genero2 (0/1)...
    :param diccionarionGenero: diccionario donde el valor es la posicion del genero en la fila a partir del primer genero en la fila 
    y el valor el el nombre de ese genero. Por ejemplo genero1 sera el 0 en el diccionario, genero2 será el 1 en el diccionario...
    :param ultimoIndiceNoGenero: ultimo índice de la fila que no es género. Por defecto es 1
    :return: lista con los valores tal cual hasta ultimoIndiceNoGenero, y el último elemento es una lista con el nombre de los géneros
    de este item
    """
    indice = 0
    indiceGenero = 0
    lineaNueva = list()
    generos = list()
    for item in fila:
        if indice>ultimoIndiceNoGenero:
            if item==1:
                genero = diccionarionGenero.get(indiceGenero)
                if genero:
                    generos.append(genero)
            indiceGenero += 1
        else:
            lineaNueva.append(item)
        indice += 1
    lineaNueva.append(generos)
    return lineaNueva
  
# Se crea un diccionario con los géneros donde la clave es la columna id y el valor el nombre del género
diccionarioGenero = dict()
for row in dataframeGenero.collect():
    diccionarioGenero[row.genre_genreid] = row.name
print diccionarioGenero

# Se obtiene el nombre de todos las columnas de los géneros en el dataframe de item
columnas = list()
columnas.append(COL_ITEM_ID[0])
columnas.append(COL_ITEM_TITLE[0])
for col in COLS_ITEM_TODOSGENEROS:
    columnas.append(col[0])
# Se seleccionan solo las columnas que nos interesa: itemid, titulo, generos
dataframeItemGenero = dataframeItem.select(columnas)
rddItemGenero = dataframeItemGenero.rdd.map(lambda fila: trasformarFilaColumnasgeneroColumageneros(fila, diccionarionGenero=diccionarioGenero))

# Se crea el dataframe desde el RDD
columnas = list()
columnas.append(COL_ITEM_ID[0])
columnas.append(COL_ITEM_TITLE[0])
columnas.append(COL_ITEM_GENEROS[0])
dataframeItemGenero = rddItemGenero.toDF(columnas)
# Se utiliza explode para obtener todas las categorías en filas distintas
dataframeItemGenero = dataframeItemGenero.select(COL_ITEM_ID[0], COL_ITEM_TITLE[0], explode(dataframeItemGenero.generos).alias(COL_ITEM_GENERO[0]))
print 'Número de filas antes de hacer el explode {0} y después {1}'.format(dataframeItem.count(), dataframeItemGenero.count())
dataframeItemGenero.cache()

El siguiente paso es coger el dataframe con las predicciones de test (se podría haber cogido el de entrenamiento también pero por cuestión de tiempo de ejecución sólo se hará con el de test), eliminar las filas correspondientes a las películas y sustituirlos por los que se acaba de obtener.

In [26]:
cabecerasReducidas = list()
for col in COLS_PUNTUACION:
    if col[0] != COL_PUNTUACION_TIMESTAMP[0]:
        cabecerasReducidas.append(col[0])
for col in COLS_USUARIO:
    if col[0] != COL_USUARIO_ID[0]:
        cabecerasReducidas.append(col[0])
cabecerasReducidas.append(COL_PUNTUACION_PREDICTION[0])
dataframePuntuacionUsuarioItemTestPrediccionGenero = dataframePuntuacionUsuarioItemTestPrediccion.select(cabecerasReducidas)

# Se hace el join con el otro dataframe y se elimina itemid
dataframePuntuacionUsuarioItemTestPrediccionGenero = dataframePuntuacionUsuarioItemTestPrediccionGenero.join(dataframeItemGenero, dataframePuntuacionUsuarioItemTestPrediccionGenero.itemid==dataframeItemGenero.item_itemid, 'inner')

print 'Número de filas antes en el conjunto de test antes de hacer el join {0} y después {1}'.format(dataframePuntuacionUsuarioItemTestPrediccion.count(), dataframePuntuacionUsuarioItemTestPrediccionGenero.count())

dataframeItemGenero.unpersist()
dataframePuntuacionUsuarioItemTestPrediccionGenero.cache()

Vamos a ver cual son los **géneros mejor valorados** de media. Para ello, agrupamos por género y obtenemos la media tanto de rating como de predicción. Además se incluye el número de puntuaciones:

In [28]:
print dataframePuntuacionUsuarioItemTestPrediccionGenero.groupBy(COL_ITEM_GENERO[0]).agg(avg(COL_PUNTUACION_RATING[0]).alias(COL_PUNTUACION_RATING[0]), avg(COL_PUNTUACION_PREDICTION[0]).alias(COL_PUNTUACION_PREDICTION[0]), count('*').alias(COL_PUNTUACION_NUMEROPUNTUACIONES[0])).sort(COL_PUNTUACION_RATING[0], ascending=False).show()

Como se puede ver Film-Noir es el género como mejor valoración aunque es de las que menos opiniones se tiene.

Se podría hacer también una agrupación por **género y código postal** para ver si en una zona predomina las buenas puntuaciones de un determinado género. Como puede haber películas que sólo lo hayan valorado 1 usuario por código postal, se ha decidido en poner un mínimo de votaciones que será 60:

In [30]:
dataframePuntuacionUsuarioItemTestPrediccionGeneroGroup = dataframePuntuacionUsuarioItemTestPrediccionGenero.groupBy([COL_ITEM_GENERO[0], COL_USUARIO_ZIPCODE[0]]).agg(avg(COL_PUNTUACION_RATING[0]).alias(COL_PUNTUACION_RATING[0]), avg(COL_PUNTUACION_PREDICTION[0]).alias(COL_PUNTUACION_PREDICTION[0]), count('*').alias(COL_PUNTUACION_NUMEROPUNTUACIONES[0]))
# .where(dataframePuntuacionUsuarioItemTestPrediccionGenero.numero_puntuaciones>20).sort(COL_PUNTUACION_RATING[0], ascending=False).show()
print dataframePuntuacionUsuarioItemTestPrediccionGeneroGroup.where(dataframePuntuacionUsuarioItemTestPrediccionGeneroGroup.numero_puntuaciones>60).sort(COL_PUNTUACION_RATING[0], ascending=False).show()

### Películas mejor valoradas
A continuación se va a obtener las **películas mejor valoradas** de media en el conjunto de test volviendo a coger el dataframe fruto de la transformación con el modelo con un mínimo de 60 votaciones:

In [32]:
dataframePuntuacionUsuarioItemTestPrediccionGroup = dataframePuntuacionUsuarioItemTestPrediccion.groupBy(COL_ITEM_TITLE[0]).agg(avg(COL_PUNTUACION_RATING[0]).alias(COL_PUNTUACION_RATING[0]), avg(COL_PUNTUACION_PREDICTION[0]).alias(COL_PUNTUACION_PREDICTION[0]), count('*').alias(COL_PUNTUACION_NUMEROPUNTUACIONES[0]))
print dataframePuntuacionUsuarioItemTestPrediccionGroup.where(dataframePuntuacionUsuarioItemTestPrediccionGroup.numero_puntuaciones>60).sort(COL_PUNTUACION_RATING[0], ascending=False).show()

No están nada mal las películas que aparecen en las primeras posiciones: Schindler's List, Casablanca, Star Wars (1977)...

---
## Conclusiones

Como se ha visto es muy fácil construir un modelo de recomendación de películas, lo más difícil es de donde obtener los datos para contruirlo (cold start).

A partir de los datos, se pueden obtener información muy atractiva para la empresas de este sector y poder recomendar películas, por ejemplo, dependiendo de la zona donde vive.

Siguientes pasos:
1. Mejorar la programación ya que en algunos casos se utilizan el nombre de la columnas y en otros la forma datafram.columna. Es preferible la primera forma.
2. Hacer un estudio más exhaustivo sobre los datos y ver, por ejemplo, si el género de la película y el sexo de la persona tiene relación y mejorar el motor de recomendación.