### S40 - ACTUALIZAR DATOS CONNEXA - SUBIR RESULTADOS FORECAST

Parte de los forecast excecutión que están en estado 40 (Ya Graficados), Genera los datos acumulados del registro excec y sube los archivos a connexa

### PROCESAR xxx_Ponostico_Extendido

1) Leer ejecuciones con Status 40.
2) Actualizar los Datos y el Minigráfico de la cabecera de ejecución.
3) Cargar datos en la tabla execuciton_excecute_result.
4) Actulizar Estado en connexa a 50 DISPONIBLE

In [None]:
# RUTINA TRANSFERENCIA DE ARCHIVOS.
import base64
from io import BytesIO
import pandas as pd
import matplotlib.pyplot as plt
# LIBRERIAS NECESARIAS 
from datetime import datetime, timedelta
import numpy as np
from dotenv import dotenv_values
import psycopg2 as pg2    # Conectores para Postgres
import pyodbc  # Conector para SQL Server
import time  # Para medir el tiempo de ejecución
import getpass  # Para obtener el usuario del sistema operativo
import uuid  # Importar la librería uuid
# Mostrar el DataFrame resultante
import ace_tools_open as tools

# Evitar Mensajes Molestos
import warnings
warnings.simplefilter(action='ignore', category=UserWarning)
warnings.simplefilter(action='ignore', category= FutureWarning)

secrets = dotenv_values(".env")   # Connection String from .env
folder = secrets["FOLDER_DATOS"]



In [None]:
# -----------------------------------------------------------
# Funciones de conexión a la base de datos
# -----------------------------------------------------------
def Open_Conn_Postgres():
    secrets = dotenv_values(".env")   # Cargar credenciales desde .env    
    conn_str = f"dbname={secrets['BASE4']} user={secrets['USUARIO4']} password={secrets['CONTRASENA4']} host={secrets['SERVIDOR4']} port={secrets['PUERTO4']}"
    try:    
        conn = pg2.connect(conn_str)
        return conn
    except Exception as e:
        print(f'Error en la conexión: {e}')
        return None
    

def Open_Postgres_retry(max_retries=5, wait_seconds=10):
    secrets = dotenv_values(".env")   # Cargar credenciales desde .env    
    conn_str = f"dbname={secrets['BASE4']} user={secrets['USUARIO4']} password={secrets['CONTRASENA4']} host={secrets['SERVIDOR4']} port={secrets['PUERTO4']}"
    for i in range(max_retries):
        try:
            conn = pg2.connect(conn_str)
            return conn 
        except Exception as e:
            print(f"Error en la conexión, intento {i+1}/{max_retries}: {e}")
            time.sleep(wait_seconds)
    return None  # Retorna None si todos los intentos fallan

def Open_Connection():
    secrets = dotenv_values(".env")   # Connection String from .env
    conn_str = f'DRIVER={secrets["DRIVER2"]};SERVER={secrets["SERVIDOR2"]};PORT={secrets["PUERTO2"]};DATABASE={secrets["BASE2"]};UID={secrets["USUARIO2"]};PWD={secrets["CONTRASENA2"]}'
    # print (conn_str) 
    try:    
        conn = pyodbc.connect(conn_str)
        return conn
    except:
        print('Error en la Conexión')
        return None

def Close_Connection(conn): 
    conn.close()
    return True

# Helper para generar identificadores únicos
def id_aleatorio():
    return str(uuid.uuid4())

# -----------------------------------------------------------
# 4. Operaciones CRUD para spl_supply_forecast_execution_parameter
# -----------------------------------------------------------
def create_execution_parameter(supply_forecast_execution_id, supply_forecast_model_parameter_id, value):
    conn = Open_Conn_Postgres()
    if conn is None:
        return None
    try:
        cur = conn.cursor()
        id_exec_param = id_aleatorio()
        timestamp = datetime.utcnow()
        query = """
            INSERT INTO public.spl_supply_forecast_execution_parameter(
                id, "timestamp", supply_forecast_execution_id, supply_forecast_model_parameter_id, value
            )
            VALUES (%s, %s, %s, %s, %s)
        """
        cur.execute(query, (id_exec_param, timestamp, supply_forecast_execution_id, supply_forecast_model_parameter_id, value))
        conn.commit()
        cur.close()
        return id_exec_param
    except Exception as e:
        print(f"Error en create_execution_parameter: {e}")
        conn.rollback()
        return None
    finally:
        Close_Connection(conn)

def get_execution_parameter(exec_param_id):
    conn = Open_Conn_Postgres()
    if conn is None:
        return None
    try:
        cur = conn.cursor()
        query = """
            SELECT id, "timestamp", supply_forecast_execution_id, supply_forecast_model_parameter_id, value
            FROM public.spl_supply_forecast_execution_parameter
            WHERE id = %s
        """
        cur.execute(query, (exec_param_id,))
        row = cur.fetchone()
        cur.close()
        if row:
            return {
                "id": row[0],
                "timestamp": row[1],
                "supply_forecast_execution_id": row[2],
                "supply_forecast_model_parameter_id": row[3],
                "value": row[4]
            }
        return None
    except Exception as e:
        print(f"Error en get_execution_parameter: {e}")
        return None
    finally:
        Close_Connection(conn)

def update_execution_parameter(exec_param_id, **kwargs):
    conn = Open_Conn_Postgres()
    if conn is None:
        return None
    try:
        cur = conn.cursor()
        set_clause =  ", ".join([f"{key} = %s" for key in kwargs.keys()])
        values = list(kwargs.values())
        values.append(exec_param_id)
        query = f"""
            UPDATE public.spl_supply_forecast_execution_parameter
            SET {set_clause}
            WHERE id = %s
        """
        cur.execute(query, tuple(values))
        conn.commit()
        cur.close()
        return get_execution_parameter(exec_param_id)
    except Exception as e:
        print(f"Error en update_execution_parameter: {e}")
        conn.rollback()
        return None
    finally:
        Close_Connection(conn)

def delete_execution_parameter(exec_param_id):
    conn = Open_Conn_Postgres()
    if conn is None:
        return False
    try:
        cur = conn.cursor()
        query = """
            DELETE FROM public.spl_supply_forecast_execution_parameter
            WHERE id = %s
        """
        cur.execute(query, (exec_param_id,))
        conn.commit()
        cur.close()
        return True
    except Exception as e:
        print(f"Error en delete_execution_parameter: {e}")
        conn.rollback()
        return False
    finally:
        Close_Connection(conn)

# -----------------------------------------------------------
# 5. Operaciones CRUD para spl_supply_forecast_execution_execute
# -----------------------------------------------------------
def create_execution_execute(end_execution, last_execution, start_execution, supply_forecast_execution_id, supply_forecast_execution_schedule_id):
    conn = Open_Conn_Postgres()
    if conn is None:
        return None
    try:
        cur = conn.cursor()
        id_exec = id_aleatorio()
        timestamp = datetime.utcnow()
        query = """
            INSERT INTO public.spl_supply_forecast_execution_execute(
                id, end_execution, last_execution, start_execution, "timestamp", supply_forecast_execution_id, supply_forecast_execution_schedule_id
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s)
        """
        cur.execute(query, (id_exec, end_execution, last_execution, start_execution, timestamp, supply_forecast_execution_id, supply_forecast_execution_schedule_id))
        conn.commit()
        cur.close()
        return id_exec
    except Exception as e:
        print(f"Error en create_execution_execute: {e}")
        conn.rollback()
        return None
    finally:
        Close_Connection(conn)

def get_execution_execute(exec_id):
    conn = Open_Conn_Postgres()
    if conn is None:
        return None
    try:
        cur = conn.cursor()
        query = """
            SELECT id, end_execution, last_execution, start_execution, "timestamp", supply_forecast_execution_id, supply_forecast_execution_schedule_id
            FROM public.spl_supply_forecast_execution_execute
            WHERE id = %s
        """
        cur.execute(query, (exec_id,))
        row = cur.fetchone()
        cur.close()
        if row:
            return {
                "id": row[0],
                "end_execution": row[1],
                "last_execution": row[2],
                "start_execution": row[3],
                "timestamp": row[4],
                "supply_forecast_execution_id": row[5],
                "supply_forecast_execution_schedule_id": row[6]
            }
        return None
    except Exception as e:
        print(f"Error en get_execution_execute: {e}")
        return None
    finally:
        Close_Connection(conn)

def update_execution_execute(exec_id, **kwargs):
    conn = Open_Conn_Postgres()
    if conn is None:
        return None
    try:
        cur = conn.cursor()
        set_clause = ", ".join([f"{key} = %s" for key in kwargs.keys()])
        values = list(kwargs.values())
        values.append(exec_id)
        query = f"""
            UPDATE public.spl_supply_forecast_execution_execute
            SET {set_clause}
            WHERE id = %s
        """
        cur.execute(query, tuple(values))
        conn.commit()
        cur.close()
        return get_execution_execute(exec_id)
    except Exception as e:
        print(f"Error en update_execution_execute: {e}")
        conn.rollback()
        return None
    finally:
        Close_Connection(conn)

def delete_execution_execute(exec_id):
    conn = Open_Conn_Postgres()
    if conn is None:
        return False
    try:
        cur = conn.cursor()
        query = """
            DELETE FROM public.spl_supply_forecast_execution_execute
            WHERE id = %s
        """
        cur.execute(query, (exec_id,))
        conn.commit()
        cur.close()
        return True
    except Exception as e:
        print(f"Error en delete_execution_execute: {e}")
        conn.rollback()
        return False
    finally:
        Close_Connection(conn)

# -----------------------------------------------------------
# 6. Operaciones CRUD para spl_supply_forecast_execution_execute_result
# -----------------------------------------------------------
def create_execution_execute_result(confidence_level, error_margin, expected_demand, average_daily_demand, lower_bound, upper_bound,
                                    product_id, site_id, supply_forecast_execution_execute_id, algorithm, average, ext_product_code, ext_site_code, ext_supplier_code,
                                    forcast, graphic, quantity_stock, sales_last, sales_previous, sales_same_year, supplier_id, windows, deliveries_pending):
    conn = Open_Postgres_retry()
    if conn is None:
        print("❌ No se pudo conectar después de varios intentos")
        return None
    try:
        cur = conn.cursor()
        id_result = id_aleatorio()
        timestamp = datetime.utcnow()
        query = """
            INSERT INTO public.spl_supply_forecast_execution_execute_result (
                id, confidence_level, error_margin, expected_demand, average_daily_demand, lower_bound, "timestamp", upper_bound, 
                product_id, site_id, supply_forecast_execution_execute_id, algorithm, average, ext_product_code, ext_site_code, ext_supplier_code, 
                forcast, graphic, quantity_stock, sales_last, sales_previous, sales_same_year, supplier_id, windows, 
                deliveries_pending
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """
        cur.execute(query, (id_result, confidence_level, error_margin, expected_demand, average_daily_demand, lower_bound, timestamp, upper_bound, 
                            product_id, site_id, supply_forecast_execution_execute_id, algorithm, average, ext_product_code, ext_site_code, ext_supplier_code,
                            forcast, graphic, quantity_stock, sales_last, sales_previous, sales_same_year, supplier_id, windows, deliveries_pending))
        conn.commit()
        cur.close()
        return id_result
    except Exception as e:
        print(f"Error en create_execution_execute_result: {e}")
        conn.rollback()
        return None
    finally:
        Close_Connection(conn)

def get_execution_execute_result(result_id):
    conn = Open_Conn_Postgres()
    if conn is None:
        return None
    try:
        cur = conn.cursor()
        query = """
            SELECT * FROM public.spl_supply_forecast_execution_execute_result
            WHERE id = %s
        """
        cur.execute(query, (result_id,))
        row = cur.fetchone()
        cur.close()
        if row:
            columns = [desc[0] for desc in cur.description]
            return dict(zip(columns, row))
        return None
    except Exception as e:
        print(f"Error en get_execution_execute_result: {e}")
        return None
    finally:
        Close_Connection(conn)

def update_execution_execute_result(result_id, **kwargs):
    conn = Open_Conn_Postgres()
    if conn is None:
        return None
    try:
        cur = conn.cursor()
        set_clause = ", ".join([f"{key} = %s" for key in kwargs.keys()])
        values = list(kwargs.values())
        values.append(result_id)
        query = f"""
            UPDATE public.spl_supply_forecast_execution_execute_result
            SET {set_clause}
            WHERE id = %s
        """
        cur.execute(query, tuple(values))
        conn.commit()
        cur.close()
        return get_execution_execute_result(result_id)
    except Exception as e:
        print(f"Error en update_execution_execute_result: {e}")
        conn.rollback()
        return None
    finally:
        Close_Connection(conn)

def delete_execution_execute_result(result_id):
    conn = Open_Conn_Postgres()
    if conn is None:
        return False
    try:
        cur = conn.cursor()
        query = """
            DELETE FROM public.spl_supply_forecast_execution_execute_result
            WHERE id = %s
        """
        cur.execute(query, (result_id,))
        conn.commit()
        cur.close()
        return True
    except Exception as e:
        print(f"Error en delete_execution_execute_result: {e}")
        conn.rollback()
        return False
    finally:
        Close_Connection(conn)

# -----------------------------------------------------------
# 3. Operaciones CRUD para spl_supply_forecast_execution
# -----------------------------------------------------------
def get_execution(execution_id):
    conn = Open_Conn_Postgres()
    if conn is None:
        return None
    try:
        cur = conn.cursor()
        query = """
            SELECT id, description, name, "timestamp", supply_forecast_model_id, 
                ext_supplier_code, supplier_id, supply_forecast_execution_status_id
            FROM public.spl_supply_forecast_execution
            WHERE id = %s
        """
        cur.execute(query, (execution_id,))
        row = cur.fetchone()
        cur.close()
        if row:
            return {
                "id": row[0],
                "description": row[1],
                "name": row[2],
                "timestamp": row[3],
                "supply_forecast_model_id": row[4],
                "ext_supplier_code": row[5],
                "supplier_id": row[6],
                "supply_forecast_execution_status_id": row[7]
            }
        return None
    except Exception as e:
        print(f"Error en get_execution: {e}")
        return None
    finally:
        Close_Connection(conn)

def update_execution(execution_id, **kwargs):
    if not kwargs:
        print("No hay valores para actualizar")
        return None

    conn = Open_Conn_Postgres()
    if conn is None:
        return None
    try:
        cur = conn.cursor()
        set_clause = ", ".join([f"{key} = %s" for key in kwargs.keys()])
        values = list(kwargs.values())
        values.append(execution_id)

        query = f"""
            UPDATE public.spl_supply_forecast_execution
            SET {set_clause}
            WHERE id = %s
        """
        cur.execute(query, tuple(values))
        conn.commit()
        cur.close()
        return get_execution(execution_id)  # Retorna la ejecución actualizada
    
    except Exception as e:
        print(f"Error en update_execution: {e}")
        conn.rollback()
        return None
    finally:
        Close_Connection(conn)

def get_excecution_excecute_by_status(status):
    if not status:
        print("No hay estados para filtrar")
        return None
    
    conn = Open_Conn_Postgres()
    if conn is None:
        return None
    try:
        query = f"""
            SELECT 
                a.id, 
                a.description, 
                a.name, 
                a."timestamp", 
                a.supply_forecast_model_id, 
                a.ext_supplier_code, 
                a.graphic, 
                a.monthly_net_margin_in_millions, 
                a.monthly_purchases_in_millions, 
                a.monthly_sales_in_millions, 
                a.sotck_days AS stock_days,  -- Posible corrección
                a.sotck_days_colors AS stock_days_colors, -- Posible corrección
                a.supplier_id, 
                a.supply_forecast_execution_status_id,
                b.supply_forecast_execution_schedule_id AS forecast_execution_schedule_id, 
                b.id AS forecast_execution_execute_id
            FROM public.spl_supply_forecast_execution a
            LEFT JOIN public.spl_supply_forecast_execution_execute b
                ON b.supply_forecast_execution_id = a.id
            WHERE a.supply_forecast_execution_status_id = {status};
        """
        # Ejecutar la consulta SQL
        fexsts = pd.read_sql(query, conn)
        return fexsts
    except Exception as e:
        print(f"Error en get_excecution_status: {e}")
        return None
    finally:
        Close_Connection(conn) 

# -----------------------------------------------------------
# 0. Rutinas Locales para la generación de gráficos
# -----------------------------------------------------------
def generar_mini_grafico( folder, name):
    # Recuperar Historial de Ventas
    df_ventas = pd.read_csv(f'{folder}/{name}_Ventas.csv')
    df_ventas['Codigo_Articulo']= df_ventas['Codigo_Articulo'].astype(int)
    df_ventas['Sucursal']= df_ventas['Sucursal'].astype(int)
    df_ventas['Fecha']= pd.to_datetime(df_ventas['Fecha'])

    fecha_maxima = df_ventas["Fecha"].max()
    df_filtrado = df_ventas[df_ventas["Fecha"] >= (fecha_maxima - pd.Timedelta(days=150))].copy()

    # Ventas Semanales
    df_filtrado["Mes"] = df_filtrado["Fecha"].dt.to_period("M").astype(str)
    df_mes = df_filtrado.groupby("Mes")["Unidades"].sum().reset_index()

    # Crear el gráfico compacto
    fig, ax = plt.subplots(figsize=(3, 1))  # Tamaño pequeño para una visualización compacta
    # ax.bar(df_mes["Mes"], df_mes["Unidades"], color=["red", "blue", "green", "orange", "purple", "brown"], alpha=0.7)
    # Usar el índice como secuencia de registros
    ax.bar(range(1, len(df_mes) + 1), df_mes["Unidades"], color=["red", "blue", "green", "orange", "purple", "brown"], alpha=0.7)

    # Eliminar ejes y etiquetas para que sea más compacto
    ax.set_xticks([])
    ax.set_yticks([])
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.spines['left'].set_visible(False)
    ax.spines['bottom'].set_visible(False)
    
    # Mostrar el gráfico
    # plt.tight_layout(rect=[0, 0.03, 1, 0.95])  # Ajustar para no solapar con el título

    # Guardar gráfico en base64
    buffer = BytesIO()
    plt.savefig(buffer, format="png")
    plt.close()
    
    return base64.b64encode(buffer.getvalue()).decode("utf-8")    
    

In [None]:
def publish_excecution_results(df_forecast_ext, forecast_execution_excecute_id, supplier_id):
    print ('Comenzando a grabar el dataframe')
    for _, row in df_forecast_ext.iterrows():
        create_execution_execute_result(
            confidence_level=0.92,  # Valor por defecto ya que no está en df_meged
            error_margin=0.07,  # Valor por defecto
            expected_demand=row['Forecast'],
            average_daily_demand=row['Average'],
            lower_bound=row['Q_DIAS_STOCK'],  # Valor por defecto
            upper_bound=row['Q_VENTA_DIARIA_NORMAL'],  # Valor por defecto
            product_id=row['product_id'],
            site_id=row['site_id'],
            supply_forecast_execution_execute_id=forecast_execution_excecute_id,
            algorithm=row['algoritmo'],
            average=row['Average'],
            ext_product_code=row['Codigo_Articulo'],
            ext_site_code=row['Sucursal'],
            ext_supplier_code=row['id_proveedor'],
            forcast=row['Q_REPONER_INCLUIDO_SOBRE_STOCK'],
            graphic=row['GRAFICO'],
            quantity_stock=row['Q_TRANSF_PEND'],  # Valor por defecto
            sales_last=row['ventas_last'],
            sales_previous=row['ventas_previous'],
            sales_same_year=row['ventas_same_year'],
            supplier_id=supplier_id,
            windows=row['ventana'],
            deliveries_pending=1  # Valor por defecto
        )
    print ('--------------------------------')


def get_precios(id_proveedor):
    conn = Open_Connection()
    query = f"""
        SELECT 
        A.[C_PROVEEDOR_PRIMARIO],
        S.[C_ARTICULO]
        ,S.[C_SUCU_EMPR]
        ,S.[I_PRECIO_VTA]
        ,S.[I_COSTO_ESTADISTICO]
        --,S.[M_HABILITADO_SUCU]
        --,A.M_BAJA                   
        FROM [DIARCOP001].[DiarcoP].[dbo].[T051_ARTICULOS_SUCURSAL] S
        LEFT JOIN [DIARCOP001].[DiarcoP].[dbo].[T050_ARTICULOS] A
            ON A.[C_ARTICULO] = S.[C_ARTICULO]
        
        WHERE S.[M_HABILITADO_SUCU] = 'S' -- Permitido Reponer
            AND A.M_BAJA = 'N'  -- Activo en Maestro Artículos
            AND A.[C_PROVEEDOR_PRIMARIO] = {id_proveedor} -- Solo del Proveedor        
        ORDER BY S.[C_ARTICULO],S.[C_SUCU_EMPR];
    """
    # Ejecutar la consulta SQL
    precios = pd.read_sql(query, conn)
    precios['C_PROVEEDOR_PRIMARIO']= precios['C_PROVEEDOR_PRIMARIO'].astype(int)
    precios['C_ARTICULO']= precios['C_ARTICULO'].astype(int)
    precios['C_SUCU_EMPR']= precios['C_SUCU_EMPR'].astype(int)
    return precios


In [None]:
def actualizar_site_ids(df_forecast_ext, conn, name):
    """Reemplaza site_id en df_forecast_ext con datos válidos desde fnd_site"""
    query = """
    SELECT code, name, id FROM public.fnd_site
    WHERE company_id = 'e7498b2e-2669-473f-ab73-e2c8b4dcc585'
    ORDER BY code 
    """
    stores = pd.read_sql(query, conn)
    stores = stores[pd.to_numeric(stores['code'], errors='coerce').notna()].copy()
    stores['code'] = stores['code'].astype(int)

    # Eliminar site_id anterior si ya existía
    df_forecast_ext = df_forecast_ext.drop(columns=['site_id'], errors='ignore')

    # Merge con los stores para obtener site_id
    df_forecast_ext = df_forecast_ext.merge(
        stores[['code', 'id']],
        left_on='Sucursal',
        right_on='code',
        how='left'
    ).rename(columns={'id': 'site_id'})

    # Validar valores faltantes
    missing = df_forecast_ext[df_forecast_ext['site_id'].isna()]
    if not missing.empty:
        print(f"⚠️ Faltan site_id en {len(missing)} registros")
        missing.to_csv(f"{folder}/{name}_Missing_Site_IDs.csv", index=False)
    else:
        print("✅ Todos los registros tienen site_id válido")

    return df_forecast_ext


# -------------------- RUTINA PRINCIPAL --------------------

# Leer Dataframe de FORECAST EXECUTION LISTOS PARA IMPORTAR A CONNEXA (DE 40 A 50)
fes = get_excecution_excecute_by_status(40)

for index, row in fes[fes["supply_forecast_execution_status_id"] == 40].iterrows():
    algoritmo = row["name"]
    name = algoritmo.split('_ALGO')[0]
    execution_id = row["id"]
    id_proveedor = row["ext_supplier_code"]
    forecast_execution_execute_id = row["forecast_execution_execute_id"]
    supplier_id = row["supplier_id"]
    print(f"Algoritmo: {algoritmo}  - Name: {name} exce_id: {forecast_execution_execute_id} id: Proveedor {id_proveedor}")
    print(f"supplier-id: {supplier_id} ----------------------------------------------------")

    try:
        # Leer forecast extendido
        df_forecast_ext = pd.read_csv(f'{folder}/{algoritmo}_Pronostico_Extendido.csv')
        df_forecast_ext['Codigo_Articulo'] = df_forecast_ext['Codigo_Articulo'].astype(int)
        df_forecast_ext['Sucursal'] = df_forecast_ext['Sucursal'].astype(int)
        df_forecast_ext.fillna(0, inplace=True)
        print(f"-> Datos Recuperados del CACHE: {id_proveedor}, Label: {name}")

        # Agregar site_id desde fnd_site
        conn = Open_Conn_Postgres()
        df_forecast_ext = actualizar_site_ids(df_forecast_ext, conn, name)
        print(f"-> Se actualizaron los site_ids: {id_proveedor}, Label: {name}")

        # Publicar en tabla de resultados
        publish_excecution_results(df_forecast_ext, forecast_execution_execute_id, supplier_id)
        print(f"-> Detalle Forecast Publicado CONNEXA: {id_proveedor}, Label: {name}")

        # Obtener precios y costos
        precio = get_precios(id_proveedor)
        precio['C_ARTICULO'] = precio['C_ARTICULO'].astype(int)
        precio['C_SUCU_EMPR'] = precio['C_SUCU_EMPR'].astype(int)

        # Merge con precios
        df_forecast_ext = df_forecast_ext.merge(
            precio,
            left_on=['Codigo_Articulo', 'Sucursal'],
            right_on=['C_ARTICULO', 'C_SUCU_EMPR'],
            how='left'
        )

        # Cálculo de métricas
        df_forecast_ext['Forecast_VENTA'] = (df_forecast_ext['Forecast'] * df_forecast_ext['I_PRECIO_VTA'] / 1000).round(2)
        df_forecast_ext['Forecast_COSTO'] = (df_forecast_ext['Forecast'] * df_forecast_ext['I_COSTO_ESTADISTICO'] / 1000).round(2)
        df_forecast_ext['MARGEN'] = (df_forecast_ext['Forecast_VENTA'] - df_forecast_ext['Forecast_COSTO']).round(2)

        # Guardar CSV actualizado
        file_path = f"{folder}/{algoritmo}_Pronostico_Extendido.csv"
        df_forecast_ext.to_csv(file_path, index=False)
        print(f"Archivo guardado: {file_path}")

        # Totales
        total_venta = df_forecast_ext['Forecast_VENTA'].sum()
        total_costo = df_forecast_ext['Forecast_COSTO'].sum()
        total_margen = df_forecast_ext['MARGEN'].sum()

        # Mini gráfico
        mini_grafico = generar_mini_grafico(folder, name)

        # Marcar como procesado
        fes.at[index, "supply_forecast_execution_status_id"] = 50

        # Actualizar en base de datos
        update_execution(
            execution_id,
            supply_forecast_execution_status_id=50,
            monthly_sales_in_millions=total_venta,
            monthly_purchases_in_millions=total_costo,
            monthly_net_margin_in_millions=total_margen,
            graphic=mini_grafico
        )
        print(f"✅ Estado actualizado a 50 para {execution_id}")

    except Exception as e:
        import traceback
        traceback.print_exc()
        print(f"❌ Error procesando {name}: {e}")


In [None]:
# RUTINA PRINCIPAL RECORRE FORECAST EXCEC con STATUS 40 y pasa a 50

# Filtrar registros con supply_forecast_execution_status_id = 40 Graficos Generados
for index, row in fes[fes["supply_forecast_execution_status_id"] == 40].iterrows():
    algoritmo = row["name"]
    name = algoritmo.split('_ALGO')[0]
    execution_id = row["id"]
    id_proveedor = row["ext_supplier_code"]
    forecast_execution_excecute_id = row['forecast_execution_execute_id']
    supplier_id = row['supplier_id']
    print("Algoritmo: " + algoritmo + "  - Name: " + name + " exce_id:" + str(forecast_execution_excecute_id) + " id: Proveedor "+id_proveedor)
    print("supplier-id: " + str(supplier_id) + "----------------------------------------------------")
    
    try:        
        # Recuperar Solicitudes de Compra Extended
        df_forecast_ext = pd.read_csv(f'{folder}/{algoritmo}_Pronostico_Extendido.csv')
        df_forecast_ext['Codigo_Articulo']= df_forecast_ext['Codigo_Articulo'].astype(int)
        df_forecast_ext['Sucursal']= df_forecast_ext['Sucursal'].astype(int)
        df_forecast_ext.fillna(0)   # Por si se filtró algún missing value
        print(f"-> Datos Recuperados del CACHE: {id_proveedor}, Label: {name}")
        
        # Agregar site_id desde fnd_site
        conn = Open_Conn_Postgres()
        df_forecast_ext = actualizar_site_ids(df_forecast_ext, conn, name)
        print(f"-> Se actualizaron los site_ids: {id_proveedor}, Label: {name}")
        
        print("❗Filas con site_id inválido:", df_forecast_ext['site_id'].isna().sum())
        print("❗Filas con product_id inválido:", df_forecast_ext['product_id'].isna().sum())
        
        # Publicar Resultados en execut_result
        publish_excecution_results(df_forecast_ext, forecast_execution_excecute_id, supplier_id)
        print(f"-> Detalle Forecast Publicado CONNEXA: {id_proveedor}, Label: {name}")
        
        # Recuperar PRECIOS y COSTOS
        precio = get_precios(id_proveedor)    
        precio['C_ARTICULO']= precio['C_ARTICULO'].astype(int)
        precio['C_SUCU_EMPR']= precio['C_SUCU_EMPR'].astype(int)
        
        # UNIR para CALCULAR Datos Adicionales
        df_forecast_ext = pd.merge(
            df_forecast_ext,  # DataFrame de artículos
            precio,    # DataFrame de precios
            left_on =['Codigo_Articulo', 'Sucursal'],  # Claves en 'forecast'
            right_on=['C_ARTICULO', 'C_SUCU_EMPR'],  # Claves en 'precios'
            how='left'  # Solo traer los productos que están en 'forecast'
        )
        df_forecast_ext['Forecast_VENTA'] = (df_forecast_ext['Forecast'] * df_forecast_ext['I_PRECIO_VTA'] / 1000).round(2)
        df_forecast_ext['Forecast_COSTO'] = (df_forecast_ext['Forecast'] * df_forecast_ext['I_COSTO_ESTADISTICO'] / 1000).round(2)
        df_forecast_ext['MARGEN'] = (df_forecast_ext['Forecast_VENTA'] - df_forecast_ext['Forecast_COSTO'] / 1000).round(2)
        
        # Guardar el archivo CSV
        file_path = f"{folder}/{algoritmo}_Pronostico_Extendido.csv"
        df_forecast_ext.to_csv(file_path, index=False)
        print(f"Archivo guardado: {file_path}")
        
        # Calcular las sumatorias totales de Forecast_VENTA, Forecast_COSTO y MARGEN
        total_venta = df_forecast_ext['Forecast_VENTA'].sum()
        total_costo = df_forecast_ext['Forecast_COSTO'].sum()
        total_margen = df_forecast_ext['MARGEN'].sum()
        
        mini_grafico = generar_mini_grafico(folder, name)
        
        # Actualizar el status_id a 40 en el DataFrame original
        fes.at[index, "supply_forecast_execution_status_id"] = 50
        # ✅ Actualizar directamente en la base de datos el estado a 50
        
        # Llamar a la función update_execution con los valores calculados
        update_execution(
            execution_id, 
            supply_forecast_execution_status_id=50, 
            monthly_sales_in_millions=total_venta, 
            monthly_purchases_in_millions=total_costo,
            monthly_net_margin_in_millions = total_margen,
            graphic = mini_grafico
        )        
        print(f"Estado actualizado a 50 para {execution_id}")

    except Exception as e:
        print(f"Error procesando {name}: {e}")

#### SANDBOX para Pruebas Manuales

In [None]:
print("❗Filas con site_id inválido:", df_forecast_ext['site_id'].isna().sum())
print("❗Filas con product_id inválido:", df_forecast_ext['product_id'].isna().sum())

In [None]:
conn = Open_Conn_Postgres()
query = """
SELECT code, name, id FROM public.fnd_site
WHERE company_id = 'e7498b2e-2669-473f-ab73-e2c8b4dcc585'
ORDER BY code 
"""

stores = pd.read_sql(query, conn)
stores = stores[pd.to_numeric(stores['code'], errors='coerce').notna()].copy()
stores['code'] = stores['code'].astype(int)

In [None]:
# Eliminar site_id anterior si ya existía
df_forecast_ext = df_forecast_ext.drop(columns=['site_id'], errors='ignore')

In [None]:


# Merge con los stores para obtener site_id
df_forecast_ext = df_forecast_ext.merge(
    stores[['code', 'id']],
    left_on='Sucursal',
    right_on='code',
    how='left'
).rename(columns={'id': 'site_id'})


In [None]:

df_forecast_ext = actualizar_site_ids(df_forecast_ext, conn, name)

In [None]:
id_proveedor = '140'
name= '140_UNILEVER'
forecast_execution_id ='a85ef5dd-b12a-4a27-a4b5-80b78d10cdf7'
forecast_execution_excecute_id ='a97250ed-3d31-4338-b493-659ebcb96691'
supplier_id = 'f603814e-c2d8-49b2-8738-24f9cc0a7e89'
algoritmo = '140_UNILEVER_ALGO_05'


df_forecast_ext = pd.read_csv(f'{folder}/{algoritmo}_Pronostico_Extendido.csv')
df_forecast_ext['Codigo_Articulo']= df_forecast_ext['Codigo_Articulo'].astype(int)
df_forecast_ext['Sucursal']= df_forecast_ext['Sucursal'].astype(int)
df_forecast_ext.fillna(0)   # Por si se filtró algún missing value


# Mostrar la tabla con los gráficos en base64

tools.display_dataframe_to_user(name="Forecast Extendido", dataframe=df_forecast_ext)

In [None]:
df_forecast_ext.info()

In [None]:

# Publicar Resultados en execut_result
publish_excecution_results(df_forecast_ext, forecast_execution_excecute_id, supplier_id)
print(f"-> Detalle Forecast Publicado CONNEXA: {id_proveedor}, Label: {name}")

In [None]:
conn = Open_Connection()

In [None]:
 # Recuperar PRECIOS y COSTOS
precio = get_precios(id_proveedor)    
precio['C_ARTICULO']= precio['C_ARTICULO'].astype(int)
precio['C_SUCU_EMPR']= precio['C_SUCU_EMPR'].astype(int)

In [None]:
df_forecast_ext = pd.merge(
    df_forecast_ext,  # DataFrame de artículos
    precio,    # DataFrame de precios
    left_on =['Codigo_Articulo', 'Sucursal'],  # Claves en 'forecast'
    right_on=['C_ARTICULO', 'C_SUCU_EMPR'],  # Claves en 'precios'
    how='left'  # Solo traer los productos que están en 'forecast'
)
df_forecast_ext['TOT_VENTA'] = (df_forecast_ext['Forecast'] * df_forecast_ext['I_PRECIO_VTA'] / 1000).round(2)
df_forecast_ext['TOT_COSTO'] = (df_forecast_ext['Forecast'] * df_forecast_ext['I_COSTO_ESTADISTICO'] / 1000).round(2)
df_forecast_ext['MARGEN'] = (df_forecast_ext['TOT_VENTA'] - df_forecast_ext['TOT_COSTO'] / 1000).round(2)

In [None]:
# Recuperar Solicitudes de Compra Extended
df_suc_faltantes = pd.read_csv(f'{folder}/Sucursales_Faltantes.csv', delimiter=";", dtype=str)
df_suc_faltantes.columns = df_suc_faltantes.columns.str.strip()  # Elimina espacios adicionales




In [None]:
# Generar el INSERT dinámicamente
insert_queries = []
for _, row in df_suc_faltantes.iterrows():
    query = f"""
        INSERT INTO public.fnd_site(
            id, address, latitude, longitude, name, "timestamp", type, code, company_id
        ) VALUES (
            gen_random_uuid(), 
            '{row['address']}', 
            {row['latitude'] if row['latitude'] else 'NULL'}, 
            {row['longitude'] if row['longitude'] else 'NULL'}, 
            '{row['name']}', 
            '{row['fecha_hora']}', 
            '{row['type']}', 
            {row['code']}, 
            '{row['company_id']}'
        );
    """
    insert_queries.append(query)

# Guardar los inserts en un archivo SQL para ejecutarlo luego
sql_file_path = f"{folder}/insert_sucursales.sql"
with open(sql_file_path, "w", encoding="utf-8") as file:
    file.writelines("\n".join(insert_queries))

print(f"Archivo SQL generado en: {sql_file_path}")


In [None]:
import psycopg2

def ejecutar_inserts(sql_file):
    conn = None
    try:
        # Conectar a la base de datos
        conn = Open_Conn_Postgres()
        cur = conn.cursor()

        # Leer el archivo SQL y ejecutarlo
        with open(sql_file, "r", encoding="utf-8") as f:
            sql_script = f.read()
        
        cur.execute(sql_script)
        conn.commit()
        cur.close()
        print("Sucursales insertadas correctamente en la base de datos.")

    except Exception as e:
        print(f"Error ejecutando el insert: {e}")
        if conn:
            conn.rollback()
    finally:
        if conn:
            conn.close()

# Ejecutar los inserts
ejecutar_inserts(sql_file_path)


In [None]:
df_suc_faltantes.info()

In [None]:
df_ventas = pd.read_csv(f'{folder}/{name}_Ventas.csv')
df_ventas['Codigo_Articulo']= df_ventas['Codigo_Articulo'].astype(int)
df_ventas['Sucursal']= df_ventas['Sucursal'].astype(int)
df_ventas['Fecha']= pd.to_datetime(df_ventas['Fecha'])

fecha_maxima = df_ventas["Fecha"].max()
df_filtrado = df_ventas[df_ventas["Fecha"] >= (fecha_maxima - pd.Timedelta(days=150))]

# Ventas Semanales
df_filtrado["Mes"] = df_filtrado["Fecha"].dt.to_period("M").astype(str)
df_mes = df_filtrado.groupby("Mes")["Unidades"].sum().reset_index()

# Crear el gráfico compacto
fig, ax = plt.subplots(figsize=(3, 1))  # Tamaño pequeño para una visualización compacta
# ax.bar(df_mes["Mes"], df_mes["Unidades"], color=["red", "blue", "green", "orange", "purple", "brown"], alpha=0.7)
# Usar el índice como secuencia de registros
ax.bar(range(1, len(df_mes) + 1), df_mes["Unidades"], color=["red", "blue", "green", "orange", "purple", "brown"], alpha=0.7)

# Eliminar ejes y etiquetas para que sea más compacto
ax.set_xticks([])
ax.set_yticks([])
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.spines['bottom'].set_visible(False)

plt.show()

In [None]:
precios = get_precios(id_proveedor)

# Mostrar la tabla con los gráficos en base64

tools.display_dataframe_to_user(name="Precios Proveedor", dataframe=precios)

In [None]:
from fpdf import FPDF
import os

# Lista de archivos .md a consolidar en un único PDF
md_files = [
    "./00_SISTEMA_FORECAST_OVERVIEW.md",
    "./S10_GENERAR_FORECAST_Planificado.md",
    "./S20_GENERA_Forecast_Extendido.md",
    "./S30_GENERA_Grafico_Detalle.md",
    "./S40_SUBIR_Forecast_Connexa.md",
    "./funciones_forecast.md"
]

# Crear el PDF
pdf = FPDF()
pdf.set_auto_page_break(auto=True, margin=15)
pdf.set_font("Arial", size=11)

# Agregar el contenido de cada archivo .md
for file_path in md_files:
    if os.path.exists(file_path):
        pdf.add_page()
        with open(file_path, "r", encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                line = line.strip("\n").replace("\t", "    ")
                pdf.multi_cell(0, 8, line)

# Guardar el documento PDF resultante
pdf_output_path = "./Documentacion_Forecast_Zeetrex.pdf"
pdf.output(pdf_output_path)

pdf_output_path


In [None]:
# Verificar el contenido de cada archivo .md antes de consolidar en PDF
import os
import pandas as pd

contenido_md = {}
for file_path in [
    "./00_SISTEMA_FORECAST_OVERVIEW.md",
    "./S10_GENERAR_FORECAST_Planificado.md",
    "./S20_GENERA_Forecast_Extendido.md",
    "./S30_GENERA_Grafico_Detalle.md",
    "./S40_SUBIR_Forecast_Connexa.md",
    "./funciones_forecast.md"
]:
    if os.path.exists(file_path):
        with open(file_path, "r", encoding="utf-8") as f:
            contenido_md[file_path] = f.read()

import ace_tools_open as tools

tools.display_dataframe_to_user(name="Contenido de Archivos Markdown", dataframe=pd.DataFrame.from_dict(contenido_md, orient="index", columns=["Contenido"]))

In [None]:
fes = get_excecution_by_status(40)
tools.display_dataframe_to_user(name="Contenido de Archivos Markdown", dataframe=fes)

In [15]:
import os
import shutil
def mover_archivos_procesados(algoritmo, folder):    # Movel a procesado los archivos.
    destino = os.path.join(folder, "procesado")
    os.makedirs(destino, exist_ok=True)  # Crea la carpeta si no existe

    for archivo in os.listdir(folder):
        if archivo.startswith(algoritmo):
            origen = os.path.join(folder, archivo)
            destino_final = os.path.join(destino, archivo)
            shutil.move(origen, destino_final)
            print(f"📁 Archivo movido: {archivo} → {destino_final}")

In [14]:
mover_archivos_procesados(algoritmo, folder)


📁 Archivo movido: 140_UNILEVER_ALGO_05_Pronostico_Extendido.csv → data\procesado\140_UNILEVER_ALGO_05_Pronostico_Extendido.csv
📁 Archivo movido: 140_UNILEVER_ALGO_05_Solicitudes_Compra.csv → data\procesado\140_UNILEVER_ALGO_05_Solicitudes_Compra.csv


# EJECUTABLE PYTHON

In [16]:

# Solo importar lo necesario desde el módulo de funciones
from funciones_forecast import (
    Open_Conn_Postgres,
    Close_Connection,
    get_excecution_by_status,
    Open_Postgres_retry,
    mover_archivos_procesados,
    actualizar_site_ids,
    get_precios,
    get_excecution_excecute_by_status,
    update_execution,
    create_execution_execute_result,
    generar_mini_grafico,
    generar_grafico_base64
)

import pandas as pd # uso localmente la lectura de archivos.
import ace_tools_open as tools

from dotenv import dotenv_values
secrets = dotenv_values(".env")
folder = secrets["FOLDER_DATOS"]

# También podés importar funciones adicionales si tu módulo las necesita

def publish_excecution_results(df_forecast_ext, forecast_execution_excecute_id, supplier_id):
    print ('Comenzando a grabar el dataframe')
    for _, row in df_forecast_ext.iterrows():
        create_execution_execute_result(
            confidence_level=0.92,  # Valor por defecto ya que no está en df_meged
            error_margin=0.07,  # Valor por defecto
            expected_demand=row['Forecast'],
            average_daily_demand=row['Average'],
            lower_bound=row['Q_DIAS_STOCK'],  # Valor por defecto
            upper_bound=row['Q_VENTA_DIARIA_NORMAL'],  # Valor por defecto
            product_id=row['product_id'],
            site_id=row['site_id'],
            supply_forecast_execution_execute_id=forecast_execution_excecute_id,
            algorithm=row['algoritmo'],
            average=row['Average'],
            ext_product_code=row['Codigo_Articulo'],
            ext_site_code=row['Sucursal'],
            ext_supplier_code=row['id_proveedor'],
            forcast=row['Q_REPONER_INCLUIDO_SOBRE_STOCK'],
            graphic=row['GRAFICO'],
            quantity_stock=row['Q_TRANSF_PEND'],  # Valor por defecto
            sales_last=row['ventas_last'],
            sales_previous=row['ventas_previous'],
            sales_same_year=row['ventas_same_year'],
            supplier_id=supplier_id,
            windows=row['ventana'],
            deliveries_pending=1  # Valor por defecto
        )
    print ('--------------------------------')

def actualizar_site_ids(df_forecast_ext, conn, name):
    """Reemplaza site_id en df_forecast_ext con datos válidos desde fnd_site"""
    query = """
    SELECT code, name, id FROM public.fnd_site
    WHERE company_id = 'e7498b2e-2669-473f-ab73-e2c8b4dcc585'
    ORDER BY code 
    """
    stores = pd.read_sql(query, conn)
    stores = stores[pd.to_numeric(stores['code'], errors='coerce').notna()].copy()
    stores['code'] = stores['code'].astype(int)

    # Eliminar site_id anterior si ya existía
    df_forecast_ext = df_forecast_ext.drop(columns=['site_id'], errors='ignore')

    # Merge con los stores para obtener site_id
    df_forecast_ext = df_forecast_ext.merge(
        stores[['code', 'id']],
        left_on='Sucursal',
        right_on='code',
        how='left'
    ).rename(columns={'id': 'site_id'})

    # Validar valores faltantes
    missing = df_forecast_ext[df_forecast_ext['site_id'].isna()]
    if not missing.empty:
        print(f"⚠️ Faltan site_id en {len(missing)} registros")
        missing.to_csv(f"{folder}/{name}_Missing_Site_IDs.csv", index=False)
    else:
        print("✅ Todos los registros tienen site_id válido")

    return df_forecast_ext

def insertar_graficos_forecast(algoritmo, name, id_proveedor):
        
    # Recuperar Historial de Ventas
    df_ventas = pd.read_csv(f'{folder}/{name}_Ventas.csv')
    df_ventas['Codigo_Articulo']= df_ventas['Codigo_Articulo'].astype(int)
    df_ventas['Sucursal']= df_ventas['Sucursal'].astype(int)
    df_ventas['Fecha']= pd.to_datetime(df_ventas['Fecha'])

    # Recuperando Forecast Calculado
    df_forecast = pd.read_csv(f'{folder}/{algoritmo}_Solicitudes_Compra.csv')
    df_forecast.fillna(0)   # Por si se filtró algún missing value
    print(f"-> Datos Recuperados del CACHE: {id_proveedor}, Label: {name}")
    
    # Agregar la nueva columna de gráficos en df_forecast Iterando sobre todo el DATAFRAME
    df_forecast["GRAFICO"] = df_forecast.apply(
        lambda row: generar_grafico_base64(df_ventas, row["Codigo_Articulo"], row["Sucursal"], row["Forecast"], row["Average"], row["ventas_last"], row["ventas_previous"], row["ventas_same_year"]) if not pd.isna(row["Codigo_Articulo"]) and not pd.isna(row["Sucursal"]) else None,
        axis=1
    )
    
    return df_forecast

ImportError: cannot import name 'mover_archivos_procesados' from 'funciones_forecast' (e:\PY\DEMANDA\funciones_forecast.py)

In [None]:

# Punto de entrada
if __name__ == "__main__":

    # Leer Dataframe de FORECAST EXECUTION LISTOS PARA IMPORTAR A CONNEXA (DE 40 A 50)
    fes = get_excecution_excecute_by_status(40)
    
    for index, row in fes[fes["supply_forecast_execution_status_id"] == 40].iterrows():
        algoritmo = row["name"]
        name = algoritmo.split('_ALGO')[0]
        execution_id = row["id"]
        id_proveedor = row["ext_supplier_code"]
        forecast_execution_execute_id = row["forecast_execution_execute_id"]
        supplier_id = row["supplier_id"]

        print(f"Algoritmo: {algoritmo}  - Name: {name} exce_id: {forecast_execution_execute_id} id: Proveedor {id_proveedor}")
        print(f"supplier-id: {supplier_id} ----------------------------------------------------")

        try:
            # Leer forecast extendido
            df_forecast_ext = pd.read_csv(f'{folder}/{algoritmo}_Pronostico_Extendido.csv')
            df_forecast_ext['Codigo_Articulo'] = df_forecast_ext['Codigo_Articulo'].astype(int)
            df_forecast_ext['Sucursal'] = df_forecast_ext['Sucursal'].astype(int)
            df_forecast_ext.fillna(0, inplace=True)
            print(f"-> Datos Recuperados del CACHE: {id_proveedor}, Label: {name}")            
            print("❗Filas con site_id inválido:", df_forecast_ext['site_id'].isna().sum())
            print("❗Filas con product_id inválido:", df_forecast_ext['product_id'].isna().sum())

            # Agregar site_id desde fnd_site
            conn = Open_Conn_Postgres()
            df_forecast_ext = actualizar_site_ids(df_forecast_ext, conn, name)
            print(f"-> Se actualizaron los site_ids: {id_proveedor}, Label: {name}")
            
            # Hacer merge solo si no existen las columnas de precios y costos
            if 'I_PRECIO_VTA' not in df_forecast_ext.columns or 'I_COSTO_ESTADISTICO' not in df_forecast_ext.columns:
                print(f"❌ ERROR: Falta la columna requerida '{col}' procedemos a actualizar {id_proveedor}")
                precio = get_precios(id_proveedor)
                precio['C_ARTICULO'] = precio['C_ARTICULO'].astype(int)
                precio['C_SUCU_EMPR'] = precio['C_SUCU_EMPR'].astype(int)

                df_forecast_ext = df_forecast_ext.merge(
                    precio,
                    left_on=['Codigo_Articulo', 'Sucursal'],
                    right_on=['C_ARTICULO', 'C_SUCU_EMPR'],
                    how='left'
                )
            else:
                print(f"⚠️ El DataFrame ya contiene precios y costos. Merge evitado para {id_proveedor}")
            
            # Verificar columnas necesarias después del merge
            columnas_requeridas = ['I_PRECIO_VTA', 'I_COSTO_ESTADISTICO']
            for col in columnas_requeridas:
                if col not in df_forecast_ext.columns:
                    print(f"❌ ERROR: Falta la columna requerida '{col}' en df_forecast_ext para el proveedor {id_proveedor}")
                    df_forecast_ext.to_csv(f"{folder}/{algoritmo}_ERROR_MERGE.csv", index=False)
                    raise ValueError(f"Column '{col}' missing in df_forecast_ext. No se puede continuar.")

            # Cálculo de métricas x Línea en miles
            df_forecast_ext['Forecast_VENTA'] = (df_forecast_ext['Forecast'] * df_forecast_ext['I_PRECIO_VTA'] / 1000).round(2)
            df_forecast_ext['Forecast_COSTO'] = (df_forecast_ext['Forecast'] * df_forecast_ext['I_COSTO_ESTADISTICO'] / 1000).round(2)
            df_forecast_ext['MARGEN'] = (df_forecast_ext['Forecast_VENTA'] - df_forecast_ext['Forecast_COSTO'])

            # Guardar CSV actualizado
            file_path = f"{folder}/{algoritmo}_Pronostico_Extendido.csv"
            df_forecast_ext.to_csv(file_path, index=False)
            print(f"Archivo guardado: {file_path}")
            
            # Asegurar que los valores son del tipo float (nativo de Python)
            total_venta = float(round(df_forecast_ext['Forecast_VENTA'].sum() / 1000, 2))
            total_costo = float(round(df_forecast_ext['Forecast_COSTO'].sum() / 1000, 2))
            total_margen = float(round(df_forecast_ext['MARGEN'].sum() / 1000, 2))

            # Mini gráfico
            mini_grafico = generar_mini_grafico(folder, name)

            # Actualizar en base de datos
            update_execution(
                execution_id,
                supply_forecast_execution_status_id=45,
                monthly_sales_in_millions=total_venta,
                monthly_purchases_in_millions=total_costo,
                monthly_net_margin_in_millions=total_margen,
                graphic=mini_grafico
            )
            
            # Publicar en tabla de resultados
            publish_excecution_results(df_forecast_ext, forecast_execution_execute_id, supplier_id)
            print(f"-> Detalle Forecast Publicado CONNEXA: {id_proveedor}, Label: {name}")
                        
            # ✅ Actualizar Estado intermedio de Procesamiento....
            update_execution(execution_id, supply_forecast_execution_status_id=50)
            print(f"✅ Estado actualizado a 50 para {execution_id}")
            
            # Mover archivos a Procesado
            mover_archivos_procesados(algoritmo, folder)


        except Exception as e:
            import traceback
            traceback.print_exc()
            print(f"❌ Error procesando {name}: {e}")
