#Sistema de recomendación basado en filtros colaborativos

Estudiantes:
- Carlos Alberto Murillo, cmurill5@eafit.edu.co
- Luz Stella Flórez, lflorezs@eafit.edu.co
- Cindy Paola Guerra cpguerram@eafit.edu.co
- Diana Carolina Benjumea, dcbenjumeh@eafit.edu.co 

Minería de datos para grandes volúmenes de información.




Comenzamos instalando la libreria de pyspark (requerida para trabajar en colab)

In [None]:
!pip install pyspark



Creamos la sesión de spark, tambien requerida para trabajar en colab. En EMR estos pasos no fueron necesarios.

In [None]:
#Correr estas lineas en Colab, no en EMR
#import SparkSession
from pyspark.sql import SparkSession
# en el cluster EMR no hay necesidad de hacer esto, ya viene con AWS EMR / Notebooks

In [None]:
#Correr estas lineas en Colab, no en EMR
#create spark session object
spark=SparkSession.builder.appName('Recommendations').getOrCreate()
# en el cluster EMR no hay necesidad de hacer esto, ya viene con AWS EMR / Notebooks

Importamos las librerias necesarias para trabajar el sistema de recomendación.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pyspark.sql import Row
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import substring

A continuación vamos a leer los datos de entrenamiento. En este ejercicio usaremos datos con calificaciones explícitas de productos. El dataset fue tomado de los datos de ejemplo proporcionados por AWS. Del conjunto de datos, se tomó el subconjunto de datos en español. Se tomaron los campos: id del usuario (reviewer_id), el id del producto (product_id) y la calificación (stars).

In [None]:
#Leer en EMR 
#data = spark.read.json('s3://bigdatamining/train/dataset_es_train.json').rdd

#Leer en colab
dataTrain = spark.read.json('/content/dataset_es_train.json').rdd

#Vamos a trabajar con los campos: product_id, reviewer_id, stars
linesTrain = dataTrain.map(lambda x: Row(product_id=x[2],reviewer_id=x[6],stars=x[7]))

#Creamos el dataframe de spark con los ratings para entrenamiento
ratingsTrain = spark.createDataFrame(linesTrain)

#Actualmente, esta API solo admite números enteros para los ID de usuario y artículo. Por lo cual las convertimos a Double.
ratingsTrain = ratingsTrain.withColumn("product_id", substring(ratingsTrain.product_id,12,7).cast(DoubleType()))
ratingsTrain = ratingsTrain.withColumn("reviewer_id", substring(ratingsTrain.reviewer_id,13,7).cast(DoubleType()))
ratingsTrain = ratingsTrain.withColumn("stars", ratingsTrain.stars.cast(DoubleType()))
ratingsTrain.show()

+----------+-----------+-----+
|product_id|reviewer_id|stars|
+----------+-----------+-----+
|  296024.0|   999081.0|  1.0|
|  922286.0|   216771.0|  1.0|
|  474543.0|   929213.0|  1.0|
|  656090.0|   224702.0|  1.0|
|  662544.0|   224827.0|  1.0|
|  103315.0|   304973.0|  1.0|
|  880915.0|   642702.0|  1.0|
|  814677.0|   895784.0|  1.0|
|  654228.0|   789283.0|  1.0|
|  877793.0|   986278.0|  1.0|
|  117076.0|   351011.0|  1.0|
|  843382.0|   783082.0|  1.0|
|  282369.0|   178971.0|  1.0|
|  809544.0|   751317.0|  1.0|
|  727558.0|   483701.0|  1.0|
|  515803.0|   484405.0|  1.0|
|   21385.0|   939700.0|  1.0|
|  477265.0|   488546.0|  1.0|
|  176350.0|   651511.0|  1.0|
|  362329.0|   419830.0|  1.0|
+----------+-----------+-----+
only showing top 20 rows



In [None]:
#Vemos un ejemplo de como eran los valores antes de la transformación
linesTrain.take(5)

[Row(product_id='product_es_0296024', reviewer_id='reviewer_es_0999081', stars='1'),
 Row(product_id='product_es_0922286', reviewer_id='reviewer_es_0216771', stars='1'),
 Row(product_id='product_es_0474543', reviewer_id='reviewer_es_0929213', stars='1'),
 Row(product_id='product_es_0656090', reviewer_id='reviewer_es_0224702', stars='1'),
 Row(product_id='product_es_0662544', reviewer_id='reviewer_es_0224827', stars='1')]

A continuación vamos a leer los datos de test.

In [None]:
#EMR 
#dataTest = spark.read.json('s3://bigdatamining/test/dataset_es_test.json').rdd

#Leer en colab
dataTest = spark.read.json('/content/dataset_es_test.json').rdd

#Vamos a trabajar con los campos: product_id, reviewer_id, stars
linesTest = dataTest.map(lambda x: Row(product_id=x[2],reviewer_id=x[6],stars=x[7]))
#linesTest.take(5)
ratingsTest = spark.createDataFrame(linesTest)

#Convertimos los campos a tipo Double
ratingsTest = ratingsTest.withColumn("product_id", substring(ratingsTest.product_id,12,7).cast(DoubleType()))
ratingsTest = ratingsTest.withColumn("reviewer_id", substring(ratingsTest.reviewer_id,13,7).cast(DoubleType()))
ratingsTest = ratingsTest.withColumn("stars", ratingsTest.stars.cast(DoubleType()))
ratingsTest.show()

+----------+-----------+-----+
|product_id|reviewer_id|stars|
+----------+-----------+-----+
|  113523.0|   580071.0|  1.0|
|   17036.0|   819733.0|  1.0|
|  138642.0|   508607.0|  1.0|
|  170887.0|   491157.0|  1.0|
|  710642.0|     8745.0|  1.0|
|  813312.0|   789216.0|  1.0|
|  260888.0|    22974.0|  1.0|
|  234796.0|   942055.0|  1.0|
|  690174.0|   969485.0|  1.0|
|  624641.0|   681717.0|  1.0|
|  634067.0|   886390.0|  1.0|
|  261612.0|   126506.0|  1.0|
|  531679.0|    84313.0|  1.0|
|   92573.0|   202839.0|  1.0|
|  590815.0|   204210.0|  1.0|
|  689661.0|   200434.0|  1.0|
|  171608.0|   636421.0|  1.0|
|  702463.0|   755775.0|  1.0|
|  984483.0|   867803.0|  1.0|
|  186852.0|   958628.0|  1.0|
+----------+-----------+-----+
only showing top 20 rows





## Construcción del modelo

In [None]:
# Instanciamos el modelo de recomendación usando ALS
# En este caso usaremos la estrategia de cold start: 'drop' para asegurarnos que no obtenemos valores métricas de evaluación con NaN
# Dejamos el parámetro de máximas iteraciones por defecto (10)

als = ALS(
         userCol="reviewer_id", 
         itemCol="product_id",
         ratingCol="stars", 
         nonnegative = True, 
         implicitPrefs = False,
         coldStartStrategy="drop"
)


Ajustamos los hiperparámetros. Usamos un grid con 4 parámetros para rank y 4 parámetros para regParam.

rank: Número de factores latentes en el modelo.

regParam: Parámetro de regularización.

In [None]:
# Añadimos los hiperparámetros y sus valores.
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

In [None]:
# Definimos la métrica: RMSE
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="stars", 
           predictionCol="prediction") 
print ("Número de modelos a probar: ", len(param_grid))

Número de modelos a probar:  16


In [None]:
# Contruimos validación cruzada usando CrossValidator, le pasamos el modelo, los hiperparámetros y k-folds en 5 param_grid
# generar 5 pares de datasets (training y test).
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [None]:
# Entrenamos el CrossValidator con el dataset de entrenamiento
model = cv.fit(ratingsTrain)

# Extraemos el mejor modelo del modelo cv entrenado.
best_model = model.bestModel

In [None]:
# Ver las predicciones con el dataset de pruebas
test_predictions = best_model.transform(ratingsTest)

# Evaluamos el modelo, calculando el RMSE con los datos de prueba.
RMSE = evaluator.evaluate(test_predictions)
print("Root-mean-square error = " + str(RMSE))

Root-mean-square error = 2.516386386403228


In [None]:
print("**Mejor modelo**")
# Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())
# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

**Mejor modelo**
  Rank: 150
  MaxIter: 10
  RegParam: 0.01


In [None]:
# Generar el top 10  de productos recomendados para cada usuario
userRecs = best_model.recommendForAllUsers(10)
userRecs.show()

+-----------+--------------------+
|reviewer_id|     recommendations|
+-----------+--------------------+
|        463|[{155442, 4.99667...|
|       1238|[{322897, 3.99819...|
|       1342|[{15558, 1.700781...|
|       1959|[{422514, 3.99889...|
|       3918|[{241043, 3.99812...|
|       4519|[{776501, 2.99792...|
|       5518|[{715343, 3.58598...|
|       6620|[{474973, 2.99851...|
|       8389|[{545077, 0.99945...|
|       8638|[{27843, 5.000258...|
|       9465|[{927201, 3.34005...|
|      10623|[{64558, 1.269477...|
|      11033|[{290542, 2.99928...|
|      11317|[{858027, 4.99806...|
|      11748|[{737657, 4.99847...|
|      13289|[{938015, 3.53443...|
|      16861|[{272496, 4.99836...|
|      17753|[{965294, 3.99831...|
|      18866|[{364762, 3.72670...|
|      18911|[{767696, 3.75073...|
+-----------+--------------------+
only showing top 20 rows



In [None]:
# Generar el top 10 de recomendación de usuarios para cada producto.
prodRecs = best_model.recommendForAllItems(10)
prodRecs.show()

+----------+--------------------+
|product_id|     recommendations|
+----------+--------------------+
|       833|[{185613, 2.99766...|
|      1580|[{777168, 2.99717...|
|      1645|[{272263, 5.01140...|
|      4519|[{963832, 3.99825...|
|      5156|[{164445, 2.99780...|
|      5300|[{430195, 1.99643...|
|      6336|[{174866, 4.99850...|
|      6658|[{868435, 2.99787...|
|      7754|[{578152, 4.99867...|
|      7880|[{660435, 4.99891...|
|      7993|[{351444, 1.99699...|
|      8592|[{708007, 2.02348...|
|      9376|[{52622, 3.969951...|
|     11858|[{901045, 1.99728...|
|     12027|[{267101, 1.99686...|
|     16339|[{161835, 4.99871...|
|     16503|[{763890, 2.99751...|
|     18051|[{50288, 0.993770...|
|     19079|[{190397, 1.99801...|
|     19204|[{336738, 3.01551...|
+----------+--------------------+
only showing top 20 rows



##Referencias

- https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html

- https://towardsdatascience.com/build-recommendation-system-with-pyspark-using-alternating-least-squares-als-matrix-factorisation-ebe1ad2e7679