In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

In [0]:
# Ruta donde se ha subido el archivo en DBFS (ajusta según la ubicación exacta)
file_path = "/FileStore/tables/ecommerce_data_cleaned.csv"

# Cargar el archivo CSV en un DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Mostrar algunas filas para verificar que se cargó correctamente
df.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+-----+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|           Revenue|Month|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+-----+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|15.299999999999999|   12|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|             20.34|   12|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|              22.0|   12|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|             20.34|   12|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:0

In [0]:
# Convertir la columna Month a tipo entero explícitamente
df_ml = df.withColumn("Month", col("Month").cast("int"))

# Definir las características y la etiqueta
df_ml = df_ml.withColumn("label", df["Revenue"])

assembler = VectorAssembler(inputCols=["Quantity", "UnitPrice", "Month"], outputCol="features")
pipeline = Pipeline(stages=[assembler])
df_prepared = pipeline.fit(df_ml).transform(df_ml)

# Dividir los datos en conjunto de entrenamiento y prueba
train_data, test_data = df_prepared.randomSplit([0.8, 0.2], seed=42)

In [0]:
# Definir el modelo de Random Forest
rf = RandomForestRegressor(featuresCol="features", labelCol="label")

# Entrenar el modelo
rf_model = rf.fit(train_data)

In [0]:
# Hacer predicciones en los datos de prueba
test_results = rf_model.transform(test_data)

# Evaluar el modelo
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(test_results)
r2 = evaluator_r2.evaluate(test_results)

print(f"RMSE: {rmse}")
print(f"R²: {r2}")


RMSE: 535.5743221872216
R²: 0.008319863384161352


Análisis de los Resultados del Random Forest
RMSE (Error Cuadrático Medio):
El RMSE de 535.57 sigue siendo un error alto, lo que indica que las predicciones están bastante lejos de los valores reales del Revenue. Random Forest tiende a funcionar mejor que la regresión lineal en problemas no lineales, pero en este caso, el modelo tampoco parece estar capturando correctamente las relaciones en los datos.

R² (Coeficiente de Determinación):
El R² de 0.0083 sugiere que el modelo no está logrando explicar la variabilidad del Revenue. Un valor tan bajo de R² significa que casi no hay correlación entre las predicciones y los valores reales del Revenue.

¿Qué indica esto?
Posibles causas:
Sobreajuste (overfitting): Si has entrenado el modelo en un dataset muy pequeño o con variables que no tienen mucha correlación con el target (Revenue), el modelo puede haber memorizado los datos de entrenamiento pero no generaliza bien en el dataset de prueba.
Datos insuficientes: Random Forest puede no funcionar bien si los datos que le has dado no tienen suficientes patrones claros, o si las variables que estás utilizando no están bien correlacionadas con el objetivo.
Relaciones no capturadas: Puede que las variables Quantity, UnitPrice y Month no sean suficientes para predecir el Revenue de manera precisa. Podría faltar alguna característica más importante.
Próximos pasos sugeridos:
Validación cruzada: Utiliza técnicas de validación cruzada para obtener una mejor evaluación del rendimiento del modelo.

Más características: Considera incluir más variables o características en tu modelo, como podría ser el comportamiento del cliente, el historial de compras, la segmentación geográfica, etc.

Paso 1: Implementar Validación Cruzada
Primero, abordemos la validación cruzada en el contexto de un Random Forest. Este enfoque divide el conjunto de datos en múltiples subconjuntos o "folds". Entrenamos el modelo en algunos subconjuntos y lo probamos en otros para obtener una evaluación más robusta de su rendimiento.

En PySpark, podemos usar la clase CrossValidator para llevar a cabo este proceso. Aquí te dejo un ejemplo:

¿Qué es la validación cruzada?
La validación cruzada es una técnica que se utiliza para evaluar la eficacia de un modelo de Machine Learning. En vez de entrenar y evaluar el modelo una sola vez, se realizan múltiples particiones en los datos. El objetivo es evitar que los resultados obtenidos dependan de una única división del conjunto de datos (entrenamiento/prueba), lo que podría generar sobreajuste (overfitting) o subajuste (underfitting).

Explicación práctica:
División en "folds": En lugar de entrenar y probar el modelo una vez, dividimos los datos en, por ejemplo, 5 grupos llamados folds.
Entrenamiento y validación repetida: Usamos 4 de esos 5 folds para entrenar el modelo y el fold restante para validar. Este proceso se repite cambiando el fold de validación en cada ciclo.
Promediamos los resultados: Los errores obtenidos en cada iteración se promedian para dar una estimación más robusta del rendimiento del modelo.
La idea detrás de la validación cruzada es obtener un modelo que generalice mejor y que no dependa de una única partición de los datos.

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

# Definir el modelo Random Forest
rf = RandomForestRegressor(featuresCol='features', labelCol='label')

# Configurar el grid de parámetros (si quieres hacer un tuning de hiperparámetros)
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [20, 50]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

# Configurar el evaluador
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# Configurar el CrossValidator
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  # 5-fold cross-validation

# Entrenar con validación cruzada
cvModel = crossval.fit(train_data)

# Evaluar en los datos de prueba
test_results = cvModel.transform(test_data)
rmse = evaluator.evaluate(test_results)
r2 = evaluator.evaluate(test_results, {evaluator.metricName: "r2"})

print(f"RMSE: {rmse}")
print(f"R²: {r2}")


RMSE: 535.3894753895572
R²: 0.00900427731731046


Paso 2: Añadir Más Características (Features)
Ya hemos trabajado con Quantity, UnitPrice, y Month. Sin embargo, podemos explorar más variables que puedan capturar mejor la variabilidad del Revenue.

Algunas nuevas características a considerar podrían ser:

Country: Para ver si el país influye en las ventas.
CustomerID: Identificar si algunos clientes generan más ventas que otros.
Day of Week o Day of Month: Podría ser relevante ver qué días del mes o de la semana son más productivos.
Historias de Ventas: Información del historial del cliente o del producto, como el número de compras anteriores, podría ser útil.
Aquí te dejo cómo podrías incluir nuevas columnas y continuar con el proceso:

Explicación del código propuesto:
Este código agrega nuevas características a tu conjunto de datos para mejorar el modelo de predicción de ingresos (Revenue). Aquí está el desglose:

Nuevas características añadidas:

DayOfMonth: El día del mes en que se realizó la compra (del 1 al 31).
DayOfWeek: El día de la semana en que se realizó la compra (del 1 al 7).
CountryIndex: Un índice numérico para cada país (por ahora convertido directamente, aunque esto podría requerir un tratamiento especial).
VectorAssembler: Se actualiza con las nuevas columnas (DayOfMonth, DayOfWeek, CountryIndex), además de las ya existentes (Quantity, UnitPrice, Month).

In [0]:

from pyspark.sql.functions import dayofmonth, dayofweek

# Añadir características adicionales
df_ml = df_ml.withColumn("DayOfMonth", dayofmonth(col("InvoiceDate")))
df_ml = df_ml.withColumn("DayOfWeek", dayofweek(col("InvoiceDate")))
df_ml = df_ml.withColumn("CountryIndex", col("Country").cast("int"))  # Si se puede codificar Country

# Actualizamos el assembler con las nuevas columnas
assembler = VectorAssembler(inputCols=["Quantity", "UnitPrice", "Month", "DayOfMonth", "DayOfWeek", "CountryIndex"], outputCol="features")

# Continuamos con el pipeline como antes
pipeline = Pipeline(stages=[assembler])
df_prepared = pipeline.fit(df_ml).transform(df_ml)

# Podemos usar el mismo código para entrenar y evaluar el modelo


1. One-Hot Encoding (OHE)
El OHE es una técnica que se utiliza para convertir variables categóricas en un formato que los modelos de Machine Learning puedan entender. Cada categoría se representa como una columna binaria (0 o 1), lo que permite que el modelo procese variables categóricas sin introducir un orden implícito. En este caso, aplicaremos OHE a la columna Country para que el modelo pueda aprovechar esta información de manera más efectiva.

2. Implementación de OHE
El código que agregué previamente te proporciona una estructura básica. Ahora, aquí te dejo un código que incluye OHE para la columna Country:

In [0]:
# Verificar si la columna 'CountryIndex' ya existe y eliminarla
if 'CountryIndex' in df_ml.columns:
    df_ml = df_ml.drop('CountryIndex')

# Convertir 'Country' a índices numéricos
indexer = StringIndexer(inputCol="Country", outputCol="CountryIndex")

# Aplicar One-Hot Encoding a 'CountryIndex'
encoder = OneHotEncoder(inputCol="CountryIndex", outputCol="CountryVec")

# Actualizar assembler con todas las columnas necesarias
assembler = VectorAssembler(inputCols=["Quantity", "UnitPrice", "Month", "DayOfMonth", "DayOfWeek", "CountryVec"], outputCol="features")

# Crear el pipeline con indexer, encoder y assembler
pipeline = Pipeline(stages=[indexer, encoder, assembler])

# Transformar los datos
df_prepared = pipeline.fit(df_ml).transform(df_ml)

# Mostrar las columnas resultantes
df_prepared.select("features", "label").show(10)



+--------------------+------------------+
|            features|             label|
+--------------------+------------------+
|(42,[0,1,2,3,4,5]...|15.299999999999999|
|(42,[0,1,2,3,4,5]...|             20.34|
|(42,[0,1,2,3,4,5]...|              22.0|
|(42,[0,1,2,3,4,5]...|             20.34|
|(42,[0,1,2,3,4,5]...|             20.34|
|(42,[0,1,2,3,4,5]...|              15.3|
|(42,[0,1,2,3,4,5]...|              25.5|
|(42,[0,1,2,3,4,5]...|11.100000000000001|
|(42,[0,1,2,3,4,5]...|11.100000000000001|
|(42,[0,1,2,3,4,5]...|             54.08|
+--------------------+------------------+
only showing top 10 rows



4. Explicación del OHE
One-Hot Encoding (OHE) convierte una variable categórica en múltiples columnas binarias. Por ejemplo, si tienes 3 países, United Kingdom, France, y Germany, OHE crea 3 columnas, cada una con 0 o 1, indicando si el valor pertenece a ese país.

Ventaja: Evita que el modelo interprete incorrectamente una relación numérica entre las categorías (por ejemplo, si UK=1, France=2, Germany=3, el modelo podría suponer que France > UK, lo cual no tiene sentido).

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

# 1. Realizar las predicciones en el conjunto de test
predictions = rf_model.transform(test_data)

# 2. Crear un evaluador para RMSE
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# Calcular el RMSE
rmse = evaluator_rmse.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# 3. Crear un evaluador para R²
evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

# Calcular el R²
r2 = evaluator_r2.evaluate(predictions)
print(f"R Squared (R²): {r2}")


Root Mean Squared Error (RMSE): 535.6690229779704
R Squared (R²): 0.0079691326186907
