In [None]:
from minio.error import S3Error
import fitz  # PyMuPDF
from datetime import datetime
from io import BytesIO
import pandas as pd
import numpy as np
import re
import psycopg2
from psycopg2.extras import execute_values
import import_ipynb
from Utils import * #conectar_minio,transformar_tipo_identificacion, generar_ruta_fecha,extraer_archivo_minio, insert_into_info_lab, agregar_fuente, agregar_observacion, cargar_dfpaciente
import os
from dotenv import load_dotenv
from pathlib import Path

In [None]:
# Cargar variables del archivo .env
env_path = Path.cwd().parent.parent / 'notebooks_etl' / '.env'
load_dotenv(dotenv_path=env_path)

In [None]:
minio_client = conectar_minio(
    endpoint=os.getenv("MINIO_ENDPOINT"),
    access_key=os.getenv("MINIO_ACCESS_KEY"),
    secret_key=os.getenv("MINIO_SECRET_KEY"),
    secure=False  # Usar HTTPS
)

In [None]:
print("Directorio actual:", os.getcwd())
print("Archivos en el directorio:", os.listdir())

In [None]:
ruta_fecha = generar_ruta_fecha()
print("Ruta de fecha generada:", ruta_fecha)
ruta_archivo=f"Difuntos/{ruta_fecha}/",
print("Ruta del archivo:", ruta_archivo)

In [None]:
# Extraer un archivo Excel/CSV/Parquet
df_datos = extraer_archivo_minio(
    minio_client=minio_client,
    bucket_name="yachay-bronze",
    ruta_archivo=f"Difuntos/{ruta_fecha}/difuntos.csv",
    tipo_archivo="csv"
)
print(df_datos.head())

In [None]:
def transformar_df(df_datos):
    """
    Transforma un DataFrame de difuntos a la estructura estandarizada.
    
    Parámetros:
    - df_datos: DataFrame con los campos originales
    
    Retorna:
    - DataFrame transformado con solo las columnas necesarias
    """
    
    # Crear copia para no modificar el original
    #df_transformado = df_datos.copy()
    
    # Concatenar nombres y apellidos de paciente en una sola columna
    df_datos['Primer Nombre Fallecido'] = (
        df_datos['Primer Nombre Fallecido'].fillna('') + ' ' +
        df_datos['Segundo Nombre Fallecido'].fillna('') + ' ' +
        df_datos['Primer Apellido Fallecido'].fillna('') + ' ' +
        df_datos['Segundo Apellido Fallecido'].fillna('')
        ).str.replace(r'\s+', ' ', regex=True).str.strip()    
    df_datos[['Segundo Nombre Fallecido', 'Primer Apellido Fallecido', 'Segundo Apellido Fallecido']] = ''

    # Concatenación, campos compuestos
    concatenar_columnas(df_datos,['Barrio Residencia', 'Centro Poblado Residencia', 'Vereda Residencia'],'comuna_y_barrio_1',' ')
    concatenar_columnas(df_datos,['Otros Estados Patológicos', 'Otros Estados Patológicos 2'],'otros_estados_patologicos_relacionados_con_la_muerte')
    concatenar_columnas(df_datos,['Primer Nombre Certificador', 'Segundo Nombre Certificador', 'Primer Apellido Certificador', 'Segundo Apellido Certificador'],'nombre_de_quien_expide_el_certificado',' ')
    concatenar_columnas(df_datos,['Fecha Cirugía', 'Motivo Cirugía'],'observaciones_defuncion')
    concatenar_columnas(df_datos,['Diagnóstico A','Diagnóstico B','Diagnóstico C','Diagnóstico D'],'observaciones_fuente')


    # Mapeo de campos
    mapeo_campos = {
        'Primer Nombre Fallecido': 'primer_nombre',
        'Segundo Nombre Fallecido': 'segundo_nombre',
        'Primer Apellido Fallecido': 'primer_apellido',
        'Segundo Apellido Fallecido': 'segundo_apellido',
        'Sexo': 'sexo',
        'Fecha Nacimiento Fallecido': 'fecha_nacimiento',
        'Tipo Documento Fallecido': 'tipo_de_identificacion',
        'Número Documento Fallecido': 'numero_de_identificacion_personal',
        'Departamento Residencia': 'departamento_de_residencia',
        'Municipio Residencia': 'municipio_de_residencia',        
        'Dirección Residencia': 'direccion_de_residencia_1',
        'Área Residencia': 'zona',
        'Estado Conyugal Fallecido': 'estado_civil',
        'EPS': 'eps_de_afiliacion',
        'Régimen Seguridad Social': 'regimen_de_afiliacion',
        'Último Año Estudios Fallecido': 'escolaridad',
        'Ocupación': 'ocupacion',
        'Fecha Defunción': 'fecha_de_ultimo_contacto',        
        'Número Certificado': 'n_certificado_de_defuncion',        
        'Diagnóstico A': 'causa_directa_de_defuncion_a_texto',
        'Diagnóstico B': 'causa_directa_de_defuncion_b_texto',
        'Diagnóstico C': 'causa_directa_de_defuncion_c_texto',
        'Diagnóstico D': 'causa_directa_de_defuncion_d_texto',        
        'Certificado Expedido Por': 'profesion_de_quien_expite_el_certificado',
        'Sitio Defunción': 'lugar_de_defuncion',        
        'Entidad Registró': 'direccion_y_telefono_de_quien_expite_el_certificado',        
        'Edad Fallecido':'edad'
    }    
    df_datos.rename(columns=mapeo_campos,inplace=True)


    # campos constantes
    df_datos['estado_vital_en_el_ultimo_contacto'] = '2'
    df_datos['fuente_de_ultimo_contacto'] = 'BASE DE MORTALIDAD SECRETARÍA MUNICIPAL DE SALUD O CERTIFICADO DE DEFUNCIÓN (1)'    
    df_datos['base_de_diagnostico'] = 'Solo certificado de defunción (0)'
    df_datos['fuente'] = 'CERTIFICADO DEFUNCION'

    # detección de cáncer en las causas de defunción
    cols_causas = [
        'causa_directa_de_defuncion_a_texto',
        'causa_directa_de_defuncion_b_texto',
        'causa_directa_de_defuncion_c_texto',
        'causa_directa_de_defuncion_d_texto'
    ]
    df_datos['mencion_de_cancer_en_la_defuncion'] = df_datos[cols_causas].apply(
        lambda row: 'Sí' if any('cancer' in normalizar_01(v) for v in row) else 'No', axis=1
    )       
    
    #Casteo de datos
    df_datos['clasificacion_zona'] = df_datos['zona'].apply(lambda x: 'Urbana' if x == 'CABECERA MUNICIPAL' else ('Rural' if x == 'RURAL DISPERSO' or x == 'CENTRO POBLADO (INSPECCIÓN, CORREGIMIENTO O CASERÍO)' else 'No especificado'))

    mapeo_docs = {
        'CÉDULA DE CIUDADANÍA': 'CC',
        'REGISTRO CIVIL': 'RC',
        'TARJETA DE IDENTIDAD': 'TI',
        'CERTIFICADO DE NACIDO VIVO': 'CNV',
        'CÉDULA DE EXTRANJERÍA': 'CE',
        'SIN INFORMACIÓN': 'SI',
        'PERMISO POR PROTECCIÓN TEMPORAL': 'PPT',
        'DOCUMENTO EXTRANJERO': 'DE',
        'MENOR SIN IDENTIFICACIÓN': 'MSI',
        'ADULTO SIN IDENTIFICACIÓN': 'ASI'
    }

    df_datos['tipo_de_identificacion'] = (
        df_datos['tipo_de_identificacion']
        .str.upper()
        .replace(mapeo_docs)
    )

    df_datos['sexo'] = df_datos['sexo'].apply(
        lambda x: 'M' if str(x).strip().upper() == 'MASCULINO'
        else 'F' if str(x).strip().upper() == 'FEMENINO'
        else pd.NA
    )

    df_datos['clasificacion_estado_civil'] = df_datos['estado_civil'].apply(
        lambda x: (
            'soltero' if 'SOLTERO' in str(x).upper() else
            'viudo' if 'VIUDO' in str(x).upper() else
            'union libre' if 'NO ESTABA CASADO' in str(x).upper() else
            'casado' if 'CASADO' in str(x).upper() else
            'divorciado/separado' if any(pal in str(x).upper() for pal in ['SEPARADO', 'DIVORCIADO']) else
            pd.NA
        )
    )


    columnas_finales = [
        'primer_nombre',
        'segundo_nombre',
        'primer_apellido',
        'segundo_apellido',
        'sexo',
        'fecha_nacimiento',
        'tipo_de_identificacion',
        'numero_de_identificacion_personal',
        'departamento_de_residencia',
        'municipio_de_residencia',
        'direccion_de_residencia_1',
        'comuna_y_barrio_1',
        'zona',
        'clasificacion_zona',
        'estado_civil',
        'clasificacion_estado_civil',
        'eps_de_afiliacion',
        'regimen_de_afiliacion',
        'escolaridad',
        'ocupacion',
        'fecha_de_ultimo_contacto',
        'estado_vital_en_el_ultimo_contacto',
        'fuente_de_ultimo_contacto',
        'n_certificado_de_defuncion',
        'mencion_de_cancer_en_la_defuncion',
        'causa_directa_de_defuncion_a_texto',
        'causa_directa_de_defuncion_b_texto',
        'causa_directa_de_defuncion_c_texto',
        'causa_directa_de_defuncion_d_texto',
        'otros_estados_patologicos_relacionados_con_la_muerte',
        'profesion_de_quien_expite_el_certificado',
        'lugar_de_defuncion',
        'nombre_de_quien_expide_el_certificado',
        'direccion_y_telefono_de_quien_expite_el_certificado',
        'observaciones_defuncion',
        'edad',
        'base_de_diagnostico',
        'fuente',
        'observaciones_fuente'
    ]

    df_transformado = df_datos[columnas_finales]
    return df_transformado

In [None]:
df = transformar_df(df_datos)

In [None]:
connection_params = {
    'dbname': os.getenv("DB_NAME"),
    'user': os.getenv("DB_USER"),
    'password': os.getenv("DB_PASSWORD"),
    'host': os.getenv("DB_HOST"),
    'port':os.getenv("DB_PORT")
}

validaciones de los pacientes

In [None]:
def sincronizar_difuntos(connection_params, tabla_paciente, df_difuntos):
    """
    Sincroniza la información de difuntos entre las tablas.
    Este proceso se ejecuta periódicamente para mantener actualizada
    la marca de difuntos en la tabla de pacientes.
    """
    
    conexion = None
    cursor = None
    
    try:
        conexion = psycopg2.connect(**connection_params)
        cursor = conexion.cursor()
        
        # Paso 1: Cargar solo las columnas necesarias para minimizar memoria        
        #query_difuntos = f"SELECT numero_de_identificacion_personal FROM {tabla_difuntos}"
        query_pacientes = f"SELECT identificacion, difunto FROM {tabla_paciente}"
        
        #df_difuntos = pd.read_sql(query_difuntos, conexion)
        df_pacientes = pd.read_sql(query_pacientes, conexion)
        
        print(f"Registros cargados - Difuntos: {len(df_difuntos)}, Pacientes: {len(df_pacientes)}")
        
        # Paso 2: Normalizar las identificaciones, aunque solo tienen dígitos, pueden tener espacios o valores nulos
        df_difuntos['numero_de_identificacion_personal'] = (
            df_difuntos['numero_de_identificacion_personal']
            .astype(str)
            .str.strip()
        )
        
        df_pacientes['identificacion'] = (
            df_pacientes['identificacion']
            .astype(str)
            .str.strip()
        )
        
        # Paso 3: Crear un set con las identificaciones de difuntos
        difuntos_set = set(df_difuntos['numero_de_identificacion_personal'])
        
        # Paso 4: (UPDATE)Identificar pacientes que necesitan actualización
        # Estos son pacientes que están marcados como NO difuntos pero SI están en la lista
        pacientes_a_actualizar = df_pacientes[
            (df_pacientes['identificacion'].isin(difuntos_set)) & 
            (df_pacientes['difunto'] != 'si')
        ]
        
        print(f"Pacientes a actualizar como difuntos: {len(pacientes_a_actualizar)}")
        
        # Paso 5: (INSERT) Identificar difuntos que NO están en pacientes
        identificaciones_pacientes = set(df_pacientes['identificacion'])
        difuntos_nuevos = difuntos_set - identificaciones_pacientes
        
        print(f"Difuntos nuevos a insertar: {len(difuntos_nuevos)}")
        
        # Paso 6: Realizar los UPDATES en la base de datos
        if len(pacientes_a_actualizar) > 0:
            # Convertimos a lista para el query SQL
            ids_actualizar = pacientes_a_actualizar['identificacion'].tolist()
            
            # Consulta que actualice todos de una vez
            placeholders = ','.join(['%s'] * len(ids_actualizar))
            query_update = f"""
                UPDATE {tabla_paciente} 
                SET difunto = 'si' 
                WHERE identificacion IN ({placeholders})
            """
            
            cursor.execute(query_update, ids_actualizar)
            conexion.commit()
            print(f"✓ {cursor.rowcount} pacientes actualizados como difuntos")
        
        # Paso 7: Realizar los INSERTS para difuntos nuevos
        if len(difuntos_nuevos) > 0:
            difuntos_nuevos_list = list(difuntos_nuevos)
            placeholders = ','.join(['%s'] * len(difuntos_nuevos_list))
            
            '''
            query_difuntos_completos = f"""
                SELECT * FROM {tabla_difuntos} 
                WHERE numero_de_identificacion_personal IN ({placeholders})
            """
            
            df_difuntos_completos = pd.read_sql(
                query_difuntos_completos, 
                conexion,
                params=difuntos_nuevos_list
            )
            '''

            # Filtrar el DataFrame df para obtener solo los registros de difuntos_nuevos
            df_para_insertar = df[df['numero_de_identificacion_personal'].isin(difuntos_nuevos)].copy()
            
            # Mapear columnas de info_lab_difuntos a paciente
            # Ajusta este mapeo según tu estructura real
            mapeo_columnas = {
                'primer_nombre': 'nombre1',
                'segundo_nombre': 'nombre2',
                'primer_apellido': 'apellido1',
                'segundo_apellido': 'apellido2',
                'tipo_de_identificacion': 'tipo_de_identificacion',
                'numero_de_identificacion_personal': 'identificacion',
                'municipio_de_residencia': 'municipio_residencia',
                'comuna_y_barrio_1': 'barrio_residencia',
                'direccion_de_residencia_1': 'direccion',
                'fecha_nacimiento': 'fecha_nacimiento',
                'sexo': 'sexo',
                'edad': 'edad',
                'departamento_de_residencia':'departamento_residencia',

            }
            
            df_para_insertar = df_para_insertar.rename(columns=mapeo_columnas)
            
            df_para_insertar['difunto'] = 'si'
            

            # Crea la columna id_paciente dentro del df
            df_para_insertar = limpiar_y_validar_id_paciente(df_para_insertar)

            # Solo las columnas que necesita la tabla paciente
            columnas_paciente = [
                'id_paciente', 'identificacion', 'nombre1', 'nombre2', 
                'apellido1', 'apellido2', 'fecha_nacimiento','tipo_de_identificacion', 
                'sexo', 'edad', 'municipio_residencia', 'barrio_residencia', 
                'direccion', 'difunto',
                # nuevas columnas en tabla paciente a partir de la información de difuntos
                'departamento_residencia',
                'zona','clasificacion_zona',
                'estado_civil','clasificacion_estado_civil',
                'escolaridad',
                'ocupacion',
            ]

            # Filtrar solo las columnas que existen en df_para_insertar
            columnas_disponibles = [col for col in columnas_paciente if col in df_para_insertar.columns]
            df_para_insertar = df_para_insertar[columnas_disponibles]
            df_para_insertar = df_para_insertar.where(pd.notnull(df_para_insertar), None)
          
            # Insertar usando to_sql con if_exists='append' y manejo de duplicados
            columnas = list(df_para_insertar.columns)
            valores = [tuple(row) for row in df_para_insertar.values]

            query_insert = f"""
                INSERT INTO paciente ({', '.join(columnas)})
                VALUES %s
                ON CONFLICT (id_paciente) DO NOTHING
            """

            execute_values(cursor, query_insert, valores)
            conexion.commit()
            print(f"✓ {len(df_para_insertar)} nuevos pacientes difuntos insertados")
        
        return {
            'actualizados': len(pacientes_a_actualizar),
            'insertados': len(difuntos_nuevos)
        }        
    
    except psycopg2.OperationalError as e:
        print(f"Error de conexión a la base de datos: {e}")
        if conexion:
            conexion.rollback()
        raise
    except psycopg2.IntegrityError as e:
        print(f"Error de integridad de datos: {e}")
        if conexion:
            conexion.rollback()
        raise
    except Exception as e:
        print(f"Error inesperado al sincronizar difuntos: {e}")
        if conexion:
            conexion.rollback()
        raise
    finally:
        if cursor is not None:
            cursor.close()
        if conexion is not None:
            conexion.close()


In [None]:
resultado = sincronizar_difuntos(connection_params,'paciente',df)
print(f"\nProceso completado: {resultado['actualizados']} actualizados, {resultado['insertados']} insertados")        

Ingreso de información de los laboratorios

In [None]:
insert_into_info_lab_difuntos(df, connection_params)

In [None]:
#df[[ 'sexo', 'estado_civil']].head()
#df.to_excel('difuntos_silver.xlsx')