# 1.4.01 "Relación entre nivel educativo y nivel jerárquico"

In [7]:
import traceback
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure

# Importamos desde tu config
from config import MONGO_URI, DB_NAME, SOURCE_COLLECTION_NAME, METRICS_COLLECTION_NAME

# --- 1. CONFIGURACIÓN BÁSICA ---
METRIC_ID = "1_4_01_CONGRUENCIA_INGRESO_ANTERIOR"

# --- 2. RUTAS DE LOS CAMPOS (para claridad) ---
PATH_FUE_PSP = "declaracion.situacionPatrimonial.actividadAnualAnterior.servidorPublicoAnioAnterior"
PATH_INGRESO = "declaracion.situacionPatrimonial.actividadAnualAnterior.remuneracionNetaCargoPublico.valor"

# --- 3. EL FLUJO DEL WORKER (ENFOQUE AGREGACIÓN) ---
def procesar_metrica_con_agregacion():
    """
    Procesa TODAS las declaraciones usando un pipeline de agregación nativo
    y guarda los resultados en la colección 'metricas' usando $merge.
    Esta es la forma más rápida.
    """
    client = None
    
    try:
        # --- A. CONECTAR A LA BASE DE DATOS ---
        print(f"Conectando a MongoDB en {DB_NAME}...")
        client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=10000)
        client.admin.command('ping') 
        db = client[DB_NAME]
        
        source_collection = db[SOURCE_COLLECTION_NAME]
        
        print("¡Conexión exitosa!")
        print(f"Calculando Métrica {METRIC_ID} para CADA documento...")
        print(f"Fuente: {SOURCE_COLLECTION_NAME} -> Destino: {METRICS_COLLECTION_NAME}")
        
        # --- B. DEFINIR EL PIPELINE DE AGREGACIÓN ---
        
        # Este pipeline calcula la métrica y la escribe en la otra colección.
        pipeline = [
    {
        '$project': {
            '_id': '$_id',
            'resultado_metrica': {
                '$switch': {
                    'branches': [
                        {
                            'case': { '$eq': [ f'${PATH_FUE_PSP}', False ] },
                            'then': 'N/A'
                        },
                        {
                            'case': {
                                '$ne': [ { '$type': f'${PATH_FUE_PSP}' }, 'bool' ]
                            },
                            'then': 'SIN_DATO'
                        },
                        {
                            'case': {
                                '$and': [
                                    { '$isNumber': f'${PATH_INGRESO}' },
                                    { '$gt': [ f'${PATH_INGRESO}', 0 ] }
                                ]
                            },
                            'then': 'CUMPLE'
                        },
                        {
                            'case': {
                                '$and': [
                                    { '$isNumber': f'${PATH_INGRESO}' },
                                    { '$lte': [ f'${PATH_INGRESO}', 0 ] }
                                ]
                            },
                            'then': 'NO_CUMPLE'
                        }
                    ],
                    'default': 'SIN_DATO'
                }
            }
        }
    },
    {
        '$set': {
            METRIC_ID: '$resultado_metrica'
        }
    },
    {
        '$unset': 'resultado_metrica'
    },
    {
        '$merge': {
            'into': METRICS_COLLECTION_NAME,
            'on': '_id',
            'whenMatched': 'merge',
            'whenNotMatched': 'insert'
        }
    }
]

        
        # --- C. EJECUTAR LA AGREGACIÓN ---
        print("Ejecutando agregación (esto puede tardar varios minutos, pero es más rápido)...")
        
        # allowDiskUse=True es vital si la colección es muy grande
        source_collection.aggregate(pipeline, allowDiskUse=True)

        print(f"\n--- PROCESAMIENTO NATIVO FINALIZADO ---")
        print(f"La Métrica {METRIC_ID} ha sido calculada y guardada en '{METRICS_COLLECTION_NAME}'.")

    except ConnectionFailure:
        print("Error: No se pudo conectar a la base de datos.")
    except OperationFailure as e:
        print(f"Error en la operación de la base de datos: {e.details}")
        traceback.print_exc()
    except Exception as e:
        print(f"Ocurrió un error inesperado: {e}")
        traceback.print_exc()
    finally:
        if client:
            client.close()
            print("Conexión cerrada.")

# --- 4. EJECUTAR EL SCRIPT ---
if __name__ == "__main__":
    procesar_metrica_con_agregacion()

Conectando a MongoDB en sistema1...
¡Conexión exitosa!
Calculando Métrica 1_4_01_CONGRUENCIA_INGRESO_ANTERIOR para CADA documento...
Fuente: all_data_20251021 -> Destino: metricas
Ejecutando agregación (esto puede tardar varios minutos, pero es más rápido)...

--- PROCESAMIENTO NATIVO FINALIZADO ---
La Métrica 1_4_01_CONGRUENCIA_INGRESO_ANTERIOR ha sido calculada y guardada en 'metricas'.
Conexión cerrada.
