In [0]:
# Importo los elementos necesarios para la tarea
from pyspark.sql.types import *
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
import pandas as pd
import pyspark.sql.functions as F

# Creación dataframe ratings
ratings_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', DoubleType())])

ratingsDF=spark.read.option("header","false").option("delimiter",";").option("inferSchema","true")\
                          .schema(ratings_schema).csv("/FileStore/tables/udata.csv")

ratingsDF.show(10)

In [0]:
# Creación dataframe películas

pelis_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType())])

peliculasDF=spark.read.option("header","false").option("delimiter",";").option("inferSchema","true")\
                          .schema(pelis_schema).csv("/FileStore/tables/uitem.csv")
peliculasDF.show(10)

In [0]:
# Hago un Inner Join de los dataframes películas y ratings (ya que comparten los valores de movieId) para ver el título de las películas y entender mejor la información y las predicciones

movielensDF = ratingsDF.join(peliculasDF, ratingsDF.movieId == peliculasDF.ID).select("userId","movieId","title","rating")
movielensDF.show(8)


In [0]:
# Creación del modelo predictivo con ALS. 

# Creo sets de train y test
(training_data, test_data) = movielensDF.randomSplit([0.8, 0.2])

# Construyo el modelo a partir del algoritmo ALS

als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True,
coldStartStrategy="drop", implicitPrefs=False)

# Definición del paramGridbuilder

param_grid=ParamGridBuilder()\
           .addGrid(als.rank,[12,13,14])\
           .addGrid(als.maxIter,[18,19,20])\
           .addGrid(als.regParam, [.17,.18,.19])\
           .build()     

# Definición como Evaluator del RMSE

evaluator= RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
                    
# Cross validation con Train validation split

tvs= TrainValidationSplit(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator)
                    
# Aplicamos el modelo al subset de training
model = tvs.fit(training_data)

# Definición del Mejor modelo
mejor_modelo=model.bestModel

# Basandondos en ese mejor modelo encontrado generamos las predicciones para la muestra de test (test_data)
predictions = mejor_modelo.transform(test_data)

# Imprimimos el resultado final del RMSE resultante con los mejores parametros del modelo
rmse = evaluator.evaluate(predictions)
print ("RMSE: "), rmse
print("Con estos parámetros, el mejor modelo es:")
print("           Rank:"),mejor_modelo.rank
print("           MaxIter:"),mejor_modelo._java_obj.parent().getMaxIter()
print("           RegParam:"),mejor_modelo._java_obj.parent().getRegParam()


In [0]:
# Mejor resultado RMSE=  Tardó unos 40 minutos (137 jobs)
RMSE:  0.920267958862
# Con estos parámetros, el mejor modelo es:
           Rank: 13 (latent factors)
           MaxIter: 20
           RegParam: 0.17

In [0]:
# Obtengo unas cuantas predicciones y las comparo con la puntuación real dada por los usuarios:

predictions.show(10) 

In [0]:
# Comparamos las predicciones con los ratings para cada usuario mejorando la tabla visualmente con la opción display

display(predictions.sort("userId","rating"))

userId,movieId,title,rating,prediction
1,247,Turbo: A Power Rangers Movie (1997),1.0,1.1001503
1,260,Event Horizon (1997),1.0,2.3963456
1,140,Homeward Bound: The Incredible Journey (1993),1.0,2.504637
1,112,Flipper (1996),1.0,2.081625
1,237,Jerry Maguire (1996),2.0,3.5295262
1,167,Private Benjamin (1980),2.0,2.7352147
1,149,Jude (1996),2.0,3.2702394
1,101,Heavy Metal (1981),2.0,3.2997768
1,126,"Spitfire Grill, The (1996)",2.0,3.8093193
1,233,Under Siege (1992),2.0,3.0048223


In [0]:
# Tras investigar las posibilidades que ofrece ALS en pyspark.ml.recommendation module, voy a poner en práctica varias funciones que ofrecen recomendaciones de películas personalizadas para todos los usuarios y que también recomiendan usarios a los que probablemente más les gustaría una película concreta, como son recommendForAllUsers() y recommendForAllItems()

# Ofrecemos por ejemplo 2 recomendaciones de peliculas por usuario. Junto al Id de cada usario se ofrece una lista con los 2 movieIds de peliculas recomendadas y su predicción

recomusers=mejor_modelo.recommendForAllUsers(2).toPandas()
recomusers


In [0]:
#Hago lo mismo con recommendForAllItems, función que recomienda a que usarios le puede gustar una película concreta que elijamos. En el ejemplo recomienda 2 usuarios por cada película

recomitems=mejor_modelo.recommendForAllItems(2).toPandas()
recomitems


In [0]:
# Pruebo también otras funciones explorando el dataset final y los usuarios, por ejemplo:
# Los 10 usuarios que más veces puntúan películas
usuarios_mas_activos = movielensDF.groupBy("userId").count().sort(F.col("count").desc()).limit(50000)
usuarios_mas_activos = usuarios_mas_activos.withColumnRenamed("userId", "Usuario_muy_activo")
usuarios_mas_activos.show(10)

In [0]:
# Los 10 usuarios que más alto puntúan de media
puntuacion_media = movielensDF.groupBy("userId").agg(F.avg('rating').alias('valoracion_media')).sort(F.col("valoracion_media").desc())
puntuacion_media.show(10)