## Controlas Fechas de Actualización de las Tablas

- monitorear_actualizacion_tablas.py
- Objetivo: Verificar que las tablas críticas del esquema `src` estén actualizadas con datos recientes.
- Autor: [Zeetrex]
- Fecha: [Auto-generado]

In [None]:
import psycopg2 as pg2
from datetime import date
import time
import logging
import os
import sys
from dotenv import dotenv_values

# Configurar logging
logging.basicConfig(
    filename='./logs/monitoreo_tablas.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

ENV_PATH = os.environ.get("ETL_ENV_PATH", "C:/ETL/ETL_DIARCO/.env")  # Toma Producción si está definido, o la ruta por defecto E:\ETL\ETL_DIARCO\.env
# Verificar si el archivo .env existe
if not os.path.exists(ENV_PATH):
    print(f"El archivo .env no existe en la ruta: {ENV_PATH}")
    print(f"Directorio actual: {os.getcwd()}")
    sys.exit(1)
    
secrets = dotenv_values(ENV_PATH)

def Open_Diarco_Data(): 
    conn_str = f"dbname={secrets['PG_DB']} user={secrets['PG_USER']} password={secrets['PG_PASSWORD']} host={secrets['PG_HOST']} port={secrets['PG_PORT']}"
    #print (conn_str)
    for i in range(5):
        try:    
            conn = pg2.connect(conn_str)
            return conn
        except Exception as e:
            print(f'Error en la conexión: {e}')
            time.sleep(5)
    return None  # Retorna None si todos los intentos fallan


In [None]:
import pandas as pd
from typing import List, Dict
from psycopg2 import sql
from psycopg2.extras import DictCursor

def obtener_control_interfaces(tablas_por_fecha: Dict[str, str],
                               max_retries: int = 3,
                               retry_delay: float = 2.0) -> pd.DataFrame:
    """
    Realiza control de interfaces sobre múltiples tablas con diferentes campos de fecha.
    Retorna un DataFrame con columnas: tabla, campo_fecha, ultima_fecha_extraccion, cantidad_registros.
    """
    registros = []
    conn = Open_Diarco_Data()
    conn.autocommit = True

    with conn.cursor(cursor_factory=DictCursor) as cur:
        cur.execute("SET statement_timeout = 0;")

        for tabla, campo_fecha in tablas_por_fecha.items():
            logging.info(f"Procesando tabla: {tabla} (campo: {campo_fecha})")

            query = sql.SQL("""
                SELECT
                    MAX({campo_fecha}) AS ultima_fecha,
                    COUNT(*) AS cantidad
                FROM src.{tabla}
                WHERE {campo_fecha}::date = (
                    SELECT MAX({campo_fecha}::date)
                    FROM src.{tabla}
                )
            """).format(
                tabla=sql.Identifier(tabla),
                campo_fecha=sql.Identifier(campo_fecha)
            )

            for intento in range(1, max_retries + 1):
                try:
                    cur.execute(query)
                    fila = cur.fetchone()
                    registros.append({
                        "tabla": tabla,
                        "campo_fecha": campo_fecha,
                        "ultima_fecha_extraccion": fila["ultima_fecha"],
                        "cantidad_registros": fila["cantidad"]
                    })
                    break
                except Exception as e:
                    logging.error(f"Error en tabla {tabla}, intento {intento}: {e}", exc_info=True)
                    if intento < max_retries:
                        time.sleep(retry_delay)
                    else:
                        registros.append({
                            "tabla": tabla,
                            "campo_fecha": campo_fecha,
                            "ultima_fecha_extraccion": None,
                            "cantidad_registros": None
                        })

    conn.close()
    return pd.DataFrame(registros,
                        columns=["tabla", "campo_fecha", "ultima_fecha_extraccion", "cantidad_registros"])


from datetime import datetime

def grabar_estado_tablas(df: pd.DataFrame, conn: pg2.extensions.connection) -> None:
    """
    Inserta o actualiza registros en src.monitoreo_estado_tablas desde un DataFrame.
    """
    if df.empty:
        logging.warning("⚠️ El DataFrame está vacío. No se insertará nada.")
        return

    # Filtrar columnas necesarias
    df = df[["tabla", "ultima_fecha_extraccion", "cantidad_registros"]].copy()

    # Limpieza: asegurarse de tipos correctos y sin float en registros
    df["tabla"] = df["tabla"].astype(str)
    df["ultima_fecha_extraccion"] = pd.to_datetime(df["ultima_fecha_extraccion"], errors="coerce")

    # Convertir a entero seguro solo si no es nulo
    df["cantidad_registros"] = pd.to_numeric(df["cantidad_registros"], errors='coerce').fillna(0).astype("Int64")

    insert_sql = """
        INSERT INTO src.monitoreo_estado_tablas (tabla, ultima_fecha_extraccion, cantidad_registros)
        VALUES (%s, %s, %s)
        ON CONFLICT (tabla) DO UPDATE SET
            ultima_fecha_extraccion = EXCLUDED.ultima_fecha_extraccion,
            cantidad_registros = EXCLUDED.cantidad_registros,
            fecha_control = NOW()
    """

    with conn.cursor() as cur:
        for _, row in df.iterrows():
            try:
                tabla = row["tabla"]
                fecha = row["ultima_fecha_extraccion"]
                cantidad = int(row["cantidad_registros"]) if pd.notnull(row["cantidad_registros"]) else None

                cur.execute(insert_sql, (tabla, fecha, cantidad))

            except Exception as e:
                logging.error(f"❌ Error al insertar {row.to_dict()}: {e}", exc_info=True)
                continue

        conn.commit()
        logging.info(f"✅ Se insertaron/actualizaron {len(df)} registros en src.monitoreo_estado_tablas.")


In [8]:
# 1. Tablas con campo 'fecha_extraccion'
tablas_fechas_estandar = [
    "base_productos_vigentes", "base_stock_sucursal", "t020_proveedor",
    "m_3_articulos", "t050_articulos", "t051_articulos_sucursal", 
    "t060_stock", "t080_oc_cabe", "t081_oc_deta", "t100_empresa_suc", "t052_articulos_proveedor",
    "t710_estadis_oferta_folder", "t710_estadis_reposicion","t117_compradores","t114_rubros"
]
tablas_dict_1 = {tabla: "fecha_extraccion" for tabla in tablas_fechas_estandar}

# 2. Tablas con campo personalizado
tablas_dict_2 = {
    "m_91_sucursales": "f_proc",
    "m_92_depositos": "f_proc",
    "m_93_sustitutos": "f_proc",
    "m_94_alternativos": "f_proc",
    "m_95_sensibles": "f_proc",
    "m_96_stock_seguridad": "f_proc",
    "t702_est_vtas_por_articulo" : "f_venta", 
    "t702_est_vtas_por_articulo_dbarrio": "f_venta",
    "base_forecast_oc_demoradas" : "f_alta_sist"
}


# OTRAS TABLAS SIN FECHA DE EXTRACCION
# Estas tablas no tienen un campo de fecha de extracción estándar, pero se pueden agregar si es
# "t020_proveedor_gestion_compra",  "t710_estadis_precios",

# Unificar en un solo diccionario
tablas_total = {**tablas_dict_1, **tablas_dict_2}

# Ejecutar control
df_control = obtener_control_interfaces(tablas_total)

# Adecuar Formatos
df_control['tabla'] = df_control['tabla'].astype(str)
df_control['campo_fecha'] = df_control['campo_fecha'].astype(str)
df_control['cantidad_registros'] = df_control['cantidad_registros'].astype('Int64')
df_control['ultima_fecha_extraccion'] = pd.to_datetime(df_control['ultima_fecha_extraccion'], errors='coerce')

print(df_control.dtypes)
print(df_control[["tabla", "ultima_fecha_extraccion", "cantidad_registros"]].head())


tabla                              object
campo_fecha                        object
ultima_fecha_extraccion    datetime64[ns]
cantidad_registros                  Int64
dtype: object
                     tabla    ultima_fecha_extraccion  cantidad_registros
0  base_productos_vigentes 2025-08-20 10:17:49.462523              538199
1      base_stock_sucursal 2025-08-20 14:40:48.962451             1265184
2           t020_proveedor 2025-08-21 06:05:11.683000               16031
3            m_3_articulos 2025-08-21 06:10:22.610000               14313
4           t050_articulos 2025-08-21 06:05:11.560000               59262


In [9]:
import ace_tools_open as tools

tools.display_dataframe_to_user(name="Actualización de Tablas Maestras", dataframe=df_control)

Actualización de Tablas Maestras


0
Loading ITables v2.4.0 from the internet...  (need help?)


In [None]:
# Grabar resultados
conn_pg = Open_Diarco_Data()
grabar_estado_tablas(df_control, conn_pg)
conn_pg.close()