In [1]:
# Celda 1
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Simple_Processing") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

print("‚úÖ SparkSession creada")

‚úÖ SparkSession creada


In [2]:
# Celda 2: Leer los datos 
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("encoding", "UTF-8") \
    .csv("Data/*.csv")

print("üìö Datos le√≠dos exitosamente")

üìö Datos le√≠dos exitosamente


In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# Schema expl√≠cito basado en el diccionario de datos del MEF
mef_schema = StructType([
    StructField("ANO_EJE", IntegerType(), True),
    StructField("MES_EJE", IntegerType(), True),
    StructField("NIVEL_GOBIERNO", StringType(), True),
    StructField("NIVEL_GOBIERNO_NOMBRE", StringType(), True),
    StructField("SECTOR", StringType(), True),
    StructField("SECTOR_NOMBRE", StringType(), True),
    StructField("PLIEGO", StringType(), True),
    StructField("PLIEGO_NOMBRE", StringType(), True),
    StructField("SEC_EJEC", IntegerType(), True),
    StructField("EJECUTORA", IntegerType(), True),
    StructField("EJECUTORA_NOMBRE", StringType(), True),
    StructField("DEPARTAMENTO_EJECUTORA", IntegerType(), True),
    StructField("DEPARTAMENTO_EJECUTORA_NOMBRE", StringType(), True),
    StructField("PROVINCIA_EJECUTORA", IntegerType(), True),
    StructField("PROVINCIA_EJECUTORA_NOMBRE", StringType(), True),
    StructField("DISTRITO_EJECUTORA", IntegerType(), True),
    StructField("DISTRITO_EJECUTORA_NOMBRE", StringType(), True),
    StructField("SEC_FUNC", IntegerType(), True),
    StructField("PROGRAMA_PPTO", IntegerType(), True),
    StructField("PROGRAMA_PPTO_NOMBRE", StringType(), True),
    StructField("TIPO_ACT_PROY", IntegerType(), True),
    StructField("TIPO_ACT_PROY_NOMBRE", StringType(), True),
    StructField("PRODUCTO_PROYECTO", IntegerType(), True),
    StructField("PRODUCTO_PROYECTO_NOMBRE", StringType(), True),
    StructField("ACTIVIDAD_ACCION_OBRA", IntegerType(), True),
    StructField("ACTIVIDAD_ACCION_OBRA_NOMBRE", StringType(), True),
    StructField("FUNCION", IntegerType(), True),
    StructField("FUNCION_NOMBRE", StringType(), True),
    StructField("DIVISION_FUNCIONAL", IntegerType(), True),
    StructField("DIVISION_FUNCIONAL_NOMBRE", StringType(), True),
    StructField("GRUPO_FUNCIONAL", IntegerType(), True),
    StructField("GRUPO_FUNCIONAL_NOMBRE", StringType(), True),
    StructField("META", IntegerType(), True),
    StructField("FINALIDAD", IntegerType(), True),
    StructField("META_NOMBRE", StringType(), True),
    StructField("DEPARTAMENTO_META", IntegerType(), True),
    StructField("DEPARTAMENTO_META_NOMBRE", StringType(), True),
    StructField("FUENTE_FINANCIAMIENTO", IntegerType(), True),
    StructField("FUENTE_FINANCIAMIENTO_NOMBRE", StringType(), True),
    StructField("RUBRO", IntegerType(), True),
    StructField("RUBRO_NOMBRE", StringType(), True),
    StructField("TIPO_RECURSO", StringType(), True),
    StructField("TIPO_RECURSO_NOMBRE", StringType(), True),
    StructField("CATEGORIA_GASTO", IntegerType(), True),
    StructField("CATEGORIA_GASTO_NOMBRE", StringType(), True),
    StructField("TIPO_TRANSACCION", IntegerType(), True),
    StructField("GENERICA", IntegerType(), True),
    StructField("GENERICA_NOMBRE", StringType(), True),
    StructField("SUBGENERICA", IntegerType(), True),
    StructField("SUBGENERICA_NOMBRE", StringType(), True),
    StructField("SUBGENERICA_DET", IntegerType(), True),
    StructField("SUBGENERICA_DET_NOMBRE", StringType(), True),
    StructField("ESPECIFICA", IntegerType(), True),
    StructField("ESPECIFICA_NOMBRE", StringType(), True),
    StructField("ESPECIFICA_DET", IntegerType(), True),
    StructField("ESPECIFICA_DET_NOMBRE", StringType(), True),
    StructField("MONTO_PIA", DoubleType(), True),
    StructField("MONTO_PIM", DoubleType(), True),
    StructField("MONTO_CERTIFICADO", DoubleType(), True),
    StructField("MONTO_COMPROMETIDO_ANUAL", DoubleType(), True),
    StructField("MONTO_COMPROMETIDO", DoubleType(), True),
    StructField("MONTO_DEVENGADO", DoubleType(), True),
    StructField("MONTO_GIRADO", DoubleType(), True)
])

print("‚úÖ Schema del MEF definido con 63 columnas")

‚úÖ Schema del MEF definido con 63 columnas


In [5]:
# Validaci√≥n contra metadata esperada del MEF
print("="*60)
print("VALIDACI√ìN CONTRA METADATA MEF")
print("="*60)

# 1. Validar n√∫mero de columnas
expected_columns = 63  # Seg√∫n schema MEF
actual_columns = len(df.columns)
print(f"üìã Columnas esperadas: {expected_columns}")
print(f"üìã Columnas encontradas: {actual_columns}")
print(f"‚úÖ Columnas: {'CORRECTO' if actual_columns == expected_columns else 'ERROR'}")

# 2. Validar presencia de columnas clave
key_columns = ["ANO_EJE", "MONTO_PIM", "MONTO_DEVENGADO", "PLIEGO", "SECTOR"]
missing_columns = [col for col in key_columns if col not in df.columns]
print(f"üîë Columnas clave faltantes: {missing_columns if missing_columns else 'NINGUNA'}")

# 3. Validar a√±os presentes
if "ANO_EJE" in df.columns:
    years = df.select("ANO_EJE").distinct().collect()
    year_list = [str(row['ANO_EJE']) for row in years]
    print(f"üìÖ A√±os encontrados: {', '.join(year_list)}")

VALIDACI√ìN CONTRA METADATA MEF
üìã Columnas esperadas: 63
üìã Columnas encontradas: 63
‚úÖ Columnas: CORRECTO
üîë Columnas clave faltantes: NINGUNA
üìÖ A√±os encontrados: 2022, 2023, 2024, 2025


In [6]:
# Detecci√≥n de registros corruptos/incompletos
from pyspark.sql.functions import col, sum as spark_sum

print("="*60)
print("DETECCI√ìN DE REGISTROS CORRUPTOS")
print("="*60)

# 1. Registros con valores nulos en columnas cr√≠ticas
critical_nulls = df.filter(
    col("ANO_EJE").isNull() | 
    col("MONTO_PIM").isNull() |
    col("PLIEGO").isNull()
).count()
print(f"üö´ Registros con nulos en columnas cr√≠ticas: {critical_nulls}")

# 2. Porcentaje de completitud por columna
print("üìä Porcentaje de completitud por columna:")
completeness = df.select([
    (1 - (spark_sum(col(c).isNull().cast("int")) / df.count())).alias(c) 
    for c in df.columns[::10]  # Cada 10 columnas para no saturar
])
completeness.show(vertical=True)

DETECCI√ìN DE REGISTROS CORRUPTOS
üö´ Registros con nulos en columnas cr√≠ticas: 0
üìä Porcentaje de completitud por columna:
-RECORD 0-----------------
 ANO_EJE            | 1.0 
 EJECUTORA_NOMBRE   | 1.0 
 TIPO_ACT_PROY      | 1.0 
 GRUPO_FUNCIONAL    | 1.0 
 RUBRO_NOMBRE       | 1.0 
 SUBGENERICA_DET    | 1.0 
 MONTO_COMPROMETIDO | 1.0 



In [7]:
# Eliminar duplicados exactos
initial_count = df.count()
df_clean = df.dropDuplicates()
final_count = df_clean.count()
duplicates_removed = initial_count - final_count

print("="*60)
print("ELIMINACI√ìN DE DUPLICADOS")
print("="*60)
print(f"üìä Registros iniciales: {initial_count:,}")
print(f"üßπ Duplicados eliminados: {duplicates_removed:,}")
print(f"‚úÖ Registros finales: {final_count:,}")
print(f"üìâ Reducci√≥n: {(duplicates_removed/initial_count*100):.2f}%")

ELIMINACI√ìN DE DUPLICADOS
üìä Registros iniciales: 40,173,905
üßπ Duplicados eliminados: 13
‚úÖ Registros finales: 40,173,892
üìâ Reducci√≥n: 0.00%


In [8]:
# Guardar particionado por a√±o con estructura MEF
df_clean.write \
    .mode("overwrite") \
    .partitionBy("ANO_EJE") \
    .parquet("data/raw/year")

print("="*60)
print("ALMACENAMIENTO COMPLETADO")
print("="*60)
print("üíæ Datos guardados en formato Parquet")
print("üóÇÔ∏è  Estructura: data/raw/year=ANO_EJE/")
print("‚úÖ Formato: Particionado por a√±o (como solicita MEF)")
print("üìã Schema: Definici√≥n expl√≠cita seg√∫n diccionario MEF")

ALMACENAMIENTO COMPLETADO
üíæ Datos guardados en formato Parquet
üóÇÔ∏è  Estructura: data/raw/year=ANO_EJE/
‚úÖ Formato: Particionado por a√±o (como solicita MEF)
üìã Schema: Definici√≥n expl√≠cita seg√∫n diccionario MEF


In [9]:
# Verificaci√≥n final
print("="*60)
print("VERIFICACI√ìN FINAL FASE 1")
print("="*60)

# Leer una partici√≥n para verificar
df_verify = spark.read.parquet("data/raw/year")
print(f"‚úÖ Registros verificados: {df_verify.count():,}")
print(f"‚úÖ Particiones: {len(df_verify.inputFiles())} archivos")
print(f"‚úÖ Columnas: {len(df_verify.columns)}")
print("üéâ FASE 1 COMPLETADA EXITOSAMENTE")

VERIFICACI√ìN FINAL FASE 1
‚úÖ Registros verificados: 40,173,892
‚úÖ Particiones: 800 archivos
‚úÖ Columnas: 63
üéâ FASE 1 COMPLETADA EXITOSAMENTE
