## Carga de Archivos

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .master('local[*]')\
        .appName('prueba')\
        .enableHiveSupport()\
        .getOrCreate()

Generamos la ingesta del archivo con datos de entrenamiento

In [4]:
yelp_review = spark.read.json('data/review_train_data.json')

Evaluamos una entrada ejemplo

In [None]:
yelp_review.take(1)

Veamos cuantos registros existen en el conjunto

In [None]:
yelp_review.count()

Evaluamos la cantidad de columnas

In [None]:
yelp_review.columns

Para este ejemplo vamos a trabajar sólo con las columnas `user_id`, `business_id` y `stars`. Con el comando `select` las separaremos en un nuevo objeto.

In [None]:
yelp_review = yelp_review.select('user_id', 'business_id', 'stars')

## Implementación del pipeline

Para poder entrenar un modelo con ALS, lo que debemos generar es una matriz con datos numéricos. Si revisamos el schema de datos con `yelp_review.printSchema()`, observaremos que tanto `user_id` como `business_id` son strings. Con el método `StringIndexer` podremos pasarlas a numérico.



In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [None]:
user_id_indexer = StringIndexer(inputCol="user_id",
                               outputCol="user_id_indexed")

business_id_indexer = StringIndexer(inputCol='business_id',
                                   outputCol='business_id_indexed')

pipeline_proc = Pipeline(stages=[user_id_indexer, business_id_indexer])

yelp_review_proc = pipeline_proc\
                    .fit(yelp_review)\
                    .transform(yelp_review)

In [11]:
yelp_review_proc.take(3)

[Row(user_id='paoW23WEkX17E-lWPn-sSw', business_id='wo41OdU_iUwFofsBnVf6uw', stars=3.0, user_id_indexed=31838.0, business_id_indexed=7277.0),
 Row(user_id='ilgrAfnldXykcpoBwSMgnQ', business_id='4GJ9B9IUeOYQtqfG_BYwlw', stars=5.0, user_id_indexed=19001.0, business_id_indexed=821.0),
 Row(user_id='Isf8G6HPbNqEisKDjmUlbw', business_id='L-EF1NxGWjL5LGONbk4hxg', stars=5.0, user_id_indexed=323.0, business_id_indexed=1823.0)]

## Implementación del modelo

Para implementar un modelo Alternating Least Squares, es necesario definir la cantidad de factores latentes a identificar, el nombre de las columnas para usuarios e item, así como el rating asociado. También hay que considerar el comportamiento de la estrategia Cold Start.

In [13]:
from pyspark.ml.recommendation import ALS

train_als = ALS(rank=5,
                userCol='user_id_indexed',
                itemCol='business_id_indexed',
                ratingCol='stars',coldStartStrategy='drop')

Entrenamos el modelo de la manera usual con el método  `fit`.

In [14]:
train_als_model = train_als.fit(yelp_review_proc)

En el archivo de helpers tenemos definida una función que nos permite evaluar las cargas de eigenvalues para cada una de las entradas a nivel de usuario e item.

In [85]:
import helpers

In [86]:
helpers.get_als_factors_information(train_als_model)

Número de usuarios en entrenamiento: 43914
Producto punto para los primeros 3 usuarios:
Usuario: 0 -> Producto punto: [1.556, -0.517, 0.379, 0.597, -0.195]
Usuario: 10 -> Producto punto: [-1.059, -1.306, -1.082, 1.122, 1.601]
Usuario: 20 -> Producto punto: [0.809, -0.976, -0.328, 1.099, -0.925]


Número de items en entrenamiento: 10607
Producto punto para los primeros 10 items:
Item: 0 -> Producto punto: [-0.048, 0.138, -0.023, 0.401, -1.507]
Item: 10 -> Producto punto: [-0.064, -0.604, -1.125, 0.899, -0.243]
Item: 20 -> Producto punto: [-0.725, 0.615, 0.519, -1.514, 0.131]
Item: 30 -> Producto punto: [-0.138, 0.274, -1.382, -0.889, -0.423]
Item: 40 -> Producto punto: [-0.156, 1.571, -1.246, 0.142, -0.1]
Item: 50 -> Producto punto: [1.002, -1.037, -0.061, 0.686, -0.605]
Item: 60 -> Producto punto: [-0.216, -0.158, -0.889, 0.003, 0.602]
Item: 70 -> Producto punto: [-1.643, 1.141, 0.049, -0.891, -0.079]
Item: 80 -> Producto punto: [0.924, -0.891, -0.894, -0.251, 0.033]
Item: 90 -> Produc

### Generación de predicciones

Para generar predicciones, hacemos uso de la función `transform` __en el modelo ya entrenado__. A diferencia de `sklearn`  donde las predicciones retornan un array, en `pyspark` las predicciones son concatenadas como una nueva columna en el `DataFrame`.

In [18]:
get_predictions = train_als_model.transform(yelp_review_proc)

In [19]:
get_predictions.show(5)

+--------------------+--------------------+-----+---------------+-------------------+----------+
|             user_id|         business_id|stars|user_id_indexed|business_id_indexed|prediction|
+--------------------+--------------------+-----+---------------+-------------------+----------+
|zwCen9VgJspf6yJgD...|yLMSxHjK56Az-KtMQ...|  4.0|        10817.0|              148.0| 3.9144683|
|_cIOI0pD1wbEtieG1...|yLMSxHjK56Az-KtMQ...|  3.0|         6376.0|              148.0| 2.9358513|
|IkvaB4ij28xrBgh5Q...|yLMSxHjK56Az-KtMQ...|  4.0|        15966.0|              148.0| 3.9144683|
|XWZH-5T5pK5nFI4fl...|yLMSxHjK56Az-KtMQ...|  5.0|        19940.0|              148.0| 4.8930855|
|dTW_kKKAKyX2fgaYh...|yLMSxHjK56Az-KtMQ...|  5.0|        33761.0|              148.0| 4.8930855|
+--------------------+--------------------+-----+---------------+-------------------+----------+
only showing top 5 rows



### Evaluación del modelo

El modelo puede ser evaluado como un problema de regresión o de clasificación. Dado que los ratings  son inherentemente ordinales, existe una jerarquía clara entre las observaciones.

Partamos por implementar el evaluador de un problema de regresión basado en los dataframes.

In [21]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluate_als = RegressionEvaluator(predictionCol='prediction',labelCol='stars', metricName='rmse')
get_rmse = evaluate_als.evaluate(get_predictions)
print(f"El RMSE promedio en el conjunto de entrenamiento es de: {get_rmse}")

El RMSE promedio en el conjunto de entrenamiento es de: 0.1206859139936097


Ahora también podemos implementar un evaluador basado en los RDD de spark

In [22]:
from pyspark.mllib.evaluation import RegressionMetrics

user_level_prediction = get_predictions\
                        .select('prediction','stars')\
                        .rdd\
                        .map(lambda x: (x[0], x[1]))
regression_metrics_output = RegressionMetrics(user_level_prediction)

In [87]:
helpers.report_reg_metrics(regression_metrics_output)

Varianza Explicada: 2.038
Error cuadrático promedio: 0.015
Error absoluto promedio: 0.091
Raíz del error cuadrático promedio: 0.121


## Generación de recomendaciones

Las recomendaciones van a estar disponibles en la función `recommendForAllUsers`.

In [25]:
get_recommendations_for_users = train_als_model.recommendForAllUsers(10)

In [26]:
get_recommendations_for_users.select('recommendations').take(1)

[Row(recommendations=[Row(business_id_indexed=4562, rating=7.489191055297852), Row(business_id_indexed=6078, rating=7.462268829345703), Row(business_id_indexed=4129, rating=7.460726737976074), Row(business_id_indexed=4351, rating=7.439226150512695), Row(business_id_indexed=2649, rating=7.438721179962158), Row(business_id_indexed=8560, rating=7.433102607727051), Row(business_id_indexed=3185, rating=7.428170204162598), Row(business_id_indexed=6471, rating=7.416937351226807), Row(business_id_indexed=3939, rating=7.413895606994629), Row(business_id_indexed=10278, rating=7.406289577484131)])]

In [27]:
get_recommendations_for_users.show(10)

+---------------+--------------------+
|user_id_indexed|     recommendations|
+---------------+--------------------+
|            148|[[4562, 7.489191]...|
|            463|[[2488, 6.726735]...|
|            471|[[2488, 10.373366...|
|            496|[[4675, 4.7120085...|
|            833|[[3159, 9.682419]...|
|           1088|[[4649, 2.6362147...|
|           1238|[[5356, 7.034578]...|
|           1342|[[5780, 11.276456...|
|           1580|[[4483, 7.444725]...|
|           1591|[[4272, 5.2209077...|
+---------------+--------------------+
only showing top 10 rows



## Evaluación de Ranking

In [28]:
from pyspark.sql.functions import expr, col
from pyspark.mllib.evaluation import RankingMetrics

users_predictions = get_predictions\
  .orderBy(col('user_id_indexed'), expr("prediction DESC"))\
  .groupBy('user_id_indexed')\
  .agg(expr('collect_set(business_id_indexed)').alias('user_preds'))

get_positive_recs = get_predictions\
  .where('stars > 2.5')\
  .groupBy('user_id_indexed')\
  .agg(expr('collect_set(business_id_indexed)').alias('likely_preds'))
  
user_divergence = get_positive_recs.join(users_predictions,['user_id_indexed'])

In [31]:
user_divergence.rdd.map(lambda x: (x[1], x[2])).take(2)

[([3616.0, 729.0, 4713.0, 1252.0], [3616.0, 729.0, 4713.0, 1252.0]),
 ([3630.0, 10464.0, 608.0, 2931.0], [3630.0, 10464.0, 608.0, 2931.0])]

In [32]:
ranking_metrics = RankingMetrics(user_divergence.rdd.map(lambda x: (x[1], x[2])))

In [33]:
ranking_metrics.meanAveragePrecision

0.9844389850334991

## Evaluación en Testing data

In [34]:
yelp_test = spark.read\
                .json('data/review_test_data.json')\
                .select('user_id', 'business_id', 'stars')
yelp_test_proc = pipeline_proc\
                    .fit(yelp_test)\
                    .transform(yelp_test)

In [35]:
get_predictions_on_test = train_als_model.transform(yelp_test_proc)

In [36]:
get_predictions_on_test.show(5)

+--------------------+--------------------+-----+---------------+-------------------+-----------+
|             user_id|         business_id|stars|user_id_indexed|business_id_indexed| prediction|
+--------------------+--------------------+-----+---------------+-------------------+-----------+
|5ELA1Xm-xeE8oj_In...|gA9hCYY7MYl9oZ3ay...|  5.0|         1829.0|              148.0|-0.06538749|
|mWHhRg6twaUGlqjYg...|gA9hCYY7MYl9oZ3ay...|  5.0|        14752.0|              148.0|  3.4248352|
|GXK2LOBctKVg7jpld...|gA9hCYY7MYl9oZ3ay...|  5.0|        28057.0|              148.0|  2.3234031|
|VFJwV-4OxcxtTM7ME...|gA9hCYY7MYl9oZ3ay...|  1.0|        11081.0|              148.0| -2.0340717|
|w3G8B2S7-AUxKCLIw...|gA9hCYY7MYl9oZ3ay...|  4.0|        28253.0|              148.0|  0.4594403|
+--------------------+--------------------+-----+---------------+-------------------+-----------+
only showing top 5 rows



In [37]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluate_als = RegressionEvaluator(predictionCol='prediction',labelCol='stars', metricName='rmse')
get_rmse = evaluate_als.evaluate(get_predictions_on_test)
print(f"El RMSE promedio en el conjunto de prueba es de: {get_rmse}")

El RMSE promedio en el conjunto de prueba es de: 4.407713895692625


In [83]:

users_predictions = get_predictions_on_test\
  .orderBy(col('user_id_indexed'), expr("prediction DESC"))\
  .groupBy('user_id_indexed')\
  .agg(expr('collect_set(business_id_indexed)').alias('user_preds'))

get_positive_recs = get_predictions_on_test\
  .where('stars > 2.5')\
  .groupBy('user_id_indexed')\
  .agg(expr('collect_set(business_id_indexed)').alias('likely_preds'))
  
user_divergence = get_positive_recs.join(users_predictions,['user_id_indexed'])

ranking_metrics_on_test = RankingMetrics(user_divergence.rdd.map(lambda x: (x[1], x[2])))
ranking_metrics_on_test.meanAveragePrecision

0.983261906418922

In [84]:
for i in range(1, 6):
    print(f"Accuracy para {i} -> {ranking_metrics_on_test.precisionAt(i)}")

Accuracy para 1 -> 1.0
Accuracy para 2 -> 0.552281226626776
Accuracy para 3 -> 0.37773123909249573
Accuracy para 4 -> 0.28621540762902026
Accuracy para 5 -> 0.23006133133881831


## Búsqueda de Grilla

In [45]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [46]:
als_instance = ALS(userCol='user_id_indexed',
                itemCol='business_id_indexed',
                ratingCol='stars',
                coldStartStrategy='drop')

params = ParamGridBuilder()\
        .addGrid(als_instance.rank,[3, 5, 7, 10])\
        .addGrid(als_instance.regParam, [0.001, 0.1, 1])\
        .build()

eval_rmse = RegressionEvaluator(predictionCol='prediction',
                               labelCol='stars',
                               metricName='rmse')

grid_search_als = CrossValidator(estimator=als_instance,
                                estimatorParamMaps = params,
                                evaluator=eval_rmse,
                                numFolds=5)

In [50]:
grid_search_als_train = grid_search_als.fit(yelp_review_proc)

In [81]:
for values in zip(params, grid_search_als_train.avgMetrics):
    tmp_values = list(values[0].values())
    print(f"regParam: {tmp_values[0]} / rank: {tmp_values[1]} -> {values[1]:.2f}")

regParam: 3 / rank: 0.001 -> 12.41
regParam: 3 / rank: 0.1 -> 4.62
regParam: 3 / rank: 1.0 -> 4.22
regParam: 5 / rank: 0.001 -> 8.66
regParam: 5 / rank: 0.1 -> 4.31
regParam: 5 / rank: 1.0 -> 4.04
regParam: 7 / rank: 0.001 -> 6.56
regParam: 7 / rank: 0.1 -> 4.15
regParam: 7 / rank: 1.0 -> 3.97
regParam: 10 / rank: 0.001 -> 4.93
regParam: 10 / rank: 0.1 -> 4.04
regParam: 10 / rank: 1.0 -> 3.94
