# Generación de Modelos Inteligentes Usando PySpark y MLLib

### Arturo Cristián Díaz López
### Instituto Tecnológico y de Estudios Superiores de Monterrey
### 29-Oct-2024

## Introducción

En este proyecto, se ha empleado el dataset Fashion MNIST con el fin de construir un modelo de clasificación de imágenes de ropa y accesorios. Este dataset ha sido seleccionado debido a su popularidad en tareas de clasificación de imágenes, así como su similitud estructural con el clásico MNIST, pero con un mayor desafío visual al contener diez categorías de productos de moda. Este reporte resume las etapas clave del proyecto, incluyendo la visualización del dataset en Tableau, el desarrollo de un modelo en PySpark y su evaluación final.

## Dataset

El dataset Fashion MNIST contiene imágenes en escala de grises de 28x28 píxeles de 10 clases de artículos de moda. Cada imagen está etiquetada con un número de clase correspondiente a un tipo específico de artículo.

### Estructura

El dataset cuenta con las siguientes 10 clases, pertenecientes a una categoría de artículo de moda:

- **0.** T-shirt
- **1.** Trouser
- **2.** Pullover
- **3.** Dress
- **4.** Coat
- **5.** Sandal
- **6.** Shirt
- **7.** Sneaker
- **8.** Bag
- **9.** Ankle boot

### Visualización

Para mejorar la comprensión de las características visuales del dataset, se realizó una visualización en Tableau, explorando la distribución y las características de las imágenes en cada categoría. En esta visualización se observan diferencias entre clases. El siguiente código en Python muestra cómo se transformó el dataset para realizar una visualización en Tableau:

In [None]:
import pandas as pd

# Cargar el dataset
df = pd.read_csv('data/fashion-mnist_test.csv')

# Convertir de wide a long format
df_long = df.melt(id_vars=["label"], 
                  var_name="pixel", 
                  value_name="intensity")

# Extraer las coordenadas x, y a partir del número del píxel
df_long['x'] = df_long['pixel'].str.extract(r'(\d+)').astype(int) % 28
df_long['y'] = df_long['pixel'].str.extract(r'(\d+)').astype(int) // 28

# Guardar en un nuevo CSV
df_long.to_csv('data/fashion_mnist_long.csv', index=False)

El anterior código permitió obtener un archivo con datos fácilmente cargables en Tableau, aplicación que permitió generar visualizaciones a través del siguiente proceso:

1. **Carga de datos**: El archivo generado `fashion_mnist_long.csv` fue importado a Tableau.
2. **Preparación de datos**: Las coordenadas `x` y `y` se usaron para representar visualmente cada píxel, mientras que la columna `intensity` determinó la intensidad de color.
3. **Exploración por categorías**: Se utilizó la columna `label` para filtrar y analizar cada clase de artículo de moda de forma individual.

A continuación, se presentan las visualizaciones generadas en la aplicación antes mencionada:

![Dashboard0](./assets/Dashboard0.png)

![Dashboard1](./assets/Dashboard1.png)

![Dashboard2](./assets/Dashboard2.png)

![Dashboard3](./assets/Dashboard3.png)

![DashboardAll](./assets//DashboardAll.png)

Adicionalmente, se realizó una visualización para verificar el número de ítems por clase. Esta visualización permitió encontrar que las clases se encuentran distribuidas equitativamente para todas las imágenes.

![DashboardClasses](./assets/ClassesDistribution.png)

## Modelo

El modelo seleccionado fue un modelo de Random Forest Classifier para la clasificación de imágenes en el conjunto de datos de Fashion MNIST. Este modelo es una elección popular para tareas de clasificación debido a su capacidad de manejar grandes conjuntos de datos y su flexibilidad para ajustarse a distintos tipos de problemas.

### Implementación en Apache Spark

El modelo fue implementado usando PySpark, la interfaz Python de Apache Spark. El Random Forest fue elegido por su capacidad de evitar el sobreajuste y por su buen rendimiento en entornos de datos complejos. La elección de PySpark permitió un manejo eficiente del volumen de datos y del entrenamiento en paralelo, logrando un modelo escalable y robusto.


### Carga y preparación de datos

1. Configuración de Spark: Se estableció una sesión de Spark optimizada para el manejo de datos en paralelo, asignando memoria suficiente para la carga y el procesamiento del conjunto de datos.

2. Carga y Preparación de Datos: El conjunto de entrenamiento se almacenó en formato Parquet y se particionó en 100 bloques, mejorando así la eficiencia del entrenamiento distribuido.

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

# Creo mi sesión de Spark
spark = SparkSession.builder \
    .appName("Fashion MNIST Data Preparation") \
    .getOrCreate()

# Cargo los datos
train_data = spark.read.csv("data/fashion-mnist_train.csv", header=True, inferSchema=True)
test_data = spark.read.csv("data/fashion-mnist_test.csv", header=True, inferSchema=True)

# Preparo las columnas de características
feature_columns = [f"pixel{i}" for i in range(1, 785)]
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

# Transformo los datos
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

# Selecciono las columnas relevantes
train_data = train_data.select("label", "features")
test_data = test_data.select("label", "features")

# Guardo los datos preparados en formato Parquet
train_data.write.mode('overwrite').parquet("data/prepared_train_data.parquet")
test_data.write.mode('overwrite').parquet("data/prepared_test_data.parquet")

# Muestro las primeras filas del DataFrame modificado
train_data.select("label", "features").show(5)

# Detengo la sesión de Spark
spark.stop()

### Entrenamiento y guardado

3. El clasificador se entrenó utilizando 100 árboles en el bosque aleatorio, configurado para equilibrar la precisión y la rapidez.

4. El modelo entrenado se guardó para su evaluación y aplicación en futuras predicciones.

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier

# Creo mi sesión de Spark
spark = SparkSession.builder \
    .appName("Fashion MNIST Model Training") \
    .config("spark.executor.memory", "10g") \
    .config("spark.driver.memory", "5g") \
    .getOrCreate()

# Cargo los datos preparados
train_data = spark.read.parquet("data/prepared_train_data.parquet").repartition(100)

# Entreno el modelo
rf = RandomForestClassifier(featuresCol='features', labelCol='label', numTrees=100)
model = rf.fit(train_data)

# Guardo el modelo
model.save("models/fashion_mnist_rf_model")

## Evaluación

La evaluación del modelo se llevó a cabo utilizando métricas estándar de clasificación multiclase. El modelo fue probado en el conjunto de datos de prueba, aplicando el modelo entrenado para generar predicciones sobre cada clase de Fashion MNIST.

Se cargaron los datos de prueba y el modelo entrenado. Los datos de prueba fueron transformados para obtener predicciones y probabilidades asociadas a cada clase.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, concat_ws
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vector
from pyspark.sql.types import ArrayType, DoubleType

# Inicializo Spark
spark = SparkSession.builder \
    .appName("Fashion MNIST Model Evaluator") \
    .config("spark.executor.memory", "10g") \
    .config("spark.driver.memory", "5g") \
    .getOrCreate()

# Defino un UDF para convertir un Vector a un Array
def vector_to_array(vector):
    return vector.toArray().tolist() if isinstance(vector, Vector) else []

# Cargo datos de prueba
test_data = spark.read.parquet("data/prepared_test_data.parquet")

# Cargo el modelo entrenado
model = RandomForestClassificationModel.load("models/fashion_mnist_rf_model")

# Realizo predicciones
predictions = model.transform(test_data)

# Evalúo el modelo
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1_score = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

# Imprimo las métricas del modelo
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-Score: {f1_score}")

# Guardo las métricas en un archivo de texto
with open("model_metrics.txt", "w") as metrics_file:
    metrics_file.write(f"Accuracy: {accuracy}\n")
    metrics_file.write(f"Precision: {precision}\n")
    metrics_file.write(f"Recall: {recall}\n")
    metrics_file.write(f"F1-Score: {f1_score}\n")

# Convierte 'features', 'rawPrediction', y 'probability' a arrays

vector_to_array_udf = udf(vector_to_array, ArrayType(DoubleType()))
predictions = predictions.withColumn("features", vector_to_array_udf(col("features"))) \
                         .withColumn("rawPrediction", vector_to_array_udf(col("rawPrediction"))) \
                         .withColumn("probability", vector_to_array_udf(col("probability")))

# Convertir columnas de arrays a cadenas separadas por comas
predictions = predictions.withColumn("features", concat_ws(",", col("features"))) \
                         .withColumn("rawPrediction", concat_ws(",", col("rawPrediction"))) \
                         .withColumn("probability", concat_ws(",", col("probability")))

# Guardo las predicciones en un archivo CSV
predictions.select("label", "prediction", "features", "rawPrediction", "probability").write.mode("overwrite").csv("predictions.csv", header=True)

# Cierro la sesión de Spark
spark.stop()

### Métricas de Evaluación

1. **Accuracy**:  
   Medida del porcentaje de predicciones correctas realizadas por el modelo.  
   **Valor obtenido**: 0.7648  

2. **Weighted Precision**:  
   Evalúa la precisión general del modelo ponderada por el número de muestras en cada clase.  
   **Valor obtenido**: 0.7907  

3. **Weighted Recall**:  
   Calcula el porcentaje de predicciones correctas sobre el total de instancias verdaderas por clase.  
   **Valor obtenido**: 0.7648  

4. **F1-Score**:  
   Una medida combinada que equilibra precisión y recall, ponderada por el soporte de cada clase.  
   **Valor obtenido**: 0.7434  


### Resultados

Se calcularon y almacenaron las métricas de evaluación, que son indicativas de un rendimiento aceptable del modelo sobre el conjunto de datos de prueba. Los resultados pueden encontrarse [aquí](./results/model_metrics.txt).

### Almacenamiento de predicciones

Las predicciones, junto con sus probabilidades y características originales, fueron exportadas a un archivo CSV para un análisis más detallado y posible visualización de resultados. Las predicciones hechas por el modelo pueden encontrarse en este [directorio](./predictions/).

Este enfoque evaluativo permite identificar el rendimiento global del modelo y verificar la eficacia de sus predicciones en un conjunto de datos nunca antes visto, asegurando así su capacidad de generalización.

## Conclusión

En conclusión, el modelo de clasificación desarrollado con el dataset Fashion MNIST ha demostrado una capacidad razonable para identificar las diferentes categorías de prendas y accesorios. La visualización en Tableau contribuyó al análisis exploratorio y facilitó una mejor comprensión de las características distintivas de cada clase. A través del uso de PySpark, se logró procesar grandes volúmenes de datos de manera eficiente, y se ha dejado abierta la posibilidad de optimizaciones futuras en función de los resultados obtenidos en la evaluación. Este proyecto sirve como una base sólida para el desarrollo de sistemas de clasificación más complejos en el campo de la moda y otras aplicaciones de visión por computadora.

## Bibliografía

1. Zalando. *Fashion MNIST Dataset*.  
   [https://github.com/zalandoresearch/fashion-mnist](https://github.com/zalandoresearch/fashion-mnist)

2. Apache Spark. *Apache Spark Documentation*.  
   [https://spark.apache.org/docs/latest/](https://spark.apache.org/docs/latest/)

3. Apache Spark. *PySpark Documentation*.  
   [https://spark.apache.org/docs/latest/api/python/](https://spark.apache.org/docs/latest/api/python/)

4. Apache Spark. *Machine Learning Pipelines*.  
   [https://spark.apache.org/docs/latest/ml-guide.html](https://spark.apache.org/docs/latest/ml-guide.html)

5. Breiman, L. (2001). Random Forests. *Machine Learning*, 45(1), 5-32.  
   [https://link.springer.com/article/10.1023/A:1010933404324](https://link.springer.com/article/10.1023/A:1010933404324)
