In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, lower, when, lit, isnan
from pyspark.sql.types import StringType, IntegerType, DoubleType, FloatType, TimestampType, DateType
import re

# Configurar Spark para Dataproc
spark = SparkSession.builder \
    .appName("LimpiezaTradingCSV") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Configurar nivel de log para reducir output
spark.sparkContext.setLogLevel("WARN")

# RUTAS
ruta_input = "/ProyectoPrincipal/cli_cambios.csv" 
ruta_output = "gs://grupo01-project-sin/processed/cli_cambios_limpio.parquet"

print("Iniciando proceso de limpieza de datos...")
print(f"Archivo de entrada: {ruta_input}")
print(f"Archivo de salida: {ruta_output}")

try:
    # -------------------------------------
    # A. LECTURA DE DATOS
    # -------------------------------------
    df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("multiline", "true") \
        .option("escape", '"') \
        .csv(ruta_input)
    
    print(f"✓ Archivo leído exitosamente")
    print(f"  Dimensiones iniciales: {df.count()} filas x {len(df.columns)} columnas")
    
    # Mostrar esquema inicial
    print("\nEsquema inicial:")
    df.printSchema()
    
    # Estadísticas iniciales
    initial_rows = df.count()
    initial_cols = len(df.columns)
    
    # Contar nulls iniciales de forma simple
    print("\nContando valores nulos iniciales...")
    total_initial_nulls = 0
    null_counts_initial = {}
    
    for column_name, data_type in df.dtypes:
        try:
            null_count = df.filter(col(column_name).isNull()).count()
            null_counts_initial[column_name] = null_count
            total_initial_nulls += null_count
            print(f"  {column_name} ({data_type}): {null_count} nulls")
        except Exception as e:
            print(f"  Warning: Error contando nulls en {column_name}: {e}")
            null_counts_initial[column_name] = 0
    
    print(f"\nTotal de valores nulos iniciales: {total_initial_nulls}")
    
    # -------------------------------------
    # B. PROCESO DE LIMPIEZA 
    # -------------------------------------
    
    print("\n--- Iniciando limpieza ---")
    
    # 1. LIMPIAR NOMBRES DE COLUMNAS
    print("1. Limpiando nombres de columnas...")
    columns_before = df.columns
    
    # Función para limpiar nombres de columnas
    def clean_column_name(col_name):
        clean_name = re.sub(r'[^\w\s]', '', str(col_name).strip().lower())
        clean_name = re.sub(r'\s+', '_', clean_name)
        return clean_name if clean_name else f"col_{hash(col_name) % 1000}"
    
    new_columns = [clean_column_name(c) for c in df.columns]
    
    # Verificar que no haya nombres duplicados
    seen = set()
    final_columns = []
    for i, col_name in enumerate(new_columns):
        original_name = col_name
        counter = 1
        while col_name in seen:
            col_name = f"{original_name}_{counter}"
            counter += 1
        seen.add(col_name)
        final_columns.append(col_name)
    
    df = df.toDF(*final_columns)
    print(f"  ✓ Columnas renombradas: {len(columns_before)} columnas")
    
    # 2. ELIMINAR FILAS COMPLETAMENTE VACÍAS
    print("2. Eliminando filas completamente vacías...")
    try:
        df_before_empty = df.count()
        df = df.na.drop("all")
        df_after_empty = df.count()
        empty_rows_removed = df_before_empty - df_after_empty
        print(f"  ✓ Filas vacías eliminadas: {empty_rows_removed}")
    except Exception as e:
        print(f"  Warning: Error eliminando filas vacías: {e}")
        empty_rows_removed = 0
    
    # 3. ELIMINAR DUPLICADOS
    print("3. Eliminando duplicados...")
    try:
        rows_before_dedup = df.count()
        df = df.dropDuplicates()
        rows_after_dedup = df.count()
        duplicate_rows_removed = rows_before_dedup - rows_after_dedup
        print(f"  ✓ Filas duplicadas eliminadas: {duplicate_rows_removed}")
    except Exception as e:
        print(f"  Warning: Error eliminando duplicados: {e}")
        duplicate_rows_removed = 0
    
    # 4. LIMPIAR DATOS POR TIPO DE COLUMNA
    print("4. Limpiando datos por tipo de columna...")
    
    # Valores a considerar como nulos para strings
    invalid_vals = ["", " ", "na", "n/a", "null", "none", "nan", "undefined", "nil", "-", "NULL", "None"]
    
    # Obtener tipos actualizados después del renombrado
    current_dtypes = df.dtypes
    
    # Procesar cada columna según su tipo
    for column_name, data_type in current_dtypes:
        try:
            print(f"  Procesando columna '{column_name}' (tipo: {data_type})")
            
            if data_type == "string":
                # Limpiar columnas de texto
                df = df.withColumn(column_name, 
                                 when(col(column_name).isNull(), lit(None))
                                 .otherwise(trim(lower(col(column_name)))))
                
                # Reemplazar valores inválidos por null
                df = df.withColumn(
                    column_name,
                    when(col(column_name).isin(invalid_vals), lit(None))
                    .otherwise(col(column_name))
                )
                
            elif data_type in ["double", "float"]:
                # Limpiar columnas numéricas decimales
                df = df.withColumn(
                    column_name,
                    when(col(column_name).isNull() | isnan(col(column_name)), lit(None))
                    .otherwise(col(column_name))
                )
                
        except Exception as e:
            print(f"    Warning: Error procesando columna {column_name}: {e}")
            continue
    
    print(f"  ✓ Procesamiento de columnas completado")
    
    # 5. ELIMINAR COLUMNAS COMPLETAMENTE VACÍAS (opcional)
    print("5. Verificando columnas completamente vacías...")
    columns_to_drop = []
    
    try:
        current_row_count = df.count()
        for column_name in df.columns:
            null_count = df.filter(col(column_name).isNull()).count()
            if null_count == current_row_count and current_row_count > 0:
                columns_to_drop.append(column_name)
        
        if columns_to_drop:
            df = df.drop(*columns_to_drop)
            print(f"  ✓ Columnas completamente vacías eliminadas: {columns_to_drop}")
        else:
            print(f"  ✓ No hay columnas completamente vacías")
    except Exception as e:
        print(f"  Warning: Error verificando columnas vacías: {e}")
        columns_to_drop = []
    
    # -------------------------------------
    # C. ESTADÍSTICAS FINALES
    # -------------------------------------
    final_rows = df.count()
    final_cols = len(df.columns)
    
    # Contar nulls finales de forma simple
    print("\nContando valores nulos finales...")
    total_final_nulls = 0
    null_counts_final = {}
    
    for column_name in df.columns:
        try:
            null_count = df.filter(col(column_name).isNull()).count()
            null_counts_final[column_name] = null_count
            total_final_nulls += null_count
        except Exception as e:
            print(f"  Warning: Error contando nulls finales en {column_name}: {e}")
            null_counts_final[column_name] = 0
    
    print(f"Total de valores nulos finales: {total_final_nulls}")
    
    # -------------------------------------
    # D. GUARDAR RESULTADO
    # -------------------------------------
    print("\n--- Guardando resultado ---")
    
    try:
        # Intentar guardar con coalesce
        df.coalesce(1).write \
            .mode("overwrite") \
            .option("compression", "snappy") \
            .parquet(ruta_output)
        
        print(f"✓ Archivo guardado exitosamente en: {ruta_output}")
        
    except Exception as e:
        print(f"Warning: Error con coalesce, intentando sin coalesce: {e}")
        try:
            # Intentar guardar sin coalesce
            df.write \
                .mode("overwrite") \
                .option("compression", "snappy") \
                .parquet(ruta_output)
            print(f" Archivo guardado (sin coalesce) en: {ruta_output}")
        except Exception as e2:
            print(f" Error guardando archivo: {e2}")
            raise e2
    
    # -------------------------------------
    # E. REPORTE FINAL
    # -------------------------------------
    print("\n" + "="*60)
    print("           REPORTE DE LIMPIEZA - PYSPARK")
    print("="*60)
    print(f"Archivo original: {ruta_input}")
    print(f"Archivo limpio: {ruta_output}")
    print("-"*60)
    print("ESTADÍSTICAS:")
    print(f"  Filas iniciales:           {initial_rows:,}")
    print(f"  Filas finales:             {final_rows:,}")
    print(f"  Filas eliminadas:          {initial_rows - final_rows:,}")
    print(f"  Filas vacías eliminadas:   {empty_rows_removed:,}")
    print(f"  Filas duplicadas elim.:    {duplicate_rows_removed:,}")
    print()
    print(f"  Columnas iniciales:        {initial_cols}")
    print(f"  Columnas finales:          {final_cols}")
    print(f"  Columnas eliminadas:       {len(columns_to_drop)}")
    print()
    print(f"  Valores nulos iniciales:   {total_initial_nulls:,}")
    print(f"  Valores nulos finales:     {total_final_nulls:,}")
    print(f"  Valores nulos limpiados:   {max(0, total_initial_nulls - total_final_nulls):,}")
    print("-"*60)
    
    # Mostrar detalle de nulls por columna (solo las que cambiaron)
    print("CAMBIOS EN VALORES NULOS POR COLUMNA:")
    for column_name in df.columns:
        initial = null_counts_initial.get(column_name, 0)
        final = null_counts_final.get(column_name, 0)
        if initial != final:
            print(f"  {column_name}: {initial} → {final}")
    
    # Mostrar esquema final
    try:
        print("\nESQUEMA FINAL:")
        df.printSchema()
    except Exception as e:
        print(f"No se pudo mostrar el esquema: {e}")
    
    # Mostrar muestra de datos
    try:
        print("MUESTRA DE DATOS LIMPIOS (primeras 5 filas):")
        df.show(5, truncate=False)
    except Exception as e:
        print(f"No se pudo mostrar muestra de datos: {e}")
    
    print("="*60)
    print(" LIMPIEZA COMPLETADA EXITOSAMENTE")
    print("="*60)

except Exception as e:
    print(f" Error durante el proceso de limpieza: {str(e)}")
    print("Detalles del error:")
    import traceback
    traceback.print_exc()

finally:
    # Cerrar Spark session
    try:
        spark.stop()
        print("\n Sesión de Spark cerrada")
    except:
        print("\n Sesión cerrada")

25/11/17 15:31:26 INFO SparkEnv: Registering MapOutputTracker
25/11/17 15:31:26 INFO SparkEnv: Registering BlockManagerMaster
25/11/17 15:31:26 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/11/17 15:31:26 INFO SparkEnv: Registering OutputCommitCoordinator


Iniciando proceso de limpieza de datos...
Archivo de entrada: /ProyectoPrincipal/cli_cambios.csv
Archivo de salida: gs://grupo01-project-sin/processed/cli_cambios_limpio.parquet


[Stage 1:>                                                          (0 + 1) / 1]

✓ Archivo leído exitosamente


                                                                                

  Dimensiones iniciales: 5163 filas x 7 columnas

Esquema inicial:
root
 |-- fecha: timestamp (nullable = true)
 |-- codigo unico: integer (nullable = true)
 |-- RUC: integer (nullable = true)
 |-- Nombre: string (nullable = true)
 |-- TIPO: string (nullable = true)
 |-- Utilidad: double (nullable = true)
 |-- VOL_USD: double (nullable = true)



                                                                                


Contando valores nulos iniciales...
  fecha (timestamp): 0 nulls
  codigo unico (int): 0 nulls
  RUC (int): 0 nulls
  Nombre (string): 0 nulls
  TIPO (string): 0 nulls
  Utilidad (double): 0 nulls
  VOL_USD (double): 0 nulls

Total de valores nulos iniciales: 0

--- Iniciando limpieza ---
1. Limpiando nombres de columnas...
  ✓ Columnas renombradas: 7 columnas
2. Eliminando filas completamente vacías...
  ✓ Filas vacías eliminadas: 0
3. Eliminando duplicados...


[Stage 38:>                                                         (0 + 1) / 1]

  ✓ Filas duplicadas eliminadas: 0
4. Limpiando datos por tipo de columna...
  Procesando columna 'fecha' (tipo: timestamp)
  Procesando columna 'codigo_unico' (tipo: int)
  Procesando columna 'ruc' (tipo: int)
  Procesando columna 'nombre' (tipo: string)


                                                                                

  Procesando columna 'tipo' (tipo: string)
  Procesando columna 'utilidad' (tipo: double)
  Procesando columna 'vol_usd' (tipo: double)
  ✓ Procesamiento de columnas completado
5. Verificando columnas completamente vacías...


                                                                                

  ✓ No hay columnas completamente vacías

Contando valores nulos finales...
Total de valores nulos finales: 0

--- Guardando resultado ---


                                                                                

✓ Archivo guardado exitosamente en: gs://grupo01-project-sin/processed/cli_cambios_limpio.parquet

           REPORTE DE LIMPIEZA - PYSPARK
Archivo original: /ProyectoPrincipal/cli_cambios.csv
Archivo limpio: gs://grupo01-project-sin/processed/cli_cambios_limpio.parquet
------------------------------------------------------------
ESTADÍSTICAS:
  Filas iniciales:           5,163
  Filas finales:             5,163
  Filas eliminadas:          0
  Filas vacías eliminadas:   0
  Filas duplicadas elim.:    0

  Columnas iniciales:        7
  Columnas finales:          7
  Columnas eliminadas:       0

  Valores nulos iniciales:   0
  Valores nulos finales:     0
  Valores nulos limpiados:   0
------------------------------------------------------------
CAMBIOS EN VALORES NULOS POR COLUMNA:

ESQUEMA FINAL:
root
 |-- fecha: timestamp (nullable = true)
 |-- codigo_unico: integer (nullable = true)
 |-- ruc: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- tipo: string (nulla

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, lower, when, lit, isnan
from pyspark.sql.types import StringType, IntegerType, DoubleType, FloatType, TimestampType, DateType
import re

# Configurar Spark para Dataproc
spark = SparkSession.builder \
    .appName("LimpiezaTradingCSV") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Configurar nivel de log para reducir output
spark.sparkContext.setLogLevel("WARN")

# RUTAS
ruta_input = "/ProyectoPrincipal/registro_comunicaciones_new_v2.csv" 
ruta_output = "gs://grupo01-project-sin/processed/registro_comunicaciones_limpio.parquet"

print("Iniciando proceso de limpieza de datos...")
print(f"Archivo de entrada: {ruta_input}")
print(f"Archivo de salida: {ruta_output}")

try:
    # -------------------------------------
    # A. LECTURA DE DATOS
    # -------------------------------------
    df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("multiline", "true") \
        .option("escape", '"') \
        .csv(ruta_input)
    
    print(f"✓ Archivo leído exitosamente")
    print(f"  Dimensiones iniciales: {df.count()} filas x {len(df.columns)} columnas")
    
    # Mostrar esquema inicial
    print("\nEsquema inicial:")
    df.printSchema()
    
    # Estadísticas iniciales
    initial_rows = df.count()
    initial_cols = len(df.columns)
    
    # Contar nulls iniciales de forma simple
    print("\nContando valores nulos iniciales...")
    total_initial_nulls = 0
    null_counts_initial = {}
    
    for column_name, data_type in df.dtypes:
        try:
            null_count = df.filter(col(column_name).isNull()).count()
            null_counts_initial[column_name] = null_count
            total_initial_nulls += null_count
            print(f"  {column_name} ({data_type}): {null_count} nulls")
        except Exception as e:
            print(f"  Warning: Error contando nulls en {column_name}: {e}")
            null_counts_initial[column_name] = 0
    
    print(f"\nTotal de valores nulos iniciales: {total_initial_nulls}")
    
    # -------------------------------------
    # B. PROCESO DE LIMPIEZA 
    # -------------------------------------
    
    print("\n--- Iniciando limpieza ---")
    
    # 1. LIMPIAR NOMBRES DE COLUMNAS
    print("1. Limpiando nombres de columnas...")
    columns_before = df.columns
    
    # Función para limpiar nombres de columnas
    def clean_column_name(col_name):
        clean_name = re.sub(r'[^\w\s]', '', str(col_name).strip().lower())
        clean_name = re.sub(r'\s+', '_', clean_name)
        return clean_name if clean_name else f"col_{hash(col_name) % 1000}"
    
    new_columns = [clean_column_name(c) for c in df.columns]
    
    # Verificar que no haya nombres duplicados
    seen = set()
    final_columns = []
    for i, col_name in enumerate(new_columns):
        original_name = col_name
        counter = 1
        while col_name in seen:
            col_name = f"{original_name}_{counter}"
            counter += 1
        seen.add(col_name)
        final_columns.append(col_name)
    
    df = df.toDF(*final_columns)
    print(f"  ✓ Columnas renombradas: {len(columns_before)} columnas")
    
    # 2. ELIMINAR FILAS COMPLETAMENTE VACÍAS
    print("2. Eliminando filas completamente vacías...")
    try:
        df_before_empty = df.count()
        df = df.na.drop("all")
        df_after_empty = df.count()
        empty_rows_removed = df_before_empty - df_after_empty
        print(f"  ✓ Filas vacías eliminadas: {empty_rows_removed}")
    except Exception as e:
        print(f"  Warning: Error eliminando filas vacías: {e}")
        empty_rows_removed = 0
    
    # 3. ELIMINAR DUPLICADOS
    print("3. Eliminando duplicados...")
    try:
        rows_before_dedup = df.count()
        df = df.dropDuplicates()
        rows_after_dedup = df.count()
        duplicate_rows_removed = rows_before_dedup - rows_after_dedup
        print(f"  ✓ Filas duplicadas eliminadas: {duplicate_rows_removed}")
    except Exception as e:
        print(f"  Warning: Error eliminando duplicados: {e}")
        duplicate_rows_removed = 0
    
    # 4. LIMPIAR DATOS POR TIPO DE COLUMNA
    print("4. Limpiando datos por tipo de columna...")
    
    # Valores a considerar como nulos para strings
    invalid_vals = ["", " ", "na", "n/a", "null", "none", "nan", "undefined", "nil", "-", "NULL", "None"]
    
    # Obtener tipos actualizados después del renombrado
    current_dtypes = df.dtypes
    
    # Procesar cada columna según su tipo
    for column_name, data_type in current_dtypes:
        try:
            print(f"  Procesando columna '{column_name}' (tipo: {data_type})")
            
            if data_type == "string":
                # Limpiar columnas de texto
                df = df.withColumn(column_name, 
                                 when(col(column_name).isNull(), lit(None))
                                 .otherwise(trim(lower(col(column_name)))))
                
                # Reemplazar valores inválidos por null
                df = df.withColumn(
                    column_name,
                    when(col(column_name).isin(invalid_vals), lit(None))
                    .otherwise(col(column_name))
                )
                
            elif data_type in ["double", "float"]:
                # Limpiar columnas numéricas decimales
                df = df.withColumn(
                    column_name,
                    when(col(column_name).isNull() | isnan(col(column_name)), lit(None))
                    .otherwise(col(column_name))
                )
                
        except Exception as e:
            print(f"    Warning: Error procesando columna {column_name}: {e}")
            continue
    
    print(f"  ✓ Procesamiento de columnas completado")
    
    # 5. ELIMINAR COLUMNAS COMPLETAMENTE VACÍAS (opcional)
    print("5. Verificando columnas completamente vacías...")
    columns_to_drop = []
    
    try:
        current_row_count = df.count()
        for column_name in df.columns:
            null_count = df.filter(col(column_name).isNull()).count()
            if null_count == current_row_count and current_row_count > 0:
                columns_to_drop.append(column_name)
        
        if columns_to_drop:
            df = df.drop(*columns_to_drop)
            print(f"  ✓ Columnas completamente vacías eliminadas: {columns_to_drop}")
        else:
            print(f"  ✓ No hay columnas completamente vacías")
    except Exception as e:
        print(f"  Warning: Error verificando columnas vacías: {e}")
        columns_to_drop = []
    
    # -------------------------------------
    # C. ESTADÍSTICAS FINALES
    # -------------------------------------
    final_rows = df.count()
    final_cols = len(df.columns)
    
    # Contar nulls finales de forma simple
    print("\nContando valores nulos finales...")
    total_final_nulls = 0
    null_counts_final = {}
    
    for column_name in df.columns:
        try:
            null_count = df.filter(col(column_name).isNull()).count()
            null_counts_final[column_name] = null_count
            total_final_nulls += null_count
        except Exception as e:
            print(f"  Warning: Error contando nulls finales en {column_name}: {e}")
            null_counts_final[column_name] = 0
    
    print(f"Total de valores nulos finales: {total_final_nulls}")
    
    # -------------------------------------
    # D. GUARDAR RESULTADO
    # -------------------------------------
    print("\n--- Guardando resultado ---")
    
    try:
        # Intentar guardar con coalesce
        df.coalesce(1).write \
            .mode("overwrite") \
            .option("compression", "snappy") \
            .parquet(ruta_output)
        
        print(f"✓ Archivo guardado exitosamente en: {ruta_output}")
        
    except Exception as e:
        print(f"Warning: Error con coalesce, intentando sin coalesce: {e}")
        try:
            # Intentar guardar sin coalesce
            df.write \
                .mode("overwrite") \
                .option("compression", "snappy") \
                .parquet(ruta_output)
            print(f" Archivo guardado (sin coalesce) en: {ruta_output}")
        except Exception as e2:
            print(f" Error guardando archivo: {e2}")
            raise e2
    
    # -------------------------------------
    # E. REPORTE FINAL
    # -------------------------------------
    print("\n" + "="*60)
    print("           REPORTE DE LIMPIEZA - PYSPARK")
    print("="*60)
    print(f"Archivo original: {ruta_input}")
    print(f"Archivo limpio: {ruta_output}")
    print("-"*60)
    print("ESTADÍSTICAS:")
    print(f"  Filas iniciales:           {initial_rows:,}")
    print(f"  Filas finales:             {final_rows:,}")
    print(f"  Filas eliminadas:          {initial_rows - final_rows:,}")
    print(f"  Filas vacías eliminadas:   {empty_rows_removed:,}")
    print(f"  Filas duplicadas elim.:    {duplicate_rows_removed:,}")
    print()
    print(f"  Columnas iniciales:        {initial_cols}")
    print(f"  Columnas finales:          {final_cols}")
    print(f"  Columnas eliminadas:       {len(columns_to_drop)}")
    print()
    print(f"  Valores nulos iniciales:   {total_initial_nulls:,}")
    print(f"  Valores nulos finales:     {total_final_nulls:,}")
    print(f"  Valores nulos limpiados:   {max(0, total_initial_nulls - total_final_nulls):,}")
    print("-"*60)
    
    # Mostrar detalle de nulls por columna (solo las que cambiaron)
    print("CAMBIOS EN VALORES NULOS POR COLUMNA:")
    for column_name in df.columns:
        initial = null_counts_initial.get(column_name, 0)
        final = null_counts_final.get(column_name, 0)
        if initial != final:
            print(f"  {column_name}: {initial} → {final}")
    
    # Mostrar esquema final
    try:
        print("\nESQUEMA FINAL:")
        df.printSchema()
    except Exception as e:
        print(f"No se pudo mostrar el esquema: {e}")
    
    # Mostrar muestra de datos
    try:
        print("MUESTRA DE DATOS LIMPIOS (primeras 5 filas):")
        df.show(5, truncate=False)
    except Exception as e:
        print(f"No se pudo mostrar muestra de datos: {e}")
    
    print("="*60)
    print("✓ LIMPIEZA COMPLETADA EXITOSAMENTE")
    print("="*60)

except Exception as e:
    print(f" Error durante el proceso de limpieza: {str(e)}")
    print("Detalles del error:")
    import traceback
    traceback.print_exc()

finally:
    # Cerrar Spark session
    try:
        spark.stop()
        print("\n Sesión de Spark cerrada")
    except:
        print("\n Sesión cerrada")

25/11/17 15:54:12 INFO SparkEnv: Registering MapOutputTracker
25/11/17 15:54:12 INFO SparkEnv: Registering BlockManagerMaster
25/11/17 15:54:12 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/11/17 15:54:12 INFO SparkEnv: Registering OutputCommitCoordinator


Iniciando proceso de limpieza de datos...
Archivo de entrada: /ProyectoPrincipal/registro_comunicaciones_new_v2.csv
Archivo de salida: gs://grupo01-project-sin/processed/registro_comunicaciones_limpio.parquet


                                                                                

✓ Archivo leído exitosamente


                                                                                

  Dimensiones iniciales: 518 filas x 6 columnas

Esquema inicial:
root
 |-- Tipo Cliente: string (nullable = true)
 |-- Especificacion: string (nullable = true)
 |-- Cantidad: integer (nullable = true)
 |-- Dia: timestamp (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Card: string (nullable = true)



                                                                                


Contando valores nulos iniciales...
  Tipo Cliente (string): 0 nulls
  Especificacion (string): 0 nulls
  Cantidad (int): 0 nulls
  Dia (timestamp): 0 nulls
  Estado (string): 0 nulls
  Card (string): 0 nulls

Total de valores nulos iniciales: 0

--- Iniciando limpieza ---
1. Limpiando nombres de columnas...
  ✓ Columnas renombradas: 6 columnas
2. Eliminando filas completamente vacías...
  ✓ Filas vacías eliminadas: 0
3. Eliminando duplicados...


                                                                                

  ✓ Filas duplicadas eliminadas: 0
4. Limpiando datos por tipo de columna...
  Procesando columna 'tipo_cliente' (tipo: string)
  Procesando columna 'especificacion' (tipo: string)
  Procesando columna 'cantidad' (tipo: int)
  Procesando columna 'dia' (tipo: timestamp)
  Procesando columna 'estado' (tipo: string)
  Procesando columna 'card' (tipo: string)
  ✓ Procesamiento de columnas completado
5. Verificando columnas completamente vacías...


                                                                                

  ✓ No hay columnas completamente vacías

Contando valores nulos finales...
Total de valores nulos finales: 0

--- Guardando resultado ---


                                                                                

✓ Archivo guardado exitosamente en: gs://grupo01-project-sin/processed/registro_comunicaciones_limpio.parquet

           REPORTE DE LIMPIEZA - PYSPARK
Archivo original: /ProyectoPrincipal/registro_comunicaciones_new_v2.csv
Archivo limpio: gs://grupo01-project-sin/processed/registro_comunicaciones_limpio.parquet
------------------------------------------------------------
ESTADÍSTICAS:
  Filas iniciales:           518
  Filas finales:             518
  Filas eliminadas:          0
  Filas vacías eliminadas:   0
  Filas duplicadas elim.:    0

  Columnas iniciales:        6
  Columnas finales:          6
  Columnas eliminadas:       0

  Valores nulos iniciales:   0
  Valores nulos finales:     0
  Valores nulos limpiados:   0
------------------------------------------------------------
CAMBIOS EN VALORES NULOS POR COLUMNA:

ESQUEMA FINAL:
root
 |-- tipo_cliente: string (nullable = true)
 |-- especificacion: string (nullable = true)
 |-- cantidad: integer (nullable = true)
 |-- dia: tim

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, lower, when, lit, isnan
from pyspark.sql.types import StringType, IntegerType, DoubleType, FloatType, TimestampType, DateType
import re

# Configurar Spark para Dataproc
spark = SparkSession.builder \
    .appName("LimpiezaTradingCSV") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Configurar nivel de log para reducir output
spark.sparkContext.setLogLevel("WARN")

# RUTAS
ruta_input = "/ProyectoPrincipal/segmentos_v2.csv" 
ruta_output = "gs://grupo01-project-sin/processed/segmentos_limpio.parquet"

print("Iniciando proceso de limpieza de datos...")
print(f"Archivo de entrada: {ruta_input}")
print(f"Archivo de salida: {ruta_output}")

try:
    # -------------------------------------
    # A. LECTURA DE DATOS
    # -------------------------------------
    df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("multiline", "true") \
        .option("escape", '"') \
        .csv(ruta_input)
    
    print(f"✓ Archivo leído exitosamente")
    print(f"  Dimensiones iniciales: {df.count()} filas x {len(df.columns)} columnas")
    
    # Mostrar esquema inicial
    print("\nEsquema inicial:")
    df.printSchema()
    
    # Estadísticas iniciales
    initial_rows = df.count()
    initial_cols = len(df.columns)
    
    # Contar nulls iniciales de forma simple
    print("\nContando valores nulos iniciales...")
    total_initial_nulls = 0
    null_counts_initial = {}
    
    for column_name, data_type in df.dtypes:
        try:
            null_count = df.filter(col(column_name).isNull()).count()
            null_counts_initial[column_name] = null_count
            total_initial_nulls += null_count
            print(f"  {column_name} ({data_type}): {null_count} nulls")
        except Exception as e:
            print(f"  Warning: Error contando nulls en {column_name}: {e}")
            null_counts_initial[column_name] = 0
    
    print(f"\nTotal de valores nulos iniciales: {total_initial_nulls}")
    
    # -------------------------------------
    # B. PROCESO DE LIMPIEZA 
    # -------------------------------------
    
    print("\n--- Iniciando limpieza ---")
    
    # 1. LIMPIAR NOMBRES DE COLUMNAS
    print("1. Limpiando nombres de columnas...")
    columns_before = df.columns
    
    # Función para limpiar nombres de columnas
    def clean_column_name(col_name):
        clean_name = re.sub(r'[^\w\s]', '', str(col_name).strip().lower())
        clean_name = re.sub(r'\s+', '_', clean_name)
        return clean_name if clean_name else f"col_{hash(col_name) % 1000}"
    
    new_columns = [clean_column_name(c) for c in df.columns]
    
    # Verificar que no haya nombres duplicados
    seen = set()
    final_columns = []
    for i, col_name in enumerate(new_columns):
        original_name = col_name
        counter = 1
        while col_name in seen:
            col_name = f"{original_name}_{counter}"
            counter += 1
        seen.add(col_name)
        final_columns.append(col_name)
    
    df = df.toDF(*final_columns)
    print(f"  ✓ Columnas renombradas: {len(columns_before)} columnas")
    
    # 2. ELIMINAR FILAS COMPLETAMENTE VACÍAS
    print("2. Eliminando filas completamente vacías...")
    try:
        df_before_empty = df.count()
        df = df.na.drop("all")
        df_after_empty = df.count()
        empty_rows_removed = df_before_empty - df_after_empty
        print(f"  ✓ Filas vacías eliminadas: {empty_rows_removed}")
    except Exception as e:
        print(f"  Warning: Error eliminando filas vacías: {e}")
        empty_rows_removed = 0
    
    # 3. ELIMINAR DUPLICADOS
    print("3. Eliminando duplicados...")
    try:
        rows_before_dedup = df.count()
        df = df.dropDuplicates()
        rows_after_dedup = df.count()
        duplicate_rows_removed = rows_before_dedup - rows_after_dedup
        print(f"  ✓ Filas duplicadas eliminadas: {duplicate_rows_removed}")
    except Exception as e:
        print(f"  Warning: Error eliminando duplicados: {e}")
        duplicate_rows_removed = 0
    
    # 4. LIMPIAR DATOS POR TIPO DE COLUMNA
    print("4. Limpiando datos por tipo de columna...")
    
    # Valores a considerar como nulos para strings
    invalid_vals = ["", " ", "na", "n/a", "null", "none", "nan", "undefined", "nil", "-", "NULL", "None"]
    
    # Obtener tipos actualizados después del renombrado
    current_dtypes = df.dtypes
    
    # Procesar cada columna según su tipo
    for column_name, data_type in current_dtypes:
        try:
            print(f"  Procesando columna '{column_name}' (tipo: {data_type})")
            
            if data_type == "string":
                # Limpiar columnas de texto
                df = df.withColumn(column_name, 
                                 when(col(column_name).isNull(), lit(None))
                                 .otherwise(trim(lower(col(column_name)))))
                
                # Reemplazar valores inválidos por null
                df = df.withColumn(
                    column_name,
                    when(col(column_name).isin(invalid_vals), lit(None))
                    .otherwise(col(column_name))
                )
                
            elif data_type in ["double", "float"]:
                # Limpiar columnas numéricas decimales
                df = df.withColumn(
                    column_name,
                    when(col(column_name).isNull() | isnan(col(column_name)), lit(None))
                    .otherwise(col(column_name))
                )
                
        except Exception as e:
            print(f"    Warning: Error procesando columna {column_name}: {e}")
            continue
    
    print(f"  ✓ Procesamiento de columnas completado")
    
    # 5. ELIMINAR COLUMNAS COMPLETAMENTE VACÍAS (opcional)
    print("5. Verificando columnas completamente vacías...")
    columns_to_drop = []
    
    try:
        current_row_count = df.count()
        for column_name in df.columns:
            null_count = df.filter(col(column_name).isNull()).count()
            if null_count == current_row_count and current_row_count > 0:
                columns_to_drop.append(column_name)
        
        if columns_to_drop:
            df = df.drop(*columns_to_drop)
            print(f"  ✓ Columnas completamente vacías eliminadas: {columns_to_drop}")
        else:
            print(f"  ✓ No hay columnas completamente vacías")
    except Exception as e:
        print(f"  Warning: Error verificando columnas vacías: {e}")
        columns_to_drop = []
    
    # -------------------------------------
    # C. ESTADÍSTICAS FINALES
    # -------------------------------------
    final_rows = df.count()
    final_cols = len(df.columns)
    
    # Contar nulls finales de forma simple
    print("\nContando valores nulos finales...")
    total_final_nulls = 0
    null_counts_final = {}
    
    for column_name in df.columns:
        try:
            null_count = df.filter(col(column_name).isNull()).count()
            null_counts_final[column_name] = null_count
            total_final_nulls += null_count
        except Exception as e:
            print(f"  Warning: Error contando nulls finales en {column_name}: {e}")
            null_counts_final[column_name] = 0
    
    print(f"Total de valores nulos finales: {total_final_nulls}")
    
    # -------------------------------------
    # D. GUARDAR RESULTADO
    # -------------------------------------
    print("\n--- Guardando resultado ---")
    
    try:
        # Intentar guardar con coalesce
        df.coalesce(1).write \
            .mode("overwrite") \
            .option("compression", "snappy") \
            .parquet(ruta_output)
        
        print(f"✓ Archivo guardado exitosamente en: {ruta_output}")
        
    except Exception as e:
        print(f"Warning: Error con coalesce, intentando sin coalesce: {e}")
        try:
            # Intentar guardar sin coalesce
            df.write \
                .mode("overwrite") \
                .option("compression", "snappy") \
                .parquet(ruta_output)
            print(f" Archivo guardado (sin coalesce) en: {ruta_output}")
        except Exception as e2:
            print(f" Error guardando archivo: {e2}")
            raise e2
    
    # -------------------------------------
    # E. REPORTE FINAL
    # -------------------------------------
    print("\n" + "="*60)
    print("           REPORTE DE LIMPIEZA - PYSPARK")
    print("="*60)
    print(f"Archivo original: {ruta_input}")
    print(f"Archivo limpio: {ruta_output}")
    print("-"*60)
    print("ESTADÍSTICAS:")
    print(f"  Filas iniciales:           {initial_rows:,}")
    print(f"  Filas finales:             {final_rows:,}")
    print(f"  Filas eliminadas:          {initial_rows - final_rows:,}")
    print(f"  Filas vacías eliminadas:   {empty_rows_removed:,}")
    print(f"  Filas duplicadas elim.:    {duplicate_rows_removed:,}")
    print()
    print(f"  Columnas iniciales:        {initial_cols}")
    print(f"  Columnas finales:          {final_cols}")
    print(f"  Columnas eliminadas:       {len(columns_to_drop)}")
    print()
    print(f"  Valores nulos iniciales:   {total_initial_nulls:,}")
    print(f"  Valores nulos finales:     {total_final_nulls:,}")
    print(f"  Valores nulos limpiados:   {max(0, total_initial_nulls - total_final_nulls):,}")
    print("-"*60)
    
    # Mostrar detalle de nulls por columna (solo las que cambiaron)
    print("CAMBIOS EN VALORES NULOS POR COLUMNA:")
    for column_name in df.columns:
        initial = null_counts_initial.get(column_name, 0)
        final = null_counts_final.get(column_name, 0)
        if initial != final:
            print(f"  {column_name}: {initial} → {final}")
    
    # Mostrar esquema final
    try:
        print("\nESQUEMA FINAL:")
        df.printSchema()
    except Exception as e:
        print(f"No se pudo mostrar el esquema: {e}")
    
    # Mostrar muestra de datos
    try:
        print("MUESTRA DE DATOS LIMPIOS (primeras 5 filas):")
        df.show(5, truncate=False)
    except Exception as e:
        print(f"No se pudo mostrar muestra de datos: {e}")
    
    print("="*60)
    print("✓ LIMPIEZA COMPLETADA EXITOSAMENTE")
    print("="*60)

except Exception as e:
    print(f" Error durante el proceso de limpieza: {str(e)}")
    print("Detalles del error:")
    import traceback
    traceback.print_exc()

finally:
    # Cerrar Spark session
    try:
        spark.stop()
        print("\n Sesión de Spark cerrada")
    except:
        print("\n Sesión cerrada")

25/11/17 15:56:28 INFO SparkEnv: Registering MapOutputTracker
25/11/17 15:56:28 INFO SparkEnv: Registering BlockManagerMaster
25/11/17 15:56:28 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/11/17 15:56:28 INFO SparkEnv: Registering OutputCommitCoordinator


Iniciando proceso de limpieza de datos...
Archivo de entrada: /ProyectoPrincipal/segmentos_v2.csv
Archivo de salida: gs://grupo01-project-sin/processed/segmentos_limpio.parquet


                                                                                

✓ Archivo leído exitosamente


                                                                                

  Dimensiones iniciales: 12239 filas x 10 columnas

Esquema inicial:
root
 |-- Periodo: integer (nullable = true)
 |-- Codigo Unico: integer (nullable = true)
 |-- Ruc: integer (nullable = true)
 |-- Jefe: string (nullable = true)
 |-- Ejecutivo: string (nullable = true)
 |-- Segmento FX: string (nullable = true)
 |-- Tipo Cuenta: string (nullable = true)
 |-- Logeo: integer (nullable = true)
 |-- Num Logeos: integer (nullable = true)
 |-- Flg Dig: integer (nullable = true)


Contando valores nulos iniciales...
  Periodo (int): 0 nulls


                                                                                

  Codigo Unico (int): 0 nulls
  Ruc (int): 0 nulls
  Jefe (string): 0 nulls
  Ejecutivo (string): 0 nulls
  Segmento FX (string): 0 nulls
  Tipo Cuenta (string): 0 nulls
  Logeo (int): 0 nulls
  Num Logeos (int): 0 nulls
  Flg Dig (int): 0 nulls

Total de valores nulos iniciales: 0

--- Iniciando limpieza ---
1. Limpiando nombres de columnas...
  ✓ Columnas renombradas: 10 columnas
2. Eliminando filas completamente vacías...
  ✓ Filas vacías eliminadas: 0
3. Eliminando duplicados...


                                                                                

  ✓ Filas duplicadas eliminadas: 0
4. Limpiando datos por tipo de columna...
  Procesando columna 'periodo' (tipo: int)
  Procesando columna 'codigo_unico' (tipo: int)
  Procesando columna 'ruc' (tipo: int)
  Procesando columna 'jefe' (tipo: string)
  Procesando columna 'ejecutivo' (tipo: string)
  Procesando columna 'segmento_fx' (tipo: string)
  Procesando columna 'tipo_cuenta' (tipo: string)
  Procesando columna 'logeo' (tipo: int)
  Procesando columna 'num_logeos' (tipo: int)
  Procesando columna 'flg_dig' (tipo: int)
  ✓ Procesamiento de columnas completado
5. Verificando columnas completamente vacías...


                                                                                

  ✓ No hay columnas completamente vacías

Contando valores nulos finales...
Total de valores nulos finales: 0

--- Guardando resultado ---


                                                                                

✓ Archivo guardado exitosamente en: gs://grupo01-project-sin/processed/segmentos_limpio.parquet

           REPORTE DE LIMPIEZA - PYSPARK
Archivo original: /ProyectoPrincipal/segmentos_v2.csv
Archivo limpio: gs://grupo01-project-sin/processed/segmentos_limpio.parquet
------------------------------------------------------------
ESTADÍSTICAS:
  Filas iniciales:           12,239
  Filas finales:             12,239
  Filas eliminadas:          0
  Filas vacías eliminadas:   0
  Filas duplicadas elim.:    0

  Columnas iniciales:        10
  Columnas finales:          10
  Columnas eliminadas:       0

  Valores nulos iniciales:   0
  Valores nulos finales:     0
  Valores nulos limpiados:   0
------------------------------------------------------------
CAMBIOS EN VALORES NULOS POR COLUMNA:

ESQUEMA FINAL:
root
 |-- periodo: integer (nullable = true)
 |-- codigo_unico: integer (nullable = true)
 |-- ruc: integer (nullable = true)
 |-- jefe: string (nullable = true)
 |-- ejecutivo: string (n

In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, lower, when, lit, isnan
from pyspark.sql.types import StringType, IntegerType, DoubleType, FloatType, TimestampType, DateType
import re

# Configurar Spark para Dataproc
spark = SparkSession.builder \
    .appName("LimpiezaTradingCSV") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Configurar nivel de log para reducir output
spark.sparkContext.setLogLevel("WARN")

# RUTAS
ruta_input = "/ProyectoPrincipal/tiendas_ranking_propio_updated.csv" 
ruta_output = "gs://grupo01-project-sin/processed/tiendas_ranking_limpio.parquet"

print("Iniciando proceso de limpieza de datos...")
print(f"Archivo de entrada: {ruta_input}")
print(f"Archivo de salida: {ruta_output}")

try:
    # -------------------------------------
    # A. LECTURA DE DATOS
    # -------------------------------------
    df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("multiline", "true") \
        .option("escape", '"') \
        .csv(ruta_input)
    
    print(f"✓ Archivo leído exitosamente")
    print(f"  Dimensiones iniciales: {df.count()} filas x {len(df.columns)} columnas")
    
    # Mostrar esquema inicial
    print("\nEsquema inicial:")
    df.printSchema()
    
    # Estadísticas iniciales
    initial_rows = df.count()
    initial_cols = len(df.columns)
    
    # Contar nulls iniciales de forma simple
    print("\nContando valores nulos iniciales...")
    total_initial_nulls = 0
    null_counts_initial = {}
    
    for column_name, data_type in df.dtypes:
        try:
            null_count = df.filter(col(column_name).isNull()).count()
            null_counts_initial[column_name] = null_count
            total_initial_nulls += null_count
            print(f"  {column_name} ({data_type}): {null_count} nulls")
        except Exception as e:
            print(f"  Warning: Error contando nulls en {column_name}: {e}")
            null_counts_initial[column_name] = 0
    
    print(f"\nTotal de valores nulos iniciales: {total_initial_nulls}")
    
    # -------------------------------------
    # B. PROCESO DE LIMPIEZA 
    # -------------------------------------
    
    print("\n--- Iniciando limpieza ---")
    
    # 1. LIMPIAR NOMBRES DE COLUMNAS
    print("1. Limpiando nombres de columnas...")
    columns_before = df.columns
    
    # Función para limpiar nombres de columnas
    def clean_column_name(col_name):
        clean_name = re.sub(r'[^\w\s]', '', str(col_name).strip().lower())
        clean_name = re.sub(r'\s+', '_', clean_name)
        return clean_name if clean_name else f"col_{hash(col_name) % 1000}"
    
    new_columns = [clean_column_name(c) for c in df.columns]
    
    # Verificar que no haya nombres duplicados
    seen = set()
    final_columns = []
    for i, col_name in enumerate(new_columns):
        original_name = col_name
        counter = 1
        while col_name in seen:
            col_name = f"{original_name}_{counter}"
            counter += 1
        seen.add(col_name)
        final_columns.append(col_name)
    
    df = df.toDF(*final_columns)
    print(f"  ✓ Columnas renombradas: {len(columns_before)} columnas")
    
    # 2. ELIMINAR FILAS COMPLETAMENTE VACÍAS
    print("2. Eliminando filas completamente vacías...")
    try:
        df_before_empty = df.count()
        df = df.na.drop("all")
        df_after_empty = df.count()
        empty_rows_removed = df_before_empty - df_after_empty
        print(f"  ✓ Filas vacías eliminadas: {empty_rows_removed}")
    except Exception as e:
        print(f"  Warning: Error eliminando filas vacías: {e}")
        empty_rows_removed = 0
    
    # 3. ELIMINAR DUPLICADOS
    print("3. Eliminando duplicados...")
    try:
        rows_before_dedup = df.count()
        df = df.dropDuplicates()
        rows_after_dedup = df.count()
        duplicate_rows_removed = rows_before_dedup - rows_after_dedup
        print(f"  ✓ Filas duplicadas eliminadas: {duplicate_rows_removed}")
    except Exception as e:
        print(f"  Warning: Error eliminando duplicados: {e}")
        duplicate_rows_removed = 0
    
    # 4. LIMPIAR DATOS POR TIPO DE COLUMNA
    print("4. Limpiando datos por tipo de columna...")
    
    # Valores a considerar como nulos para strings
    invalid_vals = ["", " ", "na", "n/a", "null", "none", "nan", "undefined", "nil", "-", "NULL", "None"]
    
    # Obtener tipos actualizados después del renombrado
    current_dtypes = df.dtypes
    
    # Procesar cada columna según su tipo
    for column_name, data_type in current_dtypes:
        try:
            print(f"  Procesando columna '{column_name}' (tipo: {data_type})")
            
            if data_type == "string":
                # Limpiar columnas de texto
                df = df.withColumn(column_name, 
                                 when(col(column_name).isNull(), lit(None))
                                 .otherwise(trim(lower(col(column_name)))))
                
                # Reemplazar valores inválidos por null
                df = df.withColumn(
                    column_name,
                    when(col(column_name).isin(invalid_vals), lit(None))
                    .otherwise(col(column_name))
                )
                
            elif data_type in ["double", "float"]:
                # Limpiar columnas numéricas decimales
                df = df.withColumn(
                    column_name,
                    when(col(column_name).isNull() | isnan(col(column_name)), lit(None))
                    .otherwise(col(column_name))
                )
                
        except Exception as e:
            print(f"    Warning: Error procesando columna {column_name}: {e}")
            continue
    
    print(f"  ✓ Procesamiento de columnas completado")
    
    # 5. ELIMINAR COLUMNAS COMPLETAMENTE VACÍAS (opcional)
    print("5. Verificando columnas completamente vacías...")
    columns_to_drop = []
    
    try:
        current_row_count = df.count()
        for column_name in df.columns:
            null_count = df.filter(col(column_name).isNull()).count()
            if null_count == current_row_count and current_row_count > 0:
                columns_to_drop.append(column_name)
        
        if columns_to_drop:
            df = df.drop(*columns_to_drop)
            print(f"  ✓ Columnas completamente vacías eliminadas: {columns_to_drop}")
        else:
            print(f"  ✓ No hay columnas completamente vacías")
    except Exception as e:
        print(f"  Warning: Error verificando columnas vacías: {e}")
        columns_to_drop = []
    
    # -------------------------------------
    # C. ESTADÍSTICAS FINALES
    # -------------------------------------
    final_rows = df.count()
    final_cols = len(df.columns)
    
    # Contar nulls finales de forma simple
    print("\nContando valores nulos finales...")
    total_final_nulls = 0
    null_counts_final = {}
    
    for column_name in df.columns:
        try:
            null_count = df.filter(col(column_name).isNull()).count()
            null_counts_final[column_name] = null_count
            total_final_nulls += null_count
        except Exception as e:
            print(f"  Warning: Error contando nulls finales en {column_name}: {e}")
            null_counts_final[column_name] = 0
    
    print(f"Total de valores nulos finales: {total_final_nulls}")
    
    # -------------------------------------
    # D. GUARDAR RESULTADO
    # -------------------------------------
    print("\n--- Guardando resultado ---")
    
    try:
        # Intentar guardar con coalesce
        df.coalesce(1).write \
            .mode("overwrite") \
            .option("compression", "snappy") \
            .parquet(ruta_output)
        
        print(f"✓ Archivo guardado exitosamente en: {ruta_output}")
        
    except Exception as e:
        print(f"Warning: Error con coalesce, intentando sin coalesce: {e}")
        try:
            # Intentar guardar sin coalesce
            df.write \
                .mode("overwrite") \
                .option("compression", "snappy") \
                .parquet(ruta_output)
            print(f" Archivo guardado (sin coalesce) en: {ruta_output}")
        except Exception as e2:
            print(f" Error guardando archivo: {e2}")
            raise e2
    
    # -------------------------------------
    # E. REPORTE FINAL
    # -------------------------------------
    print("\n" + "="*60)
    print("           REPORTE DE LIMPIEZA - PYSPARK")
    print("="*60)
    print(f"Archivo original: {ruta_input}")
    print(f"Archivo limpio: {ruta_output}")
    print("-"*60)
    print("ESTADÍSTICAS:")
    print(f"  Filas iniciales:           {initial_rows:,}")
    print(f"  Filas finales:             {final_rows:,}")
    print(f"  Filas eliminadas:          {initial_rows - final_rows:,}")
    print(f"  Filas vacías eliminadas:   {empty_rows_removed:,}")
    print(f"  Filas duplicadas elim.:    {duplicate_rows_removed:,}")
    print()
    print(f"  Columnas iniciales:        {initial_cols}")
    print(f"  Columnas finales:          {final_cols}")
    print(f"  Columnas eliminadas:       {len(columns_to_drop)}")
    print()
    print(f"  Valores nulos iniciales:   {total_initial_nulls:,}")
    print(f"  Valores nulos finales:     {total_final_nulls:,}")
    print(f"  Valores nulos limpiados:   {max(0, total_initial_nulls - total_final_nulls):,}")
    print("-"*60)
    
    # Mostrar detalle de nulls por columna (solo las que cambiaron)
    print("CAMBIOS EN VALORES NULOS POR COLUMNA:")
    for column_name in df.columns:
        initial = null_counts_initial.get(column_name, 0)
        final = null_counts_final.get(column_name, 0)
        if initial != final:
            print(f"  {column_name}: {initial} → {final}")
    
    # Mostrar esquema final
    try:
        print("\nESQUEMA FINAL:")
        df.printSchema()
    except Exception as e:
        print(f"No se pudo mostrar el esquema: {e}")
    
    # Mostrar muestra de datos
    try:
        print("MUESTRA DE DATOS LIMPIOS (primeras 5 filas):")
        df.show(5, truncate=False)
    except Exception as e:
        print(f"No se pudo mostrar muestra de datos: {e}")
    
    print("="*60)
    print("✓ LIMPIEZA COMPLETADA EXITOSAMENTE")
    print("="*60)

except Exception as e:
    print(f" Error durante el proceso de limpieza: {str(e)}")
    print("Detalles del error:")
    import traceback
    traceback.print_exc()

finally:
    # Cerrar Spark session
    try:
        spark.stop()
        print("\n Sesión de Spark cerrada")
    except:
        print("\n Sesión cerrada")

25/11/17 15:59:03 INFO SparkEnv: Registering MapOutputTracker
25/11/17 15:59:03 INFO SparkEnv: Registering BlockManagerMaster
25/11/17 15:59:03 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/11/17 15:59:03 INFO SparkEnv: Registering OutputCommitCoordinator


Iniciando proceso de limpieza de datos...
Archivo de entrada: /ProyectoPrincipal/tiendas_ranking_propio_updated.csv
Archivo de salida: gs://grupo01-project-sin/processed/tiendas_ranking_limpio.parquet


                                                                                

✓ Archivo leído exitosamente


                                                                                

  Dimensiones iniciales: 55 filas x 9 columnas

Esquema inicial:
root
 |-- Canal: string (nullable = true)
 |-- Posicion: integer (nullable = true)
 |-- Jefe: string (nullable = true)
 |-- Ejecutivo: string (nullable = true)
 |-- Monto: double (nullable = true)
 |-- Desembolsado: double (nullable = true)
 |-- Volumen Cambiado: double (nullable = true)
 |-- Utilidad: double (nullable = true)
 |-- id_ejecutivo: integer (nullable = true)


Contando valores nulos iniciales...
  Canal (string): 0 nulls
  Posicion (int): 0 nulls
  Jefe (string): 0 nulls
  Ejecutivo (string): 0 nulls
  Monto (double): 0 nulls
  Desembolsado (double): 0 nulls
  Volumen Cambiado (double): 0 nulls
  Utilidad (double): 0 nulls
  id_ejecutivo (int): 0 nulls

Total de valores nulos iniciales: 0

--- Iniciando limpieza ---
1. Limpiando nombres de columnas...
  ✓ Columnas renombradas: 9 columnas
2. Eliminando filas completamente vacías...
  ✓ Filas vacías eliminadas: 0
3. Eliminando duplicados...


                                                                                

  ✓ Filas duplicadas eliminadas: 0
4. Limpiando datos por tipo de columna...
  Procesando columna 'canal' (tipo: string)
  Procesando columna 'posicion' (tipo: int)
  Procesando columna 'jefe' (tipo: string)
  Procesando columna 'ejecutivo' (tipo: string)
  Procesando columna 'monto' (tipo: double)
  Procesando columna 'desembolsado' (tipo: double)
  Procesando columna 'volumen_cambiado' (tipo: double)
  Procesando columna 'utilidad' (tipo: double)
  Procesando columna 'id_ejecutivo' (tipo: int)
  ✓ Procesamiento de columnas completado
5. Verificando columnas completamente vacías...
  ✓ No hay columnas completamente vacías

Contando valores nulos finales...
Total de valores nulos finales: 0

--- Guardando resultado ---


                                                                                

✓ Archivo guardado exitosamente en: gs://grupo01-project-sin/processed/tiendas_ranking_limpio.parquet

           REPORTE DE LIMPIEZA - PYSPARK
Archivo original: /ProyectoPrincipal/tiendas_ranking_propio_updated.csv
Archivo limpio: gs://grupo01-project-sin/processed/tiendas_ranking_limpio.parquet
------------------------------------------------------------
ESTADÍSTICAS:
  Filas iniciales:           55
  Filas finales:             55
  Filas eliminadas:          0
  Filas vacías eliminadas:   0
  Filas duplicadas elim.:    0

  Columnas iniciales:        9
  Columnas finales:          9
  Columnas eliminadas:       0

  Valores nulos iniciales:   0
  Valores nulos finales:     0
  Valores nulos limpiados:   0
------------------------------------------------------------
CAMBIOS EN VALORES NULOS POR COLUMNA:

ESQUEMA FINAL:
root
 |-- canal: string (nullable = true)
 |-- posicion: integer (nullable = true)
 |-- jefe: string (nullable = true)
 |-- ejecutivo: string (nullable = true)
 |-- mo

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, lower, when, lit, isnan
from pyspark.sql.types import StringType, IntegerType, DoubleType, FloatType, TimestampType, DateType
import re

# Configurar Spark para Dataproc
spark = SparkSession.builder \
    .appName("LimpiezaTradingCSV") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Configurar nivel de log para reducir output
spark.sparkContext.setLogLevel("WARN")

# RUTAS
ruta_input = "/ProyectoPrincipal/tlv_ranking_propio_updated.csv" 
ruta_output = "gs://grupo01-project-sin/processed/tlv_ranking_limpio.parquet"

print("Iniciando proceso de limpieza de datos...")
print(f"Archivo de entrada: {ruta_input}")
print(f"Archivo de salida: {ruta_output}")

try:
    # -------------------------------------
    # A. LECTURA DE DATOS
    # -------------------------------------
    df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("multiline", "true") \
        .option("escape", '"') \
        .csv(ruta_input)
    
    print(f"✓ Archivo leído exitosamente")
    print(f"  Dimensiones iniciales: {df.count()} filas x {len(df.columns)} columnas")
    
    # Mostrar esquema inicial
    print("\nEsquema inicial:")
    df.printSchema()
    
    # Estadísticas iniciales
    initial_rows = df.count()
    initial_cols = len(df.columns)
    
    # Contar nulls iniciales de forma simple
    print("\nContando valores nulos iniciales...")
    total_initial_nulls = 0
    null_counts_initial = {}
    
    for column_name, data_type in df.dtypes:
        try:
            null_count = df.filter(col(column_name).isNull()).count()
            null_counts_initial[column_name] = null_count
            total_initial_nulls += null_count
            print(f"  {column_name} ({data_type}): {null_count} nulls")
        except Exception as e:
            print(f"  Warning: Error contando nulls en {column_name}: {e}")
            null_counts_initial[column_name] = 0
    
    print(f"\nTotal de valores nulos iniciales: {total_initial_nulls}")
    
    # -------------------------------------
    # B. PROCESO DE LIMPIEZA 
    # -------------------------------------
    
    print("\n--- Iniciando limpieza ---")
    
    # 1. LIMPIAR NOMBRES DE COLUMNAS
    print("1. Limpiando nombres de columnas...")
    columns_before = df.columns
    
    # Función para limpiar nombres de columnas
    def clean_column_name(col_name):
        clean_name = re.sub(r'[^\w\s]', '', str(col_name).strip().lower())
        clean_name = re.sub(r'\s+', '_', clean_name)
        return clean_name if clean_name else f"col_{hash(col_name) % 1000}"
    
    new_columns = [clean_column_name(c) for c in df.columns]
    
    # Verificar que no haya nombres duplicados
    seen = set()
    final_columns = []
    for i, col_name in enumerate(new_columns):
        original_name = col_name
        counter = 1
        while col_name in seen:
            col_name = f"{original_name}_{counter}"
            counter += 1
        seen.add(col_name)
        final_columns.append(col_name)
    
    df = df.toDF(*final_columns)
    print(f"  ✓ Columnas renombradas: {len(columns_before)} columnas")
    
    # 2. ELIMINAR FILAS COMPLETAMENTE VACÍAS
    print("2. Eliminando filas completamente vacías...")
    try:
        df_before_empty = df.count()
        df = df.na.drop("all")
        df_after_empty = df.count()
        empty_rows_removed = df_before_empty - df_after_empty
        print(f"  ✓ Filas vacías eliminadas: {empty_rows_removed}")
    except Exception as e:
        print(f"  Warning: Error eliminando filas vacías: {e}")
        empty_rows_removed = 0
    
    # 3. ELIMINAR DUPLICADOS
    print("3. Eliminando duplicados...")
    try:
        rows_before_dedup = df.count()
        df = df.dropDuplicates()
        rows_after_dedup = df.count()
        duplicate_rows_removed = rows_before_dedup - rows_after_dedup
        print(f"  ✓ Filas duplicadas eliminadas: {duplicate_rows_removed}")
    except Exception as e:
        print(f"  Warning: Error eliminando duplicados: {e}")
        duplicate_rows_removed = 0
    
    # 4. LIMPIAR DATOS POR TIPO DE COLUMNA
    print("4. Limpiando datos por tipo de columna...")
    
    # Valores a considerar como nulos para strings
    invalid_vals = ["", " ", "na", "n/a", "null", "none", "nan", "undefined", "nil", "-", "NULL", "None"]
    
    # Obtener tipos actualizados después del renombrado
    current_dtypes = df.dtypes
    
    # Procesar cada columna según su tipo
    for column_name, data_type in current_dtypes:
        try:
            print(f"  Procesando columna '{column_name}' (tipo: {data_type})")
            
            if data_type == "string":
                # Limpiar columnas de texto
                df = df.withColumn(column_name, 
                                 when(col(column_name).isNull(), lit(None))
                                 .otherwise(trim(lower(col(column_name)))))
                
                # Reemplazar valores inválidos por null
                df = df.withColumn(
                    column_name,
                    when(col(column_name).isin(invalid_vals), lit(None))
                    .otherwise(col(column_name))
                )
                
            elif data_type in ["double", "float"]:
                # Limpiar columnas numéricas decimales
                df = df.withColumn(
                    column_name,
                    when(col(column_name).isNull() | isnan(col(column_name)), lit(None))
                    .otherwise(col(column_name))
                )
                
        except Exception as e:
            print(f"    Warning: Error procesando columna {column_name}: {e}")
            continue
    
    print(f"  ✓ Procesamiento de columnas completado")
    
    # 5. ELIMINAR COLUMNAS COMPLETAMENTE VACÍAS (opcional)
    print("5. Verificando columnas completamente vacías...")
    columns_to_drop = []
    
    try:
        current_row_count = df.count()
        for column_name in df.columns:
            null_count = df.filter(col(column_name).isNull()).count()
            if null_count == current_row_count and current_row_count > 0:
                columns_to_drop.append(column_name)
        
        if columns_to_drop:
            df = df.drop(*columns_to_drop)
            print(f"  ✓ Columnas completamente vacías eliminadas: {columns_to_drop}")
        else:
            print(f"  ✓ No hay columnas completamente vacías")
    except Exception as e:
        print(f"  Warning: Error verificando columnas vacías: {e}")
        columns_to_drop = []
    
    # -------------------------------------
    # C. ESTADÍSTICAS FINALES
    # -------------------------------------
    final_rows = df.count()
    final_cols = len(df.columns)
    
    # Contar nulls finales de forma simple
    print("\nContando valores nulos finales...")
    total_final_nulls = 0
    null_counts_final = {}
    
    for column_name in df.columns:
        try:
            null_count = df.filter(col(column_name).isNull()).count()
            null_counts_final[column_name] = null_count
            total_final_nulls += null_count
        except Exception as e:
            print(f"  Warning: Error contando nulls finales en {column_name}: {e}")
            null_counts_final[column_name] = 0
    
    print(f"Total de valores nulos finales: {total_final_nulls}")
    
    # -------------------------------------
    # D. GUARDAR RESULTADO
    # -------------------------------------
    print("\n--- Guardando resultado ---")
    
    try:
        # Intentar guardar con coalesce
        df.coalesce(1).write \
            .mode("overwrite") \
            .option("compression", "snappy") \
            .parquet(ruta_output)
        
        print(f"✓ Archivo guardado exitosamente en: {ruta_output}")
        
    except Exception as e:
        print(f"Warning: Error con coalesce, intentando sin coalesce: {e}")
        try:
            # Intentar guardar sin coalesce
            df.write \
                .mode("overwrite") \
                .option("compression", "snappy") \
                .parquet(ruta_output)
            print(f" Archivo guardado (sin coalesce) en: {ruta_output}")
        except Exception as e2:
            print(f" Error guardando archivo: {e2}")
            raise e2
    
    # -------------------------------------
    # E. REPORTE FINAL
    # -------------------------------------
    print("\n" + "="*60)
    print("           REPORTE DE LIMPIEZA - PYSPARK")
    print("="*60)
    print(f"Archivo original: {ruta_input}")
    print(f"Archivo limpio: {ruta_output}")
    print("-"*60)
    print("ESTADÍSTICAS:")
    print(f"  Filas iniciales:           {initial_rows:,}")
    print(f"  Filas finales:             {final_rows:,}")
    print(f"  Filas eliminadas:          {initial_rows - final_rows:,}")
    print(f"  Filas vacías eliminadas:   {empty_rows_removed:,}")
    print(f"  Filas duplicadas elim.:    {duplicate_rows_removed:,}")
    print()
    print(f"  Columnas iniciales:        {initial_cols}")
    print(f"  Columnas finales:          {final_cols}")
    print(f"  Columnas eliminadas:       {len(columns_to_drop)}")
    print()
    print(f"  Valores nulos iniciales:   {total_initial_nulls:,}")
    print(f"  Valores nulos finales:     {total_final_nulls:,}")
    print(f"  Valores nulos limpiados:   {max(0, total_initial_nulls - total_final_nulls):,}")
    print("-"*60)
    
    # Mostrar detalle de nulls por columna (solo las que cambiaron)
    print("CAMBIOS EN VALORES NULOS POR COLUMNA:")
    for column_name in df.columns:
        initial = null_counts_initial.get(column_name, 0)
        final = null_counts_final.get(column_name, 0)
        if initial != final:
            print(f"  {column_name}: {initial} → {final}")
    
    # Mostrar esquema final
    try:
        print("\nESQUEMA FINAL:")
        df.printSchema()
    except Exception as e:
        print(f"No se pudo mostrar el esquema: {e}")
    
    # Mostrar muestra de datos
    try:
        print("MUESTRA DE DATOS LIMPIOS (primeras 5 filas):")
        df.show(5, truncate=False)
    except Exception as e:
        print(f"No se pudo mostrar muestra de datos: {e}")
    
    print("="*60)
    print("✓ LIMPIEZA COMPLETADA EXITOSAMENTE")
    print("="*60)

except Exception as e:
    print(f" Error durante el proceso de limpieza: {str(e)}")
    print("Detalles del error:")
    import traceback
    traceback.print_exc()

finally:
    # Cerrar Spark session
    try:
        spark.stop()
        print("\n Sesión de Spark cerrada")
    except:
        print("\n Sesión cerrada")

25/11/17 16:00:45 INFO SparkEnv: Registering MapOutputTracker
25/11/17 16:00:45 INFO SparkEnv: Registering BlockManagerMaster
25/11/17 16:00:45 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/11/17 16:00:45 INFO SparkEnv: Registering OutputCommitCoordinator


Iniciando proceso de limpieza de datos...
Archivo de entrada: /ProyectoPrincipal/tlv_ranking_propio_updated.csv
Archivo de salida: gs://grupo01-project-sin/processed/tlv_ranking_limpio.parquet


                                                                                

✓ Archivo leído exitosamente


                                                                                

  Dimensiones iniciales: 38 filas x 9 columnas

Esquema inicial:
root
 |-- Canal: string (nullable = true)
 |-- Posicion: integer (nullable = true)
 |-- Jefe: string (nullable = true)
 |-- Ejecutivo: string (nullable = true)
 |-- Monto: double (nullable = true)
 |-- Desembolsado: double (nullable = true)
 |-- Volumen Cambiado: double (nullable = true)
 |-- Utilidad: double (nullable = true)
 |-- id_ejecutivo: integer (nullable = true)


Contando valores nulos iniciales...
  Canal (string): 0 nulls
  Posicion (int): 0 nulls
  Jefe (string): 0 nulls
  Ejecutivo (string): 0 nulls
  Monto (double): 0 nulls
  Desembolsado (double): 0 nulls
  Volumen Cambiado (double): 0 nulls
  Utilidad (double): 0 nulls
  id_ejecutivo (int): 0 nulls

Total de valores nulos iniciales: 0

--- Iniciando limpieza ---
1. Limpiando nombres de columnas...
  ✓ Columnas renombradas: 9 columnas
2. Eliminando filas completamente vacías...
  ✓ Filas vacías eliminadas: 0
3. Eliminando duplicados...


                                                                                

  ✓ Filas duplicadas eliminadas: 0
4. Limpiando datos por tipo de columna...
  Procesando columna 'canal' (tipo: string)
  Procesando columna 'posicion' (tipo: int)
  Procesando columna 'jefe' (tipo: string)
  Procesando columna 'ejecutivo' (tipo: string)
  Procesando columna 'monto' (tipo: double)
  Procesando columna 'desembolsado' (tipo: double)
  Procesando columna 'volumen_cambiado' (tipo: double)
  Procesando columna 'utilidad' (tipo: double)
  Procesando columna 'id_ejecutivo' (tipo: int)
  ✓ Procesamiento de columnas completado
5. Verificando columnas completamente vacías...
  ✓ No hay columnas completamente vacías

Contando valores nulos finales...
Total de valores nulos finales: 0

--- Guardando resultado ---


                                                                                

✓ Archivo guardado exitosamente en: gs://grupo01-project-sin/processed/tlv_ranking_limpio.parquet

           REPORTE DE LIMPIEZA - PYSPARK
Archivo original: /ProyectoPrincipal/tlv_ranking_propio_updated.csv
Archivo limpio: gs://grupo01-project-sin/processed/tlv_ranking_limpio.parquet
------------------------------------------------------------
ESTADÍSTICAS:
  Filas iniciales:           38
  Filas finales:             38
  Filas eliminadas:          0
  Filas vacías eliminadas:   0
  Filas duplicadas elim.:    0

  Columnas iniciales:        9
  Columnas finales:          9
  Columnas eliminadas:       0

  Valores nulos iniciales:   0
  Valores nulos finales:     0
  Valores nulos limpiados:   0
------------------------------------------------------------
CAMBIOS EN VALORES NULOS POR COLUMNA:

ESQUEMA FINAL:
root
 |-- canal: string (nullable = true)
 |-- posicion: integer (nullable = true)
 |-- jefe: string (nullable = true)
 |-- ejecutivo: string (nullable = true)
 |-- monto: double 

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, lower, when, lit, isnan
from pyspark.sql.types import StringType, IntegerType, DoubleType, FloatType, TimestampType, DateType
import re

# Configurar Spark para Dataproc
spark = SparkSession.builder \
    .appName("LimpiezaTradingCSV") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Configurar nivel de log para reducir output
spark.sparkContext.setLogLevel("WARN")

# RUTAS
ruta_input = "/ProyectoPrincipal/virtual_ranking_propio_updated.csv" 
ruta_output = "gs://grupo01-project-sin/processed/virtual_ranking_limpio.parquet"

print("Iniciando proceso de limpieza de datos...")
print(f"Archivo de entrada: {ruta_input}")
print(f"Archivo de salida: {ruta_output}")

try:
    # -------------------------------------
    # A. LECTURA DE DATOS
    # -------------------------------------
    df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("multiline", "true") \
        .option("escape", '"') \
        .csv(ruta_input)
    
    print(f"✓ Archivo leído exitosamente")
    print(f"  Dimensiones iniciales: {df.count()} filas x {len(df.columns)} columnas")
    
    # Mostrar esquema inicial
    print("\nEsquema inicial:")
    df.printSchema()
    
    # Estadísticas iniciales
    initial_rows = df.count()
    initial_cols = len(df.columns)
    
    # Contar nulls iniciales de forma simple
    print("\nContando valores nulos iniciales...")
    total_initial_nulls = 0
    null_counts_initial = {}
    
    for column_name, data_type in df.dtypes:
        try:
            null_count = df.filter(col(column_name).isNull()).count()
            null_counts_initial[column_name] = null_count
            total_initial_nulls += null_count
            print(f"  {column_name} ({data_type}): {null_count} nulls")
        except Exception as e:
            print(f"  Warning: Error contando nulls en {column_name}: {e}")
            null_counts_initial[column_name] = 0
    
    print(f"\nTotal de valores nulos iniciales: {total_initial_nulls}")
    
    # -------------------------------------
    # B. PROCESO DE LIMPIEZA 
    # -------------------------------------
    
    print("\n--- Iniciando limpieza ---")
    
    # 1. LIMPIAR NOMBRES DE COLUMNAS
    print("1. Limpiando nombres de columnas...")
    columns_before = df.columns
    
    # Función para limpiar nombres de columnas
    def clean_column_name(col_name):
        clean_name = re.sub(r'[^\w\s]', '', str(col_name).strip().lower())
        clean_name = re.sub(r'\s+', '_', clean_name)
        return clean_name if clean_name else f"col_{hash(col_name) % 1000}"
    
    new_columns = [clean_column_name(c) for c in df.columns]
    
    # Verificar que no haya nombres duplicados
    seen = set()
    final_columns = []
    for i, col_name in enumerate(new_columns):
        original_name = col_name
        counter = 1
        while col_name in seen:
            col_name = f"{original_name}_{counter}"
            counter += 1
        seen.add(col_name)
        final_columns.append(col_name)
    
    df = df.toDF(*final_columns)
    print(f"  ✓ Columnas renombradas: {len(columns_before)} columnas")
    
    # 2. ELIMINAR FILAS COMPLETAMENTE VACÍAS
    print("2. Eliminando filas completamente vacías...")
    try:
        df_before_empty = df.count()
        df = df.na.drop("all")
        df_after_empty = df.count()
        empty_rows_removed = df_before_empty - df_after_empty
        print(f"  ✓ Filas vacías eliminadas: {empty_rows_removed}")
    except Exception as e:
        print(f"  Warning: Error eliminando filas vacías: {e}")
        empty_rows_removed = 0
    
    # 3. ELIMINAR DUPLICADOS
    print("3. Eliminando duplicados...")
    try:
        rows_before_dedup = df.count()
        df = df.dropDuplicates()
        rows_after_dedup = df.count()
        duplicate_rows_removed = rows_before_dedup - rows_after_dedup
        print(f"  ✓ Filas duplicadas eliminadas: {duplicate_rows_removed}")
    except Exception as e:
        print(f"  Warning: Error eliminando duplicados: {e}")
        duplicate_rows_removed = 0
    
    # 4. LIMPIAR DATOS POR TIPO DE COLUMNA
    print("4. Limpiando datos por tipo de columna...")
    
    # Valores a considerar como nulos para strings
    invalid_vals = ["", " ", "na", "n/a", "null", "none", "nan", "undefined", "nil", "-", "NULL", "None"]
    
    # Obtener tipos actualizados después del renombrado
    current_dtypes = df.dtypes
    
    # Procesar cada columna según su tipo
    for column_name, data_type in current_dtypes:
        try:
            print(f"  Procesando columna '{column_name}' (tipo: {data_type})")
            
            if data_type == "string":
                # Limpiar columnas de texto
                df = df.withColumn(column_name, 
                                 when(col(column_name).isNull(), lit(None))
                                 .otherwise(trim(lower(col(column_name)))))
                
                # Reemplazar valores inválidos por null
                df = df.withColumn(
                    column_name,
                    when(col(column_name).isin(invalid_vals), lit(None))
                    .otherwise(col(column_name))
                )
                
            elif data_type in ["double", "float"]:
                # Limpiar columnas numéricas decimales
                df = df.withColumn(
                    column_name,
                    when(col(column_name).isNull() | isnan(col(column_name)), lit(None))
                    .otherwise(col(column_name))
                )
                
        except Exception as e:
            print(f"    Warning: Error procesando columna {column_name}: {e}")
            continue
    
    print(f"  ✓ Procesamiento de columnas completado")
    
    # 5. ELIMINAR COLUMNAS COMPLETAMENTE VACÍAS (opcional)
    print("5. Verificando columnas completamente vacías...")
    columns_to_drop = []
    
    try:
        current_row_count = df.count()
        for column_name in df.columns:
            null_count = df.filter(col(column_name).isNull()).count()
            if null_count == current_row_count and current_row_count > 0:
                columns_to_drop.append(column_name)
        
        if columns_to_drop:
            df = df.drop(*columns_to_drop)
            print(f"  ✓ Columnas completamente vacías eliminadas: {columns_to_drop}")
        else:
            print(f"  ✓ No hay columnas completamente vacías")
    except Exception as e:
        print(f"  Warning: Error verificando columnas vacías: {e}")
        columns_to_drop = []
    
    # -------------------------------------
    # C. ESTADÍSTICAS FINALES
    # -------------------------------------
    final_rows = df.count()
    final_cols = len(df.columns)
    
    # Contar nulls finales de forma simple
    print("\nContando valores nulos finales...")
    total_final_nulls = 0
    null_counts_final = {}
    
    for column_name in df.columns:
        try:
            null_count = df.filter(col(column_name).isNull()).count()
            null_counts_final[column_name] = null_count
            total_final_nulls += null_count
        except Exception as e:
            print(f"  Warning: Error contando nulls finales en {column_name}: {e}")
            null_counts_final[column_name] = 0
    
    print(f"Total de valores nulos finales: {total_final_nulls}")
    
    # -------------------------------------
    # D. GUARDAR RESULTADO
    # -------------------------------------
    print("\n--- Guardando resultado ---")
    
    try:
        # Intentar guardar con coalesce
        df.coalesce(1).write \
            .mode("overwrite") \
            .option("compression", "snappy") \
            .parquet(ruta_output)
        
        print(f"✓ Archivo guardado exitosamente en: {ruta_output}")
        
    except Exception as e:
        print(f"Warning: Error con coalesce, intentando sin coalesce: {e}")
        try:
            # Intentar guardar sin coalesce
            df.write \
                .mode("overwrite") \
                .option("compression", "snappy") \
                .parquet(ruta_output)
            print(f" Archivo guardado (sin coalesce) en: {ruta_output}")
        except Exception as e2:
            print(f" Error guardando archivo: {e2}")
            raise e2
    
    # -------------------------------------
    # E. REPORTE FINAL
    # -------------------------------------
    print("\n" + "="*60)
    print("           REPORTE DE LIMPIEZA - PYSPARK")
    print("="*60)
    print(f"Archivo original: {ruta_input}")
    print(f"Archivo limpio: {ruta_output}")
    print("-"*60)
    print("ESTADÍSTICAS:")
    print(f"  Filas iniciales:           {initial_rows:,}")
    print(f"  Filas finales:             {final_rows:,}")
    print(f"  Filas eliminadas:          {initial_rows - final_rows:,}")
    print(f"  Filas vacías eliminadas:   {empty_rows_removed:,}")
    print(f"  Filas duplicadas elim.:    {duplicate_rows_removed:,}")
    print()
    print(f"  Columnas iniciales:        {initial_cols}")
    print(f"  Columnas finales:          {final_cols}")
    print(f"  Columnas eliminadas:       {len(columns_to_drop)}")
    print()
    print(f"  Valores nulos iniciales:   {total_initial_nulls:,}")
    print(f"  Valores nulos finales:     {total_final_nulls:,}")
    print(f"  Valores nulos limpiados:   {max(0, total_initial_nulls - total_final_nulls):,}")
    print("-"*60)
    
    # Mostrar detalle de nulls por columna (solo las que cambiaron)
    print("CAMBIOS EN VALORES NULOS POR COLUMNA:")
    for column_name in df.columns:
        initial = null_counts_initial.get(column_name, 0)
        final = null_counts_final.get(column_name, 0)
        if initial != final:
            print(f"  {column_name}: {initial} → {final}")
    
    # Mostrar esquema final
    try:
        print("\nESQUEMA FINAL:")
        df.printSchema()
    except Exception as e:
        print(f"No se pudo mostrar el esquema: {e}")
    
    # Mostrar muestra de datos
    try:
        print("MUESTRA DE DATOS LIMPIOS (primeras 5 filas):")
        df.show(5, truncate=False)
    except Exception as e:
        print(f"No se pudo mostrar muestra de datos: {e}")
    
    print("="*60)
    print("✓ LIMPIEZA COMPLETADA EXITOSAMENTE")
    print("="*60)

except Exception as e:
    print(f" Error durante el proceso de limpieza: {str(e)}")
    print("Detalles del error:")
    import traceback
    traceback.print_exc()

finally:
    # Cerrar Spark session
    try:
        spark.stop()
        print("\n Sesión de Spark cerrada")
    except:
        print("\n Sesión cerrada")

25/11/17 16:50:43 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Iniciando proceso de limpieza de datos...
Archivo de entrada: /ProyectoPrincipal/virtual_ranking_propio_updated.csv
Archivo de salida: gs://grupo01-project-sin/processed/virtual_ranking_limpio.parquet


                                                                                

✓ Archivo leído exitosamente


                                                                                

  Dimensiones iniciales: 25 filas x 9 columnas

Esquema inicial:
root
 |-- Canal: string (nullable = true)
 |-- Posicion: integer (nullable = true)
 |-- Jefe: string (nullable = true)
 |-- Ejecutivo: string (nullable = true)
 |-- Monto: double (nullable = true)
 |-- Desembolsado: double (nullable = true)
 |-- Volumen Cambiado: double (nullable = true)
 |-- Utilidad: double (nullable = true)
 |-- id_ejecutivo: integer (nullable = true)


Contando valores nulos iniciales...
  Canal (string): 0 nulls


                                                                                

  Posicion (int): 0 nulls
  Jefe (string): 0 nulls
  Ejecutivo (string): 0 nulls
  Monto (double): 0 nulls
  Desembolsado (double): 0 nulls
  Volumen Cambiado (double): 0 nulls
  Utilidad (double): 0 nulls
  id_ejecutivo (int): 0 nulls

Total de valores nulos iniciales: 0

--- Iniciando limpieza ---
1. Limpiando nombres de columnas...
  ✓ Columnas renombradas: 9 columnas
2. Eliminando filas completamente vacías...
  ✓ Filas vacías eliminadas: 0
3. Eliminando duplicados...


                                                                                

  ✓ Filas duplicadas eliminadas: 0
4. Limpiando datos por tipo de columna...
  Procesando columna 'canal' (tipo: string)
  Procesando columna 'posicion' (tipo: int)
  Procesando columna 'jefe' (tipo: string)
  Procesando columna 'ejecutivo' (tipo: string)
  Procesando columna 'monto' (tipo: double)
  Procesando columna 'desembolsado' (tipo: double)
  Procesando columna 'volumen_cambiado' (tipo: double)
  Procesando columna 'utilidad' (tipo: double)
  Procesando columna 'id_ejecutivo' (tipo: int)
  ✓ Procesamiento de columnas completado
5. Verificando columnas completamente vacías...


                                                                                

  ✓ No hay columnas completamente vacías

Contando valores nulos finales...
Total de valores nulos finales: 0

--- Guardando resultado ---


                                                                                

✓ Archivo guardado exitosamente en: gs://grupo01-project-sin/processed/virtual_ranking_limpio.parquet

           REPORTE DE LIMPIEZA - PYSPARK
Archivo original: /ProyectoPrincipal/virtual_ranking_propio_updated.csv
Archivo limpio: gs://grupo01-project-sin/processed/virtual_ranking_limpio.parquet
------------------------------------------------------------
ESTADÍSTICAS:
  Filas iniciales:           25
  Filas finales:             25
  Filas eliminadas:          0
  Filas vacías eliminadas:   0
  Filas duplicadas elim.:    0

  Columnas iniciales:        9
  Columnas finales:          9
  Columnas eliminadas:       0

  Valores nulos iniciales:   0
  Valores nulos finales:     0
  Valores nulos limpiados:   0
------------------------------------------------------------
CAMBIOS EN VALORES NULOS POR COLUMNA:

ESQUEMA FINAL:
root
 |-- canal: string (nullable = true)
 |-- posicion: integer (nullable = true)
 |-- jefe: string (nullable = true)
 |-- ejecutivo: string (nullable = true)
 |-- mo