Archivo de micro consultas API para no sobreescribir la BD

In [1]:
import requests
import os
from dotenv import load_dotenv
import pandas as pd
import pymysql
import time

load_dotenv()

True

In [2]:
# Configuración API
API_BASE_URL = "https://cramer.buk.cl/api/v1/chile/"
API_ENDPOINTS = {
    "licences": f"{API_BASE_URL}employees/absences/licences",
    "absences": f"{API_BASE_URL}employees/absences/absences",
    "permissions": f"{API_BASE_URL}employees/absences/permissions"
}
TOKEN = os.getenv("BUK_AUTH_TOKEN")

# Configuración BD
DB_HOST = os.getenv("IP") #REEMPLAZAR HOST SI ES DISTINTO
DB_USER = "rrhh_master" #REEMPLAZAR USUARIO 
DB_PASSWORD = os.getenv("clave_sql") #REEMPLAZAR CONTRASEÑA 
DB_NAME = "rrhh_app" #REEMPLAZAR NOMBRE DE BASE DE DATOS CREADA

# Imprimir las variables de entorno
print(f"TOKEN: {os.getenv('BUK_AUTH_TOKEN')}")
print(f"SQL: {os.getenv('clave_sql')}")
print(f"IP: {os.getenv('IP')}")

TOKEN: Xegy8dVsa1H8SFfojJcwYtDL
SQL: _Cramer2025_
IP: 10.254.32.110


In [3]:
# --- Configuración de Filtro por Rango de Fechas (Opcional) ---

# Cambia a True para activar el filtro por fechas. Si es False, usará el log incremental normal.
FILTRAR_POR_FECHAS = True

# Define el rango de fechas si FILTRAR_POR_FECHAS es True. Formato: "YYYY-MM-DD"
# La API de BUK filtra por la fecha de inicio de la incidencia.
FECHA_INICIO = "2025-09-01"
FECHA_FIN = "2025-09-08"

if FILTRAR_POR_FECHAS:
    print(f"🗓️ FILTRO POR FECHAS ACTIVADO: Se extraerán datos entre {FECHA_INICIO} y {FECHA_FIN}.")
else:
    print("⚙️ Usando modo de carga incremental normal (basado en el último ID del log).")

🗓️ FILTRO POR FECHAS ACTIVADO: Se extraerán datos entre 2025-09-01 y 2025-09-08.


In [4]:
from db_utils import (obtener_datos_paginados, crear_e_insertar_tabla_actualiza, 
                    conectar_mysql, obtener_ultima_carga, actualizar_log_carga, 
                    construir_url_incremental)

# 1. Conectar a la base de datos
conexion, cursor = conectar_mysql(DB_HOST, DB_USER, DB_PASSWORD, DB_NAME)

# 2. Definir los endpoints que se van a procesar
endpoints_a_procesar = ["absences", "licences", "permissions"]

print("🚀 FASE 1: Actualizando tablas individuales...")

# 3. Iterar sobre cada endpoint para extraer y cargar sus datos
for endpoint in endpoints_a_procesar:
    print(f"\n🔄 --- Procesando Endpoint: '{endpoint}' ---")
    
    # --- PASO CLAVE: Asegurar que la tabla tiene una PRIMARY KEY en 'id' ---
    # Esto es fundamental para prevenir duplicados.
    try:
        print(f"🔑 Verificando PRIMARY KEY en la tabla '{endpoint}'...")
        # Este comando añade la clave primaria solo si no existe. Es seguro ejecutarlo siempre.
        cursor.execute(f"ALTER TABLE {endpoint} ADD PRIMARY KEY (id);")
        conexion.commit()
        print(f"✅ PRIMARY KEY asegurada en la tabla '{endpoint}'.")
    except Exception as e:
        # Si la clave ya existe, MySQL dará un error. Lo capturamos y confirmamos que todo está bien.
        if "Multiple primary key defined" in str(e):
            print(f"✅ La PRIMARY KEY ya existe en la tabla '{endpoint}'.")
        else:
            print(f"⚠️  No se pudo establecer la PRIMARY KEY: {e}")

    # Determinar la URL de la API a consultar
    url_endpoint = ""
    if FILTRAR_POR_FECHAS:
        base_url_endpoint = f"{API_BASE_URL}employees/absences/{endpoint}"
        url_endpoint = f"{base_url_endpoint}?start_date_after={FECHA_INICIO}&start_date_before={FECHA_FIN}"
        print(f"🗓️  Modo: Filtro por fechas. URL: {url_endpoint}")
    else:
        ultima_carga = obtener_ultima_carga(cursor, endpoint)
        url_endpoint = construir_url_incremental(API_BASE_URL, endpoint, ultima_carga)
        print(f"⚙️  Modo: Carga incremental. URL: {url_endpoint}")
    
    # Extraer los datos desde la API
    print("📡 Extrayendo datos...")
    datos_nuevos = obtener_datos_paginados(url_endpoint, TOKEN)
    
    if datos_nuevos:
        print(f"📊 {len(datos_nuevos)} registros encontrados. Actualizando tabla '{endpoint}' en MySQL...")
        crear_e_insertar_tabla_actualiza(cursor, conexion, endpoint, datos_nuevos)
        
        if not FILTRAR_POR_FECHAS:
            actualizar_log_carga(cursor, conexion, endpoint, datos_nuevos)
            print("📝 Log de carga incremental actualizado.")
    else:
        print(f"✅ No se encontraron registros nuevos para '{endpoint}'.")

print("\n🎉 FASE 1 COMPLETADA: Tablas individuales actualizadas.")


🚀 Conectando a MySQL...
✅ Conectado a MySQL y usando la base: rrhh_app
🚀 FASE 1: Actualizando tablas individuales...

🔄 --- Procesando Endpoint: 'absences' ---
🔑 Verificando PRIMARY KEY en la tabla 'absences'...
⚠️  No se pudo establecer la PRIMARY KEY: (1062, "Duplicate entry '85714' for key 'absences.PRIMARY'")
🗓️  Modo: Filtro por fechas. URL: https://cramer.buk.cl/api/v1/chile/employees/absences/absences?start_date_after=2025-09-01&start_date_before=2025-09-08
📡 Extrayendo datos...

🚀 Comenzando la obtención de datos desde: https://cramer.buk.cl/api/v1/chile/employees/absences/absences?start_date_after=2025-09-01&start_date_before=2025-09-08
📄 Obteniendo página 1...
✅ Página 1: 25 datos obtenidos
📊 Total acumulado: 25 datos
📈 Páginas restantes: 24
✅ Página 1: 25 datos obtenidos
📊 Total acumulado: 25 datos
📈 Páginas restantes: 24
📄 Obteniendo página 2...
📄 Obteniendo página 2...
✅ Página 2: 25 datos obtenidos
📊 Total acumulado: 50 datos
📈 Páginas restantes: 23
✅ Página 2: 25 datos o

In [None]:
# --- FASE 2: Actualización Incremental de la Tabla Consolidada ---
print("\n🚀 FASE 2: Sincronizando la tabla 'consolidado_incidencias'...")

# Crear la tabla consolidada si no existe, con su propia clave primaria compuesta.
sql_create_consolidado = """
CREATE TABLE IF NOT EXISTS consolidado_incidencias (
    id INT, start_date VARCHAR(255), end_date VARCHAR(255), days_count VARCHAR(255),
    workday_stage VARCHAR(255), application_date VARCHAR(255), application_end_date VARCHAR(255),
    employee_id VARCHAR(255), status VARCHAR(255), created_at VARCHAR(255),
    tabla_origen VARCHAR(50), PRIMARY KEY (id, tabla_origen)
);
"""
cursor.execute(sql_create_consolidado)

# Sincronizar los datos desde las tablas individuales (ahora sin duplicados)
sql_sync_consolidado = """
INSERT INTO consolidado_incidencias (id, start_date, end_date, days_count, workday_stage, application_date, application_end_date, employee_id, status, created_at, tabla_origen)
(
    SELECT id, start_date, end_date, days_count, workday_stage, application_date, application_end_date, employee_id, status, created_at, 'permissions' AS tabla_origen FROM permissions
    UNION ALL
    SELECT id, start_date, end_date, days_count, workday_stage, application_date, application_end_date, employee_id, status, created_at, 'absences' AS tabla_origen FROM absences
    UNION ALL
    SELECT id, start_date, end_date, days_count, workday_stage, application_date, application_end_date, employee_id, status, created_at, 'licences' AS tabla_origen FROM licences
) AS source
ON DUPLICATE KEY UPDATE
    start_date = source.start_date, end_date = source.end_date, days_count = source.days_count,
    workday_stage = source.workday_stage, application_date = source.application_date,
    application_end_date = source.application_end_date, employee_id = source.employee_id,
    status = source.status, created_at = source.created_at;
"""
cursor.execute(sql_sync_consolidado)
conexion.commit()

registros_afectados = cursor.rowcount
print(f"✅ FASE 2 COMPLETADA: Tabla 'consolidado_incidencias' sincronizada. {registros_afectados} registros insertados/actualizados.")

# --- Finalizar ---
cursor.close()
conexion.close()
print("\n✅ Proceso finalizado. Conexión a la base de datos cerrada.")

OperationalError: (1050, "Table 'consolidado_incidencias' already exists")