# Sistema de Recomendación con Pyspark

Para este ejercicio, haremos un sistema de recomendación para usuarios del Sistema de Transporte Colectivo Metro basada en patrones de uso. Buscaremos analizar los patrones de ingresos a lo largo del tiempo y detectar tendencias o comportamientos recurrentes. Esto puede ayudar a generar recomendaciones personalizadas para los usuarios en función de su historial de viajes o preferencias.

## Carga de Módulos

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import isnan, when, col, abs

from pyspark.ml.feature import StringIndexer

## Carga de los datos (Pandas)

Los datos que usaremos son datos gubernamentales abiertos sobre los ingresos del Sistema de Transporte Colectivo Metro por línea a partir del año 2012. Los datos están disponibles en: https://datos.cdmx.gob.mx/dataset/ingresos-del-sistema-de-transporte-colectivo-metro


In [3]:
df = pd.read_csv("ingresos_stc_0123.csv")

In [4]:
df.columns

Index(['fecha', 'tipo_ingreso', 'linea', 'ingreso'], dtype='object')

# Análisis exploratorio de los datos

In [5]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 151260 entries, 0 to 151259
Data columns (total 4 columns):
 #   Column        Non-Null Count   Dtype  
---  ------        --------------   -----  
 0   fecha         151260 non-null  object 
 1   tipo_ingreso  151260 non-null  object 
 2   linea         151260 non-null  object 
 3   ingreso       128823 non-null  float64
dtypes: float64(1), object(3)
memory usage: 4.6+ MB


In [6]:
df.isna().sum()

fecha               0
tipo_ingreso        0
linea               0
ingreso         22437
dtype: int64

In [7]:
df[df['ingreso'].isnull()].head(10)

Unnamed: 0,fecha,tipo_ingreso,linea,ingreso
11,2012-01-01,Boletos,12,
23,2012-01-02,Boletos,12,
35,2012-01-03,Boletos,12,
47,2012-01-04,Boletos,12,
59,2012-01-05,Boletos,12,
71,2012-01-06,Boletos,12,
83,2012-01-07,Boletos,12,
95,2012-01-08,Boletos,12,
107,2012-01-09,Boletos,12,
119,2012-01-10,Boletos,12,


In [8]:
df[df['ingreso'].isnull()].tail(10)

Unnamed: 0,fecha,tipo_ingreso,linea,ingreso
151249,2023-01-31,QR/Validador,2,
151250,2023-01-31,QR/Validador,3,
151251,2023-01-31,QR/Validador,4,
151252,2023-01-31,QR/Validador,5,
151253,2023-01-31,QR/Validador,6,
151254,2023-01-31,QR/Validador,7,
151255,2023-01-31,QR/Validador,8,
151257,2023-01-31,QR/Validador,A,
151258,2023-01-31,QR/Validador,B,
151259,2023-01-31,QR/Validador,12,


Podemos darnos cuenta que en su mayoría de las columnas vacías son de la línea 12. Haciendo una rápida investigación podemos darnos cuenta que esta línea no se inauguró hasta Octubre de 2012, y además volvió a cerrar en 2021.

Podemos darnos cuenta que el tipo de ingreso QR/Validador igual tiene demasiados registros vacíos, esto podría ser por que aún no se ha implementado por completo este tipo de ingreso.


## Creación de la sesion de Spark

In [9]:
spark = SparkSession.builder \
    .appName("Recomendador de Patrones de Uso") \
    .getOrCreate()

## Carga de datos con Spark

Para Tratar los valores nulos que previamente analizamos, sustituiremos este valor nulo por 0, ya que es importante considerar esto para la recomendación

In [10]:
data = spark.read.csv('ingresos_stc_0123.csv', header=True, inferSchema=True)

# Sustituir valores nulos por 0
data = data.withColumn('ingreso', when(isnan(data.ingreso), 0).otherwise(data.ingreso))


In [11]:
  data.head(10)

[Row(fecha=datetime.date(2012, 1, 1), tipo_ingreso='Boletos', linea='1', ingreso=648825.0),
 Row(fecha=datetime.date(2012, 1, 1), tipo_ingreso='Boletos', linea='2', ingreso=657120.0),
 Row(fecha=datetime.date(2012, 1, 1), tipo_ingreso='Boletos', linea='3', ingreso=466803.0),
 Row(fecha=datetime.date(2012, 1, 1), tipo_ingreso='Boletos', linea='4', ingreso=60120.0),
 Row(fecha=datetime.date(2012, 1, 1), tipo_ingreso='Boletos', linea='5', ingreso=242280.0),
 Row(fecha=datetime.date(2012, 1, 1), tipo_ingreso='Boletos', linea='6', ingreso=138525.0),
 Row(fecha=datetime.date(2012, 1, 1), tipo_ingreso='Boletos', linea='7', ingreso=138690.0),
 Row(fecha=datetime.date(2012, 1, 1), tipo_ingreso='Boletos', linea='8', ingreso=328383.0),
 Row(fecha=datetime.date(2012, 1, 1), tipo_ingreso='Boletos', linea='9', ingreso=176.0),
 Row(fecha=datetime.date(2012, 1, 1), tipo_ingreso='Boletos', linea='A', ingreso=275370.0)]

Para el sistema de recomendación utilizaremos el algoritmo ALS (Alternating Least Squares) que es una opción popular y efectiva para construir sistemas de recomendación. Es altamente escalable, capaz de manejar datos dispersos y utiliza la factorización de matrices para capturar características latentes. ALS es fácil de implementar en PySpark y proporciona una métrica de evaluación estándar para optimizar el modelo.

Comenzamos haciendo un tratamiendo a las cadenas de texto para meterlo al modelo como números, ya que es así como la computadora los puede entender

In [12]:
# Crear objetos StringIndexer para las columnas categóricas
tipo_ingreso_indexer = StringIndexer(inputCol="tipo_ingreso", outputCol="tipo_ingreso_index")
linea_indexer = StringIndexer(inputCol="linea", outputCol="linea_index")

# Ajustar los StringIndexer al DataFrame
indexer_model = tipo_ingreso_indexer.fit(data)
data_indexed = indexer_model.transform(data)
indexer_model = linea_indexer.fit(data_indexed)
data_indexed = indexer_model.transform(data_indexed)

Verificamos si las transformaciones se realizaron correctamente

In [13]:
unique_tipo_ingreso_index = data_indexed.select("tipo_ingreso_index").distinct()
unique_tipo_ingreso_index.show()

+------------------+
|tipo_ingreso_index|
+------------------+
|               0.0|
|               1.0|
|               3.0|
|               2.0|
+------------------+



In [14]:
unique_linea_index = data_indexed.select("linea_index").distinct()
unique_linea_index.show()

+-----------+
|linea_index|
+-----------+
|        8.0|
|        0.0|
|        7.0|
|        1.0|
|        4.0|
|       11.0|
|        3.0|
|        2.0|
|       10.0|
|        6.0|
|        5.0|
|        9.0|
+-----------+



# Modelado

En esta parte, ajustaremos el ALS a nuestros datos, y usaremos la métrica MAPE (Mean Absolut Precentage Error) para validar los resultados

In [15]:
# Dividir el conjunto de datos en entrenamiento y prueba (80% para entrenamiento, 20% para prueba)
train_data, test_data = data_indexed.randomSplit([0.8, 0.2], seed=42)

# Ajustar ALS al conjunto de entrenamiento
als = ALS(userCol="tipo_ingreso_index", itemCol="linea_index", ratingCol="ingreso")
model = als.fit(train_data)

# Generar recomendaciones para los usuarios en el conjunto de prueba
predictions = model.transform(test_data)

# Calcular el error absoluto porcentual (MAPE)
mape = predictions.withColumn("error", abs(col("ingreso") - col("prediction")) / col("ingreso")).selectExpr("mean(error) as MAPE")

# Mostrar las recomendaciones y el MAPE
predictions.select("fecha", "tipo_ingreso", "linea", "ingreso", "tipo_ingreso_index", "linea_index", "prediction").show(truncate=False)
mape.show()

+----------+------------+-----+--------+------------------+-----------+----------+
|fecha     |tipo_ingreso|linea|ingreso |tipo_ingreso_index|linea_index|prediction|
+----------+------------+-----+--------+------------------+-----------+----------+
|2012-01-01|Recargas    |12   |0.0     |1.0               |1.0        |904461.06 |
|2012-01-01|Recargas    |7    |60717.5 |1.0               |7.0        |631763.06 |
|2012-01-01|Recargas    |B    |144235.0|1.0               |11.0       |678194.25 |
|2012-01-02|Recargas    |12   |0.0     |1.0               |1.0        |904461.06 |
|2012-01-02|Recargas    |3    |752359.0|1.0               |3.0        |1151421.8 |
|2012-01-02|Recargas    |7    |467594.0|1.0               |7.0        |631763.06 |
|2012-01-01|Tarjetas    |5    |890.0   |2.0               |5.0        |1934.3373 |
|2012-01-01|Tarjetas    |B    |1190.0  |2.0               |11.0       |3582.6804 |
|2012-01-02|Tarjetas    |2    |9920.0  |2.0               |2.0        |7971.082  |
|201

El MAPE es una métrica relativa que se expresa como un porcentaje. En este caso, el MAPE del 253.79 indica que, en promedio, las predicciones tienen un error del 253.79% en relación con los valores reales. Si bien esto puede parecer alto, también es importante considerar la escala y el rango de los valores de ingreso en el conjunto de datos.

La alta variabilidad en los valores de ingreso puede dificultar la precisión de las predicciones. En el conjunto de datos, se pueden observar valores de ingreso muy bajos, como 0.0, y valores muy altos, como 965,385.0. Esta amplia gama de valores puede contribuir a un MAPE más alto, ya que las predicciones pueden tener dificultades para capturar la variabilidad en los datos.

# Recomendación

In [38]:
from pyspark.ml.feature import StringIndexer

new_data = spark.createDataFrame([
    ('2023/06/23', 0.0, 2),
    ('2023/06/23', 1.0, 3)
], ['fecha', 'tipo_ingreso_index', 'linea_index'])


new_data.head(3)
recommendations = model.transform(new_data)
recommendations.select("fecha", "tipo_ingreso_index", "linea_index", "prediction").show(truncate=False)

+----------+------------------+-----------+----------+
|fecha     |tipo_ingreso_index|linea_index|prediction|
+----------+------------------+-----------+----------+
|2023/06/23|0.0               |2          |1038415.94|
|2023/06/23|1.0               |3          |1151421.8 |
+----------+------------------+-----------+----------+



De estos datos de entrada podemos concluir que la línea 2 será mejor opción que la línea 3 ya que habrá menos gente