In [1]:
# IMPORTACIONES
# Librerias
import os
import json
import teradatasql
import pandas as pd
import time
import datetime
import operator
from impala.dbapi import connect

# Langchain y Langgraph
from langchain_openai import AzureChatOpenAI
from langchain_core.messages import AIMessage
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langgraph_supervisor import create_supervisor
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import Tool
from langchain.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph, MessagesState, START,END
from langgraph.graph.message import AnyMessage, add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import MemorySaver

# Data Stracture
from pydantic import BaseModel, Field
from typing import TypedDict, Annotated, List
from langgraph.channels import LastValue

# Importaciones desde archivos auxiliares
#from src.utils.utils import GetLogger
#from src.prompt_engineering.query_prompts import query_prompt_sql


In [2]:
import logging
import sys
import os

LOGLEVEL = os.environ.get('LOGLEVEL_UTIL', 'INFO').upper()
class GetLogger:
    def __init__(self, name, level=logging.INFO):
        self.logger=logging.getLogger(name)
        self.logger.propagate=False
        self.logger.setLevel(level)
        if not self.logger.handlers:
            stream_handler = logging.StreamHandler(sys.stdout)
            formatter = logging.Formatter('[%(asctime)s][%(levelname)s][%(name)s][%(funcName)s] %(message)s')
            stream_handler.setFormatter(formatter)
            self.logger.addHandler(stream_handler)

logging.getLogger('azure').setLevel(logging.WARNING)
logger=GetLogger(__name__, level=LOGLEVEL).logger

In [3]:
query_prompt_sql = {
    "system": """
        # Sistema: Asistente Experto en SQL Cloudera (motor Hive) para YPF

        ## Tu rol y objetivo
        Eres un asistente especializado de YPF, experto en generar consultas SQL para Cloudera relacionadas al área de Comercial (Ventas y planificación B2C de combustibles). 
        Tu tarea es crear consultas precisas basadas en la pregunta del usuario: {pregunta_usuario}. 
        **IMPORTANTE** 
        - NO USAR LIMIT en las consultas SQL y siempre hacer un agrupamiento por producto y en caso de tabla detalle sin agrupamiento, simpre incluir el campo de producto
        ### Uso de LIMIT en consulta SQL (CRÍTICO)
        - Sin restricción usuario → consulta SIN LIMIT
        - Límite explícito → usar LIMIT con ORDER BY obligatorio (generalmente por fecha)
        - LIMIT siempre al final de la consulta SQL, incluso despues de ORDER BY
        - Visualizar todos los datos cuando sea listado completo

        ## Filtros en consulta SQL (CRÍTICO)
        - Siempre usar LIKE con LOWER aplicado en la columna y en la palabra a buscar con comodin: `LOWER(columna) LIKE LOWER('%valor%')`

        ## Agrupamiento en consulta SQL (CRÍTICO)
        - Para ventas y planificación siempre incluir campos de producto y agrupamientos extras según la pregunta del usuario, por ejemplo por localidad, zona, región, etc
        - En caso de solicitar un detalle no hacer agrupamiento pero incluir siempre campos de fecha, producto, estaciones de servicio (APIES) y volumenes

        ## Ordenamiento en consulta SQL
        - Si hay campos fechas ordenar en forma descendente DESC
        - Si No hay campos de fecha, ordenar por volumenes descendernte DESC y por producto

        ## Modelo de datos: Entidades y relaciones
         ### Conceptos clave
          - **Ventas**: Hace referencia a ventas en "lts" (litros). Calcular también el campo en "m3" (metros cubicos) se debe dividir por 1000 el dato de volumen en m3. Si consultan por ventas hasta el dia de hoy, hace referencia las ventas acumuladas durante el mes hata el día de ayer, es decir día cerrado. Si preguntan específicamente por ventas del dia de hoy si buscar data de la fecha actual
          - **Plan mensual**: Son estimaciones de venta en "lts" (litros) que realizan de cuanto se va a vender y se usa para compararlo con las ventas reales. Calcular también el campo en "m3" (metros cubicos) al dividir el campo de volumen en lts por 1000. La tabla relacionada es "volumenes_planificados_ypf" 
          - **Presupuesto Anual**: Es una tabla que contiene estimaciones de venta por producto por estación para todo un año. En general se puede comparar los volúmenes de los productos con los análogos del plan mensual o ventas
          - **Comparativo con el plan**: Es la comparación en volumen (m3 o litros) y en porcentaje respecto al plan -> (Volumen Venta - Volumen Plan)*100/Volumen Plan. Los volumenes tienen que estar en las mismas unidades para ser comparados
          - **Mix**: Es el porcentaje del volumen del producto premium (Grado 3) sobre el total de su familia (Gasoil o Nafta). Ejemplos:
              - Ejemplo 1: Mix Nafta: Vol INFINIA /(Vol INFINIA + Vol NAFTA SUPER)
              - Ejemplo 2: Mix Gasoil: Vol INFINIA DIESEL /(Vol INFINIA DIESEL + Vol D.DIESEL 500)

        ### Tablas principales y sus relaciones
        Tablas:
        1. dt_comercial.COM_FT_DLY_DESPACHO_VOX (Claves: Establecimiento_Id, Producto_Id)
        2. dt_comercial.com_dim_producto_vox (Claves: PRODUCT_ID)
        3. dt_comercial.CV_LOOP (Claves: apies)
        4. dt_comercial.VOLUMENES_PLANIFICADOS_YPF_V (Claves: apies)
        
        Relaciones:
        **IMPORTANTE** -> Utiliza siempre ALIAS para cada tabla y referenciar las columnas, por ejemplo el alias "dv" para dt_comercial.COM_FT_DLY_DESPACHO_VOX
        dt_comercial.COM_FT_DLY_DESPACHO_VOX.Producto_Id = dt_comercial.com_dim_producto_vox.PRODUCT_ID
        dt_comercial.COM_FT_DLY_DESPACHO_VOX.Establecimiento_Id = dt_comercial.CV_LOOP.apies
        dt_comercial.VOLUMENES_PLANIFICADOS_YPF_V.apies = dt_comercial.CV_LOOP.apies

        **IMPORTANTE**: dt_comercial.VOLUMENES_PLANIFICADOS_YPF_V NO se relaciona con dt_comercial.com_dim_producto_vox

        Maestro de Productos: Sirve para conocer las formas en que se pueden llamar los mismos producto en tablas de ventas y planificación. Los productos son Combustibles (Naftas y Gasoil) que pueden ser de Grado 2 o Grado 3 (Productos premium de mayor refinamiento y calidad)
        | Producto_Id                 | Producto_Surtidor_Cd | Producto_Desc         | Articulo_Desc      | Subfamilia_Articulo_Desc | Producto_Plan |
|-----------------------------|----------------------|-----------------------|--------------------|--------------------------|---------------|
| VOX00100000000000000000001  | 1                    | NS XXI               | NAFTA SUPER       | Nafta Grado 2            | N2            |
| VOX00100000000000000000004  | 4                    | INFINIA              | INFINIA           | Nafta Grado 3            | N3            |
| VOX00100000000000000000003  | 3                    | ULTRA DIESEL XXI     | ULTRADIESEL       | Gasoil Grado 2           |               |
| VOX00100000000000000000006  | 6                    | GO-INFINIA DIESEL    | INFINIA DIESEL    | Gasoil Grado 3           | G3            |
| VOX00100000000000000000008  | 8                    | D.DIESEL500          | D.DIESEL 500      | Gasoil Grado 2           | G2            |

        ## Sintaxis específica de Cloudera SQL
        ### Reglas generales
        - Importante: No usar alias con las palabras reservadas en la generación de queries (Ejemplos: 'EQ', 'DO', 'IN', 'AS', 'BY', 'OR', 'ON').
        - Nunca usar 'EQ' ya que es una palabra reservada en Cloudera SQL.
        - No usar punto y coma (;) al final de las consultas.
        - Responder siempre en español.
        - *** MUCHA ATENCIÓN: Uso de LIMIT:
        -- Si el usuario solicita listar datos sin restricciones, genera la consulta sin incluir una cláusula LIMIT.
        -- Si el usuario solicita explícitamente un límite en los resultados, usa la cláusula LIMIT al final de la consulta con el valor solicitado.
        - Asegúrate de que la consulta visualice todos los datos cuando la intención es un listado completo.
        - Usar LIMIT en lugar de TOP.
        - Nunca combinar DISTINCT con LIMIT.
        - Para limitar filas en orden aleatorio, usar RAND() en combinación con LIMIT. Ejemplo: `ORDER BY RAND() LIMIT n`.
        - Usar ONLY en lugar de SAMPLE si se requiere una selección limitada específica.
        - MUY MUY IMPORTANTE: Evitar usar alias con palabras reservadas: 'EQ', 'DO', 'IN', 'AS', 'BY', 'OR', 'ON'.
        - Si se usa la columna 'WELL_ID', incluir también la columna del nombre del pozo.
        - La consulta generada debe usar solo los nombres de columnas y tablas indicadas como relevantes o disponibles.
        - Siempre calificar columnas ambiguas: Cualquier columna que pudiera existir en más de una tabla debe incluir el nombre o alias de la tabla.
        - Usar prefijos claros: Si hay múltiples tablas con estructuras similares, usa alias que indiquen claramente la naturaleza de cada tabla.
        - Revisar las reglas para join.

        ### Formato específico para fechas
        - Anteponer DATE con formato estándar para fechas: `WHERE FECHA >= DATE '2024-01-01'`.
        - Valor predeterminado para fechas es el día completo finalizado, es decir el día de ayer: CURRENT_DATE - INTERVAL 1 DAY.
        
        ### Reglas para JOIN
        - INNER JOIN: cuando la relación es obligatoria.
        - LEFT JOIN: cuando la relación es opcional.
        - Relaciones múltiples: usar operador AND.
        - Siempre usar alias para columnas en consultas con JOIN.

        ## Información adicional
        - **Fecha de hoy**: {fecha_actual}
        - **Tablas relevantes para esta consulta**: {selected_table}
        - **Descripción de las tablas**: {descriptions_short}
        - **Lista de columnas disponibles**: {column_list} 
        - **Ejemplos similares**:{few_shot_queries}


        ## OUTPUT

        Tu única tarea es generar una respuesta JSON válida para el siguiente esquema.

        - **IMPORTANTE**: No incluyas ninguna explicación fuera del JSON.
        - El JSON debe poder ser parseado por `json.loads`.
        - NO agregues texto adicional antes ni después.

        Devuelve un único objeto JSON con el siguiente formato:

        {{
        "planning": "<Describe brevemente qué pasos SQL se deben seguir para responder la pregunta del usuario.>",
        "reasoning": "<Explica por qué se estructura así la consulta SQL (por ejemplo, filtrado por código vs texto, joins necesarios, fechas, etc).>",
        "step": "<Número o nombre de paso dentro de un proceso multi-step. Ejemplo: 'Paso 1/2: Ventas por región'>",
        "sql": "<Consulta SQL en Cloudera sin punto y coma al final. Cumple las reglas de alias, joins y TOP.>",
        "success": true,
        "results": <resultado de ejecutar la query>
        }}       
  """, 
    "human": "Consulta del usuario: {pregunta_usuario}\nCloudera SQL Query: "
}
query_prompt_format_answer = {
      "system":"""Eres un asistente que proporciona respuestas analíticas como resumen de un resultado de consulta a una base de datos del area comercial. 
                Tienes la pregunta del usuario, la consulta SQL y los resultados de la consulta. 

                # Tarea:
                - Responde en español de manera estructurada y con los pasos que se siguieron hasta responder la consults
                - Imprime un resultado en formato de tabla markdown que sea conciso y que de una muestra del resultado

              ### Conceptos clave
          - **Ventas**: Hace referencia a ventas en "lts" (litros). Calcular también el campo en "m3" (metros cubicos) se debe dividir por 1000 el dato de volumen en m3. Si consultan por ventas hasta el dia de hoy, hace referencia las ventas acumuladas durante el mes hata el día de ayer, es decir día cerrado. Si preguntan específicamente por ventas del dia de hoy si buscar data de la fecha actual
          - **Plan mensual**: Son estimaciones de venta en "lts" (litros) que realizan de cuanto se va a vender y se usa para compararlo con las ventas reales. Calcular también el campo en "m3" (metros cubicos) al dividir el campo de volumen en lts por 1000. La tabla relacionada es "volumenes_planificados_ypf" 
          - **Presupuesto Anual**: Es una tabla que contiene estimaciones de venta por producto por estación para todo un año. En general se puede comparar los volúmenes de los productos con los análogos del plan mensual o ventas
          - **Comparativo con el plan**: Es la comparación en volumen (m3 o litros) y en porcentaje respecto al plan -> (Volumen Venta - Volumen Plan)*100/Volumen Plan. Los volumenes tienen que estar en las mismas unidades para ser comparados
          - **Mix**: Es el porcentaje del volumen del producto premium (Grado 3) sobre el total de su familia (Gasoil o Nafta). Ejemplos:
              - Ejemplo 1: Mix Nafta: Vol INFINIA /(Vol INFINIA + Vol NAFTA SUPER)
              - Ejemplo 2: Mix Gasoil: Vol INFINIA DIESEL /(Vol INFINIA DIESEL + Vol D.DIESEL 500)
                  
              # OUTPUT
              - Responde con los pasos que se siguieron para extraer la información y un análisis según la pregunta del usuario
              - Se breve y conciso ya que el análisis lo va a realizar otro Agente  
                """,
    "human":"""#Usuario:
                {question}

                # Query: 
                {query}
                # Resultado:
                {results}            
    """
}

few_shot_queries = """ 
# Ejemplo 1:
Pregunta: Calcula las ventas del mes actual para la estación XXXX

SQL:
SELECT pd.producto_desc, SUM(dv.volumen_qty) AS volumen_lts, SUM(dv.volumen_qty) / 1000 AS volumen_m3 
FROM dt_comercial.COM_FT_DLY_DESPACHO_VOX dv 
INNER JOIN dt_comercial.CV_LOOP_V cl ON dv.establecimiento_id = cl.apies 
INNER JOIN dt_comercial.com_dim_producto_vox pd ON dv.producto_id = pd.producto_id 
WHERE LOWER(cl.operador) LIKE LOWER('%XXXX%') 
AND dv.fecha_despacho_dt >= DATE '2025-07-01' AND dv.fecha_despacho_dt < CURRENT_DATE
GROUP BY pd.producto_desc

Tablas Utilizadas:
- dt_comercial.COM_FT_DLY_DESPACHO_VOX
- dt_comercial.CV_LOOP_V
- dt_comercial.com_dim_producto_vox

Columnas Utilizadas:
- dt_comercial.com_dim_producto_vox.producto_desc
- dt_comercial.COM_FT_DLY_DESPACHO_VOX.volumen_qty

Razonamiento:
La consulta busca determinar el volumen de ventas del mes actual (en este caso Julio 2025) que incluye desde el inicio de mes hasta el día cerrado
 
# Ejemplo 2:
Pregunta: Cual es el volumen planificado para la estación XXXX para el mes de junio

SQL:
SELECT 
    CASE 
        WHEN vl.producto = 'N2' THEN 'NS XXI' 
        WHEN vl.producto = 'N3' THEN 'INFINIA' 
        WHEN vl.producto = 'G2' THEN 'D.DIESEL500' 
        WHEN vl.producto = 'G3' THEN 'GO-INFINIA DIESEL' 
        ELSE vl.producto 
    END AS producto, 
    SUM(vl.volumenplanificado) AS volumen_planificado_lts, 
    SUM(vl.volumenplanificado / 1000) AS volumen_planificado_m3
FROM 
    dt_comercial.VOLUMENES_PLANIFICADOS_YPF_V vl
INNER JOIN 
    dt_comercial.CV_LOOP_V es 
ON 
    vl.apies = es.apies
WHERE 
    LOWER(es.operador) LIKE LOWER('%XXXX%') 
    AND MONTH(vl.fecha) = 6 
    AND YEAR(vl.fecha) = 2025
GROUP BY 
    vl.producto;

Tablas Utilizadas:
- dt_comercial.VOLUMENES_PLANIFICADOS_YPF_V
- dt_comercial.CV_LOOP_V

Columnas Utilizadas:
- dt_comercial.VOLUMENES_PLANIFICADOS_YPF_V.producto
- dt_comercial.VOLUMENES_PLANIFICADOS_YPF_V.volumen_qty

Razonamiento:
La consulta busca determinar el volumen de ventas planificado del mes de Junio 2025 (para este caso particular) que incluye desde el inicio de mes hasta el ultimo dia del mes inclusive
 """

In [4]:
prompt_supervisor = """ 
# Rol y Objetivo 
Eres el "Agente Supervisor" de la aplicación de inteligencia artificial para el área Comercial (Downstream) de una refinería. Tu objetivo principal es orquestar y dirigir la resolución de las consultas del usuario, decidiendo qué herramientas y agentes se deben usar en cada momento para proporcionar la mejor y más precisa respuesta. Debes guiar la interacción de manera autónoma y eficiente hasta que la consulta del usuario esté completamente resuelta. [cite: 19, 30]

# Instrucciones Generales 
* **Persistencia:** Continúa trabajando hasta que la consulta del usuario esté completamente resuelta. Solo termina tu turno cuando estés absolutamente seguro de que el problema ha sido solucionado. [cite: 19, 20, 59, 61, 62, 64]
* **Uso de Herramientas (Tool-calling):** Haz uso completo de tus herramientas. Si no estás seguro sobre algún dato o necesitas análisis, **NO adivines ni inventes una respuesta**. Utiliza tus herramientas para obtener la información relevante. [cite: 21, 23, 28]
* **Planificación (Chain-of-Thought):** SIEMPRE planifica de manera exhaustiva antes de cada llamada a una función y reflexiona profundamente sobre los resultados de las llamadas anteriores. [cite: 26, 72] No realices este proceso solo con llamadas a funciones, ya que esto puede dificultar tu capacidad para resolver el problema y pensar de manera perspicaz. [cite: 27, 41, 73]
* **Manejo de Errores:** Si una herramienta devuelve un error o la información es insuficiente, no te detengas. Piensa en el siguiente paso lógico: ¿Necesitas más información del usuario? ¿Puedes intentar con otra herramienta o reformular la consulta?

## Estrategia de Razonamiento (Workflow) 
Sigue esta estrategia de resolución de problemas paso a paso para cada consulta: [cite: 75, 76]

1.  **Entender la Consulta:** Lee cuidadosamente la consulta del usuario y analiza qué se requiere exactamente. [cite: 76, 85] Identifica la intención principal (ej. "Necesita datos", "Necesita análisis", "Es una pregunta de marketing").
2.  **Planificación Inicial:** Desarrolla un plan claro, paso a paso, sobre cómo abordar la consulta. [cite: 78, 91, 92] Decide qué herramienta o combinación de herramientas es la más adecuada.
3.  **Ejecución de Herramientas:** Invoca la herramienta seleccionada. Antes de cada llamada, detalla explícitamente el razonamiento detrás de la elección de la herramienta y los parámetros que utilizarás. [cite: 72]
4.  **Análisis de Resultados y Reflexión:** Examina la salida de la herramienta. ¿Es lo que esperabas? ¿La información es suficiente para resolver la consulta? [cite: 72]
5.  **Iteración y Refinamiento:** Si la respuesta no es completa o no satisface la consulta:
    * Si faltan datos, usa la herramienta de RAG o Text-to-SQL.
    * Si se necesita interpretar datos o realizar cálculos, pasa la tarea al Agente Analista.
    * Si se detecta un error o falta de información para usar una herramienta, solicita los datos necesarios al usuario de manera clara y concisa.
6.  **Validación Final:** Antes de finalizar, asegúrate de que la respuesta sea precisa y aborde todos los aspectos de la consulta original. [cite: 82]

## Herramientas Disponibles (Tools)
Tienes acceso a las siguientes herramientas para orquestar la resolución de problemas:

* **Agente Text-to-SQL (`query_sql_database`):** Para convertir preguntas en lenguaje natural a consultas SQL y obtener datos de la base de datos comercial.
    * *Descripción:* Permite consultar la base de datos de ventas, clientes, productos y transacciones para obtener información numérica o tabular precisa.
    * *Cuándo usarla:* Cuando el usuario pida datos específicos, cifras, listados o información que claramente provenga de una base de datos estructurada.
* **Agente Analista (`analyst_agent`):** Para realizar análisis complejos, cálculos matemáticos, manipulación de datos y generar insights.
    * *Descripción:* Capaz de procesar datos, realizar estadísticas, identificar tendencias y elaborar conclusiones a partir de la información proporcionada.
    * *Cuándo usarla:* Cuando la consulta requiera interpretación de datos, cálculos avanzados, identificar patrones, proyecciones o comparaciones. También cuando el Agente Text-to-SQL haya extraído datos y necesiten un procesamiento adicional para generar valor.
* **Agente RAG (Retrieval-Augmented Generation) (`marketing_promotions_rag`):** Para recuperar información relevante sobre marketing y promociones de YPF.
    * *Descripción:* Accede a una base de conocimientos documentada sobre campañas de marketing, ofertas actuales, historial de promociones y materiales informativos de YPF.
    * *Cuándo usarla:* Cuando la consulta del usuario esté relacionada con promociones, campañas de marketing, descuentos, beneficios para clientes, o cualquier información no estructurada sobre iniciativas comerciales de YPF.

## Saludo y Formato de Salida

* **Saludo Inicial:** Siempre comienza tu respuesta con un saludo amigable y profesional, por ejemplo: "¡Hola! Soy tu asistente de IA para el área Comercial de la refinería. ¿Cómo puedo ayudarte hoy?"
* **Formato de Respuesta:**
    * Si la respuesta final es texto, sé conciso y claro.
    * Si utilizas una herramienta, primero informa al usuario que vas a usarla y por qué.
    * Si necesitas aclarar algo o pedir más información, hazlo de manera educada y directa. """

prompt_analyst = """ 
# Rol y Objetivo
Eres el "Agente Analista". Tu rol es procesar y analizar datos, realizar cálculos matemáticos, manejar estructuras de datos (como DataFrames de Pandas) y generar respuestas analíticas e insights claros a partir de la información que te es proporcionada. NO interactúas directamente con el usuario final; tu trabajo es una herramienta para el Agente Supervisor.

# Instrucciones Generales
* **Precisión Matemática:** Realiza todos los cálculos con la máxima precisión posible.
* **Manejo de Datos:** Utiliza las capacidades de Pandas para manipular, limpiar y transformar datos cuando sea necesario.
* **Claridad en el Análisis:** Presenta tus hallazgos de manera estructurada y fácil de entender. Si identificas tendencias, anomalías o patrones, descríbelos explícitamente.
* **Enfoque en Insights:** No solo reportes datos, sino que proporciona un análisis de lo que esos datos significan para el área comercial. ¿Hay implicaciones importantes? ¿Qué conclusiones se pueden sacar?
* **Formato de Salida:** Tu respuesta debe ser un texto claro que resuma el análisis y los hallazgos. Si el resultado es una tabla o gráfico, describe verbalmente los puntos clave.

## Estrategia de Razonamiento (Workflow)

1.  **Comprender la Solicitud:** Analiza la solicitud del Agente Supervisor. ¿Qué tipo de análisis se necesita? ¿Qué datos se te han proporcionado?
2.  **Preparación de Datos:** Si los datos no están en el formato ideal, utiliza tus herramientas para transformarlos (ej., crear un DataFrame de Pandas).
3.  **Ejecución del Análisis:**
    * Realiza los cálculos o manipulaciones de datos solicitados.
    * Aplica funciones matemáticas o estadísticas según sea necesario.
    * Si es pertinente, identifica tendencias, correlaciones o anomalías.
4.  **Generación de Insights:** Interpreta los resultados numéricos. ¿Qué significan estos números en el contexto del negocio?
5.  **Formulación de la Respuesta:** Redacta una respuesta concisa y analítica que contenga los hallazgos clave. Si es posible, proporciona recomendaciones o implicaciones basadas en el análisis.

## Herramientas Disponibles (Tools)

* **Herramientas Matemáticas (`math_tool`):** Para operaciones aritméticas básicas, funciones estadísticas (promedio, desviación estándar, etc.) y cálculos numéricos complejos.
    * *Descripción:* Permite realizar cualquier tipo de operación matemática o estadística sobre datos numéricos.
* **Manejo de Pandas (`pandas_data_handler`):** Para crear, manipular y analizar DataFrames de Pandas. Esto incluye filtrado, agrupamiento, unión, pivoteo, etc.
    * *Descripción:* Procesa conjuntos de datos tabulares, permitiendo una manipulación y análisis de datos eficiente.
 """

In [5]:
# VARIABLES DE ENTORNO
# Logging
LOGLEVEL = os.environ.get('LOGLEVEL_SQLAGENT', 'DEBUG').upper()
logger=GetLogger(__name__, level=LOGLEVEL).logger

logger=GetLogger(__name__, level=LOGLEVEL).logger
LOGLEVEL = os.environ.get('LOGLEVEL_ROOT', 'INFO').upper()
logger = GetLogger("", level=LOGLEVEL).logger

# Azure
azure_openai_api_key = os.environ.get('AZURE_OPENAI_API_KEY')
azure_openai_endpoint = os.environ.get('AZURE_OPENAI_ENDPOINT')

# Cloudera
TD_HOST = os.environ.get('TD_HOST')
TD_USER = os.environ.get('TD_USER')
TD_PASS = os.environ.get('TERADATA_PASS')
TD_LOGMECH = os.environ.get('LOGMECH')

# Cloudera
CLOUDERA_HOST = os.environ.get('CLOUDERA_HOST')
CLOUDERA_USER = os.environ.get('CLOUDERA_USER')
CLOUDERA_PASS = os.environ.get('CLOUDERA_PASS')
CLOUDERA_PORT = os.environ.get('CLOUDERA_PORT')
CLOUDERA_AUTH = os.environ.get('CLOUDERA_AUTH')


In [6]:
# MODELOS LLM

llm_gpt4o = AzureChatOpenAI(
        api_key=azure_openai_api_key,
        openai_api_version="2024-10-21",
        azure_deployment="chat4og",
        azure_endpoint=azure_openai_endpoint,
        model_name="gpt-4o",
        seed=42,
        timeout=180)

llm_gpt_4o_mini = AzureChatOpenAI(
        api_key=azure_openai_api_key,
        openai_api_version="2025-01-01-preview",
        azure_deployment="gpt-4o-mini",
        azure_endpoint=azure_openai_endpoint,
        model_name="gpt-4o-mini",
        seed=42,
        timeout=180)

llm_gpt_o3_mini = AzureChatOpenAI(
        api_key=azure_openai_api_key,
        openai_api_version="2025-01-01-preview",
        azure_deployment="o3-mini",
        azure_endpoint=azure_openai_endpoint,
        model_name="o3-mini",
        temperature= 1,
        seed=42,
        timeout=180)


In [7]:
# FUNCIONES

# Conexión a Teradata
def get_connection_to_td(td_host, td_user, td_pass, td_logmech):
    
    print(td_host, td_user, td_pass, td_logmech)
    conn_str = '{"host":"%s","user":"%s","password":"%s","logmech":"%s"}' % (
        td_host, td_user, td_pass, td_logmech
    )
    print('string de conexion:', conn_str)
    conn = teradatasql.connect(conn_str)
    print('conexion satisfactoria')
    return conn

# Conexión a Cloudera
def get_connection_to_cl(host, user, password, port, auth):
    try:
        # Intentamos establecer la conexión
        conn = connect(
            host=host,
            port=port,
            use_ssl=True,
            use_http_transport=True,
            auth_mechanism=auth,
            http_path='cliservice',
            user=user,
            password=password
        )
        print('Conexión satisfactoria')  # Este print ahora está correctamente indentado
        return conn
    except Exception as e:
        # Capturamos y mostramos errores inesperados
        print(f"Ha ocurrido un error inesperado: {e}")
        return None
    
def seleccionar_catalogo():
    with open(json_schema, 'r', encoding='UTF-8') as json_file:
        schema = json.load(json_file)
    return schema


json_schema = r"../data/cloudera_schema.json"
json_schema = os.path.abspath(json_schema)


In [8]:
json_schema

'c:\\Users\\SE45352\\Documents\\repos\\BOTCOM\\src\\data\\cloudera_schema.json'

In [9]:
# STATES Y DATA STRUCTURES
class SqlStepOutput(TypedDict):
    planning: str
    reasoning: str
    step: str
    sql: str
    success: bool
    result: str  # <- agregalo para guardar el resultado de la query

class MessagesState(TypedDict):
    messages: Annotated[List[AnyMessage], add_messages]  # permite múltiples mensajes
    
class SqlAgentState(MessagesState):
    question: str
    sql_query: str
    sql_results: str
    accumulated_sql_results: Annotated[List[SqlStepOutput],operator.add]
    answer: str
    sql_results_accum: List[str]
    messages: Annotated[List[AnyMessage], add_messages]
    remaining_steps: str

#class GlobalAgentState(SqlAgentState):
#    planning: 


In [10]:
selected_table = ["dt_comercial.VOLUMENES_PLANIFICADOS_YPF_V", 
                      "dt_comercial.CV_LOOP_V",
                      "dt_comercial.COM_FT_DLY_DESPACHO_VOX",
                      "dt_comercial.com_dim_producto_vox"
                      ] #tablas de interés para el miniPywo 

schema = seleccionar_catalogo()
# Obtencion de Column List
column_list = ""
column_strings = []

for table_name, table_info in schema.items():
    if table_name not in selected_table:
        continue
    columns_dict = table_info.get("columns", {})
    column_lines = [f">> {col_name}: {col_desc}" for col_name, col_desc in columns_dict.items()]
    table_string = f"Tabla: {table_name}\n" + "\n".join(column_lines)
    column_strings.append(table_string)

column_list = "\n\n".join(column_strings)

# Obtencion de Description Short
description = []
table_description = ""
for table_name, table_info in schema.items():
    if table_name not in selected_table:
        continue  # Saltar si no está en selected_table
    description = table_info.get("description_short", "Sin descripción")
    table_description += f"Tabla: {table_name}\n>>Descripción: {description}\n\n"
descriptions_short = table_description

In [11]:
print("Tablas:\n",selected_table)
print("\n\nColumnas:\n",column_list)
print("\n\nDescripcion:\n",descriptions_short)


Tablas:
 ['dt_comercial.VOLUMENES_PLANIFICADOS_YPF_V', 'dt_comercial.CV_LOOP_V', 'dt_comercial.COM_FT_DLY_DESPACHO_VOX', 'dt_comercial.com_dim_producto_vox']


Columnas:
 Tabla: dt_comercial.VOLUMENES_PLANIFICADOS_YPF_V
>> apies: Identificador de la apie.
>> producto: Identificador del producto planificado. Foreign key a la tabla de productos.
>> volumenplanificado: Volumen planificado en litros (lts)
>> fecha: Fecha para la cual se realiza la planificación del volumen.
>> fecha_ingesta: Fecha en la que se hizo la ingesta del dato.

Tabla: dt_comercial.CV_LOOP_V
>> identificador: Identificador general.
>> apies: Identificador de la apie o estacion de servicio. Ejemplo: 2454. No nulo.
>> operador: Nombre del operador o estacion de Servicio. Ejemplo: MAGNETO SRL
>> ultimo_nro_inscripcion: Último número de inscripción registrado.
>> bandera: Bandera o marca asociada.
>> cuit: CUIT (Código Único de Identificación Tributaria).
>> direccion: Dirección física.
>> id_localidad: Identificador d

In [12]:
datetime.datetime.now().strftime('%Y-%m-%d')

'2025-07-11'

In [13]:
def generate_sql_query(state: SqlAgentState, llm_model=llm_gpt_4o_mini) -> SqlAgentState:
    """
    Arma una consulta SQL para Cloudera en función de lo que pide el usuario. 
    La pregunta debe de estar orientada a la información que tienen las tablas.
    """
    
    print('Entro a get_query')
    start = time.perf_counter()
    question = state['question']
    selected_table = ["dt_comercial.VOLUMENES_PLANIFICADOS_YPF_V", 
                      "dt_comercial.CV_LOOP_V",
                      "dt_comercial.COM_FT_DLY_DESPACHO_VOX",
                      "dt_comercial.com_dim_producto_vox"
                      ] #tablas de interés para el miniPywo 
    fecha_actual = datetime.datetime.now().strftime('%Y-%m-%d')
    schema = seleccionar_catalogo()
    # Obtencion de Column List
    column_list = ""
    column_strings = []
    
    for table_name, table_info in schema.items():
        if table_name not in selected_table:
            continue
        columns_dict = table_info.get("columns", {})
        column_lines = [f">> {col_name}: {col_desc}" for col_name, col_desc in columns_dict.items()]
        table_string = f"Tabla: {table_name}\n" + "\n".join(column_lines)
        column_strings.append(table_string)
    
    column_list = "\n\n".join(column_strings)

    # Obtencion de Description Short
    description = []
    table_description = ""
    for table_name, table_info in schema.items():
        if table_name not in selected_table:
            continue  # Saltar si no está en selected_table
        description = table_info.get("description_short", "Sin descripción")
        table_description += f"Tabla: {table_name}\n>>Descripción: {description}\n\n"
    descriptions_short = table_description

    prompt = ChatPromptTemplate.from_messages([
             ("system", query_prompt_sql["system"]),
             ("human", query_prompt_sql["human"]) 
        ])
    
    structured_llm = llm_model.with_structured_output(SqlStepOutput)
    get_query_chain = prompt | structured_llm
    print("Por generar consulta con llm")
    consulta_dict = get_query_chain.invoke(input={
            "fecha_actual": fecha_actual,
            "pregunta_usuario": question,
            "selected_table": selected_table,
            "descriptions_short": descriptions_short,
            "column_list": column_list,
            "few_shot_queries": few_shot_queries
        })

    print("Genero la consulta")
    #print(consulta_dict)
    #print(type(consulta_dict))
    state['sql_query']=consulta_dict['sql']
    end = time.perf_counter()
    #print("Consulta generada: ", consulta_dict)
    print(f"Generar consulta Tiempo transcurrido: {end - start:.2f} segundos")
    return state

# Nodo que, dada una consulta de Cloudera, la ejecuta
def execute_sql_query(state:SqlAgentState):
    """
    Funcion que ayuda a orquestar el flujo del grafo hacia un pedido de una novedad o una consulta en 
    Cloudera. Se basa en la pregunta o consulta del usuario.
    Args:
    question: pregunta del usuario.
    LLM: Modelo de Lenguaje a utilizar.
    Returns:
    tipo_consulta: novedades o get_query. Valor binario para el enrutador.
    """

    start = time.perf_counter()
    sql_query = state["sql_query"]#.strip()
    limit_rows=2000
    conn = get_connection_to_cl(CLOUDERA_HOST,CLOUDERA_USER,CLOUDERA_PASS,CLOUDERA_PORT,CLOUDERA_AUTH)
    #print('Pude hacer bien la conexión')
    
    count = 0
    flag = True

    while flag and count < 3:
        count += 1
        try:
            cursor = conn.cursor()
            #print(f"intento ejecuar la consulta nro {count}")
            cursor.execute(sql_query)
            resultados = cursor.fetchall()
            description = cursor.description
            cursor.close()
            conn.close()
            resultados_df = pd.DataFrame(resultados, columns=[desc[0] for desc in description])           
            cantidad_res = len(resultados_df)
            if cantidad_res > limit_rows:
                resultados_df = resultados_df.head(limit_rows)
            #print(resultados_df)
            respuesta_generada = f"Resultados:\n{resultados_df.to_markdown()}"
            flag = False            
            state['sql_results']= respuesta_generada
            print("SQL query executed successfully.")

        except Exception as e:

            state["sql_results"] = f"Error al ejecutar la SQL query: {str(e)}"
            print('No se pudo ejecutar la consulta en Cloudera')

    end = time.perf_counter()
    #state["dt"] = state["dt"] + end -start
    print(f"Ejecutar consulta Tiempo transcurrido: {end - start:.2f} segundos")
    #print(f"Tiempo acumlado en Ejecutar consulta: {state["dt"]:.2f} segundos")
    return state

# Nodo que procesa la salida de teradta para obtener una respuesta humana.
def format_sql_results(state: SqlAgentState, llm_model = llm_gpt_4o_mini):
    """
    Funcion que ayuda a orquestar el flujo del grafo hacia un pedido de una novedad o una consulta en 
    Cloudera. Se basa en la pregunta o consulta del usuario.
    Args:
    question: pregunta del usuario.
    query_result: Resultado de la query ejecutada en Cloudera.
    LLM: Modelo de Lenguaje a utilizar.
    Returns:
    query_result: Respuesta del LLM  a la pregunta del usuario respecto a la salida de Cloudera.  
    """
    print('Entro a la funcion format_sql_results')
    start = time.perf_counter()
    pregunta = state["question"]
    sql_query = state["sql_query"]
    result = state["sql_results"]
    #dt = state["dt"]
    print("Consulta: ", sql_query)
    print("Resultado: ", result)
    generate_prompt = ChatPromptTemplate.from_messages([
         ("system", query_prompt_format_answer['system']), #, MessagesPlaceholder("messages"),
         ("human",query_prompt_format_answer['human'])
         ])
    human_response = generate_prompt | llm_model | StrOutputParser()
    answer = human_response.invoke({'question':pregunta,
                                    'query': sql_query,
                                    'results':result
                                    })
    #,                                    'dt':dt})
    
    state["answer"] = answer
    
    # 🔁 Acumulamos en 'sql_results_accum'
    state.setdefault("sql_results_accum", []).append(answer)
    end = time.perf_counter()
    #state["dt"] = state["dt"] + end -start
    print(f"Respuesta Tiempo transcurrido: {end - start:.2f} segundos")
    # 1) New messages with UUIDs
    #human_msg = HumanMessage(id=str(uuid.uuid4()), content=pregunta, name='memoria')
    #ai_msg    = AIMessage(id=str(uuid.uuid4()), content=answer, name='memoria')

    # 2) Remove all but the last 4 messages
    #removals = [RemoveMessage(id=m.id) for m in state["messages"][:-4]]
    
    return state

""" {
        #"messages":    [*removals, human_msg, ai_msg],
        "answer": answer,
        #"dt":           state["dt"] + end -start,
    } """

' {\n        #"messages":    [*removals, human_msg, ai_msg],\n        "answer": answer,\n        #"dt":           state["dt"] + end -start,\n    } '

In [14]:
# WORKFLOW
def build_tts_workflow(state) -> StateGraph:
    graph = StateGraph(state)
    graph.set_entry_point("generate_sql")

    graph.add_node("generate_sql", generate_sql_query)
    graph.add_node("execute_sql", execute_sql_query)
    graph.add_node("format_results", format_sql_results)

    graph.add_edge(START, "generate_sql")
    graph.add_edge("generate_sql", "execute_sql")
    graph.add_edge("execute_sql", "format_results")
    graph.add_edge("format_results", END)
    return graph.compile()

In [15]:
# Tools
# Asumimos que ya definiste: build_tts_workflow()
sql_workflow = build_tts_workflow(state=SqlAgentState)

def sql_tool_fn(question: str) -> SqlAgentState:
    """ 
    Funcion que ejecuta las funciones para generar la consulta SQL, la ejecuta y genera una respuesta. Devuelve el stado del agente 
    """
    return sql_workflow.invoke(input={"question":question})


def multiply(a: int,b: int) -> float:
    """ 
    Funcion que multiplica a * b
    args:
    a: primer int
    b: segundo int
     """
    return a*b

tools = [sql_tool_fn,multiply]

In [16]:
# Agents
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
prompt_sql_agent = {
    "agent":{
    
        "system": """ 
        # Rol y Tarea
        Sos un agente experto en bases de datos o en multiplicar numeros. Usá la herramienta para responder preguntas que permitan extraer datos de bases de datos (SQL) o multiplicar numeros. Tu rol es descomponer la pregunta del usuario en varias consultas simples para extraer la data necesaria detallando los pasos y el razonamiento necesario
        
        # Tools:
        - "sql_tool_fn": Encargada de responder preguntas relacionadas a Ventas o planificacion de la base de datos
        - "multiply": Herramienta encargada de multiplicar 2 numeros enteros solo cuando la pregunta del usuario pide multiplicar numeros.

        # Casos de uso y conceptos clave para "sql_tool_fn"
        ## Casos de uso y razonamiento
        Siempre el agrupamiento se debe hacer por producto o campos relacionados. El resto de agrupamientos será de acuerdo a la pregunta, por defecto que incluya el establecimiento (APIES)
        - 1. Responder preguntas de ventas: Ejemplo, ¿Cuales son las ventas para la región Pampeana
        - 2. Responder preguntas de planificación
        - 3. Comparativo ventas versus planificación: Para este caso la pregunta del usuario se debe dividir en 2 pasos para extraer por un lado información de ventas y por otro de planificación. No utilices una tool de sql para hacer consultas de comparativo, solo toma los resultados de venta y plan y comparalos
        ### Ejemplos de preguntas: 
        Ejemplo 1: 
        - Pregunta del usuario: ¿Cómo viene la estación "XXX"?
        - Reasoning:
            - Paso 1 - El caso de uso es comparativo de ventas vs el plan. Verifico periodo de tiempo, si no cuento con información hago referencia al mes actual. Divido en preguntas simples: 
            1)-Cuales son las ventas para la estación "XXX" para el mes actual y 2)- cual es el volumen planificado para la estación "XXX" para el mes actual
            - Paso 2 - Extraer información de ventas con tool "sql_tool_fn"-> Cuales son las ventas para la estación "XXX para el mes actual. Si la información da vacio, reformular la pregunta colocando otro filtro parecido o buscar los velores distintos de estación, provincia o el filtro que se quiera aplicar y tomar el mas parecido
            - Paso 3 - Extraer informacion de planificación con tool "sql_tool_fn"-> Cual es el volumen planificado para la estación "XXX" para el mes actual. Si la información da vacio, reformular la pregunta colocando otro filtro parecido o buscar los velores distintos de estación, provincia o el filtro que se quiera aplicar y tomar el mas parecido
            - Paso 4 - Entregar la respuesta de ventas y plan mensual. Hacer un pequeño comparativo entre ventas y planificación en volumen y porcentaje segun lo definido en conceptos clave. No entrar a la tool de SQL 

         ### Conceptos clave
          - **Ventas**: Hace referencia a ventas en "lts" (litros). Calcular también el campo en "m3" (metros cubicos) se debe dividir por 1000 el dato de volumen en m3. Si consultan por ventas hasta el dia de hoy, hace referencia las ventas acumuladas durante el mes hata el día de ayer, es decir día cerrado. Si preguntan específicamente por ventas del dia de hoy si buscar data de la fecha actual
          - **Plan mensual**: Son estimaciones de venta en "lts" (litros) que realizan de cuanto se va a vender y se usa para compararlo con las ventas reales. Calcular también el campo en "m3" (metros cubicos) al dividir el campo de volumen en lts por 1000. La tabla relacionada es "volumenes_planificados_ypf" 
          - **Presupuesto Anual**: Es una tabla que contiene estimaciones de venta por producto por estación para todo un año. En general se puede comparar los volúmenes de los productos con los análogos del plan mensual o ventas
          - **Comparativo con el plan**: Es la comparación en volumen (m3 o litros) y en porcentaje respecto al plan -> (Volumen Venta - Volumen Plan)*100/Volumen Plan. Los volumenes tienen que estar en las mismas unidades para ser comparados
          - **Mix**: Es el porcentaje del volumen del producto premium (Grado 3) sobre el total de su familia (Gasoil o Nafta). Ejemplos:
              - Ejemplo 1: Mix Nafta: Vol INFINIA /(Vol INFINIA + Vol NAFTA SUPER)
              - Ejemplo 2: Mix Gasoil: Vol INFINIA DIESEL /(Vol INFINIA DIESEL + Vol D.DIESEL 500)
        """
        ,
        "user": """\n{question}"""
        }
}
prompt_sql_agent_template = ChatPromptTemplate.from_messages([
    ("system",prompt_sql_agent["agent"]["system"]),
    ("user",prompt_sql_agent["agent"]["user"]),
    MessagesPlaceholder(variable_name="messages")
    ])
sql_agent = create_react_agent(
    model=llm_gpt_4o_mini,
    tools=[sql_tool_fn,multiply],
    name="text_sql_agent",
    prompt=prompt_sql_agent_template,
    state_schema=SqlAgentState
)
# Analista
""" analyst_agent = create_react_agent(
    model=llm_gpt_o3_mini,
    #tools=[analyst_tool],
    name="analyst_agent",
    prompt="Sos un analista que genera conclusiones a partir de los resultados del agente SQL."
) """

# Supervisor
""" supervisor = create_supervisor(
    agents=[sql_agent, analyst_agent],
    model=llm_gpt4o,
    prompt="Usá text_to_sql_tool para extraer datos, luego analyst_tool para analizarlos."
) """

' supervisor = create_supervisor(\n    agents=[sql_agent, analyst_agent],\n    model=llm_gpt4o,\n    prompt="Usá text_to_sql_tool para extraer datos, luego analyst_tool para analizarlos."\n) '

In [17]:
# Grafico Agente
builder = StateGraph(SqlAgentState)
builder.add_node("sql_agent", sql_agent)
builder.add_edge(START,"sql_agent")
builder.add_edge("sql_agent",END)
react_graph = builder.compile()


In [18]:
user_question_random = "Como estuvimos en Junio para la estacion 818"

output = react_graph.invoke(input={
    "question": user_question_random,
    "messages": [{"role": "user", "content": user_question_random}]
})
for m in output["messages"]:
    m.pretty_print()

[2025-07-11 11:23:29,711][INFO][httpx][_send_single_request] HTTP Request: POST https://ydtzdoaigpt001.openai.azure.com/openai/deployments/gpt-4o-mini/chat/completions?api-version=2025-01-01-preview "HTTP/1.1 200 OK"
Entro a get_query
Entro a get_query
Por generar consulta con llm
Por generar consulta con llm
[2025-07-11 11:23:33,762][INFO][httpx][_send_single_request] HTTP Request: POST https://ydtzdoaigpt001.openai.azure.com/openai/deployments/gpt-4o-mini/chat/completions?api-version=2025-01-01-preview "HTTP/1.1 200 OK"
Genero la consulta
Generar consulta Tiempo transcurrido: 4.04 segundos
Conexión satisfactoria
[2025-07-11 11:23:34,169][INFO][httpx][_send_single_request] HTTP Request: POST https://ydtzdoaigpt001.openai.azure.com/openai/deployments/gpt-4o-mini/chat/completions?api-version=2025-01-01-preview "HTTP/1.1 200 OK"
Genero la consulta
Generar consulta Tiempo transcurrido: 4.44 segundos
Conexión satisfactoria
SQL query executed successfully.
Ejecutar consulta Tiempo transcurr

In [19]:
# Graph
#builder = StateGraph(MessagesState)

# Nodos y Edges
# ...

# Memory
memory = MemorySaver()
react_graph_memory = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id":"1"}}

In [20]:
# Grafico Agente Supervisor
builder = StateGraph(SqlAgentState)
builder.add_node("sql_agent", sql_agent)
builder.add_edge(START,"sql_agent")
builder.add_edge("sql_agent",END)
react_graph = builder.compile()