In [None]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, when
from typing import Tuple

def split_train_test_three_categories(
    df: DataFrame,
    target_column: str,
    train_ratio: float = 0.8,
    seed: int = 42
) -> Tuple[DataFrame, DataFrame]:
    """
    Divide un DataFrame de Spark en conjuntos de entrenamiento y prueba, 
    manteniendo la proporción de las tres categorías del target.
    
    Args:
        df (DataFrame): DataFrame de Spark preprocesado.
        target_column (str): Nombre de la columna objetivo con tres categorías.
        train_ratio (float): Proporción de datos para el conjunto de entrenamiento. Por defecto es 0.8 (80%).
        seed (int): Semilla para reproducibilidad. Por defecto es 42.
    
    Returns:
        Tuple[DataFrame, DataFrame]: DataFrames de entrenamiento y prueba.
    """
    if train_ratio <= 0 or train_ratio >= 1:
        raise ValueError("train_ratio debe estar entre 0 y 1")
    
    # Verificar que hay exactamente tres categorías
    categories = df.select(target_column).distinct().count()
    if categories != 3:
        raise ValueError(f"El target debe tener exactamente 3 categorías, pero tiene {categories}")
    
    # Contar el número de registros por categoría
    class_counts = df.groupBy(target_column).count().collect()
    
    # Calcular la fracción para cada categoría para mantener la estratificación
    fractions = {row[target_column]: train_ratio for row in class_counts}
    
    # Muestreo estratificado
    train = df.sampleBy(target_column, fractions, seed)
    test = df.subtract(train)
    
    # Función para calcular y mostrar la distribución de clases
    def print_class_distribution(data: DataFrame, name: str):
        total = data.count()
        distribution = data.groupBy(target_column).agg(
            count("*").alias("count"),
            (count("*") / total * 100).alias("percentage")
        ).orderBy(target_column)
        print(f"\nDistribución de clases en {name} (Total: {total}):")
        distribution.show()

    # Mostrar la distribución de clases en los conjuntos completo, de entrenamiento y de prueba
    print_class_distribution(df, "el conjunto completo")
    print_class_distribution(train, "el conjunto de entrenamiento")
    print_class_distribution(test, "el conjunto de prueba")
    
    # Verificar que la división se hizo correctamente
    total_count = df.count()
    train_count = train.count()
    test_count = test.count()
    
    print(f"\nResumen de la división:")
    print(f"Total de registros: {total_count}")
    print(f"Registros de entrenamiento: {train_count} ({train_count/total_count:.2%})")
    print(f"Registros de prueba: {test_count} ({test_count/total_count:.2%})")
    
    if abs((train_count / total_count) - train_ratio) > 0.01:
        print("Advertencia: La proporción de división real difiere de la solicitada en más del 1%")
    
    return train, test

# Ejemplo de uso:
# preprocessed_df = ... # Tu DataFrame de Spark preprocesado
# target_column = "clasificacion"
# train_df, test_df = split_train_test_three_categories(preprocessed_df, target_column, train_ratio=0.8)

In [None]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, lit, rand, expr
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import StructType, StructField, DoubleType
import pyspark.sql.functions as F
from typing import List

def optimal_balance_training(
    df: DataFrame,
    target_column: str,
    feature_column: str,
    balance_ratio: float = 0.8,
    k_neighbors: int = 5,
    seed: int = 42
) -> DataFrame:
    """
    Balancea la muestra de entrenamiento usando una combinación de técnicas para minimizar el sobreajuste.
    
    Args:
        df (DataFrame): DataFrame de Spark con la muestra de entrenamiento.
        target_column (str): Nombre de la columna objetivo con tres categorías.
        feature_column (str): Nombre de la columna que contiene el vector de características.
        balance_ratio (float): Proporción objetivo para el balanceo (0.8 significa que las clases minoritarias
                               se sobremuestrearán hasta el 80% de la clase mayoritaria).
        k_neighbors (int): Número de vecinos a considerar para la generación de muestras sintéticas.
        seed (int): Semilla para reproducibilidad.
    
    Returns:
        DataFrame: DataFrame de Spark con la muestra balanceada.
    """
    # Contar las instancias de cada clase
    class_counts = df.groupBy(target_column).count().collect()
    total_counts = {row[target_column]: row['count'] for row in class_counts}
    
    # Encontrar la clase mayoritaria y su conteo
    majority_class = max(total_counts, key=total_counts.get)
    majority_count = total_counts[majority_class]
    
    # Calcular el conteo objetivo para las clases minoritarias
    target_count = int(majority_count * balance_ratio)
    
    # Función para generar instancias sintéticas usando SMOTE
    def generate_smote_samples(df: DataFrame, target_value: any, n_samples: int, k: int) -> DataFrame:
        # Seleccionar instancias de la clase minoritaria
        minority_samples = df.filter(col(target_column) == target_value)
        
        # Función para encontrar los k vecinos más cercanos
        def find_k_neighbors(vector: List[float], k: int) -> List[List[float]]:
            return minority_samples.rdd.map(
                lambda row: (sum((a - b) ** 2 for a, b in zip(vector, row[feature_column])), row)
            ).sortByKey().map(lambda x: x[1][feature_column]).take(k + 1)[1:]
        
        # Función para generar una nueva instancia
        def generate_instance(row):
            features = row[feature_column]
            neighbors = find_k_neighbors(features, k)
            new_features = []
            for i in range(len(features)):
                diff = neighbors[rand.randint(0, k-1)][i] - features[i]
                new_features.append(features[i] + rand.random() * diff)
            return (Vectors.dense(new_features), row[target_column])
        
        # Generar nuevas instancias
        synthetic_samples = minority_samples.rdd.flatMap(
            lambda row: [generate_instance(row) for _ in range(n_samples // minority_samples.count())]
        ).toDF([feature_column, target_column])
        
        return synthetic_samples
    
    # Balancear cada clase
    balanced_dfs = []
    for class_value, count in total_counts.items():
        if count > target_count:
            # Submuestreo para la clase mayoritaria
            balanced_dfs.append(df.filter(col(target_column) == class_value).sample(fraction=target_count/count, seed=seed))
        elif count < target_count:
            # Sobremuestreo para las clases minoritarias
            balanced_dfs.append(df.filter(col(target_column) == class_value))
            synthetic_samples = generate_smote_samples(
                df, class_value, target_count - count, k_neighbors
            )
            balanced_dfs.append(synthetic_samples)
        else:
            # Mantener sin cambios si ya está en el conteo objetivo
            balanced_dfs.append(df.filter(col(target_column) == class_value))
    
    # Unir todos los DataFrames
    balanced_df = balanced_dfs[0].unionAll(*balanced_dfs[1:])
    
    # Añadir ruido gaussiano a las características para aumentar la variabilidad
    @F.udf(returnType=VectorUDT())
    def add_gaussian_noise(vector):
        return Vectors.dense([v + rand.gauss(0, 0.01) for v in vector])
    
    balanced_df = balanced_df.withColumn(feature_column, add_gaussian_noise(col(feature_column)))
    
    # Mostrar la distribución de clases después del balanceo
    print("\nDistribución de clases después del balanceo:")
    balanced_df.groupBy(target_column).count().orderBy(target_column).show()
    
    return balanced_df

# Ejemplo de uso:
# train_df = ... # Tu DataFrame de Spark con la muestra de entrenamiento
# target_column = "clasificacion"
# feature_column = "features"
# balanced_train_df = optimal_balance_training(train_df, target_column, feature_column)