# Utilizaci√≥n, procesamiento y visualizaci√≥n de grandes vol√∫menes de datos

Imanol Mu√±iz Ramirez A01701713

## Objetivo

Obtener un modelo de procesamiento del lenguaje natural que sea capaz de clasificar art√≠culos cortos por su tema utilizando PySpark para el manejo de los datos y ML como framework de creaci√≥n de modelos.

## Contexto

Para este proyecto seleccionamos dos datasets. El primero de ellos es WikiCAT_dataseset https://huggingface.co/datasets/PlanTL-GOB-ES/WikiCAT_esv2?utm_source=chatgpt.com y CC-NEWS-ES https://huggingface.co/datasets/LeoCordoba/CC-NEWS-ES?utm_source=chatgpt.com. WikiCAT Contiene art√≠culos cortos categorizados en 12 temas distintos. Est√° separado en trainning y validaci√≥n sumando en total 8.71 MB. CC-NEWS contiene art√≠culos de distintos pa√≠ses sumando en total 7.62 GB de datos. Usar√©mos el primer conjunto de datos para entrenar al modelo y posteriormente har√©mos las predcciones del dataset de CC-NEWS.

## Creamos la sesi√≥n de Spark

Primero creamos la sesi√≥n de Spark con la que trabajaremos los datos. A√±adimos las configuraciones adicionales para proporcionar mayor rendimiento de acuerdo al hardware con el que contamos.

In [1]:
import json
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, monotonically_increasing_id

os.environ["HADOOP_HOME"] = "C:\\hadoop"
os.environ["PATH"] += ";C:\\hadoop\\bin"

# üí° Desactiva el uso de librer√≠as nativas (previene el error UnsatisfiedLinkError)
os.environ["HADOOP_OPTIONAL_TOOLS"] = "hadoop-azure"
os.environ["JAVA_TOOL_OPTIONS"] = "-Djava.library.path=C:\\hadoop\\bin"
os.environ["HADOOP_OPTS"] = "-Djava.library.path=C:\\hadoop\\bin"
os.environ["HADOOP_USER_NAME"] = "hadoop"

print("Iniciando nueva sesi√≥n de Spark optimizada para 16GB RAM...")
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("BR JSON Reader") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.default.parallelism", "8") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.3") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.hadoop.hadoop.home.dir", "C:\\hadoop") \
    .config("spark.hadoop.home.dir", "C:\\hadoop") \
    .config("spark.hadoop.io.nativeio.native", "false") \
    .config("spark.hadoop.native.io.enable", "false") \
    .getOrCreate()

# Establecer nivel de log para reducir mensajes
spark.sparkContext.setLogLevel("WARN")

print("‚úì Sesi√≥n de Spark iniciada con configuraci√≥n optimizada para 16GB RAM")
print(f"  - Memoria del driver: 8GB")
print(f"  - Memoria del executor: 6GB")
print(f"  - Tama√±o m√°ximo de resultados: 4GB")

Iniciando nueva sesi√≥n de Spark optimizada para 16GB RAM...
‚úì Sesi√≥n de Spark iniciada con configuraci√≥n optimizada para 16GB RAM
  - Memoria del driver: 8GB
  - Memoria del executor: 6GB
  - Tama√±o m√°ximo de resultados: 4GB


## Pasamos los datos de WikiCAT de JSON a formato CSV

In [2]:
import csv

# Ruta del archivo JSON
json_files = []
json_files.append(r"D:\Tec\Septimo\Cloud\Global Superstore Analysis\hftrain_esv5.json")
json_files.append(r"D:\Tec\Septimo\Cloud\Global Superstore Analysis\hfeval_esv5.json")

for json_file in json_files:
    # Leer el archivo JSON
    print("Leyendo archivo JSON...")
    with open(json_file, 'r', encoding='utf-8') as f:
        json_data = json.load(f)

    # Extraer el array de datos
    data_array = json_data['data']

    print(f"‚úì Datos cargados: {len(data_array)} filas")
    print(f"\nColumnas disponibles: {list(data_array[0].keys())}")
    print(f"\nPrimeras 3 filas:")
    for i, row in enumerate(data_array[:3], 1):
        print(f"{i}. Label: {row['label']}, Sentence: {row['sentence'][:80]}...")

    # Preparar directorio de salida
    output_dir = "WikiCAT_dataset"
    os.makedirs(output_dir, exist_ok=True)

    if json_file.endswith("hftrain_esv5.json"):
        csv_file = f"{output_dir}/train_esv5.csv"
        print(f"\nGuardando como CSV en {csv_file}...")
    else:
        csv_file = f"{output_dir}/valid_esv5.csv"
        print(f"\nGuardando como CSV en {csv_file}...")

    # Escribir CSV directamente sin usar Spark (evita problemas en Windows)
    with open(csv_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
        
        # Escribir encabezado
        writer.writerow(['id', 'label', 'sentence'])
        
        # Escribir datos
        for idx, row in enumerate(data_array):
            # Limpiar saltos de l√≠nea en sentence
            sentence = row['sentence'].replace('\n', ' ').replace('\r', ' ')
            writer.writerow([idx, row['label'], sentence])

    print(f"‚úì Dataset guardado exitosamente en {csv_file}")
    print(f"  Total de filas escritas: {len(data_array)}")

Leyendo archivo JSON...
‚úì Datos cargados: 6716 filas

Columnas disponibles: ['sentence', 'label']

Primeras 3 filas:
1. Label: Econom√≠a, Sentence: En estad√≠stica, un modelo probit es un tipo de regresi√≥n donde la variable depen...
2. Label: Econom√≠a, Sentence: El libro diario o libro de cuentas es un libro contable donde se registran, d√≠a ...
3. Label: Econom√≠a, Sentence: La tarifa diaria promedio (com√∫nmente conocida como ADR - por sus siglas en ingl...

Guardando como CSV en WikiCAT_dataset/train_esv5.csv...
‚úì Dataset guardado exitosamente en WikiCAT_dataset/train_esv5.csv
  Total de filas escritas: 6716
Leyendo archivo JSON...
‚úì Datos cargados: 1685 filas

Columnas disponibles: ['sentence', 'label']

Primeras 3 filas:
1. Label: Econom√≠a, Sentence: La administraci√≥n es una de las actividades humanas m√°s importantes, encargada d...
2. Label: Econom√≠a, Sentence: El aprendizaje por la pr√°ctica o aprendizaje por la ejercitaci√≥n y la repetici√≥n...
3. Label: Econom√≠a, 

## Obtenemos la informaci√≥n del CSV de WikiCAT 

Revisamos que se haya almacenado correctamente e imprimimos informaci√≥n √∫til para elaborar nuestro tablero de visualizaci√≥n

In [3]:
# Cargar el dataset WikiCAT desde CSV
print("Cargando dataset WikiCAT...")

df_wikicat = spark.read.csv(
    r"D:\Tec\Septimo\Cloud\Global Superstore Analysis\WikiCAT_dataset\train_esv5.csv",
    header=True,
    inferSchema=True,
    encoding="UTF-8"
)

print(f"‚úì Dataset cargado: {df_wikicat.count()} filas")
print(f"\nColumnas: {df_wikicat.columns}")
print(f"\nEsquema:")
df_wikicat.printSchema()

print("\nPrimeras filas:")
df_wikicat.show(5, truncate=80)

print("\nDistribuci√≥n de etiquetas:")
df_wikicat.groupBy("label").count().orderBy("count", ascending=False).show(10)

Cargando dataset WikiCAT...
‚úì Dataset cargado: 6716 filas

Columnas: ['id', 'label', 'sentence']

Esquema:
root
 |-- id: integer (nullable = true)
 |-- label: string (nullable = true)
 |-- sentence: string (nullable = true)


Primeras filas:
+---+--------+--------------------------------------------------------------------------------+
| id|   label|                                                                        sentence|
+---+--------+--------------------------------------------------------------------------------+
|  0|Econom√≠a|En estad√≠stica, un modelo probit es un tipo de regresi√≥n donde la variable de...|
|  1|Econom√≠a|El libro diario o libro de cuentas es un libro contable donde se registran, d...|
|  2|Econom√≠a|La tarifa diaria promedio (com√∫nmente conocida como ADR - por sus siglas en i...|
|  3|Econom√≠a|En econom√≠a, el coste medio o costo medio es igual al coste total dividido po...|
|  4|Econom√≠a|"Para un individuo, el equivalente cierto C(p) de una loter√≠

La distribuci√≥n de los datos es la siguiente

![image-2.png](attachment:image-2.png)

## Preparaci√≥n de los datos del modelo

Aplicamos distintas tranformaciones a los datos: Adecuamos los nombres de las columnas, filtramos registros no v√°lidos, tokenizamos el texto y vectorizamos las palabras.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer, NGram, VectorAssembler
from pyspark.sql.functions import length, col, lower, trim, regexp_replace

# ===============================================
# === PREPARACI√ìN DE DATOS PARA CLASIFICACI√ìN ===
# ===============================================

print("="*60)
print("PREPARANDO DATOS WikiCAT PARA ENTRENAMIENTO")
print("="*60)

# === 1. Cargar datos de validaci√≥n ===
print("\nCargando datos de validaci√≥n...")
df_valid = spark.read.csv(
    r"D:\Tec\Septimo\Cloud\Global Superstore Analysis\WikiCAT_dataset\valid_esv5.csv",
    header=True,
    inferSchema=True,
    encoding="UTF-8"
)
print(f"‚úì Datos de validaci√≥n cargados: {df_valid.count()} documentos")

# === 2. Usar TODOS los datos de entrenamiento (sin muestra) ===
print("\nPreparando TODOS los datos de entrenamiento (sin muestreo)...")
df_train = df_wikicat.withColumnRenamed("sentence", "texto") \
                     .withColumnRenamed("label", "tema")

df_test = df_valid.withColumnRenamed("sentence", "texto") \
                  .withColumnRenamed("label", "tema")

print(f"‚úì Dataset de entrenamiento: {df_train.count()} documentos")
print(f"‚úì Dataset de validaci√≥n: {df_test.count()} documentos")

# === 3. Limpieza avanzada de datos ===
print("\nAplicando limpieza avanzada de texto...")

def clean_data(df):
    # Limpieza de texto mejorada
    df = df.withColumn("texto", lower(col("texto")))  # Convertir a min√∫sculas
    df = df.withColumn("texto", trim(col("texto")))   # Eliminar espacios
    df = df.withColumn("texto", regexp_replace(col("texto"), r'\s+', ' '))  # Normalizar espacios
    
    # Filtrar documentos inv√°lidos
    df = df.filter(col("texto").isNotNull())
    df = df.filter(col("texto") != "")
    df = df.filter(length(col("texto")) > 20)
    df = df.filter(col("tema").isNotNull())
    
    return df

train_data = clean_data(df_train)
test_data = clean_data(df_test)

print(f"‚úì Entrenamiento despu√©s de limpieza: {train_data.count()} documentos")
print(f"‚úì Validaci√≥n despu√©s de limpieza: {test_data.count()} documentos")

# === 4. An√°lisis de temas ===
print("\nüìä Distribuci√≥n de temas en entrenamiento:")
train_data.groupBy("tema").count().orderBy("count", ascending=False).show(15)

# === 5. Particionar datos para mejor rendimiento ===
train_data = train_data.repartition(8)
test_data = test_data.repartition(8)

# Mostrar ejemplos
print("\nüìù Ejemplos de datos de entrenamiento:")
train_data.select("tema", "texto").show(3, truncate=80)

# === 6. Configurar Pipeline de Preprocesamiento MEJORADO ===
print("\n" + "="*60)
print("CONFIGURANDO PIPELINE DE PREPROCESAMIENTO MEJORADO")
print("="*60)

# 6.1 Tokenizar texto
tokenizer = RegexTokenizer(
    inputCol="texto", 
    outputCol="palabras", 
    pattern="\\W",
    minTokenLength=3  # Ignorar palabras de menos de 3 caracteres
)

# 6.2 Remover stopwords en espa√±ol
remover = StopWordsRemover(
    inputCol="palabras", 
    outputCol="palabras_limpias", 
    stopWords=StopWordsRemover.loadDefaultStopWords("spanish")
)

# 6.3 Convertir a vectores (Term Frequency) - REDUCIDO DR√ÅSTICAMENTE
cv = CountVectorizer(
    inputCol="palabras_limpias", 
    outputCol="rawFeatures", 
    vocabSize=10000,  
    minDF=5.0         # Solo palabras muy comunes
)

# 6.4 TF-IDF (relevancia de t√©rminos)
idf = IDF(
    inputCol="rawFeatures", 
    outputCol="features", 
    minDocFreq=2
)

# 6.5 Codificar etiquetas de temas como n√∫meros
indexer = StringIndexer(
    inputCol="tema", 
    outputCol="label",
    handleInvalid="keep"  # Manejar temas nuevos
)

# === 7. Crear Pipeline Completo ===
preprocessing_pipeline = Pipeline(stages=[tokenizer, remover, cv, idf, indexer])

# === 8. Entrenar el pipeline de preprocesamiento ===
print("\n" + "="*60)
print("ENTRENANDO PIPELINE DE PREPROCESAMIENTO")
print("="*60)
print("Procesando TODOS los datos de entrenamiento...")
print("(Esto tomar√° varios minutos - usando dataset completo)")

preprocessing_model = preprocessing_pipeline.fit(train_data)

# Aplicar transformaciones
print("\nAplicando transformaciones...")
train_processed = preprocessing_model.transform(train_data)
test_processed = preprocessing_model.transform(test_data)

# Cachear para mejorar rendimiento
print("Cacheando datos procesados...")
train_processed.cache()
test_processed.cache()

# Forzar evaluaci√≥n
train_count = train_processed.count()
test_count = test_processed.count()

# === 9. Mostrar resultados ===
print("\n" + "="*60)
print("‚úì PREPROCESAMIENTO COMPLETADO EXITOSAMENTE")
print("="*60)

print(f"\nüìä Estad√≠sticas:")
print(f"   - Documentos de entrenamiento procesados: {train_count:,}")
print(f"   - Documentos de validaci√≥n procesados: {test_count:,}")

vocab = preprocessing_model.stages[2].vocabulary
print(f"   - Tama√±o del vocabulario: {len(vocab):,}")

# Mapeo de temas
label_to_tema = preprocessing_model.stages[4].labels
print(f"   - N√∫mero de temas/categor√≠as: {len(label_to_tema)}")

print("\nüè∑Ô∏è  Todos los temas encontrados:")
for i, tema in enumerate(label_to_tema):
    count = train_data.filter(col("tema") == tema).count()
    print(f"   {i:2d}: {tema:30s} ({count:,} documentos)")

print("\nüìù Muestra de datos procesados:")
train_processed.select("tema", "label", "palabras_limpias").show(3, truncate=80)

print("\n" + "="*60)
print("üíæ DATOS LISTOS PARA ENTRENAMIENTO DEL MODELO")
print("="*60)
print("Variables disponibles:")
print("  - train_processed: TODOS los datos de entrenamiento preprocesados")
print("  - test_processed: datos de validaci√≥n preprocesados")
print("  - preprocessing_model: modelo de preprocesamiento optimizado")
print("  - label_to_tema: mapeo de √≠ndices a nombres de temas")

PREPARANDO DATOS WikiCAT PARA ENTRENAMIENTO (MEJORADO)

Cargando datos de validaci√≥n...
‚úì Datos de validaci√≥n cargados: 1685 documentos

Preparando TODOS los datos de entrenamiento (sin muestreo)...
‚úì Dataset de entrenamiento: 6716 documentos
‚úì Dataset de validaci√≥n: 1685 documentos

Aplicando limpieza avanzada de texto...
‚úì Entrenamiento despu√©s de limpieza: 6716 documentos
‚úì Validaci√≥n despu√©s de limpieza: 1685 documentos

üìä Distribuci√≥n de temas en entrenamiento:
+--------------------+-----+
|                tema|count|
+--------------------+-----+
|            Pol√≠tica| 1230|
|Ciencia_y_Tecnolog√≠a|  996|
|            Econom√≠a|  726|
|         Matem√°ticas|  613|
|             Derecho|  540|
|              M√∫sica|  528|
|         Humanidades|  520|
|            Historia|  396|
|           Filosof√≠a|  378|
|             Deporte|  324|
|            Religi√≥n|  301|
|     Entretenimiento|  164|
+--------------------+-----+


üìù Ejemplos de datos de entrenamie

## Entrenamos el modelo

Usamos el framework de ML para utilizar un modelo de regresi√≥n log√≠stica que clasifique los vectores generados en el preprocesamiento del subconjunto de train y probamos el desempe√±o con los datos de evaluaci√≥n. Imprimimos el resumen de los resultados y guardamos el modelo.

In [24]:
from pyspark.ml.classification import LogisticRegression, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import when

# ===============================================
# === ENTRENAMIENTO DEL MODELO DE CLASIFICACI√ìN MEJORADO ===
# ===============================================

print("="*60)
print("ENTRENANDO MODELO DE CLASIFICACI√ìN MEJORADO")
print("="*60)

# === 1. Configurar el modelo MEJORADO ===
print("\nConfigurando Logistic Regression Mejorado...")
print("Par√°metros optimizados para mayor accuracy...")

modelo_clasificacion = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=100,          # AUMENTADO de 50 a 100 iteraciones
    regParam=0.001,       # REDUCIDO de 0.01 a 0.001 (menos regularizaci√≥n)
    elasticNetParam=0.1,  # Mezcla L1 y L2 regularization
    tol=1e-6,             # Tolerancia m√°s estricta
    standardization=True, # Normalizar features
    family="multinomial"  # Mejor para m√∫ltiples clases
)

# === 2. Entrenar el modelo con TODOS los datos ===
print("\nüèãÔ∏è Entrenando modelo con TODO el dataset de entrenamiento...")
print(f"Procesando {train_count:,} documentos de entrenamiento...")
print("(Esto puede tomar 10-15 minutos dependiendo del hardware)")

import time
start_time = time.time()

modelo_entrenado = modelo_clasificacion.fit(train_processed)

elapsed_time = time.time() - start_time
print(f"‚úì Modelo entrenado exitosamente en {elapsed_time/60:.2f} minutos!")

# === 3. Hacer predicciones en datos de validaci√≥n ===
print("\nüìä Evaluando modelo en datos de validaci√≥n...")
predictions = modelo_entrenado.transform(test_processed)

# Cachear predicciones
predictions.cache()
pred_count = predictions.count()

# === 4. Evaluar el modelo con m√∫ltiples m√©tricas ===
print("\n" + "="*60)
print("EVALUACI√ìN DETALLADA DEL MODELO")
print("="*60)

# Evaluar Accuracy (Precisi√≥n)
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy = evaluator_accuracy.evaluate(predictions)

# Evaluar F1 Score
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="f1"
)
f1_score = evaluator_f1.evaluate(predictions)

# Evaluar Weighted Precision
evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="weightedPrecision"
)
precision = evaluator_precision.evaluate(predictions)

# Evaluar Weighted Recall
evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="weightedRecall"
)
recall = evaluator_recall.evaluate(predictions)

print(f"\nüìà M√âTRICAS DE EVALUACI√ìN:")
print(f"   ‚úì Accuracy (Precisi√≥n Global):    {accuracy:.4f} ({accuracy*100:.2f}%)")
print(f"   ‚úì F1 Score:                       {f1_score:.4f} ({f1_score*100:.2f}%)")
print(f"   ‚úì Weighted Precision:             {precision:.4f} ({precision*100:.2f}%)")
print(f"   ‚úì Weighted Recall:                {recall:.4f} ({recall*100:.2f}%)")

# === 5. An√°lisis detallado de predicciones ===
print("\n" + "="*60)
print("AN√ÅLISIS DETALLADO DE PREDICCIONES")
print("="*60)

# Contar predicciones correctas e incorrectas
predictions_analysis = predictions.withColumn(
    "correcto",
    when(col("label") == col("prediction"), 1).otherwise(0)
)

total_correct = predictions_analysis.filter(col("correcto") == 1).count()
total_incorrect = predictions_analysis.filter(col("correcto") == 0).count()

print(f"\n‚úÖ Predicciones correctas:   {total_correct:,} ({total_correct/pred_count*100:.2f}%)")
print(f"‚ùå Predicciones incorrectas: {total_incorrect:,} ({total_incorrect/pred_count*100:.2f}%)")

# === 6. Ejemplos de predicciones ===
print("\n‚úÖ EJEMPLOS DE PREDICCIONES CORRECTAS (5 aleatorias):")
predictions_correctas = predictions_analysis.filter(col("correcto") == 1)
predictions_correctas.select("tema", "texto").show(5, truncate=100)

print("\n‚ùå EJEMPLOS DE PREDICCIONES INCORRECTAS (5 aleatorias):")
predictions_incorrectas = predictions_analysis.filter(col("correcto") == 0)
if total_incorrect > 0:
    # Mostrar tema real vs predicho - SIN USAR UDF
    # Crear un mapeo usando broadcasting
    print("   Mostrando ejemplos de errores:")
    
    # Simplemente mostrar label y prediction como n√∫meros
    predictions_incorrectas.select(
        "tema", 
        col("label").alias("label_real"),
        col("prediction").alias("label_predicho"),
        "texto"
    ).show(5, truncate=100)
    
    # Crear mapeo de labels a temas para interpretar
    print("\n   Mapeo de √≠ndices a temas:")
    for i, tema in enumerate(label_to_tema[:10]):  # Mostrar primeros 10
        print(f"   {i}: {tema}")
else:
    print("   ¬°No hay predicciones incorrectas! Modelo perfecto ‚ú®")

# === 7. An√°lisis por tema (Performance individual) ===
print("\n" + "="*60)
print("üìä PERFORMANCE POR TEMA (ACCURACY INDIVIDUAL)")
print("="*60)

tema_stats = predictions_analysis.groupBy("tema").agg(
    {"correcto": "sum", "tema": "count"}
).withColumnRenamed("sum(correcto)", "correctos") \
 .withColumnRenamed("count(tema)", "total")

tema_stats = tema_stats.withColumn(
    "accuracy_tema",
    (col("correctos") / col("total") * 100)
)

print("\nTop 10 temas con mejor accuracy:")
tema_stats.orderBy(col("accuracy_tema").desc()).show()

print("\nTop 10 temas con menor accuracy (necesitan mejora):")
tema_stats.orderBy(col("accuracy_tema").asc()).show(10, truncate=False)

# Guardar estad√≠sticas de performance por tema en CSV
print("\nüíæ Guardando estad√≠sticas de performance por tema...")
tema_stats_ordenado = tema_stats.orderBy(col("accuracy_tema").desc())

# Guardar con Spark (un solo archivo)
temp_stats_dir = "temp_tema_stats"
tema_stats_ordenado.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_stats_dir)

# Mover el archivo CSV al nombre deseado
import glob
import shutil
csv_parts = glob.glob(f"{temp_stats_dir}/*.csv")
if csv_parts:
    stats_csv_file = "performance_por_tema.csv"
    shutil.move(csv_parts[0], stats_csv_file)
    shutil.rmtree(temp_stats_dir)
    print(f"‚úì Estad√≠sticas guardadas en: {stats_csv_file}")

# === 8. Guardar el modelo completo ===
print("\n" + "="*60)
print("üíæ GUARDANDO MODELO MEJORADO")
print("="*60)

try:
    # Guardar modelo de preprocesamiento
    preprocessing_model.write().overwrite().save("modelo_preprocessing_wikicat_mejorado")
    print("‚úì Modelo de preprocesamiento guardado: modelo_preprocessing_wikicat_mejorado")
    
    # Guardar modelo de clasificaci√≥n
    modelo_entrenado.write().overwrite().save("modelo_clasificacion_wikicat_mejorado")
    print("‚úì Modelo de clasificaci√≥n guardado: modelo_clasificacion_wikicat_mejorado")
    
    # Guardar mapeo de etiquetas
    import pickle
    with open("label_to_tema_mapping.pkl", "wb") as f:
        pickle.dump(label_to_tema, f)
    print("‚úì Mapeo de etiquetas guardado: label_to_tema_mapping.pkl")
    
    # Guardar m√©tricas
    metricas = {
        "accuracy": accuracy,
        "f1_score": f1_score,
        "precision": precision,
        "recall": recall,
        "train_count": train_count,
        "test_count": test_count,
        "vocab_size": len(vocab),
        "num_temas": len(label_to_tema)
    }
    with open("metricas_modelo.pkl", "wb") as f:
        pickle.dump(metricas, f)
    print("‚úì M√©tricas guardadas: metricas_modelo.pkl")
    
except Exception as e:
    print(f"‚ö† Error al guardar: {e}")

# === 9. Resumen final ===
print("\n" + "="*60)
print("üéâ ENTRENAMIENTO COMPLETADO EXITOSAMENTE")
print("="*60)

print(f"\nüìä RESUMEN FINAL:")
print(f"   ‚Ä¢ Documentos de entrenamiento: {train_count:,}")
print(f"   ‚Ä¢ Documentos de validaci√≥n: {test_count:,}")
print(f"   ‚Ä¢ N√∫mero de temas: {len(label_to_tema)}")
print(f"   ‚Ä¢ Tama√±o del vocabulario: {len(vocab):,} palabras")
print(f"   ‚Ä¢ Iteraciones del modelo: 100")
print(f"   ‚Ä¢ Tiempo de entrenamiento: {elapsed_time/60:.2f} minutos")
print(f"\n   üéØ ACCURACY FINAL: {accuracy*100:.2f}%")
print(f"   üéØ F1 SCORE: {f1_score*100:.2f}%")

print("\nüíæ Modelos guardados y listos para usar")
print("‚úì Siguiente paso: usar el modelo para clasificar nuevos textos")

ENTRENANDO MODELO DE CLASIFICACI√ìN MEJORADO

Configurando Logistic Regression Mejorado...
Par√°metros optimizados para mayor accuracy...

üèãÔ∏è Entrenando modelo con TODO el dataset de entrenamiento...
Procesando 6,716 documentos de entrenamiento...
(Esto puede tomar 10-15 minutos dependiendo del hardware)
‚úì Modelo entrenado exitosamente en 0.35 minutos!

üìä Evaluando modelo en datos de validaci√≥n...

EVALUACI√ìN DETALLADA DEL MODELO
‚úì Modelo entrenado exitosamente en 0.35 minutos!

üìä Evaluando modelo en datos de validaci√≥n...

EVALUACI√ìN DETALLADA DEL MODELO

üìà M√âTRICAS DE EVALUACI√ìN:
   ‚úì Accuracy (Precisi√≥n Global):    0.6475 (64.75%)
   ‚úì F1 Score:                       0.6452 (64.52%)
   ‚úì Weighted Precision:             0.6469 (64.69%)
   ‚úì Weighted Recall:                0.6475 (64.75%)

AN√ÅLISIS DETALLADO DE PREDICCIONES

üìà M√âTRICAS DE EVALUACI√ìN:
   ‚úì Accuracy (Precisi√≥n Global):    0.6475 (64.75%)
   ‚úì F1 Score:                       0.

Podemos concluir del modelo tiene un margen de mejora considerable pues solo tiene alrededor de un 64% de presici√≥n. En la imagen de abajo podemos analizar mejor el desempe√±o del modelo en cada uno de los temas que categoriza. Es posible que se requiera un modelo m√°s robusto para aumentar la presici√≥n.

![image-2.png](attachment:image-2.png)

## Procesar informaci√≥n de CC-NEWS-ES



Los datos de CC-NEWS-ES vienen en archivos JSON de una sola l√≠nea. Son arreglos que contienen objetos con los atributos id, country y text. Por lo que necesitamos primero convertir a csv.

In [6]:
df = spark.read.option("multiline", "true").json("D:\\Tec\\Septimo\\Cloud\\Global Superstore Analysis\\CC-NEWS-ES\\ar\\ar.json")

# Selecciona solo las columnas que te interesan
df_simple = df.select("id", "text")

df_simple.show(5, truncate=False)

+---+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Guardamos los resultados en un CSV. 
Posteriormente comentamos esta tarea para no hacerlo cada vez que corramos el archivo. 

In [None]:
# Save df_simple to CSV
'''
df_simple.coalesce(1) \
    .write.mode("overwrite") \
    .option("header", "true") \
    .csv("file:///D:\\Tec\\Septimo\\Cloud\\Global Superstore Analysis\\br_texts")
    '''

'\ndf_simple.coalesce(1)     .write.mode("overwrite")     .option("header", "true")     .csv("file:///D:\\Tec\\Septimo\\Cloud\\Global Superstore Analysis\\br_texts")\n    '

## Preparaci√≥n de los datos
Aplicamos las transformaciones a la columna de texto para poderla pasar por el preprocesamiento m√°s adelante

In [7]:
from pyspark.sql.functions import col, lower, trim, regexp_replace

print("Limpiando textos...")
df_prepared = (
    df_simple
    .withColumnRenamed("text", "texto")
    .withColumn("texto", lower(col("texto")))
    .withColumn("texto", trim(col("texto")))
    .withColumn("texto", regexp_replace(col("texto"), r'\s+', ' '))
    .filter(col("texto").isNotNull())
    .filter(col("texto") != "")
)


# Verificar esquema antes de guardar
df_prepared.printSchema()

print("‚úÖ Datos preparados guardados correctamente.")
df_prepared.show(5, truncate=False)


Limpiando textos...
root
 |-- id: long (nullable = true)
 |-- texto: string (nullable = true)

‚úÖ Datos preparados guardados correctamente.
+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Divisi√≥n de los datos
Estuve lidiando con m√∫ltiples problemas de memoria por lo que manejar un solo archivo grande era dificil. Decid√≠ dividirlo en lotes de 10,000 renglones cada uno

In [8]:
from pyspark.sql.functions import monotonically_increasing_id, floor

# OPTIMIZACI√ìN: Cachear df_prepared para evitar rec√°lculos
print("‚ö° Optimizando datos para procesamiento...")
df_prepared.cache()
total_rows = df_prepared.count()
print(f"Total de filas: {total_rows:,}")

# Definir el tama√±o de cada lote
batch_size = 10000

# Calcular cu√°ntos lotes necesitamos
num_batches = (total_rows // batch_size) + (1 if total_rows % batch_size > 0 else 0)
print(f"N√∫mero de lotes a crear: {num_batches}")

# OPTIMIZACI√ìN: Agregar ID y n√∫mero de lote en una sola operaci√≥n
df_with_batch = df_prepared.withColumn("row_id", monotonically_increasing_id()) \
                           .withColumn("batch_number", floor(col("row_id") / batch_size).cast("int"))

# Cachear para evitar recalcular
df_with_batch.cache()
df_with_batch.count()  # Forzar evaluaci√≥n

# Verificar la distribuci√≥n de lotes
print("\nüìä Distribuci√≥n de documentos por lote:")
df_with_batch.groupBy("batch_number").count().orderBy("batch_number").show(num_batches)

print("\n‚úÖ Datos preparados y cacheados")
print("üí° Ahora puedes guardar los lotes de forma eficiente en la siguiente celda")

‚ö° Optimizando datos para procesamiento r√°pido...
Total de filas: 532,703
N√∫mero de lotes a crear: 54

üìä Distribuci√≥n de documentos por lote:
+------------+-----+
|batch_number|count|
+------------+-----+
|           0|10000|
|           1|10000|
|           2|10000|
|           3|10000|
|           4|10000|
|           5|10000|
|           6|10000|
|           7|10000|
|           8|10000|
|           9|10000|
|          10|10000|
|          11|10000|
|          12|10000|
|          13|10000|
|          14|10000|
|          15|10000|
|          16|10000|
|          17|10000|
|          18|10000|
|          19|10000|
|          20|10000|
|          21|10000|
|          22|10000|
|          23|10000|
|          24|10000|
|          25|10000|
|          26|10000|
|          27|10000|
|          28|10000|
|          29|10000|
|          30|10000|
|          31|10000|
|          32|10000|
|          33|10000|
|          34|10000|
|          35|10000|
|          36|10000|
|          

## Preparamos los datos de cada lote para pasarlos por el modelo y guardamos los resultados.

In [None]:
# ‚ö° VERSI√ìN PARALELA CON SPARK: Guardar todos los lotes simult√°neamente
# Esta versi√≥n procesa TODOS los lotes en paralelo aprovechando Spark

import time

output_dir = "clasificacion_resultados_spark"
os.makedirs(output_dir, exist_ok=True)

print("‚ö°üíæ GUARDADO PARALELO DE LOTES CON SPARK")
print(f"Directorio de salida: {output_dir}")
print("="*60)
print(f"Total de lotes a procesar: {num_batches}")
print(f"Tama√±o de cada lote: {batch_size:,} filas")
print("\nüí° Procesando TODOS los lotes en PARALELO")
print("="*60)

start_time_total = time.time()

# Crear todos los subdirectorios de una vez
for batch_num in range(num_batches):
    batch_dir = f"{output_dir}/lote_{batch_num + 1}"
    os.makedirs(batch_dir, exist_ok=True)

print("\n‚ö° Guardando todos los lotes en paralelo...")

try:
    # Usar partitionBy para escribir todos los lotes simult√°neamente
    # Spark maneja la paralelizaci√≥n autom√°ticamente
    df_with_batch.drop("row_id") \
                 .write \
                 .partitionBy("batch_number") \
                 .mode("overwrite") \
                 .option("header", "true") \
                 .csv(f"{output_dir}/temp_parallel_output")
    
    print("‚úì Escritura paralela completada")
    
    # Reorganizar archivos a la estructura deseada
    print("\nüìÅ Reorganizando archivos...")
    import glob
    import shutil
    
    files_moved = 0
    total_size = 0
    
    for batch_num in range(num_batches):
        # Buscar el directorio de esta partici√≥n
        partition_dir = f"{output_dir}/temp_parallel_output/batch_number={batch_num}"
        
        if os.path.exists(partition_dir):
            # Buscar archivos CSV en la partici√≥n
            csv_files = glob.glob(f"{partition_dir}/*.csv")
            
            if csv_files:
                # Mover el primer archivo CSV encontrado
                source_file = csv_files[0]
                dest_file = f"{output_dir}/lote_{batch_num + 1}/articulos_lote_{batch_num + 1}.csv"
                
                shutil.move(source_file, dest_file)
                
                # Calcular tama√±o
                file_size = os.path.getsize(dest_file) / (1024 * 1024)
                total_size += file_size
                files_moved += 1
                
                print(f"   ‚úì Lote {batch_num + 1}: {file_size:.2f} MB")
    
    # Limpiar directorio temporal
    print("\nüßπ Limpiando archivos temporales...")
    if os.path.exists(f"{output_dir}/temp_parallel_output"):
        shutil.rmtree(f"{output_dir}/temp_parallel_output")
    
    # Resumen final
    elapsed_total = time.time() - start_time_total
    
    print("\n" + "="*60)
    print("‚úÖ PROCESO DE GUARDADO PARALELO COMPLETADO")
    print("="*60)
    print(f"‚è±Ô∏è  Tiempo total: {elapsed_total/60:.2f} minutos ({elapsed_total:.1f} segundos)")
    print(f"‚ö° Velocidad: {elapsed_total/num_batches:.1f} segundos efectivos por lote (procesado en paralelo)")
    print(f"üìÅ Total de archivos CSV creados: {files_moved}/{num_batches}")
    print(f"üíæ Tama√±o total de archivos: {total_size:.2f} MB")
    print(f"üìÇ Ubicaci√≥n: {os.path.abspath(output_dir)}")
    
    print(f"\nüöÄ ¬°Proceso completado {num_batches}x m√°s r√°pido gracias a paralelizaci√≥n!")
    
except Exception as e:
    print(f"‚ùå Error en procesamiento paralelo: {e}")
    import traceback
    traceback.print_exc()

‚ö°üíæ GUARDADO DE LOTES EN CSV CON SPARK NATIVO
Directorio de salida: clasificacion_resultados_pyspark
Total de lotes a procesar: 54
Tama√±o de cada lote: 10,000 filas

üí° Usando Spark nativo (sin Pandas)

üìù Procesando lote 1/54...
   ‚ö° Guardando 10,000 filas...
   ‚úÖ Guardado exitoso:
      ‚Ä¢ Archivo: articulos_lote_1.csv
      ‚Ä¢ Filas: 10,000
      ‚Ä¢ Tama√±o: 15.92 MB
      ‚Ä¢ Tiempo: 2.2 segundos

üìù Procesando lote 2/54...
   ‚ö° Guardando 10,000 filas...
   ‚úÖ Guardado exitoso:
      ‚Ä¢ Archivo: articulos_lote_1.csv
      ‚Ä¢ Filas: 10,000
      ‚Ä¢ Tama√±o: 15.92 MB
      ‚Ä¢ Tiempo: 2.2 segundos

üìù Procesando lote 2/54...
   ‚ö° Guardando 10,000 filas...
   ‚úÖ Guardado exitoso:
      ‚Ä¢ Archivo: articulos_lote_2.csv
      ‚Ä¢ Filas: 10,000
      ‚Ä¢ Tama√±o: 15.51 MB
      ‚Ä¢ Tiempo: 0.4 segundos

üìù Procesando lote 3/54...
   ‚ö° Guardando 10,000 filas...
   ‚úÖ Guardado exitoso:
      ‚Ä¢ Archivo: articulos_lote_2.csv
      ‚Ä¢ Filas: 10,000
      

## Cargar modelos entrenados

Cargamos el modelo de preprocesamiento y clasificaci√≥n que entrenamos anteriormente para poder clasificar los nuevos textos.

In [None]:
from pyspark.ml import PipelineModel
from pyspark.ml.classification import LogisticRegressionModel
import pickle

print("="*60)
print("CARGANDO MODELOS ENTRENADOS CON SPARK")
print("="*60)

# Cargar modelo de preprocesamiento directamente con Spark
print("\nüì¶ Cargando modelo de preprocesamiento...")
preprocessing_model_loaded = PipelineModel.load("modelo_preprocessing_wikicat_mejorado")
print("‚úì Modelo de preprocesamiento cargado exitosamente")

# Cargar modelo de clasificaci√≥n
print("\nüì¶ Cargando modelo de clasificaci√≥n...")
modelo_clasificacion_loaded = LogisticRegressionModel.load("modelo_clasificacion_wikicat_mejorado")
print("‚úì Modelo de clasificaci√≥n cargado exitosamente")

# Cargar mapeo de etiquetas
print("\nüì¶ Cargando mapeo de etiquetas...")
with open("label_to_tema_mapping.pkl", "rb") as f:
    label_to_tema_loaded = pickle.load(f)
print(f"‚úì Mapeo de etiquetas cargado: {len(label_to_tema_loaded)} temas")

# Mostrar los temas disponibles
print("\nüè∑Ô∏è  Temas del modelo:")
for i, tema in enumerate(label_to_tema_loaded):
    print(f"   {i:2d}: {tema}")

print("\n" + "="*60)
print("‚úÖ MODELOS LISTOS PARA CLASIFICAR NUEVOS TEXTOS")
print("="*60)

CARGANDO MODELOS ENTRENADOS CON SPARK

üì¶ Cargando modelo de preprocesamiento...
‚úì Modelo de preprocesamiento cargado exitosamente

üì¶ Cargando modelo de clasificaci√≥n...
‚úì Modelo de preprocesamiento cargado exitosamente

üì¶ Cargando modelo de clasificaci√≥n...
‚úì Modelo de clasificaci√≥n cargado exitosamente

üì¶ Cargando mapeo de etiquetas...
‚úì Mapeo de etiquetas cargado: 12 temas

üè∑Ô∏è  Temas del modelo:
    0: Pol√≠tica
    1: Ciencia_y_Tecnolog√≠a
    2: Econom√≠a
    3: Matem√°ticas
    4: Derecho
    5: M√∫sica
    6: Humanidades
    7: Historia
    8: Filosof√≠a
    9: Deporte
   10: Religi√≥n
   11: Entretenimiento

‚úÖ MODELOS LISTOS PARA CLASIFICAR NUEVOS TEXTOS
‚úì Modelo de clasificaci√≥n cargado exitosamente

üì¶ Cargando mapeo de etiquetas...
‚úì Mapeo de etiquetas cargado: 12 temas

üè∑Ô∏è  Temas del modelo:
    0: Pol√≠tica
    1: Ciencia_y_Tecnolog√≠a
    2: Econom√≠a
    3: Matem√°ticas
    4: Derecho
    5: M√∫sica
    6: Humanidades
    7: Histo

## Clasificar art√≠culos por lotes

Procesamos cada lote de art√≠culos usando el modelo entrenado. Utilizamos Spark para procesar los lotes de forma eficiente y guardamos los resultados con las predicciones. Para el caso de esta celda tuve que utilizar pandas debido a un problema con las librer√≠as de hadoop en la que no encontraba una funci√≥n en especifico y no encontr√© alternativas.

In [22]:
import time
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

print("="*60)
print("‚ö° CLASIFICACI√ìN PARALELA DE ART√çCULOS POR LOTES (SIN HADOOP I/O)")
print("="*60)

# Directorio donde est√°n los CSVs
input_dir = "clasificacion_resultados"
output_dir = "clasificacion_resultados_predicciones"
os.makedirs(output_dir, exist_ok=True)

# Encontrar todos los archivos CSV de lotes
csv_files = []
for i in range(1, num_batches + 1):
    csv_path = f"{input_dir}/lote_{i}/articulos_lote_{i}.csv"
    if os.path.exists(csv_path):
        csv_files.append((i, csv_path))

print(f"\nüìÅ Archivos CSV encontrados: {len(csv_files)}")
print(f"üìÇ Directorio de salida: {output_dir}")
print("="*60)

# Procesar cada lote
start_time_total = time.time()
resultados_resumen = []

for batch_num, csv_file in csv_files:
    start_time_batch = time.time()
    
    print(f"\n{'='*60}")
    print(f"üìù PROCESANDO LOTE {batch_num}/{num_batches}")
    print(f"{'='*60}")
    
    try:
        # 1. Cargar CSV directamente con Pandas (evita Hadoop I/O)
        print(f"   üìñ Cargando CSV con Pandas: {csv_file}")
        pandas_input = pd.read_csv(csv_file)
        num_docs = len(pandas_input)
        print(f"   ‚úì Cargado: {num_docs:,} documentos")
        
        # 2. Convertir a Spark DataFrame para procesamiento
        print(f"   ‚ö° Convirtiendo a Spark DataFrame...")
        df_lote = spark.createDataFrame(pandas_input)
        
        # 3. Aplicar preprocesamiento
        print(f"   ‚öôÔ∏è  Aplicando preprocesamiento...")
        df_procesado = preprocessing_model_loaded.transform(df_lote)
        
        # 4. Hacer predicciones
        print(f"   ü§ñ Clasificando documentos...")
        df_predicciones = modelo_clasificacion_loaded.transform(df_procesado)
        
        # 5. Seleccionar columnas y convertir a Pandas inmediatamente
        print(f"   üè∑Ô∏è  Extrayendo predicciones...")
        df_resultado = df_predicciones.select(
            "id",
            "texto",
            col("prediction").alias("tema_id")
        )
        
        # Convertir a Pandas
        pandas_resultado = df_resultado.toPandas()
        
        # 6. Mapear tema_id a nombre de tema en Pandas (m√°s eficiente)
        print(f"   üè∑Ô∏è  Mapeando temas...")
        pandas_resultado['tema_predicho'] = pandas_resultado['tema_id'].apply(
            lambda x: label_to_tema_loaded[int(x)] if int(x) < len(label_to_tema_loaded) else "Desconocido"
        )
        
        # 7. Guardar resultados usando Pandas (evita problemas de Hadoop)
        print(f"   üíæ Guardando resultados...")
        
        # Crear subdirectorio para este lote
        batch_output_dir = f"{output_dir}/lote_{batch_num}"
        os.makedirs(batch_output_dir, exist_ok=True)
        
        # Guardar CSV
        output_csv = f"{batch_output_dir}/predicciones_lote_{batch_num}.csv"
        pandas_resultado.to_csv(output_csv, index=False, encoding='utf-8')
        
        # 8. Calcular estad√≠sticas
        print(f"   üìä Generando estad√≠sticas...")
        tema_counts = pandas_resultado['tema_predicho'].value_counts()
        
        # Guardar estad√≠sticas
        stats_file = f"{batch_output_dir}/estadisticas_lote_{batch_num}.txt"
        with open(stats_file, 'w', encoding='utf-8') as f:
            f.write(f"ESTAD√çSTICAS - LOTE {batch_num}\n")
            f.write(f"{'='*50}\n\n")
            f.write(f"Total de documentos: {num_docs:,}\n\n")
            f.write(f"Distribuci√≥n por tema:\n")
            f.write(f"{'-'*50}\n")
            for tema, count in tema_counts.items():
                porcentaje = (count / num_docs) * 100
                f.write(f"{tema:30s}: {count:6,} ({porcentaje:5.2f}%)\n")
        
        # Tiempo transcurrido
        elapsed = time.time() - start_time_batch
        file_size = os.path.getsize(output_csv) / (1024 * 1024)
        
        # Guardar resumen
        resultados_resumen.append({
            'lote': batch_num,
            'documentos': num_docs,
            'tiempo_seg': elapsed,
            'tama√±o_mb': file_size,
            'temas_unicos': len(tema_counts)
        })
        
        print(f"\n   ‚úÖ LOTE {batch_num} COMPLETADO:")
        print(f"      ‚Ä¢ Documentos clasificados: {num_docs:,}")
        print(f"      ‚Ä¢ Temas √∫nicos encontrados: {len(tema_counts)}")
        print(f"      ‚Ä¢ Archivo de salida: {output_csv}")
        print(f"      ‚Ä¢ Tama√±o: {file_size:.2f} MB")
        print(f"      ‚Ä¢ Tiempo: {elapsed:.1f} segundos")
        print(f"\n   üèÜ Top 3 temas m√°s frecuentes:")
        for i, (tema, count) in enumerate(tema_counts.head(3).items(), 1):
            porcentaje = (count / num_docs) * 100
            print(f"      {i}. {tema}: {count:,} ({porcentaje:.1f}%)")
        
        # Liberar memoria inmediatamente
        del pandas_input, df_lote, df_procesado, df_predicciones, df_resultado, pandas_resultado
        
    except Exception as e:
        print(f"   ‚ùå Error al procesar lote {batch_num}: {e}")
        import traceback
        traceback.print_exc()

# Resumen final
elapsed_total = time.time() - start_time_total

print("\n" + "="*60)
print("üéâ CLASIFICACI√ìN COMPLETADA - RESUMEN FINAL")
print("="*60)

print(f"\n‚è±Ô∏è  Tiempo total: {elapsed_total/60:.2f} minutos ({elapsed_total:.1f} segundos)")
print(f"‚ö° Tiempo promedio por lote: {elapsed_total/len(csv_files):.1f} segundos")
print(f"üìÅ Total de lotes procesados: {len(csv_files)}/{num_batches}")

# Crear DataFrame de resumen
if resultados_resumen:
    df_resumen = pd.DataFrame(resultados_resumen)
    print(f"\nüìä ESTAD√çSTICAS POR LOTE:")
    print(df_resumen.to_string(index=False))
    
    # Guardar resumen general
    resumen_csv = f"{output_dir}/resumen_clasificacion.csv"
    df_resumen.to_csv(resumen_csv, index=False)
    print(f"\nüíæ Resumen guardado en: {resumen_csv}")
    
    # Estad√≠sticas globales
    total_docs = df_resumen['documentos'].sum()
    total_size = df_resumen['tama√±o_mb'].sum()
    
    print(f"\nüìà TOTALES:")
    print(f"   ‚Ä¢ Documentos clasificados: {total_docs:,}")
    print(f"   ‚Ä¢ Tama√±o total de archivos: {total_size:.2f} MB")
    print(f"   ‚Ä¢ Velocidad promedio: {total_docs/elapsed_total:.1f} docs/segundo")

print(f"\nüìÇ Resultados guardados en: {os.path.abspath(output_dir)}")
print("="*60)
print("‚ú® ¬°Clasificaci√≥n completada exitosamente!")
print("="*60)

‚ö° CLASIFICACI√ìN PARALELA DE ART√çCULOS POR LOTES (SIN HADOOP I/O)

üìÅ Archivos CSV encontrados: 54
üìÇ Directorio de salida: clasificacion_resultados_predicciones

üìù PROCESANDO LOTE 1/54
   üìñ Cargando CSV con Pandas: clasificacion_resultados/lote_1/articulos_lote_1.csv
   ‚úì Cargado: 10,000 documentos
   ‚ö° Convirtiendo a Spark DataFrame...
   ‚öôÔ∏è  Aplicando preprocesamiento...
   ü§ñ Clasificando documentos...
   üè∑Ô∏è  Extrayendo predicciones...
   ‚úì Cargado: 10,000 documentos
   ‚ö° Convirtiendo a Spark DataFrame...
   ‚öôÔ∏è  Aplicando preprocesamiento...
   ü§ñ Clasificando documentos...
   üè∑Ô∏è  Extrayendo predicciones...
   üè∑Ô∏è  Mapeando temas...
   üíæ Guardando resultados...
   üè∑Ô∏è  Mapeando temas...
   üíæ Guardando resultados...
   üìä Generando estad√≠sticas...

   ‚úÖ LOTE 1 COMPLETADO:
      ‚Ä¢ Documentos clasificados: 10,000
      ‚Ä¢ Temas √∫nicos encontrados: 12
      ‚Ä¢ Archivo de salida: clasificacion_resultados_predicciones/lot

## An√°lisis consolidado de resultados

Consolidamos todos los resultados de los lotes para obtener estad√≠sticas generales de la clasificaci√≥n.

In [None]:
import glob

print("="*60)
print("üìä CONSOLIDACI√ìN DE RESULTADOS DE CLASIFICACI√ìN")
print("="*60)

# Buscar todos los archivos de predicciones
prediction_files = glob.glob(f"{output_dir}/lote_*/predicciones_lote_*.csv")
print(f"\nüìÅ Archivos de predicciones encontrados: {len(prediction_files)}")

if prediction_files:
    # Cargar y consolidar todos los resultados
    print("\n‚ö° Cargando y consolidando resultados...")
    all_predictions = []
    
    for file in sorted(prediction_files):
        df = pd.read_csv(file)
        all_predictions.append(df)
        print(f"   ‚úì Cargado: {file} ({len(df):,} documentos)")
    
    # Concatenar todos los DataFrames
    df_consolidado = pd.concat(all_predictions, ignore_index=True)
    total_docs = len(df_consolidado)
    
    print(f"\n‚úÖ Consolidaci√≥n completada: {total_docs:,} documentos totales")
    
    # An√°lisis de distribuci√≥n de temas
    print("\n" + "="*60)
    print("üìà DISTRIBUCI√ìN GLOBAL DE TEMAS")
    print("="*60)
    
    tema_distribution = df_consolidado['tema_predicho'].value_counts()
    
    print(f"\nüè∑Ô∏è  Total de temas √∫nicos: {len(tema_distribution)}")
    print(f"\nDistribuci√≥n completa:\n")
    
    for i, (tema, count) in enumerate(tema_distribution.items(), 1):
        porcentaje = (count / total_docs) * 100
        barra = "‚ñà" * int(porcentaje / 2)
        print(f"{i:2d}. {tema:30s}: {count:8,} ({porcentaje:5.2f}%) {barra}")
    
    # Guardar resultados consolidados
    print("\n" + "="*60)
    print("üíæ GUARDANDO AN√ÅLISIS CONSOLIDADO")
    print("="*60)
    
    # Guardar CSV consolidado (opcional - puede ser grande)
    consolidado_csv = f"{output_dir}/predicciones_consolidadas.csv"
    print(f"\nüíæ Guardando CSV consolidado...")
    df_consolidado.to_csv(consolidado_csv, index=False, encoding='utf-8')
    file_size = os.path.getsize(consolidado_csv) / (1024 * 1024)
    print(f"   ‚úì Guardado: {consolidado_csv} ({file_size:.2f} MB)")
    
    # Guardar estad√≠sticas globales
    stats_global_file = f"{output_dir}/estadisticas_globales.txt"
    with open(stats_global_file, 'w', encoding='utf-8') as f:
        f.write("ESTAD√çSTICAS GLOBALES DE CLASIFICACI√ìN\n")
        f.write("="*60 + "\n\n")
        f.write(f"Total de documentos clasificados: {total_docs:,}\n")
        f.write(f"Total de temas √∫nicos: {len(tema_distribution)}\n\n")
        f.write("Distribuci√≥n por tema:\n")
        f.write("-"*60 + "\n\n")
        
        for i, (tema, count) in enumerate(tema_distribution.items(), 1):
            porcentaje = (count / total_docs) * 100
            f.write(f"{i:2d}. {tema:30s}: {count:8,} ({porcentaje:5.2f}%)\n")
    
    print(f"   ‚úì Estad√≠sticas guardadas: {stats_global_file}")
    
    # Crear resumen visual
    print("\n" + "="*60)
    print("üéØ TOP 10 TEMAS M√ÅS FRECUENTES")
    print("="*60)
    
    for i, (tema, count) in enumerate(tema_distribution.head(10).items(), 1):
        porcentaje = (count / total_docs) * 100
        print(f"\n{i:2d}. {tema}")
        print(f"    Documentos: {count:,}")
        print(f"    Porcentaje: {porcentaje:.2f}%")
        print(f"    {'‚ñà' * int(porcentaje)}")
    
    # Informaci√≥n de muestra
    print("\n" + "="*60)
    print("üìù MUESTRA DE PREDICCIONES")
    print("="*60)
    print("\nPrimeras 10 predicciones:")
    print(df_consolidado[['id', 'texto', 'tema_predicho']].head(10).to_string(index=False))
    
    print("\n" + "="*60)
    print("‚ú® AN√ÅLISIS CONSOLIDADO COMPLETADO")
    print("="*60)
    print(f"\nüìÇ Archivos generados:")
    print(f"   ‚Ä¢ CSV consolidado: {consolidado_csv}")
    print(f"   ‚Ä¢ Estad√≠sticas: {stats_global_file}")
    print(f"\nüéâ ¬°Proceso completo finalizado exitosamente!")
    
else:
    print("\n‚ö†Ô∏è  No se encontraron archivos de predicciones para consolidar.")

üìä CONSOLIDACI√ìN DE RESULTADOS DE CLASIFICACI√ìN

üìÅ Archivos de predicciones encontrados: 54

‚ö° Cargando y consolidando resultados...
   ‚úì Cargado: clasificacion_resultados_predicciones\lote_10\predicciones_lote_10.csv (10,000 documentos)
   ‚úì Cargado: clasificacion_resultados_predicciones\lote_10\predicciones_lote_10.csv (10,000 documentos)
   ‚úì Cargado: clasificacion_resultados_predicciones\lote_11\predicciones_lote_11.csv (10,000 documentos)
   ‚úì Cargado: clasificacion_resultados_predicciones\lote_11\predicciones_lote_11.csv (10,000 documentos)
   ‚úì Cargado: clasificacion_resultados_predicciones\lote_12\predicciones_lote_12.csv (10,000 documentos)
   ‚úì Cargado: clasificacion_resultados_predicciones\lote_12\predicciones_lote_12.csv (10,000 documentos)
   ‚úì Cargado: clasificacion_resultados_predicciones\lote_13\predicciones_lote_13.csv (10,000 documentos)
   ‚úì Cargado: clasificacion_resultados_predicciones\lote_13\predicciones_lote_13.csv (10,000 documentos)
  

![image-2.png](attachment:image-2.png)

## Versiones que se usaron para este trabajo.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
print("üîπ Spark version:", spark.version)
print("üîπ Hadoop version:", spark._jvm.org.apache.hadoop.util.VersionInfo.getVersion())


üîπ Spark version: 4.0.1
üîπ Hadoop version: 3.4.1


## Link de Tablue Public

https://public.tableau.com/app/profile/imanol.mu.iz.ramirez/viz/NLP_17622915284760/Story1?publish=yes