# Motor de recomendación con PySpark

___
## Tarea 1 - Módulo 8

La práctica consiste en construir un recomendador con el módulo de recomendación de Spark (en la versión 1.6 solamente implementa el método ALS) sobre un conjunto de entrenamiento, realizar predicciones sobre un conjunto de test y posteriormente evaluar su rendimiento aplicando un RegressionEvaluator con metricName=”rmse”, dado que los valores finales son las valoraciones que se supone que daría el usuario a una determinada película que aún no ha visto, y por tanto numéricos.

El paquete de pySpark de recomendaciones se encuentra en:

http://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS

El recomendador se construirá utilizando el dataset de 100000 puntuaciones de 1000 usuarios sobre 1700 películas de Movielens

http://grouplens.org/datasets/movielens/100k/  (user id | item id | rating | timestamp)

Se recomienda leer el archivo README.txt para entender bien los datos:

http://files.grouplens.org/datasets/movielens/ml-100k-README.txt


El entregable será un notebook de Python con el código pyspark correspondiente.

___
## Preparación del entorno: paquetes y variables

Lo primero que vamos hacer es **importar todos los paquetes** necesarios y crear el contexto para los data frames (SQLContext)

In [4]:
import re
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.recommendation import ALS, ALSModel

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc

sqlc = SQLContext(sc)

A continuación se crean las **variables** para:
* Las rutas y nombres de los siguientes ficheros: puntuaciones usuario-película, información de los usuarios, información de las películas, información de las categorías de las películas
* El separador de los campos en los ficheros que pueden ser un pipe o un tabulador
* El nombre y posición de las columnas de los ficheros; además del tipo de dato que será en el dataframe

In [6]:
# Rutas, nombres de los datasets y separadores que se usan

RUTA_FICHERO_PUNTUACION = '/FileStore/tables/u.data'
RUTA_FICHERO_USUARIO = '/FileStore/tables/u.user'
RUTA_FICHERO_ITEM = '/FileStore/tables/u.item'
RUTA_FICHERO_GENERO = '/FileStore/tables/u.genre'
SEPARADOR_TABULADOR = '\t'
SEPARADOR_PIPE = '|'

# Variables con información de las columnas de los ficheros: 
#    - posicion 0: nombre de la columna en el fichero
#    - posicion 1: posicion de la columna en el fichero
#    - posicion 2: tipo dataframe
#    - posicion 3: True si la columna puede ser nullable. Se utilizara al convertir de RDD a dataframe.
# Se crea también una lista con ellos dentro por si se quiere iterar


# Dataset de puntuaciones

COL_PUNTUACION_USERID = ('userid', 0, IntegerType(), True)
COL_PUNTUACION_ITEMID = ('itemid', 1, IntegerType(), True)
COL_PUNTUACION_RATING = ('rating', 2, FloatType(), True)
COL_PUNTUACION_TIMESTAMP = ('timestamp', 3, IntegerType(), True)
COL_PUNTUACION_PREDICTION = ('prediction', None, None, None)
COL_PUNTUACION_NUMEROPUNTUACIONES = ('numero_puntuaciones', None, None, None)
COLS_PUNTUACION = (
    COL_PUNTUACION_USERID, 
    COL_PUNTUACION_ITEMID, 
    COL_PUNTUACION_RATING, 
    COL_PUNTUACION_TIMESTAMP)

# Dataset de usuarios

COL_USUARIO_ID = ('user_userid', 0, IntegerType(), True)
COL_USUARIO_AGE = ('age', 1, IntegerType(), True)
COL_USUARIO_GENDER = ('gender', 2, StringType(), True)
COL_USUARIO_OCCUPATION = ('occupation', 3, StringType(), True)
COL_USUARIO_ZIPCODE = ('zipcode', 4, StringType(), True)
COLS_USUARIO = (
    COL_USUARIO_ID, 
    COL_USUARIO_AGE, 
    COL_USUARIO_GENDER, 
    COL_USUARIO_OCCUPATION,
    COL_USUARIO_ZIPCODE)

#  Dataset de generos

COL_GENERO_NAME = ('name', 0, StringType(), False)
COL_GENERO_ID = ('genre_genreid', 1, IntegerType(), False)
COLS_GENERO = (
    COL_GENERO_NAME, 
    COL_GENERO_ID)

#  Dataset de items

COL_ITEM_ID = ('item_itemid', 0, IntegerType(), False)
COL_ITEM_TITLE = ('title', 1, StringType(), False)
COL_ITEM_RELEASEDATE = ('releasedate', 2, StringType(), False)
COL_ITEM_VIDERELEASEDATE = ('videoreleasedate', 3, StringType(), False)
COL_ITEM_IMDBURL = ('imdburl', 4, StringType(), False)
COL_ITEM_UNKNOWN = ('unknown', 5, IntegerType(), False)
COL_ITEM_ACTION = ('action', 6, IntegerType(), False)
COL_ITEM_ADVENTURE = ('adventure', 7, IntegerType(), False)
COL_ITEM_ANIMATION = ('animation', 8, IntegerType(), False)
COL_ITEM_CHILDRENS = ('childrens', 9, IntegerType(), False)
COL_ITEM_COMEDY = ('comedy', 10, IntegerType(), False)
COL_ITEM_CRIME = ('crime', 11, IntegerType(), False)
COL_ITEM_DOCUMENTARY = ('documentary', 12, IntegerType(), False)
COL_ITEM_DRAMA = ('drama', 13, IntegerType(), False)
COL_ITEM_FANTASY = ('fantasy', 14, IntegerType(), False)
COL_ITEM_FILMNOIR = ('filmnoir', 15, IntegerType(), False)
COL_ITEM_HORROR = ('horror', 16, IntegerType(), False)
COL_ITEM_MUSICAL = ('musical', 17, IntegerType(), False)
COL_ITEM_MYSTERY = ('mystery', 18, IntegerType(), False)
COL_ITEM_ROMACE = ('Romance', 19, IntegerType(), False)
COL_ITEM_SCIFI = ('scifi', 20, IntegerType(), False)
COL_ITEM_THRILLER = ('thriller', 21, IntegerType(), False)
COL_ITEM_WAR = ('war', 22, IntegerType(), False)
COL_ITEM_WESTERN = ('western', 23, IntegerType(), False)
COL_ITEM_GENEROS = ('generos', None, ArrayType(StringType()), False)
COL_ITEM_GENERO = ('genero', None, None, None)
COLS_ITEM = (
    COL_ITEM_ID, 
    COL_ITEM_TITLE,
    COL_ITEM_RELEASEDATE,
    COL_ITEM_VIDERELEASEDATE,
    COL_ITEM_IMDBURL)
COLS_ITEM_TODOSGENEROS = (
    COL_ITEM_UNKNOWN,
    COL_ITEM_ACTION, 
    COL_ITEM_ADVENTURE,
    COL_ITEM_ANIMATION,
    COL_ITEM_CHILDRENS,
    COL_ITEM_COMEDY,
    COL_ITEM_CRIME,
    COL_ITEM_DOCUMENTARY,
    COL_ITEM_DRAMA,
    COL_ITEM_FANTASY,
    COL_ITEM_FILMNOIR,
    COL_ITEM_HORROR,
    COL_ITEM_MUSICAL,
    COL_ITEM_MYSTERY,
    COL_ITEM_ROMACE,
    COL_ITEM_SCIFI,
    COL_ITEM_THRILLER,
    COL_ITEM_WAR,
    COL_ITEM_WESTERN)


___
## Creación de los dataframes

Creamos los dataframes con las siguientes particularidades:

* **Puntuaciones**: Se elimina el atributo timestamp, ya que no se cree que aporte nada al modelo que queremos construir.

* **Item**: Se eliminarán los atributos video_release_date e IMDb_URL, por la misma razón que anteriormente.

Finalmente uniremos todos los dataframes en uno solo para hacer la modelización.

In [8]:
# Se carga las puntuaciones utilizando COLS_PUNTUACION y después se elimina el timestamp
esquemaPuntuacion = StructType()
for col in COLS_PUNTUACION:
    esquemaPuntuacion.add(StructField(col[0], col[2], col[3]))
dataframePuntuacion = sqlc.read.format('com.databricks.spark.csv'). \
                option('delimiter', SEPARADOR_TABULADOR). \
                option('header', 'false'). \
                load(RUTA_FICHERO_PUNTUACION, schema=esquemaPuntuacion)
cabecerasReducidas = list()

# Número de NAN y NULL en rating
print('Número de filas con NAN en las puntuaciones en la columna rating: {0}', dataframePuntuacion.where(isnan(COL_PUNTUACION_RATING[0])).count())
print('Número de filas con NULL en las puntuaciones en la columna rating: {0}', dataframePuntuacion.where(isnull(COL_PUNTUACION_RATING[0])).count())

for col in COLS_PUNTUACION:
    if col[0] != COL_PUNTUACION_TIMESTAMP[0]:
        cabecerasReducidas.append(col[0])
dataframePuntuacion = dataframePuntuacion.select(cabecerasReducidas)

# Se carga los usuarios utilizando COLS_USUARIO
esquemaUsuario = StructType()
for col in COLS_USUARIO:
    esquemaUsuario.add(StructField(col[0], col[2], col[3]))
dataframeUsuario = sqlc.read.format('com.databricks.spark.csv'). \
                option('delimiter', SEPARADOR_PIPE). \
                option('header', 'false'). \
                load(RUTA_FICHERO_USUARIO, schema=esquemaUsuario)

# Se carga los items utilizando COLS_ITEM y COLS_ITEM_TODOSGENEROS
esquemaItem = StructType()
for col in COLS_ITEM:
    esquemaItem.add(StructField(col[0], col[2], col[3]))
for col in COLS_ITEM_TODOSGENEROS:
    esquemaItem.add(StructField(col[0], col[2], col[3]))
dataframeItem = sqlc.read.format('com.databricks.spark.csv'). \
                option('delimiter', SEPARADOR_PIPE). \
                option('header', 'false'). \
                load(RUTA_FICHERO_ITEM, schema=esquemaItem)
cabecerasReducidas = list()
for col in COLS_ITEM:
    if col[0] == COL_ITEM_ID[0] or col[0] == COL_ITEM_TITLE[0] or col[0] == COL_ITEM_RELEASEDATE[0]:
        cabecerasReducidas.append(col[0])
for col in COLS_ITEM_TODOSGENEROS:
    cabecerasReducidas.append(col[0])
dataframeItem = dataframeItem.select(cabecerasReducidas)

# Se carga los géneros utilizando COLS_GENERO
esquemaGenero = StructType()
for col in COLS_GENERO:
    esquemaGenero.add(StructField(col[0], col[2], col[3]))
dataframeGenero = sqlc.read.format('com.databricks.spark.csv'). \
                option('delimiter', SEPARADOR_PIPE). \
                option('header', 'false'). \
                load(RUTA_FICHERO_GENERO, schema=esquemaGenero)

print(dataframePuntuacion.show(4))
print(dataframeUsuario.show(4))
print(dataframeItem.show(4))
print(dataframeGenero.show(4))

Creamos el Dataframe completo, basado en los dataframes anteriores a través de las claves, y se divide en Train y Test el dataframe a fin de preparar los datos para el aprendizaje automático, con la división estandar 70/30.

Se utiliza el dataframe completo por la razón de que ofrece una visión mucho más amplia al visualizarlo, pero principalmente porque se obtendrán mejores resultados que con el resto de dataframes, principalmente a que se poseen más datos. Al fin y al cabo, el uso del método de factorización de matriz para matrices de calificación se nutre de la mayor cantidad de productos posibles (long tail), y será de ese modo como podamos obtener un modelo más completo.

Si se tratase de un trabajo más específico, y estuviese orientado a un fin concreto o linea de negocio, se suprimirían los atributos que no aportasen valor, y de esta manera aumentaríamos la eficiencia en la ejecución del programa, así como la capacidad del dataframe de soportar cambios o adiciones de atributos, pero no lo consideraremos así para este ejercicio. En definitiva, el único problema que observaremos utilizando este dataframe será el largo tiempo necesario para ejecutar el modelo

In [10]:
# Creamos el dataframe completo con un join de los dataframes puntuación-usuario-item:

dataframePuntuacionUsuarioItem = dataframePuntuacion. \
                                  join(dataframeUsuario, dataframePuntuacion.userid==dataframeUsuario.user_userid, 'left_outer'). \
                                  join(dataframeItem, dataframePuntuacion.itemid==dataframeItem.item_itemid, 'left_outer')
# Se eliminan las columnas duplicadas
cabecerasReducidas = list()
for col in dataframePuntuacionUsuarioItem.columns:
    if col != COL_USUARIO_ID[0] and col !=COL_ITEM_ID[0]:
        cabecerasReducidas.append(col)
dataframePuntuacionUsuarioItem = dataframePuntuacionUsuarioItem.select(cabecerasReducidas)

# Comprobamos que el dataframe resultante está conforme a lo proyectado:

print('Columnas del dataframe: {0}', dataframePuntuacionUsuarioItem.columns)
print('Número de filas: {0}', dataframePuntuacionUsuarioItem.count())

# Se divide el dataframe en entrenamiento y test

dataframePuntuacionUsuarioItem.cache()

dataframePuntuacionUsuarioItemDividido = dataframePuntuacionUsuarioItem.randomSplit([0.7, 0.3], 1234)
dataframePuntuacionUsuarioItemEntrenamiento = dataframePuntuacionUsuarioItemDividido[0]
dataframePuntuacionUsuarioItemTest = dataframePuntuacionUsuarioItemDividido[1]

## Implementación del modelo ALS

Para crear el modelo se utilizará CrossValidator, en el que se variarán varios parámetros: rank, maxIter, alpha y regParam, dentro de unos márgenes normalizados y estándares, que tras varios intentos resultan ofrecer el cálculo del modelo ALS sin aumentar excesivamente el consumo de recursos.

La premisa principal que justifica el uso del CrossValidator es que se ha comprobado que, bajo los parámetros escogidos, sólo un 0,20% de las entradas del dataset del test no tienen valor, y eliminarlos (que es nuestra pretensión), bien puede suponerse que no afectaría en absoluto al modelo, y nos garantizaría evitar tener Nan para el cálculo de la Raíz del error cuadrático medio (rmse).

In [12]:
# Evaluador y cross validation
evaluatorRegression = RegressionEvaluator(labelCol=COL_PUNTUACION_RATING[0])

als = ALS(userCol=COL_PUNTUACION_USERID[0], itemCol=COL_PUNTUACION_ITEMID[0], ratingCol=COL_PUNTUACION_RATING[0], coldStartStrategy='drop')
# als = ALS(userCol=COL_PUNTUACION_USERID[0], itemCol=COL_PUNTUACION_ITEMID[0], ratingCol=COL_PUNTUACION_RATING[0])
grid = ParamGridBuilder(). \
        addGrid(als.rank, [5, 10, 15, 20]). \
        addGrid(als.maxIter, [5, 10]). \
        addGrid(als.alpha, [1.0, 2.0]). \
        addGrid(als.regParam, [0.1, 0.5, 1.0]). \
        build()
# grid = ParamGridBuilder().addGrid(als.rank, [5, 20]).build()
crossValidator = CrossValidator(estimator=als, estimatorParamMaps=grid, evaluator=evaluatorRegression, numFolds=2)
crossValidatorModel = crossValidator.fit(dataframePuntuacionUsuarioItemEntrenamiento)

## Valoración del modelo

Describimos el método de predicción y valoramos que, efectivamente, no existen NaN en el modelo propuesto gracias al CrossValidation aplicado anteriormente.

In [14]:
# Se obtiene la predicción para entrenamiento y test

dataframePuntuacionUsuarioItemEntrenamientoPrediccion = crossValidatorModel.bestModel.transform(dataframePuntuacionUsuarioItemEntrenamiento)
dataframePuntuacionUsuarioItemTestPrediccion = crossValidatorModel.bestModel.transform(dataframePuntuacionUsuarioItemTest)

dataframePuntuacionUsuarioItemEntrenamiento.show(5)

# Se muestra el número de filas y las que son NAN en el conjunto de entrenamiento y de test

print('Conjunto de entrenamiento predicción -> Número de filas: {0}, número con NAN en predicción: {1}'.format(dataframePuntuacionUsuarioItemEntrenamientoPrediccion.count(), dataframePuntuacionUsuarioItemEntrenamientoPrediccion.where(isnan(COL_PUNTUACION_PREDICTION[0])).count()))
print('Conjunto de test predicción -> Número de filas: {0}, número con NAN en predicción: {1}'.format(dataframePuntuacionUsuarioItemTestPrediccion.count(), dataframePuntuacionUsuarioItemTestPrediccion.where(isnan(COL_PUNTUACION_PREDICTION[0])).count()))

# Describe método para rating y prediction

print(dataframePuntuacionUsuarioItemEntrenamientoPrediccion.select(COL_PUNTUACION_RATING[0], COL_PUNTUACION_PREDICTION[0]).describe().show())
print(dataframePuntuacionUsuarioItemTestPrediccion.select(COL_PUNTUACION_RATING[0], COL_PUNTUACION_PREDICTION[0]).describe().show())

**RMSE** para el conjunto de entrenamiento y el de test. Como se puede observar es más pequeño el Train que el Test, como es lógico. Llama la atención que son valores relativamente altos si los consideramos en el rango 0-1.

In [16]:
# Se obtiene el RMSE sobre los dos conjuntos
rmseEntrenamiento = evaluatorRegression.evaluate(dataframePuntuacionUsuarioItemEntrenamientoPrediccion, {evaluatorRegression.metricName: 'rmse'})
rmseTest = evaluatorRegression.evaluate(dataframePuntuacionUsuarioItemTestPrediccion, {evaluatorRegression.metricName: 'rmse'})

print('RMSE en training: {0}'.format(rmseEntrenamiento))
print('RMSE en test: {0}'.format(rmseTest))

Se observa en un gráfico la comparativa de las puntuaciones reales y según modelo:

* Se observa que, como en cierto modo es lógico, el modelo no se ajusta a la línea oblicua que debería de seguir. Si observáramos los resultados en un boxplots veríamos que la aparente diferencia del modelo continuo predicho que hemos construido y la evolución real(como ya vimos anteriormente, el error del dataset de entrenamiento es menor),  veríamos que la densidad de puntos es mucho mayor en torno a la zona que rodea la recta que los puntos que se dispersan en su vertical. 

** Se podría jugar con los parámetros del modelo ALS y también haciendo una limpieza previa de datos, para eliminar outliers.

In [18]:
# Se crea la lista con la puntuación real y la predicha en Train y Test

xEntrenamiento, yEntrenamiento = list(), list()
for entrenamientoPrediccion in dataframePuntuacionUsuarioItemEntrenamientoPrediccion.collect():
    xEntrenamiento.append(entrenamientoPrediccion[COL_PUNTUACION_RATING[0]])
    yEntrenamiento.append(entrenamientoPrediccion[COL_PUNTUACION_PREDICTION[0]])
    
xTest, yTest = list(), list()
for testPrediccion in dataframePuntuacionUsuarioItemTestPrediccion.collect():
    xTest.append(testPrediccion[COL_PUNTUACION_RATING[0]])
    yTest.append(testPrediccion[COL_PUNTUACION_PREDICTION[0]])

# Ploteamos

plt.clf()
plt.xlim(-1, 6)
plt.ylim(-1, 6)
plt.xlabel('Puntuacion real')
plt.ylabel('Puntuacion segun el modelo')
plt.title('Puntuacion real vs prediccion')

plt.plot([0, 20], [0, 20], 'b')
# Se pasan los datos de entrenamiento y test al gráfico

plt.plot(xTest, yTest, 'ro', label='Test')
plt.legend(loc='lower right')

plt.show()
display()

In [19]:
plt.clf()
plt.xlim(-1, 6)
plt.ylim(-1, 6)
plt.xlabel('Puntuacion real')
plt.ylabel('Puntuacion segun el modelo')
plt.title('Puntuacion real vs prediccion')

plt.plot([0, 20], [0, 20], 'b')
# Se pasan los datos de entrenamiento y test al gráfico
plt.plot(xEntrenamiento, yEntrenamiento, 'go', label='Train')

plt.legend(loc='lower right')

plt.show()
display()

## Análisis de datos

### Género de películas mejor valorados
Se obtendrá un dataframe donde cada fila tendrá los valores item_id-película-género. Por cada película, habrá tantas filas como géneros tenga.

Para ello:
1. Se obtendrá un diccionario con los géneros donde la clave es el id o posición en el fichero de item y el valor será el género.
2. El siguiente paso es añadir una columna con un array con los géneros que tiene cada película. Para ello se utilizará RDDs para posteriormente trasformalo en otro dataframe
3. El último paso es hacer un explode.

In [22]:
def trasformarFilaColumnasgeneroColumageneros(fila, diccionarionGenero, ultimoIndiceNoGenero=1):

    indice = 0
    indiceGenero = 0
    lineaNueva = list()
    generos = list()
    for item in fila:
        if indice>ultimoIndiceNoGenero:
            if item==1:
                genero = diccionarionGenero.get(indiceGenero)
                if genero:
                    generos.append(genero)
            indiceGenero += 1
        else:
            lineaNueva.append(item)
        indice += 1
    lineaNueva.append(generos)
    return lineaNueva
  
# Se crea un diccionario con los géneros donde la clave es la columna id y el valor el nombre del género

diccionarioGenero = dict()
for row in dataframeGenero.collect():
    diccionarioGenero[row.genre_genreid] = row.name
print(diccionarioGenero)

# Se obtiene el nombre de todas las columnas de los géneros en el dataframe de item

columnas = list()
columnas.append(COL_ITEM_ID[0])
columnas.append(COL_ITEM_TITLE[0])
for col in COLS_ITEM_TODOSGENEROS:
    columnas.append(col[0])
    
# Seleccionamos sólo itemid, titulo, generos

dataframeItemGenero = dataframeItem.select(columnas)
rddItemGenero = dataframeItemGenero.rdd.map(lambda fila: trasformarFilaColumnasgeneroColumageneros(fila, diccionarionGenero=diccionarioGenero))

# Se crea el dataframe desde el RDD

columnas = list()
columnas.append(COL_ITEM_ID[0])
columnas.append(COL_ITEM_TITLE[0])
columnas.append(COL_ITEM_GENEROS[0])
dataframeItemGenero = rddItemGenero.toDF(columnas)

# Hacemos 'explode' para distribuir todas las categorias en filas, y posteriormente comparamos el número de filas

dataframeItemGenero = dataframeItemGenero.select(COL_ITEM_ID[0], COL_ITEM_TITLE[0], explode(dataframeItemGenero.generos).alias(COL_ITEM_GENERO[0]))

print('Nº filas antes de explode {0} y después {1}'.format(dataframeItem.count(), dataframeItemGenero.count()))

dataframeItemGenero.cache()

dataframeItemGenero.show(10)

Se transforma el dataframe del Test sustituyendo filas por el dataframe que se acaba de generar

In [24]:
cabecerasReducidas = list()

for col in COLS_PUNTUACION:
    if col[0] != COL_PUNTUACION_TIMESTAMP[0]:
        cabecerasReducidas.append(col[0])
for col in COLS_USUARIO:
    if col[0] != COL_USUARIO_ID[0]:
        cabecerasReducidas.append(col[0])
        
cabecerasReducidas.append(COL_PUNTUACION_PREDICTION[0])

dataframePuntuacionUsuarioItemTestPrediccionGenero = dataframePuntuacionUsuarioItemTestPrediccion.select(cabecerasReducidas)

# Se hace el join con el otro dataframe y se elimina itemid

dataframePuntuacionUsuarioItemTestPrediccionGenero = dataframePuntuacionUsuarioItemTestPrediccionGenero.join(dataframeItemGenero, dataframePuntuacionUsuarioItemTestPrediccionGenero.itemid==dataframeItemGenero.item_itemid, 'inner')

print('Número de filas antes en el conjunto de test antes de hacer el join {0} y después {1}'.format(dataframePuntuacionUsuarioItemTestPrediccion.count(), dataframePuntuacionUsuarioItemTestPrediccionGenero.count()))

dataframeItemGenero.unpersist()
dataframePuntuacionUsuarioItemTestPrediccionGenero.cache()

dataframePuntuacionUsuarioItemTestPrediccionGenero.show(10)

Valoración de películas por genéro: Para valorar los géneros mejor valorados, vamos a hacer una consulta con las medias de rating y predicción por género

In [26]:
print(dataframePuntuacionUsuarioItemTestPrediccionGenero.groupBy(COL_ITEM_GENERO[0]).agg(avg(COL_PUNTUACION_RATING[0]).alias(COL_PUNTUACION_RATING[0]), avg(COL_PUNTUACION_PREDICTION[0]).alias(COL_PUNTUACION_PREDICTION[0]), count('*').alias(COL_PUNTUACION_NUMEROPUNTUACIONES[0])).sort(COL_PUNTUACION_RATING[0], ascending=False).show())

Film-noir es el género que tiene mejor valoración, pero hace sospechar el hecho de que es uno de los géneros menos votados, lo que significa que está orientado a un público más exclusivo, y esto podría, según como decíamos en los inicios, no ser un análisis suficiente o concluyente si el fin último fuera aumentar beneficios generando contenido basado en género. 

Dadas esas circunstancias, valoraremos de nuevo el ranking eliminando aquellas cuyo número de puntiaciones esté por debajo de 1000:

In [28]:
dataframePuntuacionUsuarioItemTestPrediccionGeneroGroup= dataframePuntuacionUsuarioItemTestPrediccionGenero.groupBy(COL_ITEM_GENERO[0]).agg(avg(COL_PUNTUACION_RATING[0]).alias(COL_PUNTUACION_RATING[0]), avg(COL_PUNTUACION_PREDICTION[0]).alias(COL_PUNTUACION_PREDICTION[0]), count('*').alias(COL_PUNTUACION_NUMEROPUNTUACIONES[0]))

print(dataframePuntuacionUsuarioItemTestPrediccionGeneroGroup.where(dataframePuntuacionUsuarioItemTestPrediccionGeneroGroup.numero_puntuaciones>1000).sort(COL_PUNTUACION_RATING[0], ascending=False).show())

Y como se puede comprobar, es el género de guerra quien gana en mayor rating. 

Se podría hacer también una agrupación por **género y código postal** para ver si en una zona predomina las buenas puntuaciones de un determinado género, filtrando aquellas que tengan un número mínimo de puntuaciones mayor que 50.

In [30]:
dataframePuntuacionUsuarioItemTestPrediccionGeneroGroup = dataframePuntuacionUsuarioItemTestPrediccionGenero.groupBy([COL_ITEM_GENERO[0], COL_USUARIO_ZIPCODE[0]]).agg(avg(COL_PUNTUACION_RATING[0]).alias(COL_PUNTUACION_RATING[0]), avg(COL_PUNTUACION_PREDICTION[0]).alias(COL_PUNTUACION_PREDICTION[0]), count('*').alias(COL_PUNTUACION_NUMEROPUNTUACIONES[0]))

print(dataframePuntuacionUsuarioItemTestPrediccionGeneroGroup.where(dataframePuntuacionUsuarioItemTestPrediccionGeneroGroup.numero_puntuaciones>50).sort(COL_PUNTUACION_RATING[0], ascending=False).show())

### Películas mejor valoradas
Al igual que con el género, se valoran el número de películas mejor valoradas por título:

In [32]:
dataframePuntuacionUsuarioItemTestPrediccionGroup = dataframePuntuacionUsuarioItemTestPrediccion.groupBy(COL_ITEM_TITLE[0]).agg(avg(COL_PUNTUACION_RATING[0]).alias(COL_PUNTUACION_RATING[0]), avg(COL_PUNTUACION_PREDICTION[0]).alias(COL_PUNTUACION_PREDICTION[0]), count('*').alias(COL_PUNTUACION_NUMEROPUNTUACIONES[0]))
print(dataframePuntuacionUsuarioItemTestPrediccionGroup.where(dataframePuntuacionUsuarioItemTestPrediccionGroup.numero_puntuaciones>60).sort(COL_PUNTUACION_RATING[0], ascending=False).show())

La lista de Schindler, Casablanca, Star Wars, Alguien voló sobre el nido del cuco.... 

Pese a que el análisis podría complejizarse mucho más, lo interesante es que vemos que la pridicción se ajusta bastante bien a los resultados por agrupación, aunque un poco por debajo