# Actividad 3 | Aprendizaje supervisado y no supervisado
---

**MAESTRÍA EN INTELIGENCIA ARTIFICIAL APLICADA**

**Curso: TC4034.10 - Análisis de grandes volúmenes de datos**

Tecnológico de Monterrey

* Dr. Iván Olmos Pineda
* Mtra. Verónica Sandra Guzmán de Valle
* Mtro. Alberto Daniel Salinas Montemayor

**Actividad 3** - 
Aprendizaje supervisado y no supervisado

**Fecha de entrega** - 
25 de mayo del 2025

---

**Presenta**

|  NOMBRE COMPLETO                        |     MATRÍCULA     |
| :-------------------------------------: |:-----------------:|
| Alejandro Díaz Villagómez               |  A01276769        |

# 1) Introducción teórica
---

## **Aprendizaje supervisado y no supervisado en PySpark**

El análisis y modelado de datos en entornos de Big Data requiere enfoques escalables que permitan procesar grandes volúmenes de información de forma eficiente. En este contexto, el aprendizaje automático (machine learning) ofrece un conjunto de técnicas y algoritmos que permiten extraer patrones útiles a partir de los datos. Estos algoritmos se agrupan tradicionalmente en dos grandes paradigmas: aprendizaje supervisado y aprendizaje no supervisado.

### **Aprendizaje supervisado**

El aprendizaje supervisado se basa en el uso de un conjunto de datos etiquetado, donde cada instancia posee una variable objetivo conocida (etiqueta o clase), que se desea predecir. El algoritmo aprende una función de mapeo a partir de ejemplos, con el objetivo de predecir la salida correspondiente para nuevas entradas. Esta técnica es ampliamente utilizada en problemas de clasificación y regresión.

Entre los algoritmos más representativos del aprendizaje supervisado se encuentran:

* Árboles de decisión (Decision Trees)
* Bosques aleatorios (Random Forests)
* Gradiente boosting (GBTClassifier)
* Redes neuronales multicapa (Multilayer Perceptron Classifier)

Estos algoritmos están implementados directamente en la librería pyspark.ml.classification, lo que permite su aplicación sobre grandes volúmenes de datos distribuidos con eficiencia.

### **Aprendizaje no supervisado**

En contraste, el aprendizaje no supervisado trabaja sobre datos sin etiquetas predefinidas, con el objetivo de encontrar estructuras ocultas o agrupaciones dentro de los datos. Es particularmente útil en tareas de segmentación de clientes, análisis exploratorio y reducción de dimensionalidad.

Entre los algoritmos más reconocidos se encuentran:

* K-Means Clustering
* Gaussian Mixture Models (GMM)
* Power Iteration Clustering (PIC)

Estos modelos también se encuentran disponibles en PySpark, a través del módulo pyspark.ml.clustering, el cual permite realizar agrupamientos escalables con configuraciones flexibles sobre datasets distribuidos.

### **Disponibilidad en PySpark**

PySpark, como API de Apache Spark para Python, ofrece una amplia gama de algoritmos de ambos enfoques dentro del paquete pyspark.ml. Esto permite diseñar flujos completos de aprendizaje automático que incluyan:

* Preparación de datos (transformaciones, codificación, escalado)
* División en entrenamiento y prueba
* Entrenamiento del modelo
* Evaluación de métricas de desempeño
* Predicción de nuevas instancias

Gracias a esta integración, PySpark se consolida como una herramienta clave para el desarrollo de modelos de machine learning a gran escala, tanto supervisados como no supervisados.

### **Consideraciones de rendimiento**

Una de las principales ventajas de PySpark frente a bibliotecas tradicionales de aprendizaje automático (como scikit-learn) radica en su capacidad de procesamiento distribuido. Al ejecutarse sobre clústeres de Spark, PySpark permite manejar datasets de gran volumen sin necesidad de cargarlos completamente en memoria, lo que lo convierte en una solución óptima para entornos empresariales o científicos con requerimientos de escalabilidad y eficiencia computacional.

### **Referencias**

- [Supervised versus unsupervised learning: What's the difference?](https://www.ibm.com/think/topics/supervised-vs-unsupervised-learning)
- [Supervised and Unsupervised learning](https://www.geeksforgeeks.org/supervised-unsupervised-learning/)
- [Building Machine Learning Models with PySpark's pyspark.ml Library: A Comprehensive Guide.](https://dev.to/grayhat/building-machine-learning-models-with-pysparks-pysparkml-library-a-comprehensive-guide-4g5h)

# 2) Selección de los datos
---

## 2.1) Carga de datos

In [1]:
# !pip install findspark pyspark setuptools

In [2]:
import findspark
from pyspark.sql import SparkSession, functions as F
from itertools import product
from os import path

findspark.init()
findspark.find()

'/Users/alejandrodiazvillagomez/Desktop/Proyecto-Big-Data-PySpark/.venv/lib/python3.12/site-packages/pyspark'

In [3]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

spark

25/05/20 20:48:10 WARN Utils: Your hostname, Alejandros-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.100.220 instead (on interface en0)
25/05/20 20:48:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/20 20:48:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
class FileManager():
    @staticmethod
    def open_csv_file(file_path : str):
        """
        This method opens a csv file with pyspark
        """
        csv_df = spark.read.csv(
            file_path,
            header=True,
            inferSchema=True,
            multiLine=True,
            escape="\"",
            quote="\""
        )

        # csv_df.show(truncate=20)

        return csv_df

In [5]:
import kagglehub

# Download latest version
FILE_PATH = kagglehub.dataset_download("machharavikiran/amazon-reviews")

print("Path to dataset files:", FILE_PATH)

df_reviews = FileManager.open_csv_file(FILE_PATH)
df_reviews.limit(10).toPandas()

  from .autonotebook import tqdm as notebook_tqdm


Path to dataset files: /Users/alejandrodiazvillagomez/.cache/kagglehub/datasets/machharavikiran/amazon-reviews/versions/1


                                                                                

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date,sentiment
0,US,22873041,R3ARRMDEGED8RD,B00KJWQIIC,335625766,Plemo 14-Inch Laptop Sleeve Case Waterproof Fa...,PC,5,0,0,N,Y,Pleasantly surprised,I was very surprised at the high quality of th...,2015-08-31,1
1,US,30088427,RQ28TSA020Y6J,B013ALA9LA,671157305,TP-Link OnHub AC1900 Wireless Wi-Fi Router,PC,5,24,31,N,N,OnHub is a pretty no nonsense type router that...,I am a Google employee and had to chance to us...,2015-08-31,1
2,US,20329786,RUXJRZCT6953M,B00PML2GQ8,982036237,AmazonBasics USB 3.0 A Male to A Male Cable - ...,PC,1,2,2,N,N,None of them worked. No functionality at all.,"Bought cables in 3ft, 6ft and 9ft. NONE of th...",2015-08-31,0
3,US,14215710,R7EO0UO6BPB71,B001NS0OZ4,576587596,Transcend P8 15-in-1 USB 2.0 Flash Memory Card...,PC,1,0,0,N,Y,just keep searching.,"nope, cheap and slow",2015-08-31,0
4,US,38264512,R39NJY2YJ1JFSV,B00AQMTND2,964759214,Aleratec SATA Data Cable 2.0 20in Serial ATA S...,PC,5,0,0,N,Y,Five Stars,Excellent! Great value and does the job.,2015-08-31,1
5,US,30548466,R31SR7REWNX7CF,B00KX4TORI,170101802,Kingston Digital MobileLite G4 USB 3.0 Multi-F...,PC,5,0,0,N,Y,"Good quality, works well and compact","Good quality,works well and compact size",2015-08-31,1
6,US,589298,RVBP8I1R0CTZ8,B00P17WEMY,206124740,White 9 Inch Unlocked Dual Sim Card Phone Tabl...,PC,3,1,2,N,Y,in fact this is third China good. Demn s***,This demn tablet is just a Real Chinese produc...,2015-08-31,0
7,US,49329488,R1QF6RS1PDLU18,B00TR05L9Y,778403103,"Lenovo TAB2 A10 - 10.1"" 2-in-1 Tablet (1.5Ghz,...",PC,4,1,1,N,Y,Good,I am not sure I don't know if it is the tablet...,2015-08-31,1
8,US,50728290,R23AICGEDAJQL1,B0098Y77OG,177098042,Acer,PC,1,0,0,N,Y,You get what you pay for,"After exactly 45 days, the screen went dark. P...",2015-08-31,0
9,US,37802374,R2EY3N4K9W19UP,B00IFYEYXC,602496520,AzureWave Broadcom BCM94352HMB 802.11/ac/867Mb...,PC,5,3,4,N,Y,Great for Windows 7 Laptop!,Replaced my Intel Centrino 2230 with the BCM94...,2015-08-31,1


En esta primera etapa, se realiza la carga del conjunto de datos correspondiente a reseñas de productos de Amazon. 

El archivo fuente se obtiene directamente desde Kaggle utilizando la librería `kagglehub`, lo que garantiza la reproducibilidad del experimento mediante una fuente pública y estandarizada.

Posteriormente, se emplea el método `open_csv_file()` de la clase `FileManager`, el cual encapsula la lógica de lectura del archivo utilizando PySpark. Esta función está configurada para:

- Leer archivos en formato CSV con encabezados (`header=True`)
- Inferir automáticamente los tipos de datos de cada columna (`inferSchema=True`)
- Soportar estructuras multilinea en los campos textuales como `review_body` (`multiLine=True`)
- Manejar correctamente comillas escapadas o anidadas (`escape="\""` y `quote="\""`)

El resultado de este proceso es un `DataFrame` distribuido denominado `df_reviews`, que contiene un total de más de 6.9 millones de reseñas en idioma inglés correspondientes a la categoría de productos "PC".

La correcta visualización de las primeras filas del dataset confirma que el archivo ha sido cargado de manera satisfactoria, y que los datos contienen información estructurada y adecuada para las siguientes etapas del proceso analítico.

## 2.2) Construcción de la muestra M

In [6]:
class PartitioningManager:

    @staticmethod
    def compute_probabilities(df, cols):
        """
        Computes and returns the probability of each combination of values in the specified columns.
        """
        total_count = df.count()
        return df.groupBy(cols).count() \
                 .withColumn("probability", F.round(F.col("count") / total_count, 6)) \
                 .orderBy("probability", ascending=False)

    @staticmethod
    def filter_partition(df, star_rating, verified_purchase, vine):
        """
        Filters the DataFrame by specific values for rating, verified purchase, and vine.
        """
        return df.filter(
            (F.col("star_rating") == star_rating) &
            (F.col("verified_purchase") == verified_purchase) &
            (F.col("vine") == vine)
        )

    @staticmethod
    def generate_all_partitions(df, min_probability=0.0001):
        """
        Generates partitions only for combinations whose joint probability is above min_probability.
        """

        prob_df = PartitioningManager.compute_probabilities(
            df, ["star_rating", "verified_purchase", "vine"]
        )

        filtered_combinations = prob_df.filter(
            F.col("probability") >= min_probability
        ).select("star_rating", "verified_purchase", "vine").collect()

        partitions = {}
        for row in filtered_combinations:
            rating = row["star_rating"]
            purchase = row["verified_purchase"]
            vine = row["vine"]

            key = f"R{rating}_VP{purchase}_V{vine}"
            filtered = PartitioningManager.filter_partition(df, rating, purchase, vine)
            # Add a partition key to the DataFrame
            filtered = filtered.withColumn("partition_key", F.lit(key))
            partitions[key] = filtered
            print(f"Partition {key} created with {filtered.count()} records.")

        return partitions

    @staticmethod
    def stratified_sample_partitioned_data(partitions_dict, label_col="sentiment", fraction=0.3, min_rows=50):
        """
        Applies stratified sampling to each partition based on sentiment.
        """
        sampled_partitions = {}

        for key, df in partitions_dict.items():
            count = df.count()

            if count < min_rows:
                print(f"Skipping partition {key} — only {count} rows (<{min_rows})")
                continue

            sentiments = df.select(label_col).distinct().rdd.flatMap(lambda x: x).collect()
            fractions = {s: fraction for s in sentiments}

            sampled_df = df.sampleBy(label_col, fractions, seed=42)
            sampled_partitions[key] = sampled_df
            print(f"Sampled {sampled_df.count()} rows from partition {key} (original: {count})")

        return sampled_partitions

    @staticmethod
    def consolidate_sampled_partitions(sampled_partitions):
        """
        Combines all sampled partitions into a single DataFrame.
        Prints validations about the final dataset.
        """
        from functools import reduce
        from pyspark.sql import DataFrame

        if not sampled_partitions:
            print("No partitions to consolidate.")
            return None

        sampled_dfs = list(sampled_partitions.values())
        df_M = reduce(DataFrame.unionByName, sampled_dfs)

        print("Consolidation sampled partitions completed.")
        print(f"Total number of records in the sample M: {df_M.count()}")

        print("Sentiment distribution in the sample M:")
        df_M.groupBy("sentiment").count().orderBy("count", ascending=False).show()
        
        print("Sampled partitions distribution:")
        df_M.groupBy("partition_key").count().orderBy("count", ascending=False).show()

        return df_M


In [7]:
partitions = PartitioningManager.generate_all_partitions(df_reviews, min_probability=0.00001)

                                                                                

Partition R5_VPY_VN created with 3679909 records.


                                                                                

Partition R4_VPY_VN created with 1019728 records.


                                                                                

Partition R1_VPY_VN created with 603371 records.


                                                                                

Partition R3_VPY_VN created with 443364 records.


                                                                                

Partition R5_VPN_VN created with 410073 records.


                                                                                

Partition R2_VPY_VN created with 300544 records.


                                                                                

Partition R1_VPN_VN created with 152779 records.


                                                                                

Partition R4_VPN_VN created with 135197 records.


                                                                                

Partition R3_VPN_VN created with 65398 records.


                                                                                

Partition R2_VPN_VN created with 59973 records.


                                                                                

Partition R5_VPN_VY created with 15604 records.


                                                                                

Partition R4_VPN_VY created with 13240 records.


                                                                                

Partition R3_VPN_VY created with 4886 records.


                                                                                

Partition R2_VPN_VY created with 1634 records.


                                                                                

Partition R1_VPN_VY created with 705 records.


[Stage 59:>                                                         (0 + 1) / 1]

Partition R5_VPY_VY created with 101 records.


                                                                                

Una vez cargado el conjunto de datos completo, se procede a realizar el **particionamiento** de la base de datos `df_reviews` en subconjuntos homogéneos, basados en combinaciones de valores representativos de las variables de caracterización previamente identificadas.

Para la generación de las particiones, se seleccionaron específicamente tres variables categóricas: `star_rating`, `verified_purchase` y `vine`. Esta elección responde a criterios de relevancia analítica, simplicidad estructural y viabilidad computacional:

* **Relevancia analítica**: Estas variables están directamente asociadas a la percepción del cliente (`star_rating`), la credibilidad de la reseña (`verified_purchase`) y su posible sesgo promocional (`vine`), siendo por tanto altamente explicativas del comportamiento del consumidor.
* **Evitar combinaciones poco frecuentes**: Incluir más variables de segmentación habría generado una *"explosión"* combinatoria en el número de particiones, muchas de las cuales serían extremadamente pequeñas o vacías, complicando tanto el análisis como el modelado.
* **Viabilidad computacional**: Reducir la dimensionalidad del espacio de particionamiento ayuda a mantener el tamaño de muestra bajo control, garantizando que los procesos posteriores de muestreo y modelado sean eficientes y escalables.

Este proceso se implementa a través del método `generate_all_partitions()` de la clase `PartitioningManager`. La lógica de este método se resume en los siguientes pasos:

1. **Cálculo de probabilidades conjuntas**:
   Se calcula la probabilidad de ocurrencia de todas las combinaciones posibles de las variables categóricas:
   - `star_rating` (valoración otorgada)
   - `verified_purchase` (indicador de compra verificada)
   - `vine` (participación en el programa de reseñas Vine)

   El objetivo es identificar combinaciones suficientemente frecuentes en la población como para constituir particiones relevantes.

2. **Filtrado de combinaciones**:
   Se utiliza un umbral de probabilidad mínima (`min_probability=0.00001`) para descartar combinaciones con ocurrencia insignificante, lo que permite evitar particiones demasiado pequeñas que no aporten valor analítico ni representatividad.

3. **Construcción de particiones**:
   Para cada combinación que supera el umbral de probabilidad, se crea una partición específica mediante filtrado del DataFrame original.
   A cada subconjunto se le asigna un identificador único (`partition_key`) con el formato `R{rating}_VP{Y/N}_V{Y/N}`, que facilita su trazabilidad.

   Por ejemplo:
   - `R5_VPY_VN` representa todas las reseñas con 5 estrellas, compra verificada, y sin participación en el programa Vine.
   - `R1_VPN_VY` representa reseñas con 1 estrella, sin compra verificada, pero con participación en Vine.

4. **Resultado**:
   El proceso genera un total de 16 particiones distintas con tamaños muy variados, lo cual es consistente con la distribución sesgada del dataset hacia reseñas positivas y verificadas.
   La partición `R5_VPY_VN` es la más numerosa con más de 3.6 millones de registros, mientras que otras como `R5_VPY_VY` o `R1_VPN_VY` tienen tamaños mucho más reducidos, lo que refleja escenarios menos comunes dentro de la plataforma.

Este paso es crucial, ya que permite segmentar la base de datos original en grupos de usuarios y comportamientos claramente definidos, lo que a su vez facilita:

- La aplicación de estrategias de muestreo diferenciadas,
- El análisis comparativo entre segmentos,
- La generación de modelos más precisos y representativos en etapas posteriores.

Además, garantiza que el análisis posterior no mezcle contextos heterogéneos, conservando la coherencia entre características de los usuarios y sus evaluaciones.

In [8]:
sampled_partitions = PartitioningManager.stratified_sample_partitioned_data(partitions, fraction=0.4, min_rows=100)

                                                                                

Sampled 1473392 rows from partition R5_VPY_VN (original: 3679909)


                                                                                

Sampled 408759 rows from partition R4_VPY_VN (original: 1019728)


                                                                                

Sampled 241961 rows from partition R1_VPY_VN (original: 603371)


                                                                                

Sampled 177507 rows from partition R3_VPY_VN (original: 443364)


                                                                                

Sampled 164237 rows from partition R5_VPN_VN (original: 410073)


                                                                                

Sampled 120365 rows from partition R2_VPY_VN (original: 300544)


                                                                                

Sampled 61298 rows from partition R1_VPN_VN (original: 152779)


                                                                                

Sampled 54241 rows from partition R4_VPN_VN (original: 135197)


                                                                                

Sampled 26223 rows from partition R3_VPN_VN (original: 65398)


                                                                                

Sampled 24060 rows from partition R2_VPN_VN (original: 59973)


                                                                                

Sampled 6330 rows from partition R5_VPN_VY (original: 15604)


                                                                                

Sampled 5385 rows from partition R4_VPN_VY (original: 13240)


                                                                                

Sampled 1979 rows from partition R3_VPN_VY (original: 4886)


                                                                                

Sampled 682 rows from partition R2_VPN_VY (original: 1634)


                                                                                

Sampled 294 rows from partition R1_VPN_VY (original: 705)


[Stage 203:>                                                        (0 + 1) / 1]

Sampled 35 rows from partition R5_VPY_VY (original: 101)


                                                                                

Con el objetivo de construir una muestra de trabajo contenida pero representativa, se procede a aplicar una técnica de **muestreo estratificado** dentro de cada una de las particiones generadas en el paso anterior.

Este proceso se lleva a cabo utilizando el método `stratified_sample_partitioned_data()` de la clase `PartitioningManager`, el cual opera bajo los siguientes principios:

1. **Muestreo por clase de salida (`sentiment`)**:
   Cada partición contiene reseñas que han sido clasificadas como positivas (`sentiment = 1`) o negativas (`sentiment = 0`). 
   Para preservar esta distribución y evitar sesgos en el modelado posterior, se aplica un muestreo **estratificado** con respecto a dicha variable. Esto significa que se extrae la misma proporción de registros para cada clase dentro de cada partición.

2. **Fracción de muestreo definida**:
   En este caso, se especifica una fracción del `40%` (`fraction=0.4`) para cada clase. Esto implica que se extraerá el 40% de los registros positivos y el 40% de los negativos por partición, siempre que la cantidad total de registros sea suficiente.

3. **Control de calidad mínimo por partición**:
   Para garantizar la validez estadística de cada subconjunto extraído, se define un umbral mínimo de registros (`min_rows=100`). 
   Las particiones que no alcanzan esta cantidad son automáticamente descartadas del muestreo.

4. **Resultados del muestreo**:
   Como resultado del proceso, se logra extraer una submuestra significativa desde cada partición. Por ejemplo:
   - De la partición más abundante `R5_VPY_VN` (3.6 millones de registros), se extrajeron más de 1.47 millones.
   - De particiones medianas como `R1_VPY_VN` o `R3_VPN_VN`, se obtuvieron decenas de miles de observaciones.
   - De particiones poco frecuentes como `R5_VPY_VY`, se extrajeron 35 registros, lo que refleja la escasez de ese patrón en la población.

Este tipo de muestreo es altamente recomendable en problemas de clasificación con clases desbalanceadas, ya que:

- Ayuda a conservar la proporción natural de clases dentro de cada segmento del dataset.
- Mejora la estabilidad de los modelos al asegurar exposición balanceada a distintas opiniones.
- Evita que las clases mayoritarias dominen el entrenamiento de algoritmos supervisados.

Además de asegurar representatividad, este enfoque optimiza el rendimiento computacional y la calidad de los modelos de machine learning:

- **Reducción del tamaño total**: Se pasa de ~7 millones de registros a una muestra de ~2.7 millones, mejorando tiempos de procesamiento y consumo de memoria.
- **Eliminación de grupos marginales**: Se omiten combinaciones con baja ocurrencia, que suelen aportar ruido o aumentar la varianza de los modelos.

La salida de esta etapa es un nuevo diccionario `sampled_partitions`, donde cada clave corresponde a una partición válida y cada valor es un subconjunto muestreado de esa partición, listo para ser consolidado en el paso siguiente.

In [9]:
df_M = PartitioningManager.consolidate_sampled_partitions(sampled_partitions)

Consolidation sampled partitions completed.


                                                                                

Total number of records in the sample M: 2766748
Sentiment distribution in the sample M:


                                                                                

+---------+-------+
|sentiment|  count|
+---------+-------+
|        1|2112379|
|        0| 654369|
+---------+-------+

Sampled partitions distribution:




+-------------+-------+
|partition_key|  count|
+-------------+-------+
|    R5_VPY_VN|1473392|
|    R4_VPY_VN| 408759|
|    R1_VPY_VN| 241961|
|    R3_VPY_VN| 177507|
|    R5_VPN_VN| 164237|
|    R2_VPY_VN| 120365|
|    R1_VPN_VN|  61298|
|    R4_VPN_VN|  54241|
|    R3_VPN_VN|  26223|
|    R2_VPN_VN|  24060|
|    R5_VPN_VY|   6330|
|    R4_VPN_VY|   5385|
|    R3_VPN_VY|   1979|
|    R2_VPN_VY|    682|
|    R1_VPN_VY|    294|
|    R5_VPY_VY|     35|
+-------------+-------+



                                                                                

In [10]:
# df_M.show(truncate=20)
df_M.limit(10).toPandas()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date,sentiment,partition_key
0,US,37802374,R2EY3N4K9W19UP,B00IFYEYXC,602496520,AzureWave Broadcom BCM94352HMB 802.11/ac/867Mb...,PC,5,3,4,N,Y,Great for Windows 7 Laptop!,Replaced my Intel Centrino 2230 with the BCM94...,2015-08-31,1,R5_VPY_VN
1,US,13232866,R21PTQNLGCBN0I,B00XMN20Y6,873354048,Fintie iPad 2/3/4 Case - Slim Fit Folio Case w...,PC,5,0,0,N,Y,Five Stars,"Nice color, I love it",2015-08-31,1,R5_VPY_VN
2,US,34685412,R1LTMCKOL72U34,9875961809,982116513,Professional Ultra SanDisk 16GB MicroSDHC Card...,PC,5,0,0,N,Y,Five Stars,Works great to store my pics and videos!,2015-08-31,1,R5_VPY_VN
3,US,12680846,R2AG4YGHD0IB1C,B00CC47VAO,170539622,25-PACK CF Memory Card Plastic Storage Jewel C...,PC,5,0,0,N,Y,Five Stars,Great protector case just sqeeze the sides and...,2015-08-31,1,R5_VPY_VN
4,US,6348379,R1B0GR7BIW6ZFM,B00SB4AHJG,868501171,"Lenovo Z70 (80FG00DBUS) 17.3"" Laptop",PC,5,0,1,N,Y,Great,"I love this computer. The big screen, the spee...",2015-08-31,1,R5_VPY_VN
5,US,2701611,R3SQFM4D091HWE,B004EBG4J2,582958308,Eastvita MK-200 Keyboard and Case for 7-Inch T...,PC,5,0,0,N,Y,get this,case is well made and keyboard comes in handy.,2015-08-31,1,R5_VPY_VN
6,US,47012791,R3ILJ0GC2105AV,B00KT1HL5M,979894134,Fintie Samsung Galaxy Tab 4 10.1 SmartShell,PC,5,0,0,N,Y,Five Stars,Love,2015-08-31,1,R5_VPY_VN
7,US,18382586,RM7WHMA7RL279,B00B0N29G4,243601768,MoKo Slim Fit Cover Case for iPad Air Parent.,PC,5,0,0,N,Y,Just what we wanted!,Fits on my Dad's iPad well. He loves it. It fe...,2015-08-31,1,R5_VPY_VN
8,US,20961846,R1F854X0L51448,B00ZWUEN10,684076400,Botetrade Children School Backpack Bags for Pr...,PC,5,0,0,N,Y,Good deal,Very happy and totally satisfied with this pur...,2015-08-31,1,R5_VPY_VN
9,US,44080268,R2RUAQ0M6MA30T,B005WUUFBW,708772551,ASUS M5A78L-M LX PLUS AM3+ AMD 760G Micro ATX ...,PC,5,0,0,N,Y,Five Stars,Exactly what i needed. Thanks!,2015-08-31,1,R5_VPY_VN


Tras haber aplicado el muestreo estratificado a cada una de las particiones válidas, se procede a consolidar los subconjuntos resultantes en un único DataFrame denominado `df_M`, que representa la **muestra final contenida** a utilizar en los modelos de aprendizaje automático.

Esta operación se ejecuta mediante el método `consolidate_sampled_partitions()` de la clase `PartitioningManager`, el cual realiza lo siguiente:

1. **Unificación de subconjuntos**:
   Se utiliza la función `unionByName()` para concatenar horizontalmente todos los DataFrames pertenecientes al diccionario `sampled_partitions`, preservando el esquema original de columnas y asegurando compatibilidad entre las particiones.

2. **Validación de la muestra consolidada**:
   Una vez construida `df_M`, se imprimen dos distribuciones clave para validar su representatividad:
   - **Distribución por clase de sentimiento (`sentiment`)**: refleja que la muestra contiene aproximadamente un 76% de reseñas positivas (2,112,379 registros) y un 24% negativas (654,369 registros). Esta proporción es coherente con la tendencia observada en la población original, donde predominan opiniones favorables.
   - **Distribución por clave de partición (`partition_key`)**: evidencia que la muestra incluye contribuciones de prácticamente todas las combinaciones significativas, siendo `R5_VPY_VN` la más dominante con más de 1.47 millones de registros. Otras particiones más pequeñas también están presentes, aunque en menor proporción.

3. **Tamaño final de la muestra (`M`)**:
   El conjunto consolidado contiene **2,766,748 registros**, lo que representa una fracción manejable del total original (~6.9 millones). 
   Esta dimensión es adecuada para el entrenamiento de modelos en PySpark, ya que:
   - Mantiene la variabilidad de la población.
   - Permite ejecutar procesos de modelado con tiempos razonables.
   - Conserva la estructura poblacional identificada en el análisis exploratorio.

4. **Evaluación de la calidad de la muestra `M`**:
   La muestra final puede considerarse de **alta calidad** porque:
   - Representa múltiples segmentos de usuarios (por valoración, compra verificada, y participación en Vine).
   - Mantiene la proporción natural de clases en cada partición gracias al muestreo estratificado.
   - Filtra combinaciones poco frecuentes, lo que evita ruido estadístico y reduce la varianza en modelos posteriores.
   - Su tamaño es suficientemente grande como para preservar patrones significativos, pero no excesivo como para dificultar el procesamiento.

En conclusión, El DataFrame `df_M` representa una muestra representativa, limpia y balanceada del conjunto original. Es un insumo óptimo para aplicar técnicas de aprendizaje supervisado y no supervisado con alto rendimiento y robustez estadística.

# 3) Preparación de los datos
---

In [11]:
class PreprocessingManager:

    @staticmethod
    def drop_nulls(df):
        """Drop rows with null values in any column"""
        before = df.count()
        df_clean = df.na.drop()
        after = df_clean.count()
        removed = before - after

        print(f"Removed {removed} rows with null values.")
        if removed > 0:
            print(f"Before: {before} rows, After: {after} rows")
        print(f"Percentage of rows removed: {removed / before:.2%}")
        print(f"Percentage of rows remaining: {after / before:.2%}")
        print(f"Total rows remaining: {after} rows")
        return df.na.drop()

    @staticmethod
    def cast_column_types(df):
        """Cast columns to the required types for PySpark ML"""
        return df \
            .withColumn("star_rating", F.col("star_rating").cast("int")) \
            .withColumn("helpful_votes", F.col("helpful_votes").cast("int")) \
            .withColumn("total_votes", F.col("total_votes").cast("int")) \
            .withColumn("sentiment", F.col("sentiment").cast("int"))

    @staticmethod
    def encode_categorical(df):
        """Encode categorical columns using StringIndexer"""
        from pyspark.ml.feature import StringIndexer

        indexers = [
            StringIndexer(inputCol="verified_purchase", outputCol="verified_purchase_index"),
            StringIndexer(inputCol="vine", outputCol="vine_index")
        ]

        for indexer in indexers:
            df = indexer.fit(df).transform(df)
        
        print("verified_purchase_index distribution:")
        df.groupBy("verified_purchase", "verified_purchase_index") \
            .count().orderBy("verified_purchase_index").show()

        print("vine_index distribution:")
        df.groupBy("vine", "vine_index") \
            .count().orderBy("vine_index").show()

        return df
    
    @staticmethod
    def detect_outliers(df, numeric_columns=None):
        """
        Detect outliers in numeric columns using the IQR method.
        """
        if not numeric_columns:
            from pyspark.sql.types import NumericType
            numeric_columns = [f.name for f in df.schema.fields if isinstance(f.dataType, NumericType)]

        df_out = df
        total_rows = df.count()

        print(f"Total records analyzed: {total_rows}")
        print("="*60)

        for col_name in numeric_columns:
            # Compute Q1 & Q3
            q1, q3 = df.approxQuantile(col_name, [0.25, 0.75], 0.05)
            iqr = q3 - q1
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr

            outlier_count = df.filter((F.col(col_name) < lower_bound) | (F.col(col_name) > upper_bound)).count()
            outlier_pct = round(outlier_count / total_rows * 100, 2)

            print(f"Variable: {col_name}")
            print(f" - Q1: {q1}")
            print(f" - Q3: {q3}")
            print(f" - IQR: {iqr}")
            print(f" - Lower bound: {lower_bound}")
            print(f" - Upper bound: {upper_bound}")
            print(f" - Outliers detected: {outlier_count} ({outlier_pct}%)")
            print("-"*50)

        return df_out

    @staticmethod
    def assemble_features(df, feature_cols):
        """Assemble features into a single vector column"""
        from pyspark.ml.feature import VectorAssembler

        assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
        return assembler.transform(df)


In [12]:
df_M_clean = PreprocessingManager.drop_nulls(df_M)



Removed 0 rows with null values.
Percentage of rows removed: 0.00%
Percentage of rows remaining: 100.00%
Total rows remaining: 2766748 rows


                                                                                

El primer paso del preprocesamiento consiste en asegurar la integridad estructural de la muestra `M` eliminando aquellos registros que contengan valores nulos en alguna de sus columnas. 

Para ello, se implementa el método `drop_nulls()` dentro de la clase `PreprocessingManager`, el cual aplica la función `na.drop()` de PySpark sobre el DataFrame `df_M`.

Durante este procedimiento, también se calcula y reporta el número total de registros eliminados,  así como el porcentaje que estos representan con respecto al tamaño original de la muestra.

El análisis demuestra que el dataset `M` no contiene registros con valores nulos. Por lo tanto, este paso no genera pérdidas de información, lo que indica que la muestra se encuentra en un estado óptimo para las siguientes etapas de preprocesamiento. 

In [13]:
df_M_clean.printSchema()
# df_M_casted = PreprocessingManager.cast_column_types(df_M_clean)

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- sentiment: integer (nullable = true)
 |-- partition_key: string (nullable = false)



Como segundo paso en el preprocesamiento, se revisó la estructura del DataFrame `df_M_clean` para asegurar que las columnas clave cuenten con tipos de datos compatibles con los algoritmos de aprendizaje automático.

Al aplicar el método `.printSchema()`, se verificó que:

- Las variables numéricas (`star_rating`, `helpful_votes`, `total_votes`, `sentiment`) ya están correctamente tipadas como `integer`.
- Las variables categóricas (`verified_purchase`, `vine`) están representadas como `string`, lo cual es apropiado antes de aplicar técnicas de codificación.
- No existen columnas relevantes con tipos incompatibles.
- La columna `review_date` se encuentra en formato `date`, pero no será utilizada directamente como variable predictiva, por lo que no requiere transformación en esta etapa.

In [14]:
df_M_encoded = PreprocessingManager.encode_categorical(df_M_clean)
df_M_encoded.select("verified_purchase", "verified_purchase_index", "vine", "vine_index").limit(10).toPandas()

                                                                                

verified_purchase_index distribution:


                                                                                

+-----------------+-----------------------+-------+
|verified_purchase|verified_purchase_index|  count|
+-----------------+-----------------------+-------+
|                Y|                    0.0|2422019|
|                N|                    1.0| 344729|
+-----------------+-----------------------+-------+

vine_index distribution:


                                                                                

+----+----------+-------+
|vine|vine_index|  count|
+----+----------+-------+
|   N|       0.0|2752043|
|   Y|       1.0|  14705|
+----+----------+-------+



Unnamed: 0,verified_purchase,verified_purchase_index,vine,vine_index
0,Y,0.0,N,0.0
1,Y,0.0,N,0.0
2,Y,0.0,N,0.0
3,Y,0.0,N,0.0
4,Y,0.0,N,0.0
5,Y,0.0,N,0.0
6,Y,0.0,N,0.0
7,Y,0.0,N,0.0
8,Y,0.0,N,0.0
9,Y,0.0,N,0.0


Como parte del preprocesamiento, se transformaron las variables categóricas `verified_purchase` y `vine` en variables numéricas utilizando la técnica de codificación por índices (`StringIndexer`) provista por PySpark. En otras palabras, se aplicó el método `encode_categorical()` del módulo `PreprocessingManager`, que realiza lo siguiente:
- Codifica `verified_purchase` en una nueva columna `verified_purchase_index`.
- Codifica `vine` en una nueva columna `vine_index`.
- Imprime la distribución de frecuencia de los índices generados, para asegurar que el mapeo es correcto.

Este paso es fundamental para que las variables categóricas puedan ser utilizadas como entradas por los algoritmos de aprendizaje automático, ya que estos requieren datos numéricos en su representación vectorial. Cabe destacar que se eligió esta técnica de codificación por las siguientes razones:
- **Compatibilidad directa con PySpark ML**: `StringIndexer` es la solución oficial y optimizada para Spark, integrándose fácilmente con otros componentes como `VectorAssembler` y clasificadores como `RandomForestClassifier`, `GBTClassifier` o `MultilayerPerceptronClassifier`.
- **Eficiencia computacional**: A diferencia de técnicas como one-hot encoding, no genera columnas adicionales, lo cual es importante cuando se trabaja con grandes volúmenes de datos.
- **Adecuado para variables binarias**: Ambas variables (`vine` y `verified_purchase`) son binarias, por lo que no se justifica la complejidad de otros métodos como binary encoding o one-hot.
- **Inocuidad del orden numérico**: Los algoritmos seleccionados para este proyecto (basados en árboles y redes) no interpretan los índices como valores ordinales, por lo tanto no existe riesgo de introducir sesgos al asignar 0.0 o 1.0.

In [16]:
df_M_outliers = PreprocessingManager.detect_outliers(df_M_encoded)

                                                                                

Total records analyzed: 2766748


ERROR:root:KeyboardInterrupt while sending command.=>             (12 + 4) / 16]
Traceback (most recent call last):
  File "/Users/alejandrodiazvillagomez/Desktop/Proyecto-Big-Data-PySpark/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/alejandrodiazvillagomez/Desktop/Proyecto-Big-Data-PySpark/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/alejandrodiazvillagomez/.pyenv/versions/3.12.0/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

                                                                                

Como parte del preprocesamiento, se aplicó una estrategia de detección de valores atípicos (outliers) a través del método del rango intercuartílico (IQR), sobre las variables numéricas del conjunto `df_M_encoded`.

El objetivo de esta etapa es identificar registros con valores extremos que puedan afectar el desempeño de los algoritmos de aprendizaje automático, especialmente aquellos sensibles a escalas de magnitud como redes neuronales o algoritmos no supervisados como K-means.

Se analizaron todas las variables numéricas y se documentó el número y proporción de outliers detectados. A continuación, se presentan los hallazgos más relevantes:

**Variables sin valores atípicos:**

- `customer_id`, `product_parent`, `star_rating`, `sentiment`: sin registros fuera del rango [Q1 - 1.5×IQR, Q3 + 1.5×IQR].
  Esto era esperable, dado que:
  - `star_rating` y `sentiment` son categóricas discretas (1–5 y 0–1).
  - `customer_id` y `product_parent` son identificadores, y no se espera normalidad estadística en ellos.

**Variables con valores atípicos detectados:**

| Variable         | Outliers detectados | Porcentaje sobre total |
|------------------|---------------------|-------------------------|
| `helpful_votes`  | 248,715             | 8.99%                   |
| `total_votes`    | 354,695             | 12.82%                  |
| `verified_purchase_index` | 344,729     | 12.46% (explicado abajo) |
| `vine_index`     | 14,705              | 0.53% (por bajo volumen de reseñas Vine) |


En primera instancia, los "outliers" en `verified_purchase_index` y `vine_index` **no deben eliminarse**, ya que reflejan categorías reales (no errores). Esta detección se debe a que ambas variables están desbalanceadas y presentan un rango intercuartílico igual a cero (IQR = 0), lo que genera falsos positivos en la detección automática.

Por otro lado, los outliers detectados en las variables `helpful_votes` y `total_votes` representan casos reales de interacción significativa con las reseñas. Considerando que los algoritmos de aprendizaje a utilizar son robustos frente a este tipo de valores, se decidió **no eliminar los registros extremos**. No obstante, se documentan para un posible análisis futuro y se mantiene su presencia en la muestra M, como reflejo de la variabilidad natural en datos reales de plataformas digitales.

In [17]:
feature_cols = [
    "star_rating",
    "helpful_votes",
    "total_votes",
    "verified_purchase_index",
    "vine_index"
]

df_M_ready = PreprocessingManager.assemble_features(df_M_encoded, feature_cols)
df_M_ready.select("features", "sentiment").limit(10).toPandas()

Unnamed: 0,features,sentiment
0,"[5.0, 3.0, 4.0, 0.0, 0.0]",1
1,"(5.0, 0.0, 0.0, 0.0, 0.0)",1
2,"(5.0, 0.0, 0.0, 0.0, 0.0)",1
3,"(5.0, 0.0, 0.0, 0.0, 0.0)",1
4,"(5.0, 0.0, 1.0, 0.0, 0.0)",1
5,"(5.0, 0.0, 0.0, 0.0, 0.0)",1
6,"(5.0, 0.0, 0.0, 0.0, 0.0)",1
7,"(5.0, 0.0, 0.0, 0.0, 0.0)",1
8,"(5.0, 0.0, 0.0, 0.0, 0.0)",1
9,"(5.0, 0.0, 0.0, 0.0, 0.0)",1


En esta etapa se construye el vector de características requerido por los algoritmos de aprendizaje automático de PySpark. A diferencia de otros frameworks como Scikit-learn, PySpark exige que las variables predictoras estén integradas en una única columna de tipo `Vector` denominada `features`, la cual agrupa de forma estructurada todos los atributos que serán utilizados por los modelos tanto supervisados como no supervisados.

Para lograr esta transformación, se empleó el transformador `VectorAssembler`, que recibe como entrada una lista de columnas numéricas y devuelve una columna vectorial combinada. En este caso, se seleccionaron las siguientes variables:

| Columna                     | Tipo              | Justificación                                                                 |
|----------------------------|-------------------|-------------------------------------------------------------------------------|
| `star_rating`              | Numérica           | Refleja la calificación otorgada por el usuario; se correlaciona directamente con el sentimiento. |
| `helpful_votes`            | Numérica           | Representa cuántos usuarios consideraron útil una reseña, lo que puede ser indicativo de su impacto. |
| `total_votes`              | Numérica           | Mide el alcance de la reseña y complementa a `helpful_votes`.               |
| `verified_purchase_index`  | Codificada (0.0/1.0) | Indica si la reseña proviene de una compra verificada; asociado con la credibilidad del comentario. |
| `vine_index`               | Codificada (0.0/1.0) | Indica si el usuario participó en el programa Vine, lo que puede influir en la naturaleza de la reseña. |

Estas características fueron seleccionadas porque:

- Son numéricas o han sido codificadas correctamente como índices.
- No presentan valores nulos tras la limpieza.
- Representan dimensiones fundamentales del comportamiento del usuario, credibilidad y tipo de participación.
- Son robustas y apropiadas para alimentar modelos de clasificación supervisada (como Decision Trees o Random Forests) y agrupamiento no supervisado (como K-Means).

La aplicación de este ensamblaje se realizó sobre el conjunto `df_M_encoded`, generando un nuevo DataFrame `df_M_ready` que contiene la columna `features`.

# 4) Preparación del conjunto de entrenamiento y prueba
---

In [19]:
total = df_M_ready.count()
df_M_ready.groupBy("sentiment").count() \
    .withColumn("percentage", F.round(F.col("count") / total * 100, 2)) \
    .orderBy("percentage", ascending=False) \
    .show()



+---------+-------+----------+
|sentiment|  count|percentage|
+---------+-------+----------+
|        1|2112379|     76.35|
|        0| 654369|     23.65|
+---------+-------+----------+



                                                                                

Una vez finalizado el preprocesamiento de la muestra `M`, se procedió a evaluar la distribución de la variable objetivo `sentiment`, con el fin de identificar posibles desbalances de clase que pudieran afectar el entrenamiento de los modelos (principalmente al modelo de aprendizaje supervisado, debido a su naturaleza).

La variable `sentiment` presenta un desbalance moderado, donde la clase positiva (`sentiment` = 1) representa aproximadamente el 76% del total de registros, mientras que la clase negativa (`sentiment` = 0) representa solo el 24% restante. Esta proporción es coherente con la naturaleza del dominio, en el que históricamente se ha observado una tendencia hacia comentarios positivos en plataformas de reseñas como Amazon.

Un desbalance de clases puede generar los siguientes riesgos en algoritmos de clasificación:

* **Sobreajuste a la clase mayoritaria**: Los modelos tienden a predecir la clase más frecuente, logrando una aparente alta precisión pero bajo rendimiento real en la clase minoritaria.
* **Falsas métricas de desempeño**: En contextos desbalanceados, la métrica de accuracy puede ser engañosa, ya que un modelo que siempre predice la clase mayoritaria puede tener una precisión elevada pero carecer de utilidad práctica.
* **Pérdida de sensibilidad hacia clases minoritarias**: Se reduce la capacidad del modelo para detectar patrones que expliquen los comentarios negativos (clase minoritaria), que son precisamente los más relevantes para la toma de decisiones correctivas.

Cabe destacar que este desbalance fue mitigado desde la etapa de particionado y muestreo estratificado:
* Al aplicar la función `stratified_sample_partitioned_data()`, se garantizó que cada partición mantuviera proporcionalmente representadas ambas clases (`sentiment` = 0 y `sentiment` = 1), en función de una fracción fija por clase.
* Esto permitió construir una muestra `M` contenida pero estructuralmente coherente, respetando la proporción natural de sentimientos en cada subconjunto y evitando que el desbalance se amplificara en las etapas posteriores.

En otras palabras, aunque la clase positiva sigue siendo mayoritaria en la muestra final, el diseño metodológico aplicado (basado en particionamiento por variables de caracterización y muestreo estratificado) permite conservar la representatividad del conjunto original sin incurrir en sesgos extremos.

Sin embargo, es importante menciona que para compensar este desbalance moderado en fases posteriores, se recomienda evaluar los modelos mediante métricas como F1-score, precision y recall, particularmente en la clase minoritaria. O bien, considerar ajustes adicionales, como ponderación de clases o técnicas de resampling, si el desempeño sobre la clase negativa no resulta satisfactorio.

In [20]:
train_ratio = 0.8  # 80% para entrenamiento, 20% para prueba

# División estratificada para mantener la proporción de clases
train_df, test_df = df_M_ready.randomSplit([train_ratio, 1 - train_ratio], seed=42)

# Verificación de las proporciones en cada conjunto
print("Conjunto de entrenamiento:")
train_total = train_df.count()
train_df.groupBy("sentiment").count() \
    .withColumn("percentage", F.round(F.col("count") / train_total * 100, 2)) \
    .orderBy("sentiment").show()

print("Conjunto de prueba:")
test_total = test_df.count()
test_df.groupBy("sentiment").count() \
    .withColumn("percentage", F.round(F.col("count") / test_total * 100, 2)) \
    .orderBy("sentiment").show()

print(f"Tamaño del conjunto de entrenamiento: {train_total} registros ({round(train_total/total*100, 2)}%)")
print(f"Tamaño del conjunto de prueba: {test_total} registros ({round(test_total/total*100, 2)}%)")

Conjunto de entrenamiento:


                                                                                

+---------+-------+----------+
|sentiment|  count|percentage|
+---------+-------+----------+
|        0| 523456|     23.65|
|        1|1690239|     76.35|
+---------+-------+----------+

Conjunto de prueba:




+---------+------+----------+
|sentiment| count|percentage|
+---------+------+----------+
|        0|130913|     23.67|
|        1|422140|     76.33|
+---------+------+----------+

Tamaño del conjunto de entrenamiento: 2213695 registros (80.01%)
Tamaño del conjunto de prueba: 553053 registros (19.99%)


                                                                                

Para la preparación de los conjuntos de entrenamiento y prueba, se implementó una división siguiendo estos principios:

1. **Proporción de división 80/20**: Se asignó el 80% de los datos al conjunto de entrenamiento y el 20% al conjunto de prueba. Esta proporción es óptima porque:
   - Proporciona suficientes datos para entrenar modelos complejos (2.2M registros aproximadamente)
   - Mantiene un conjunto de prueba robusto (~550K registros) para evaluación confiable
   - Es una proporción estándar recomendada en la literatura para conjuntos grandes

2. **Técnica de muestreo aleatorio estratificado**: Se utilizó el método `randomSplit()` con una semilla aleatoria fija (seed=42) para garantizar:
   - Reproducibilidad de los resultados
   - Mantenimiento de las proporciones de la variable objetivo en ambos conjuntos
   - Minimización del riesgo de sesgo de selección

Esta estrategia de división es coherente con las mejores prácticas en aprendizaje automático y adecuada para el problema de clasificación binaria de sentimientos, donde es crucial mantener la representatividad de ambas clases en los conjuntos de entrenamiento y prueba.

---

⚠️ *Nota técnica sobre estratificación*

Aunque el método `randomSplit()` de PySpark no realiza una estratificación explícita como lo haría `train_test_split(..., stratify=...)` en bibliotecas como `scikit-learn`, en conjuntos de datos de gran tamaño, como el que se maneja en este proyecto (más de 2.7 millones de registros), la distribución de clases tiende a conservarse de forma natural en cada subconjunto.

Esta hipótesis ha sido verificada empíricamente en la presente implementación al comparar las proporciones de la variable `sentiment` antes y después de la partición, observando una correspondencia muy cercana entre ambas.

Por ello, se considera que esta aproximación es válida, eficiente y suficiente para los fines del análisis y modelado, sin necesidad de una estratificación adicional más costosa computacionalmente.

# 5) Construcción de modelos de aprendizaje supervisado y no supervisado
---

## 5.1) Algoritmo de aprendizaje supervisado

### 5.1.1) Modelo con sobreajuste

In [21]:
from pyspark.ml.classification import RandomForestClassifier

# Definición del modelo
rf = RandomForestClassifier(
    labelCol="sentiment",
    featuresCol="features",
    predictionCol="prediction",
    probabilityCol="probability",
    seed=42,
    numTrees=100,
    maxDepth=10
)

# Entrenamiento
rf_model = rf.fit(train_df)

print("Modelo RandomForestClassifier entrenado exitosamente.")

                                                                                

Modelo RandomForestClassifier entrenado exitosamente.


Para abordar la tarea de clasificación binaria del sentimiento (positivo o negativo) en las reseñas de productos, se seleccionó el algoritmo `RandomForestClassifier` del módulo `pyspark.ml.classification` como modelo de aprendizaje supervisado. Esta elección fue sustentada tanto en criterios técnicos como en consideraciones prácticas vinculadas a la naturaleza del problema y las características del conjunto de datos.

El modelo en cuestión es una técnica de ensamblado basada en árboles de decisión que ofrece una excelente relación entre precisión, robustez y eficiencia computacional, lo cual lo hace especialmente adecuado para contextos de Big Data como el presente. Las razones principales de su elección se detallan a continuación:

* **Adecuado para clasificación binaria**: El problema se define como una clasificación binaria (`sentiment` = 0 o 1), una tarea donde Random Forest ha demostrado un desempeño sobresaliente en múltiples estudios y aplicaciones prácticas.
* **Manejo eficiente de datos numéricos y categóricos codificados**: Todas las variables seleccionadas (`star_rating`, `helpful_votes`, `total_votes`, `verified_purchase_index`, `vine_index`) son de tipo numérico o han sido codificadas previamente mediante `StringIndexer`, lo cual es completamente compatible con este algoritmo sin necesidad de transformaciones adicionales.
* **Robustez frente a outliers y datos ruidosos**: Random Forest no se ve significativamente afectado por valores atípicos o ruido en los datos, como aquellos observados en las variables `helpful_votes` y `total_votes`, por lo que evita la necesidad de aplicar estrategias de eliminación o transformación adicional de estos valores.
* **No requiere normalización de variables**: A diferencia de algoritmos como la regresión logística o las redes neuronales, Random Forest no depende de la escala de las variables de entrada, lo que reduce el número de pasos de preprocesamiento requeridos.
* **Capacidad para manejar datos desbalanceados**: Aunque el dataset presenta un leve desbalance de clases (76% positivas, 24% negativas), Random Forest puede mitigar este efecto mediante su mecanismo de muestreo aleatorio (bootstrap sampling) en cada árbol.
* **Interpretabilidad y análisis de importancia de variables**: Una ventaja adicional del modelo es su capacidad para generar métricas de importancia de características, lo cual es valioso para interpretar qué variables contribuyen más a la predicción del sentimiento.
**Costo computacional razonable**: Dado que el entrenamiento se realiza en un entorno distribuido con PySpark, el uso de `RandomForestClassifier` permite aprovechar eficientemente los recursos disponibles, obteniendo buenos resultados sin incurrir en el alto costo computacional que requerirían otros modelos como `GBTClassifier` o `MultilayerPerceptronClassifier`.

#### **Comparación con otros modelos**
| Modelo                 | Motivo por el cual no fue seleccionado                                     |
| ---------------------- | -------------------------------------------------------------------------- |
| `LogisticRegression`   | Requiere normalización y es más sensible a desbalance de clases.           |
| `MultilayerPerceptron` | Demanda tuning de hiperparámetros, normalización y mayor poder de cómputo. |
| `GBTClassifier`        | Más preciso en algunos casos, pero considerablemente más costoso.          |


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Realizamos predicciones sobre el conjunto de prueba
rf_predictions = rf_model.transform(test_df)

# Definimos evaluadores para distintas métricas
rf_evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="sentiment", predictionCol="prediction", metricName="accuracy")
rf_evaluator_precision = MulticlassClassificationEvaluator(labelCol="sentiment", predictionCol="prediction", metricName="weightedPrecision")
rf_evaluator_recall = MulticlassClassificationEvaluator(labelCol="sentiment", predictionCol="prediction", metricName="weightedRecall")
rf_evaluator_f1 = MulticlassClassificationEvaluator(labelCol="sentiment", predictionCol="prediction", metricName="f1")

# Calculamos las métricas
rf_accuracy = rf_evaluator_accuracy.evaluate(rf_predictions)
rf_precision = rf_evaluator_precision.evaluate(rf_predictions)
rf_recall = rf_evaluator_recall.evaluate(rf_predictions)
rf_f1 = rf_evaluator_f1.evaluate(rf_predictions)

# Extraemos las importancias de las características
rf_importances = rf_model.featureImportances.toArray()

# Mostramos los resultados
print("Resultados de la evaluación del modelo:")
print("="*60)
print(f"Accuracy: {rf_accuracy:.4f}")
print(f"Precision (ponderada): {rf_precision:.4f}")
print(f"Recall (ponderado): {rf_recall:.4f}")
print(f"F1 Score: {rf_f1:.4f}")

print("\nImportancia de las características:")
print("="*60)
for feature, importance in zip(feature_cols, rf_importances):
    print(f" - {feature}: {importance:.4f}")



Accuracy: 1.0000
Precision (ponderada): 1.0000
Recall (ponderado): 1.0000
F1 Score: 1.0000


                                                                                

Durante la evaluación del modelo de clasificación supervisada utilizando un Random Forest Classifier, se obtuvieron resultados inusualmente perfectos. Estos resultados, aunque a primera vista parecen excelentes, no son realistas en contextos de datos reales, y constituyen un claro indicio de sobreajuste (**overfitting**). Esto significa que el modelo ha memorizado los patrones exactos de los datos de entrenamiento en lugar de aprender generalizaciones útiles.

Al analizar las importancias de las características (`featureImportances`) reportadas por el modelo, se observó que la variable `star_rating` domina completamente el proceso de decisión del modelo, acumulando el 97.3% del peso total en la predicción. Dado que tanto `star_rating` como la variable objetivo `sentiment` provienen del dataset original y no fueron calculadas por separado, es altamente probable que exista una relación directa o redundante entre ambas variables, lo que explicaría la capacidad del modelo para predecir el sentimiento con precisión perfecta. Esta situación evidencia una fuga de información (**data leakage**), donde el modelo está utilizando indirectamente la etiqueta como una entrada predictiva, anulando su capacidad de generalización.

Como mejora inmediata, se propone reentrenar el modelo excluyendo la variable `star_rating` del conjunto de características, con el objetivo de evaluar su verdadero desempeño al depender de señales más sutiles y no redundantes.

Este ajuste permite:
* Reducir el riesgo de sobreajuste.
* Evaluar si otras variables como `helpful_votes`, `total_votes`, `verified_purchase_index` y `vine_index` aportan valor predictivo genuino.
* Obtener métricas de evaluación más realistas y útiles para el análisis.

### 5.1.2) Modelo sin sobreajuste

In [37]:
feature_cols_v2 = [
    "helpful_votes",
    "total_votes",
    "verified_purchase_index",
    "vine_index"
]

df_M_ready_v2 = PreprocessingManager.assemble_features(df_M_encoded, feature_cols_v2)
df_M_ready_v2.select("features", "sentiment").limit(10).toPandas()

train_ratio = 0.8  # 80% para entrenamiento, 20% para prueba
train_df_v2, test_df_v2 = df_M_ready_v2.randomSplit([train_ratio, 1 - train_ratio], seed=42)

                                                                                

In [None]:
rf_v2 = RandomForestClassifier(
    labelCol="sentiment",
    featuresCol="features",
    predictionCol="prediction",
    probabilityCol="probability",
    seed=42,
    numTrees=100,
    maxDepth=10
)

# Entrenamiento
rf_model_v2 = rf_v2.fit(train_df_v2)

print("Modelo RandomForestClassifier entrenado exitosamente.")

Traceback (most recent call last):
  File "/Users/alejandrodiazvillagomez/Desktop/Proyecto-Big-Data-PySpark/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/alejandrodiazvillagomez/Desktop/Proyecto-Big-Data-PySpark/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/alejandrodiazvillagomez/.pyenv/versions/3.12.0/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

                                                                                

In [None]:
rf_predictions_v2 = rf_model_v2.transform(test_df_v2)

rf_evaluator_accuracy_v2 = MulticlassClassificationEvaluator(labelCol="sentiment", predictionCol="prediction", metricName="accuracy")
rf_evaluator_precision_v2 = MulticlassClassificationEvaluator(labelCol="sentiment", predictionCol="prediction", metricName="weightedPrecision")
rf_evaluator_recall_v2 = MulticlassClassificationEvaluator(labelCol="sentiment", predictionCol="prediction", metricName="weightedRecall")
rf_evaluator_f1_v2 = MulticlassClassificationEvaluator(labelCol="sentiment", predictionCol="prediction", metricName="f1")

rf_accuracy_v2 = rf_evaluator_accuracy_v2.evaluate(rf_predictions_v2)
rf_precision_v2 = rf_evaluator_precision_v2.evaluate(rf_predictions_v2)
rf_recall_v2 = rf_evaluator_recall_v2.evaluate(rf_predictions_v2)
rf_f1_v2 = rf_evaluator_f1_v2.evaluate(rf_predictions_v2)

rf_importances_v2 = rf_model_v2.featureImportances.toArray()

print("Resultados de la evaluación del modelo:")
print("="*60)
print(f"Accuracy: {rf_accuracy_v2:.4f}")
print(f"Precision (ponderada): {rf_precision_v2:.4f}")
print(f"Recall (ponderado): {rf_recall_v2:.4f}")
print(f"F1 Score: {rf_f1_v2:.4f}")

print("\nImportancia de las características:")
print("="*60)
for feature, importance in zip(feature_cols_v2, rf_importances_v2):
    print(f" - {feature}: {importance:.4f}")

## 5.2) Algoritmo de aprendizaje no supervisado

In [None]:
from pyspark.ml.clustering import KMeans

# Definición del modelo KMeans
kmeans = KMeans(
    featuresCol="features",
    predictionCol="prediction",
    k=5,
    seed=42
)

# Entrenamiento del modelo
kmeans_model = kmeans.fit(train_df)

print("Modelo KMeans entrenado y aplicado exitosamente.")

Para la etapa de aprendizaje no supervisado, se selecciona el algoritmo `K-Means Clustering` como el método más adecuado para llevar a cabo un análisis de agrupamiento sobre el conjunto de entrenamiento `train_df`.

K-Means es un algoritmo ampliamente utilizado en problemas de agrupamiento debido a su eficiencia computacional, simplicidad conceptual y buena escalabilidad para grandes volúmenes de datos, lo cual es especialmente importante en contextos de Big Data como el presente.

Las justificaciones de la elección de este modelo son:

* **Escalabilidad**: K-Means tiene una complejidad computacional baja (O(n·k·i)), lo que lo hace ideal para trabajar con conjuntos de datos de millones de registros como en el caso actual.
* **Compatibilidad con PySpark**: PySpark MLlib ofrece una implementación altamente optimizada de KMeans que se integra fácilmente con pipelines de transformación (`VectorAssembler`) y evaluación (`ClusteringEvaluator`).
* **Simplicidad interpretativa**: Los centroides obtenidos permiten caracterizar los grupos descubiertos con base en las variables seleccionadas, facilitando el análisis exploratorio.

Los hiperparámetros seleccionados son:

| Hiperparámetro  | Valor        | Justificación                                                                                                 |
| --------------- | ------------ | ------------------------------------------------------------------------------------------------------------- |
| `k`             | 5            | Valor inicial común para identificar segmentos diferenciados. Se evaluará con métricas internas para refinar. |
| `seed`          | 42           | Para garantizar reproducibilidad de los resultados.                                                           |
| `featuresCol`   | `features`   | Define las columnas de entrada para el agrupamiento.                                                          |
| `predictionCol` | `prediction` | Almacena la asignación de cluster de cada observación.                                                        |

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator

# Predicción de clústeres sobre el conjunto de entrenamiento
kmeans_predictions = kmeans_model.transform(train_df)

# kmeans_predictions.select("prediction", "features").show(10, truncate=False)

# Evaluador de agrupamiento basado en distancia euclidiana
evaluator = ClusteringEvaluator(
    featuresCol="features",
    predictionCol="prediction",
    metricName="silhouette",
    distanceMeasure="squaredEuclidean"
)

# Cálculo del Silhouette Score
silhouette_score = evaluator.evaluate(kmeans_predictions)

print(f"Silhouette Score: {silhouette_score:.4f}")

Para evaluar el modelo KMeans en PySpark, el criterio más comúnmente aceptado es el **Silhouette Score**, ya que mide qué tan bien se agrupan los datos dentro de sus clústeres. PySpark lo proporciona a través del evaluador `ClusteringEvaluator`.

El Silhouette Score varía entre -1 y 1:
* Cercano a 1: buena cohesión intra-clúster y buena separación entre clústeres.
* Cercano a 0: clústeres se superponen, no hay separación clara.
* Negativo: mala asignación, puntos estarían mejor en otros clústeres.

## 5.3) Resultados

Resultados