# Importar jars y definir spark

In [None]:
import os
import sys
import subprocess
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

# --- 1. Configuraci√≥n ---
os.environ['SPARK_HOME'] = "/home/hadoop/spark"
sys.path.insert(0, "/home/hadoop/spark/python")
sys.path.insert(0, "/home/hadoop/spark/python/lib/py4j-0.10.9.7-src.zip") 
sys.path.insert(0, "/home/hadoop/spark/python/lib/pyspark.zip")

spark = SparkSession.builder \
    .appName("Ingesta_Limpia_Multiclase") \
    .master("yarn") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.yarn.executor.memoryOverhead", "1024m") \
    .getOrCreate()

print("‚úÖ Spark iniciado correctamente usando los JARs del sistema.")


25/11/25 03:47:36 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
25/11/25 03:47:36 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/25 03:47:36 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
25/11/25 03:47:36 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'sp

# Procesar datos en formato csv y guardar en nueva carpeta
## Se procesan 9 archivos

In [3]:
# Rutas
CARPETA_ORIGEN = "/trafico"
CARPETA_DESTINO = "/trafico_clean"


# Columnas a eliminar
cols_drop_base = [
    "Bwd PSH Flags", "Bwd URG Flags", "Fwd Pkts/b Avg", "Bwd Pkts/b Avg",
    "Fwd Byts/b Avg", "Bwd Byts/b Avg", "Fwd Blk Rate Avg", "Bwd Blk Rate Avg",
    "Fwd URG Flags", "CWE Flag Count", "FIN Flag Cnt", "Timestamp",
    "Flow ID", "Src IP", "Dst IP", "Src Port", "Protocol"
]

# Obtener lista de archivos
try:
    cmd = f"hdfs dfs -ls {CARPETA_ORIGEN} | grep .csv | awk '{{print $8}}'"
    lista_archivos = subprocess.check_output(cmd, shell=True).decode("utf-8").strip().split("\n")
    lista_archivos = [x for x in lista_archivos if x]
except:
    lista_archivos = []

print(f"Procesando {len(lista_archivos)} archivos...")

for archivo in lista_archivos:
    filename = archivo.split("/")[-1]
    print(f"\n--> Procesando: {filename}")
    
    try:
        # 1. Leer CSV
        df_temp = spark.read.option("header", "true").option("inferSchema", "false").csv(archivo)
        
        # --- METRICAS INICIALES ---
        # Contamos filas iniciales (esto toma unos segundos extra)
        filas_iniciales = df_temp.count()
        
        # 2. Limpiar filas (Eliminar headers repetidos 'Label')
        df_temp = df_temp.filter(col("Label") != "Label")
        
        # --- METRICAS DE FILAS ---
        filas_finales = df_temp.count()
        filas_borradas = filas_iniciales - filas_finales
        
        # 3. Limpiar Columnas
        cols_existentes = df_temp.columns
        # Identificamos la intersecci√≥n (cu√°les de la lista est√°n realmente en el archivo)
        cols_a_borrar = [c for c in cols_drop_base if c in cols_existentes]
        df_temp = df_temp.drop(*cols_a_borrar)
        
        # --- IMPRIMIR REPORTE ---
        print(f"    üìâ Columnas eliminadas: {len(cols_a_borrar)} (De {len(cols_existentes)} iniciales)")
        print(f"    ‚úÇÔ∏è  Registros eliminados: {filas_borradas} (Headers repetidos o basura)")
        
        # 4. Castear a Double (MANTENIENDO ETIQUETA STRING)
        for c in df_temp.columns:
            if c != "Label":
                df_temp = df_temp.withColumn(c, col(c).cast(DoubleType()))
        
        # (La l√≠nea binaria est√° comentada/omitida como pediste)
        # df_temp = df_temp.withColumn("Label_Binary", ...)
        
        # 5. Guardar
        df_temp.write.mode("append").parquet(CARPETA_DESTINO)
        
        # 6. Borrar original
        subprocess.check_call(f"hdfs dfs -rm -skipTrash {archivo}", shell=True)
        print(f"    ‚úÖ Guardado y origen eliminado.")
        
    except Exception as e:
        print(f"    ‚ùå Error: {str(e)}")
        spark.stop()
        break

print("\n‚úÖ Proceso completado.")

Deleted /trafico_clean
Procesando 9 archivos...

--> Procesando: Friday-02-03-2018_TrafficForML_CICFlowMeter.csv


                                                                                

    üìâ Columnas eliminadas: 13 (De 80 iniciales)
    ‚úÇÔ∏è  Registros eliminados: 0 (Headers repetidos o basura)


25/11/21 21:05:07 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Deleted /trafico/Friday-02-03-2018_TrafficForML_CICFlowMeter.csv
    ‚úÖ Guardado y origen eliminado.

--> Procesando: Friday-16-02-2018_TrafficForML_CICFlowMeter.csv


                                                                                

    üìâ Columnas eliminadas: 13 (De 80 iniciales)
    ‚úÇÔ∏è  Registros eliminados: 1 (Headers repetidos o basura)


                                                                                

Deleted /trafico/Friday-16-02-2018_TrafficForML_CICFlowMeter.csv
    ‚úÖ Guardado y origen eliminado.

--> Procesando: Friday-23-02-2018_TrafficForML_CICFlowMeter.csv


                                                                                

    üìâ Columnas eliminadas: 13 (De 80 iniciales)
    ‚úÇÔ∏è  Registros eliminados: 0 (Headers repetidos o basura)


                                                                                

Deleted /trafico/Friday-23-02-2018_TrafficForML_CICFlowMeter.csv
    ‚úÖ Guardado y origen eliminado.

--> Procesando: Thursday-01-03-2018_TrafficForML_CICFlowMeter.csv


                                                                                

    üìâ Columnas eliminadas: 13 (De 80 iniciales)
    ‚úÇÔ∏è  Registros eliminados: 25 (Headers repetidos o basura)


                                                                                

Deleted /trafico/Thursday-01-03-2018_TrafficForML_CICFlowMeter.csv
    ‚úÖ Guardado y origen eliminado.

--> Procesando: Thursday-15-02-2018_TrafficForML_CICFlowMeter.csv


                                                                                

    üìâ Columnas eliminadas: 13 (De 80 iniciales)
    ‚úÇÔ∏è  Registros eliminados: 0 (Headers repetidos o basura)


                                                                                

Deleted /trafico/Thursday-15-02-2018_TrafficForML_CICFlowMeter.csv
    ‚úÖ Guardado y origen eliminado.

--> Procesando: Thursday-22-02-2018_TrafficForML_CICFlowMeter.csv


                                                                                

    üìâ Columnas eliminadas: 13 (De 80 iniciales)
    ‚úÇÔ∏è  Registros eliminados: 0 (Headers repetidos o basura)


                                                                                

Deleted /trafico/Thursday-22-02-2018_TrafficForML_CICFlowMeter.csv
    ‚úÖ Guardado y origen eliminado.

--> Procesando: Wednesday-14-02-2018_TrafficForML_CICFlowMeter.csv


                                                                                

    üìâ Columnas eliminadas: 13 (De 80 iniciales)
    ‚úÇÔ∏è  Registros eliminados: 0 (Headers repetidos o basura)


                                                                                

Deleted /trafico/Wednesday-14-02-2018_TrafficForML_CICFlowMeter.csv
    ‚úÖ Guardado y origen eliminado.

--> Procesando: Wednesday-21-02-2018_TrafficForML_CICFlowMeter.csv


                                                                                

    üìâ Columnas eliminadas: 13 (De 80 iniciales)
    ‚úÇÔ∏è  Registros eliminados: 0 (Headers repetidos o basura)


                                                                                

Deleted /trafico/Wednesday-21-02-2018_TrafficForML_CICFlowMeter.csv
    ‚úÖ Guardado y origen eliminado.

--> Procesando: Wednesday-28-02-2018_TrafficForML_CICFlowMeter.csv


                                                                                

    üìâ Columnas eliminadas: 13 (De 80 iniciales)
    ‚úÇÔ∏è  Registros eliminados: 33 (Headers repetidos o basura)


                                                                                

Deleted /trafico/Wednesday-28-02-2018_TrafficForML_CICFlowMeter.csv
    ‚úÖ Guardado y origen eliminado.

‚úÖ Proceso completado.


# Se procesa el archivo de 3.7 GB

In [None]:
#rutas
ARCHIVO_PESADO = "/trafico/Thuesday-20-02-2018_TrafficForML_CICFlowMeter.csv"
CARPETA_DESTINO = "/trafico_clean"

# Columnas a eliminar (Misma lista que los otros)
cols_drop_base = [
    "Bwd PSH Flags", "Bwd URG Flags", "Fwd Pkts/b Avg", "Bwd Pkts/b Avg",
    "Fwd Byts/b Avg", "Bwd Byts/b Avg", "Fwd Blk Rate Avg", "Bwd Blk Rate Avg",
    "Fwd URG Flags", "CWE Flag Count", "FIN Flag Cnt", "Timestamp",
    "Flow ID", "Src IP", "Dst IP", "Src Port", "Protocol"
]

print(f"üöÄ Procesando archivo pesado para recuperar datos...")

try:
    # 1. Leer
    # Nota: Sin inferSchema es vital para este archivo de 3.8GB
    df_temp = spark.read.option("header", "true").option("inferSchema", "false").csv(ARCHIVO_PESADO)
    
    print(f"    Le√≠do. Filas: {df_temp.count():,}")

    # 2. Limpiar
    df_temp = df_temp.filter(col("Label") != "Label")
    
    cols_existentes = df_temp.columns
    cols_a_borrar = [c for c in cols_drop_base if c in cols_existentes]
    df_temp = df_temp.drop(*cols_a_borrar)

    # 3. Castear
    for c in df_temp.columns:
        if c != "Label":
            df_temp = df_temp.withColumn(c, col(c).cast(DoubleType()))
    
    # 4. APPEND (Crucial: modo append para sumar a los otros archivos)
    df_temp.write.mode("append").parquet(CARPETA_DESTINO)
    
    # 5. Borrar CSV origen
    subprocess.check_call(f"hdfs dfs -rm -skipTrash {ARCHIVO_PESADO}", shell=True)
    
    print(f"‚úÖ RECUPERACI√ìN COMPLETADA. El archivo pesado se uni√≥ al dataset.")

except Exception as e:
    print(f"‚ùå Error: {str(e)}")

# Verificaci√≥n final
total = spark.read.parquet(CARPETA_DESTINO).count()
print(f"üìä Total registros en el dataset completo: {total:,}")

# Verificar datos procesados

In [2]:
from pyspark.sql.functions import col, count, when, isnan, countDistinct
from pyspark.ml.feature import StringIndexer

PATH_DATA = "/trafico_clean"
print(f"üîç Inspeccionando dataset en: {PATH_DATA}")

#definir df y cargar datos
df = spark.read.parquet(PATH_DATA)

#dimenciones
total_filas = df.count()
total_cols = len(df.columns)
print(f"\nüìè Dimensiones: {total_filas:,} filas x {total_cols} columnas")

#verificar tipo de dato Double y label se descarta
tipos = dict(df.dtypes)
no_numericas = [c for c, t in tipos.items() if t == 'string' and c != 'Label']
if no_numericas:
    print(f"‚ö†Ô∏è ALERTA: Hay columnas string que no deber√≠an estar: {no_numericas}")
else:
    print("‚úÖ Estructura Correcta: Todas las features son num√©ricas.")

#definir nueva columna xon tipo int
print("\nüè∑Ô∏è Generando √çndices de Clase (Por frecuencia)...")
#asignar indices de forma ascendente de mayor a menor cantidad de clase 
indexer = StringIndexer(inputCol="Label", outputCol="Label_Index")
indexer_model = indexer.fit(df)
df_indexed = indexer_model.transform(df)

print("\nüìä DISTRIBUCI√ìN DE CLASES (Mapeo e Histograma):")
#tabla de cols e indices
df_stats = df_indexed.groupBy("Label_Index", "Label").count().orderBy("Label_Index")
df_stats.show(30, truncate=False)

#Verificar nulos 
print("\nse Buscando valores Nulos o NaN en todas las columnas...")
print("(Esto puede tardar un poco dependiendo del tama√±o...)")

expresiones_nulos = []
for c in df.columns:
    if c == "Label": continue
    #es null o nan
    expresiones_nulos.append(count(when(col(c).isNull() | isnan(col(c)), c)).alias(c))

df_nulos = df.select(expresiones_nulos)

#verificar errores 
nulos_dict = df_nulos.first().asDict()
columnas_con_nulos = {k: v for k, v in nulos_dict.items() if v > 0}

if not columnas_con_nulos:
    print("‚úÖ EXCELENTE: No se encontraron valores nulos ni NaN en el dataset.")
else:
    print("‚ùå SE ENCONTRARON ERRORES:")
    for col_name, cant in columnas_con_nulos.items():
        print(f"   - {col_name}: {cant} nulos")

#mayormente cero
print("\nüïµÔ∏è  Buscando columnas constantes (in√∫tiles para IA)...")
# desvest
from pyspark.sql.functions import stddev

features_check = [c for c in df.columns if c not in ["Label", "Label_Index"]]
#analizar el 10
df_sample = df.sample(fraction=0.1, seed=42)
resumen_stats = df_sample.select([stddev(c).alias(c) for c in features_check]).first().asDict()

constantes = [c for c, val in resumen_stats.items() if val == 0 or val is None]

if constantes:
    print(f"‚ö†Ô∏è  ADVERTENCIA: Las siguientes {len(constantes)} columnas tienen varianza 0 (son constantes):")
    print(constantes)
    print("   -> Sugerencia: Elim√≠nalas antes de hacer la matriz de correlaci√≥n.")
else:
    print("‚úÖ Todas las columnas parecen tener variaci√≥n de datos.")

üîç Inspeccionando dataset en: /trafico_clean


                                                                                


üìè Dimensiones: 16,232,943 filas x 67 columnas
‚úÖ Estructura Correcta: Todas las features son num√©ricas.

üè∑Ô∏è Generando √çndices de Clase (Por frecuencia)...


25/11/21 22:36:07 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
                                                                                


üìä DISTRIBUCI√ìN DE CLASES (Mapeo e Histograma):


                                                                                

+-----------+------------------------+--------+
|Label_Index|Label                   |count   |
+-----------+------------------------+--------+
|0.0        |Benign                  |13484708|
|1.0        |DDOS attack-HOIC        |686012  |
|2.0        |DDoS attacks-LOIC-HTTP  |576191  |
|3.0        |DoS attacks-Hulk        |461912  |
|4.0        |Bot                     |286191  |
|5.0        |FTP-BruteForce          |193360  |
|6.0        |SSH-Bruteforce          |187589  |
|7.0        |Infilteration           |161934  |
|8.0        |DoS attacks-SlowHTTPTest|139890  |
|9.0        |DoS attacks-GoldenEye   |41508   |
|10.0       |DoS attacks-Slowloris   |10990   |
|11.0       |DDOS attack-LOIC-UDP    |1730    |
|12.0       |Brute Force -Web        |611     |
|13.0       |Brute Force -XSS        |230     |
|14.0       |SQL Injection           |87      |
+-----------+------------------------+--------+


se Buscando valores Nulos o NaN en todas las columnas...
(Esto puede tardar un poco de

25/11/21 22:36:47 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

‚ùå SE ENCONTRARON ERRORES:
   - Flow Byts/s: 59721 nulos

üïµÔ∏è  Buscando columnas constantes (in√∫tiles para IA)...


[Stage 15:>                                                         (0 + 1) / 1]

‚úÖ Todas las columnas parecen tener variaci√≥n de datos.


                                                                                

In [4]:
df = spark.read.parquet("/trafico_clean")
num_particiones = df.rdd.getNumPartitions()
print(f"El dataset est√°: {num_particiones} particiones")

El dataset est√°: 19 particiones


# Eliminar registors nulos encontrados 

In [2]:
from pyspark.sql.functions import col, isnan

# Rutas
RUTA_ORIGINAL = "/trafico_clean"
RUTA_TEMP = "/trafico_clean_temp"

print(f"üîÑ Leyendo datos desde: {RUTA_ORIGINAL}")
df = spark.read.parquet(RUTA_ORIGINAL)
total_inicial = df.count()

#eliminar nans
col_problema = "Flow Byts/s"

print(f"üîç Filtrando registros corruptos en '{col_problema}'...")

# no borrar lo que interesa
df_limpio = df.filter(
    col(col_problema).isNotNull() & (~isnan(col(col_problema)))
)

total_final = df_limpio.count()
borrados = total_inicial - total_final

if borrados == 0:
    print("‚ö†Ô∏è No se encontraron registros para borrar. Verifica el nombre de la columna.")
else:
    print(f"‚úÖ Se detectaron {borrados} registros sucios (NaN/Null).")
    
    #guardar en temporarl 
    print("üíæ Guardando dataset corregido en carpeta temporal...")
    df_limpio.write.mode("overwrite").parquet(RUTA_TEMP)
    
    #reemplazar
    print("üîÑ Reemplazando carpeta original...")
    # borrar viejos nulos 
    subprocess.check_call(f"hdfs dfs -rm -r -skipTrash {RUTA_ORIGINAL}", shell=True)
    #renombrar
    subprocess.check_call(f"hdfs dfs -mv {RUTA_TEMP} {RUTA_ORIGINAL}", shell=True)
    
    print(f"üéâ ¬°Listo! Tu dataset en {RUTA_ORIGINAL} ahora tiene {total_final:,} filas limpias.")

üîÑ Leyendo datos desde: /trafico_clean


                                                                                

üîç Filtrando registros corruptos en 'Flow Byts/s'...


                                                                                

‚úÖ Se detectaron 59721 registros sucios (NaN/Null).
üíæ Guardando dataset corregido en carpeta temporal...


25/11/21 23:02:03 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

üîÑ Reemplazando carpeta original...
Deleted /trafico_clean
üéâ ¬°Listo! Tu dataset en /trafico_clean ahora tiene 16,173,222 filas limpias.


In [1]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan

# --- 1. Configuraci√≥n ---
os.environ['SPARK_HOME'] = "/home/hadoop/spark"
sys.path.insert(0, "/home/hadoop/spark/python")
sys.path.insert(0, "/home/hadoop/spark/python/lib/py4j-0.10.9.7-src.zip") 
sys.path.insert(0, "/home/hadoop/spark/python/lib/pyspark.zip")

spark = SparkSession.builder \
    .appName("Verificacion_Final_Nulos") \
    .master("yarn") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.yarn.executor.memoryOverhead", "1024m") \
    .getOrCreate()

RUTA_DATOS = "/trafico_clean"
print(f"üîç Verificando dataset en: {RUTA_DATOS}")

# Cargar datos
df = spark.read.parquet(RUTA_DATOS)

# ============================================================
# 1. CONTEO TOTAL DE REGISTROS
# ============================================================
total = df.count()
print(f"\n‚úÖ TOTAL DE REGISTROS LIMPIOS: {total:,}")

# ============================================================
# 2. B√öSQUEDA INTENSIVA DE NULOS
# ============================================================
print("Verificando columnas")

exprs = []
for c in df.columns:
    # La columna Label es String, solo verificamos isNull
    if c == "Label":
        condicion = col(c).isNull()
    # Las dem√°s son Double, verificamos isNull O isNaN
    else:
        condicion = col(c).isNull() | isnan(col(c))
    
    exprs.append(count(when(condicion, c)).alias(c))

# Ejecutamos la b√∫squeda
resultados = df.select(exprs).first().asDict()

# Filtramos solo las que tengan errores > 0
errores = {k: v for k, v in resultados.items() if v > 0}

print("-" * 50)
if not errores:
    print("No hay nulos ni NaN en ninguna columna.")
else:
    print("A√∫n quedan valores sucios:")
    for col_name, qty in errores.items():
        print(f"   - {col_name}: {qty} registros malos")
print("-" * 50)

25/11/21 23:13:00 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
25/11/21 23:13:00 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/21 23:13:00 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
25/11/21 23:13:00 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'sp

üîç Verificando dataset en: /trafico_clean


                                                                                


‚úÖ TOTAL DE REGISTROS LIMPIOS: 16,173,222
Verificando columnas


25/11/21 23:14:04 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

--------------------------------------------------
No hay nulos ni NaN en ninguna columna.
--------------------------------------------------


                                                                                

In [None]:
df = spark.read.parquet("/trafico_clean")
df_sample_cache = df.sample(fraction=0.01, seed=42).limit(100000).cache()
print("‚úÖ Datos cargados y muestra en cach√©.")

In [None]:
import pandas as pd
from pyspark.sql.functions import col

print("üîç Iniciando detecci√≥n de columnas duplicadas...")

# 1. OBTENER MUESTRA (Fase de Sospecha)
# Bajamos 50k filas a Pandas. Es r√°pido y suficiente para detectar patrones.
print("    ‚¨áÔ∏è  Bajando muestra para an√°lisis r√°pido...")
df_sample = df.sample(fraction=0.1, seed=42).limit(50000).toPandas()

# 2. IDENTIFICAR CANDIDATOS (L√≥gica en Python/Pandas)
# Transponemos la muestra (ahora las columnas son filas) y buscamos duplicados
print("    üïµÔ∏è  Buscando columnas id√©nticas en la muestra...")
df_T = df_sample.T
duplicated_mask = df_T.duplicated()
columnas_duplicadas_names = df_T[duplicated_mask].index.tolist()

# Ahora necesitamos saber "qui√©n es copia de qui√©n"
# Creamos un mapa: {Columna_Borrable: Columna_Original}
mapa_duplicados = {}
# Iteramos sobre las columnas marcadas como duplicadas
for col_dup in columnas_duplicadas_names:
    # Buscamos la primera columna que tenga los mismos valores que esta
    # (Pandas .duplicated() marca True a partir de la segunda aparici√≥n)
    # Esto es un poco fuerza bruta en local, pero con 50k filas es instant√°neo.
    valores_dup = df_sample[col_dup]
    for col_orig in df_sample.columns:
        if col_orig == col_dup: break # No compararse consigo misma
        if df_sample[col_orig].equals(valores_dup):
            mapa_duplicados[col_dup] = col_orig
            break

print(f"    ‚ö†Ô∏è  Candidatos encontrados en la muestra: {len(mapa_duplicados)}")
for copia, original in mapa_duplicados.items():
    print(f"       Posible copia: '{copia}'  <-- Id√©ntica a --> '{original}'")

# 3. VERIFICACI√ìN BLINDADA (En Spark completo)
# Ahora verificamos si son id√©nticas en los 16 MILLONES de filas, no solo en la muestra.
print("\nüî¨ Verificando exactitud en todo el Big Data (Spark)...")

columnas_a_eliminar_final = []

for copia, original in mapa_duplicados.items():
    # La l√≥gica: Contamos cu√°ntas filas son DIFERENTES. 
    # Si el conteo es 0, son gemelas perfectas.
    # Manejamos nulos con 'eqNullSafe' (igualdad segura ante nulos)
    diferencias = df.filter(col(copia).eqNullSafe(col(original)) == False).count()
    
    if diferencias == 0:
        print(f"    ‚úÖ CONFIRMADO: '{copia}' es duplicada exacta de '{original}'. Se eliminar√°.")
        columnas_a_eliminar_final.append(copia)
    else:
        print(f"    ‚ùå FALSO POSITIVO: '{copia}' y '{original}' difieren en {diferencias} filas. SE CONSERVA.")

# 4. ELIMINACI√ìN
if columnas_a_eliminar_final:
    print(f"\nüóëÔ∏è Eliminando {len(columnas_a_eliminar_final)} columnas redundantes...")
    df_optimizado = df.drop(*columnas_a_eliminar_final)
    
    print(f"    Columnas antes: {len(df.columns)}")
    print(f"    Columnas ahora: {len(df_optimizado.columns)}")
    
    # Opcional: Sobreescribir variable df si quieres seguir us√°ndola
    # df = df_optimizado
    
    # Guardar (Recomendado)
    # df_optimizado.write.mode("overwrite").parquet("/trafico_sin_duplicados")
    # print("    üíæ Dataset optimizado guardado.")
else:
    print("\n‚ú® ¬°Felicidades! No tienes columnas duplicadas exactas.")