# Actividad 3 | Aprendizaje supervisado y no supervisado
---
- Alonso Pedrero Martínez   |   A01769076

## Aprendizaje supervisado y no supervisado
---

- ​​Aprendizaje supervisado: es un enfoque de aprendizaje automático que se define por el uso de conjuntos de datos etiquetados. Estos conjuntos de datos están diseñados para entrenar o supervisar algoritmos para clasificar datos o predecir resultados con precisión. Mediante entradas y salidas etiquetadas, el modelo puede medir su precisión y aprender con el tiempo.


    - Clasificación: los problemas utilizan un algoritmo para asignar con precisión los datos de prueba a categorías específicas, como separar manzanas de naranjas. O, en el mundo real, los algoritmos de aprendizaje supervisado pueden utilizarse para clasificar el correo no deseado en una carpeta separada de la bandeja de entrada. Los clasificadores lineales, las máquinas de vectores de soporte, los árboles de decisión y los bosques aleatorios son tipos comunes de algoritmos de clasificación.


    - Regresión: es otro tipo de método de aprendizaje supervisado que utiliza un algoritmo para comprender la relación entre las variables dependientes e independientes. Los modelos de regresión son útiles para predecir valores numéricos basados ​​en diferentes puntos de datos, como las proyecciones de ingresos por ventas para una empresa determinada. Algunos algoritmos de regresión populares son la regresión lineal, la regresión logística y la regresión polinómica.


--- 



- Aprendizaje no supervisado: utiliza algoritmos de aprendizaje automático para analizar y agrupar conjuntos de datos sin etiquetar. Estos algoritmos descubren patrones ocultos en los datos sin necesidad de intervención humana.


    - Agrupamiento: es una técnica de minería de datos que permite agrupar datos sin etiquetar en función de sus similitudes o diferencias. Por ejemplo, los algoritmos de agrupamiento K-medias asignan puntos de datos similares a grupos, donde el valor K representa el tamaño de la agrupación y la granularidad. Esta técnica es útil para la segmentación de mercado, la compresión de imágenes, etc.


    - Asociación: es otro tipo de método de aprendizaje no supervisado que utiliza diferentes reglas para encontrar relaciones entre las variables de un conjunto de datos determinado. Estos métodos se utilizan con frecuencia para el análisis de la cesta de la compra y los motores de recomendación, como las recomendaciones de "Los clientes que compraron este artículo también compraron".


    - Reducción de la dimensionalidad: es una técnica de aprendizaje que se utiliza cuando el número de características (o dimensiones) en un conjunto de datos determinado es demasiado elevado. Reduce la cantidad de datos de entrada a un tamaño manejable, preservando al mismo tiempo su integridad. Esta técnica se utiliza a menudo en la etapa de preprocesamiento de datos, como cuando los autocodificadores eliminan el ruido de los datos visuales para mejorar la calidad de la imagen.

--- 
Referencia bibliográfica:

    Delua, J. (2025, April 17). Supervised vs unsupervised learning. Supervised versus unsupervised learning: What’s the difference? https://www.ibm.com/think/topics/supervised-vs-unsupervised-learning

## Aprendizaje supervisado y no supervisado en pySpark
---
🔹 Modelos de Aprendizaje Supervisado en PySpark

🔸 Clasificación

- LogisticRegression – Regresión logística binaria o multinomial
- DecisionTreeClassifier – Árbol de decisión para clasificación
- RandomForestClassifier – Bosques aleatorios
- GBTClassifier – Gradient-Boosted Trees
- NaiveBayes – Clasificador basado en teorema de Bayes
- MultilayerPerceptronClassifier – Red neuronal multicapa
- LinearSVC – Máquina de vectores de soporte lineal


🔸 Regresión

- LinearRegression – Regresión lineal
- DecisionTreeRegressor – Árbol de decisión para regresión
- RandomForestRegressor – Bosques aleatorios
- GBTRegressor – Gradient-Boosted Trees
- GeneralizedLinearRegression – Regresión generalizada (Poisson, Binomial, etc.)
- AFTSurvivalRegression – Análisis de supervivencia


🔹 Modelos de Aprendizaje No Supervisado en PySpark

🔸 Clustering

- KMeans – Clustering por K-medias
- GaussianMixture – Modelos de mezcla gaussiana (soft clustering)
- BisectingKMeans – Variante jerárquica de K-means
- LDA – Análisis de Dirichlet latente (para temas en texto)

🔸 Reducción de Dimensionalidad

- PCA – Análisis de Componentes Principales
- SVD – Descomposición en valores singulares (indirectamente a través de ALS y PCA)

🔹 Otros Algoritmos Útiles
- ALS – Alternating Least Squares (recomendadores colaborativos)
- IsotonicRegression – Regresión isotónica (no lineal y no paramétrica)
- OneVsRest – Envoltorio para clasificación multiclase
- Pipeline – Para encadenar transformaciones y modelos
- CrossValidator y TrainValidationSplit – Validación cruzada y tuning de hiperparámetros


## Selección de los datos
---

In [103]:
import findspark
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, isnan, when, count
from pyspark.sql.types import StringType, DoubleType, FloatType
import findspark

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.clustering import GaussianMixture

from os import path

In [3]:
findspark.init()
findspark.find()

'/Users/cesarivp/Documents/GitHub/big_data/myenv/lib/python3.13/site-packages/pyspark'

In [4]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/25 10:48:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
PATH = "../files"
FILE = "amazon_electronics.csv"

In [6]:
class FileManager():
    @staticmethod
    def open_csv_file(input_path : str, file_name : str):
        """
        This method opens a csv file with pyspark
        """
        csv_df = spark.read.csv(
            path.join(input_path, file_name),
            header=True,
            inferSchema=True,
            multiLine=True,
            escape="\"",
            quote="\""
        )

        csv_df.show(truncate=20)

        return csv_df

In [7]:
df_reviews = FileManager.open_csv_file(PATH, FILE)

                                                                                

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+---------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|sentiment|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+---------+
|         US|   22873041|R3ARRMDEGED8RD|B00KJWQIIC|     335625766|Plemo 14-Inch Lap...|              PC|          5|            0|          0|   N|                Y|Pleasantly surprised|I was very surpri...| 2015-08-31|        1|
|         US|   30088427| RQ28TSA020Y6J|B013ALA9LA|     671157305|TP-Link OnHub 

In [8]:
# Previously defined relevant columns for the activity.
RELEVANT_COLUMNS_FOR_CHARACTERIZATION = [
  "star_rating",
  "helpful_votes",
  "total_votes",
  "vine",
  "verified_purchase",
  "review_date",
  "sentiment"
]

In [9]:
df_reviews_filtered = df_reviews.select(*RELEVANT_COLUMNS_FOR_CHARACTERIZATION)

### Generación de las particiones

Se implementó un procedimiento automático en PySpark que:

1. Filtra los registros de la base de datos que cumplen con cada combinación de valores.
2. Almacena cada subconjunto en un diccionario indexado por nombre de combinación (ej. "R5_VPY_VN" para `star_rating`=5, `verified_purchase`=Y, `vine`=N).
3. Imprime la cantidad de registros por partición para control y trazabilidad.

Las particiones con muy pocos registros pueden ser descartadas en etapas posteriores para evitar problemas en el análisis.

In [None]:
class PartitioningManager:

    @staticmethod
    def compute_probabilities(df, cols):
        """
        Computes and returns the probability of each combination of values in the specified columns.
        """
        total_count = df.count()
        return df.groupBy(cols).count() \
                 .withColumn("probability", F.round(F.col("count") / total_count, 6)) \
                 .orderBy("probability", ascending=False)

    @staticmethod
    def filter_partition(df, star_rating, verified_purchase, vine):
        """
        Filters the DataFrame by specific values for rating, verified purchase, and vine.
        """
        return df.filter(
            (F.col("star_rating") == star_rating) &
            (F.col("verified_purchase") == verified_purchase) &
            (F.col("vine") == vine)
        )

    @staticmethod
    def generate_all_partitions(df, min_probability=0.0001):
        """
        Generates partitions only for combinations whose joint probability is above min_probability.
        """

        prob_df = PartitioningManager.compute_probabilities(
            df, ["star_rating", "verified_purchase", "vine"]
        )

        filtered_combinations = prob_df.filter(
            F.col("probability") >= min_probability
        ).select("star_rating", "verified_purchase", "vine").collect()

        partitions = {}
        for row in filtered_combinations:
            rating = row["star_rating"]
            purchase = row["verified_purchase"]
            vine = row["vine"]

            key = f"R{rating}_VP{purchase}_V{vine}"
            filtered = PartitioningManager.filter_partition(df, rating, purchase, vine)
            partitions[key] = filtered
            print(f"Partition {key} created with {filtered.count()} records.")

        return partitions

    @staticmethod
    def stratified_sample_partitioned_data(partitions_dict, label_col="sentiment", fraction=0.3, min_rows=50):
        """
        Applies stratified sampling to each partition based on sentiment.
        """
        sampled_partitions = {}

        for key, df in partitions_dict.items():
            count = df.count()

            if count < min_rows:
                print(f"Skipping partition {key} — only {count} rows (<{min_rows})")
                continue

            sentiments = df.select(label_col).distinct().rdd.flatMap(lambda x: x).collect()
            fractions = {s: fraction for s in sentiments}

            sampled_df = df.sampleBy(label_col, fractions, seed=42)
            sampled_partitions[key] = sampled_df
            print(f"Sampled {sampled_df.count()} rows from partition {key} (original: {count})")

        return sampled_partitions

    @staticmethod
    def build_combined_sample(partitions_sampled_dict):
        """
        Unites all sampled partitions into a single DataFrame (M).
        This helps reduce computational load while maintaining diversity.
        """
        if not partitions_sampled_dict:
            raise ValueError("No partitions provided for sample combination.")

        combined_df = None
        for key, df in partitions_sampled_dict.items():
            if combined_df is None:
                combined_df = df
            else:
                combined_df = combined_df.union(df)
            print(f"Partition {key} added to the combined sample.")

        print(f"Total records in combined sample: {combined_df.count()}")
        return combined_df


In [11]:
partitions = PartitioningManager.generate_all_partitions(df_reviews, min_probability=0.00001)

                                                                                

Partition R5_VPY_VN created with 3679909 records.


                                                                                

Partition R4_VPY_VN created with 1019728 records.


                                                                                

Partition R1_VPY_VN created with 603371 records.


                                                                                

Partition R3_VPY_VN created with 443364 records.


                                                                                

Partition R5_VPN_VN created with 410073 records.


                                                                                

Partition R2_VPY_VN created with 300544 records.


                                                                                

Partition R1_VPN_VN created with 152779 records.


                                                                                

Partition R4_VPN_VN created with 135197 records.


                                                                                

Partition R3_VPN_VN created with 65398 records.


                                                                                

Partition R2_VPN_VN created with 59973 records.


                                                                                

Partition R5_VPN_VY created with 15604 records.


                                                                                

Partition R4_VPN_VY created with 13240 records.


                                                                                

Partition R3_VPN_VY created with 4886 records.


                                                                                

Partition R2_VPN_VY created with 1634 records.


                                                                                

Partition R1_VPN_VY created with 705 records.


[Stage 59:>                                                         (0 + 1) / 1]

Partition R5_VPY_VY created with 101 records.


                                                                                

In [12]:
# Sample of one of the generated partitions.
partitions["R4_VPY_VN"].show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+---------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|sentiment|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+---------+
|         US|   49329488|R1QF6RS1PDLU18|B00TR05L9Y|     778403103|Lenovo TAB2 A10 -...|              PC|          4|            1|          1|   N|                Y|                Good|I am not sure I d...| 2015-08-31|        1|
|         US|   43341796|R2NJ3WFUS4E5G6|B00YGJJQ6U|     986548413|Fintie iPad Ai

## 3.Técnica de muestreo aplicada por partición

Una vez construidas las particiones, se aplicó una técnica de **muestreo estratificado** sobre cada subconjunto, usando la variable `sentiment` como variable de estratificación. Esto asegura que cada muestra mantenga la proporción original de clases de sentimiento en la partición.

Para evitar particiones con tamaños insuficientes, se definió un umbral mínimo (`min_rows`) que descarta automáticamente las particiones con muy pocos registros.

Además, se permitió configurar el porcentaje de muestreo (`fraction`) por clase de sentimiento. Esto ofrece flexibilidad para ajustar el tamaño del conjunto de entrenamiento o validación según necesidades posteriores.

#### Justificación del muestreo estratificado

* **Preservación del equilibrio de clases**: Al muestrear por clase de sentimiento se evitan sesgos por clases desbalanceadas.
* **Relevancia contextual**: Al aplicar el muestreo dentro de cada partición (y no sobre la base completa), se conserva la variabilidad contextual de las reseñas.
* **Evita el submuestreo accidental**: Las particiones muy pequeñas son descartadas de forma controlada, garantizando que el conjunto final tenga representatividad suficiente.

In [13]:
sampled_partitions = PartitioningManager.stratified_sample_partitioned_data(partitions, fraction=0.05, min_rows=100)

                                                                                

Sampled 184082 rows from partition R5_VPY_VN (original: 3679909)


                                                                                

Sampled 51148 rows from partition R4_VPY_VN (original: 1019728)


                                                                                

Sampled 30183 rows from partition R1_VPY_VN (original: 603371)


                                                                                

Sampled 22223 rows from partition R3_VPY_VN (original: 443364)


                                                                                

Sampled 20507 rows from partition R5_VPN_VN (original: 410073)


                                                                                

Sampled 15034 rows from partition R2_VPY_VN (original: 300544)


                                                                                

Sampled 7595 rows from partition R1_VPN_VN (original: 152779)


                                                                                

Sampled 6708 rows from partition R4_VPN_VN (original: 135197)


                                                                                

Sampled 3345 rows from partition R3_VPN_VN (original: 65398)


                                                                                

Sampled 3066 rows from partition R2_VPN_VN (original: 59973)


                                                                                

Sampled 817 rows from partition R5_VPN_VY (original: 15604)


                                                                                

Sampled 724 rows from partition R4_VPN_VY (original: 13240)


                                                                                

Sampled 266 rows from partition R3_VPN_VY (original: 4886)


                                                                                

Sampled 93 rows from partition R2_VPN_VY (original: 1634)


                                                                                

Sampled 35 rows from partition R1_VPN_VY (original: 705)


[Stage 204:>                                                        (0 + 1) / 1]

Sampled 3 rows from partition R5_VPY_VY (original: 101)


                                                                                

In [23]:
df_sample_M = PartitioningManager.build_combined_sample(sampled_partitions)

Partition R5_VPY_VN added to the combined sample.
Partition R4_VPY_VN added to the combined sample.
Partition R1_VPY_VN added to the combined sample.
Partition R3_VPY_VN added to the combined sample.
Partition R5_VPN_VN added to the combined sample.
Partition R2_VPY_VN added to the combined sample.
Partition R1_VPN_VN added to the combined sample.
Partition R4_VPN_VN added to the combined sample.
Partition R3_VPN_VN added to the combined sample.
Partition R2_VPN_VN added to the combined sample.
Partition R5_VPN_VY added to the combined sample.
Partition R4_VPN_VY added to the combined sample.
Partition R3_VPN_VY added to the combined sample.
Partition R2_VPN_VY added to the combined sample.
Partition R1_VPN_VY added to the combined sample.
Partition R5_VPY_VY added to the combined sample.




Total records in combined sample: 345829


                                                                                

## Preparación del conjunto de entrenamiento y prueba
--- 

In [24]:
class StatisticalAnalysisHelper():
    @staticmethod
    def dataset_dimensions(df_input):
        print("columns in the dataset:", len(df_input.columns))
        print("rows in the dataset:", df_input.count())

    @staticmethod
    def schema_information(df_input):
        """
        This method shows the current schema of the data.
        """
        df_input.printSchema()

    @staticmethod
    def descriptive_statistics(df_input):
        """
        This method shows the descriptive statistics of the data.
        """
        df_input.summary().show(truncate=False)

    @staticmethod
    def missing_values_table(df_input):
        """
        Displays a table with the count of missing values per column.
        """
        missing_exprs = []
        
        for c in df_input.schema.fields:
            field_name = c.name
            field_type = c.dataType
            
            if isinstance(field_type, (DoubleType, FloatType)):
                missing_exprs.append(
                    count(when(col(field_name).isNull() | isnan(col(field_name)), field_name)).alias(field_name)
                )
            elif isinstance(field_type, StringType):
                missing_exprs.append(
                    count(when(col(field_name).isNull() | (col(field_name) == ""), field_name)).alias(field_name)
                )
            else:
                missing_exprs.append(
                    count(when(col(field_name).isNull(), field_name)).alias(field_name)
                )

        df_missing_values = df_input.select(missing_exprs)

        return df_missing_values

In [25]:
StatisticalAnalysisHelper.dataset_dimensions(df_sample_M)

columns in the dataset: 16




rows in the dataset: 345829


                                                                                

In [26]:
StatisticalAnalysisHelper.schema_information(df_sample_M)

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- sentiment: integer (nullable = true)



In [27]:
StatisticalAnalysisHelper.descriptive_statistics(df_sample_M)



+-------+-----------+--------------------+--------------+-------------------+--------------------+----------------------------------------------------------------------------------------------+----------------+------------------+------------------+------------------+------+-----------------+----------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|summary|marketplace|customer_id         |review_id     |product_id         |product_parent      |product_title                                  

                                                                                

In [28]:
missing_values = StatisticalAnalysisHelper.missing_values_table(df_sample_M)
missing_values.show(truncate=False)



+-----------+-----------+---------+----------+--------------+-------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+-----------+-----------+---------+
|marketplace|customer_id|review_id|product_id|product_parent|product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|review_headline|review_body|review_date|sentiment|
+-----------+-----------+---------+----------+--------------+-------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+-----------+-----------+---------+
|0          |0          |0        |0         |0             |0            |0               |0          |0            |0          |0   |0                |0              |0          |0          |0        |
+-----------+-----------+---------+----------+--------------+-------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+-----------+---

                                                                                

## Preparación del conjunto de entrenamiento y prueba
--- 

In [63]:
class TrainTestManager:
  @staticmethod
  def stratified_train_test_split(df, label_col="sentiment", train_ratio=0.8, seed=42):
    """
    Performs a stratified split of the DataFrame based on the label column.
    Returns (train_df, test_df).
    """
    label_values = df.select(label_col).distinct().rdd.flatMap(lambda x: x).collect()

    train_fractions = {label: train_ratio for label in label_values}

    train_df = df.sampleBy(label_col, train_fractions, seed=seed)

    train_ids = train_df.select(F.monotonically_increasing_id().alias("id"))
    df_with_id = df.withColumn("id", F.monotonically_increasing_id())

    test_df = df_with_id.join(train_ids, on="id", how="left_anti").drop("id")
    train_df = train_df.drop("id") if "id" in train_df.columns else train_df

    print(f"Train set: {train_df.count()} rows")
    print(f"Test set: {test_df.count()} rows")

    return train_df, test_df

In [73]:
train_df, test_df = TrainTestManager.stratified_train_test_split(df_sample_M, train_ratio=0.8)

                                                                                

Train set: 276731 rows




Test set: 69098 rows


                                                                                

## Construcción de modelos de aprendizaje supervisado y no supervisado
--- 

In [106]:
class ModelManager:
    @staticmethod
    def train_logistic_regression(train_df, test_df, label_col="sentiment"):
        """
        Trains a logistic regression model on the training set and evaluates it on the test set.
        Returns the trained model and evaluation metrics.
        """
        categorical_cols = ["verified_purchase", "vine"]
        numeric_cols = ["helpful_votes", "total_votes"]

        indexers = [
            StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="keep")
            for col in categorical_cols
        ]

        feature_cols = [f"{col}_idx" for col in categorical_cols] + numeric_cols
        assembler = VectorAssembler(inputCols=feature_cols, outputCol="assembled_features")

        scaler = StandardScaler(inputCol="assembled_features", outputCol="features")

        lr = LogisticRegression(labelCol=label_col, featuresCol="features", maxIter=20)

        pipeline = Pipeline(stages=indexers + [assembler, scaler, lr])
        model = pipeline.fit(train_df)

        predictions = model.transform(test_df)

        evaluator = MulticlassClassificationEvaluator(
            labelCol=label_col,
            predictionCol="prediction",
            metricName="accuracy"
        )

        accuracy = evaluator.evaluate(predictions)

        return model, accuracy, predictions

    @staticmethod
    def train_random_forest(train_df, test_df, label_col="sentiment", num_trees=50):
        """
        Trains a Random Forest classifier on the training set and evaluates it.
        Returns the trained model, accuracy score, and predictions.
        """
        categorical_cols = ["verified_purchase", "vine"]
        numeric_cols = ["helpful_votes", "total_votes"]

        indexers = [
            StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="keep")
            for col in categorical_cols
        ]

        feature_cols = [f"{col}_idx" for col in categorical_cols] + numeric_cols
        assembler = VectorAssembler(inputCols=feature_cols, outputCol="assembled_features")

        rf = RandomForestClassifier(
            labelCol=label_col,
            featuresCol="assembled_features",
            numTrees=num_trees,
            maxDepth=10,
            seed=42
        )

        pipeline = Pipeline(stages=indexers + [assembler, rf])
        model = pipeline.fit(train_df)

        predictions = model.transform(test_df)

        evaluator = MulticlassClassificationEvaluator(
            labelCol=label_col,
            predictionCol="prediction",
            metricName="accuracy"
        )

        accuracy = evaluator.evaluate(predictions)

        return model, accuracy, predictions
    
    @staticmethod
    def train_kmeans(train_df, test_df, k=2):
        categorical_cols = ["verified_purchase", "vine"]
        numeric_cols = ["helpful_votes", "total_votes"]

        indexers = [
            StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="keep")
            for col in categorical_cols
        ]

        # Text processing
        tokenizer = Tokenizer(inputCol="review_body", outputCol="tokens")
        remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
        hashingTF = HashingTF(inputCol="filtered_tokens", outputCol="raw_features", numFeatures=100)
        idf = IDF(inputCol="raw_features", outputCol="text_features")

        feature_cols = [f"{col}_idx" for col in categorical_cols] + numeric_cols + ["text_features"]
        assembler = VectorAssembler(inputCols=feature_cols, outputCol="assembled_features")
        scaler = StandardScaler(inputCol="assembled_features", outputCol="features")

        kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=k, seed=42)

        pipeline = Pipeline(stages=indexers + [tokenizer, remover, hashingTF, idf, assembler, scaler, kmeans])
        model = pipeline.fit(train_df)

        test_predictions = model.transform(test_df)

        evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="cluster", metricName="silhouette")
        accuracy = evaluator.evaluate(test_predictions)

        return model, accuracy
    
    @staticmethod
    def train_gmm(train_df, test_df, k=2):
        """
        Trains a Gaussian Mixture Model on the training set and evaluates it on the test set.
        Returns the trained model and the silhouette score (as accuracy).
        """
        categorical_cols = ["verified_purchase", "vine"]
        numeric_cols = ["helpful_votes", "total_votes"]

        indexers = [
            StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="keep")
            for col in categorical_cols
        ]

        feature_cols = [f"{col}_idx" for col in categorical_cols] + numeric_cols
        assembler = VectorAssembler(inputCols=feature_cols, outputCol="assembled_features")
        scaler = StandardScaler(inputCol="assembled_features", outputCol="features")

        gmm = GaussianMixture(featuresCol="features", predictionCol="cluster", k=k, seed=42)

        pipeline = Pipeline(stages=indexers + [assembler, scaler, gmm])
        model = pipeline.fit(train_df)

        test_predictions = model.transform(test_df)

        evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="cluster", metricName="silhouette")
        accuracy = evaluator.evaluate(test_predictions)

        return model, accuracy


In [None]:
lr_model, lr_accuracy, lr_predictions = ModelManager.train_logistic_regression(train_df, test_df)

                                                                                

In [None]:
print(f"Logistic Regression accuracy: {round(lr_accuracy, 4)}")

0.786231149960925

In [None]:
lr_model.transform(test_df).select("review_id", "prediction", "probability", "sentiment").show(10)



+--------------+----------+--------------------+---------+
|     review_id|prediction|         probability|sentiment|
+--------------+----------+--------------------+---------+
|R38YS6F8AIPTWS|       1.0|[0.20178237845048...|        1|
| RLKRLZ7VRDTGD|       1.0|[0.25605214422464...|        1|
|R3CMHUMRLBFRGU|       1.0|[0.20178237845048...|        1|
| RL5A9G7ATCTKW|       1.0|[0.20178237845048...|        1|
|R2WRS7LOHTEWC1|       1.0|[0.34326285631962...|        1|
|R30FVKNS7815YT|       1.0|[0.20178237845048...|        1|
| RIG8ZUVB49HVN|       1.0|[0.19742101500703...|        1|
|R2JTOYUS285KME|       1.0|[0.20178237845048...|        1|
| RRFWALHZISB9T|       1.0|[0.26659069851519...|        1|
|R39DPWJB2WS35W|       1.0|[0.20178237845048...|        1|
+--------------+----------+--------------------+---------+
only showing top 10 rows


                                                                                

In [79]:
fr_model, fr_accuracy, fr_predictions = ModelManager.train_random_forest(train_df, test_df)

25/05/25 13:05:02 WARN DAGScheduler: Broadcasting large task binary with size 1005.5 KiB
25/05/25 13:05:03 WARN DAGScheduler: Broadcasting large task binary with size 1165.7 KiB
                                                                                

In [80]:
print(f"Random Forest accuracy: {round(fr_accuracy, 4)}")

Random Forest accuracy: 0.7825


In [81]:
fr_model.transform(test_df).select("review_id", "prediction", "probability", "sentiment").show(10)



+--------------+----------+--------------------+---------+
|     review_id|prediction|         probability|sentiment|
+--------------+----------+--------------------+---------+
|R38YS6F8AIPTWS|       1.0|[0.19324904377050...|        1|
| RLKRLZ7VRDTGD|       1.0|[0.32877604441972...|        1|
|R3CMHUMRLBFRGU|       1.0|[0.19324904377050...|        1|
| RL5A9G7ATCTKW|       1.0|[0.19324904377050...|        1|
|R2WRS7LOHTEWC1|       1.0|[0.42131016696662...|        1|
|R30FVKNS7815YT|       1.0|[0.19324904377050...|        1|
| RIG8ZUVB49HVN|       1.0|[0.20900236205237...|        1|
|R2JTOYUS285KME|       1.0|[0.19324904377050...|        1|
| RRFWALHZISB9T|       1.0|[0.30358284024919...|        1|
|R39DPWJB2WS35W|       1.0|[0.19324904377050...|        1|
+--------------+----------+--------------------+---------+
only showing top 10 rows


                                                                                

In [97]:
kmeans_model, kmeans_accuracy = ModelManager.train_kmeans(train_df, test_df)

                                                                                

In [98]:
print(f"KMeans accuracy: {round(kmeans_accuracy, 4)}")

KMeans accuracy: 0.8874


In [107]:
gmm_model, gmm_accuracy = ModelManager.train_gmm(train_df, test_df)

25/05/25 13:56:02 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [108]:
print(f"Gaussian Mixture accuracy: {round(gmm_accuracy, 4)}")

Gaussian Mixture accuracy: 0.9557
