In [17]:
from langchain_google_vertexai import ChatVertexAI

In [18]:
import os
from typing import Dict, List, Tuple, Optional, Any, Annotated
from enum import Enum
import json

from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from google.cloud import bigquery
from google.oauth2 import service_account
from pydantic import BaseModel, Field

model_name = "gemini-2.0-flash"

"""model_name = "gpt-4o"

llm = ChatOpenAI(
    openai_api_key="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", 
    model=model_name, temperature=0, streaming=True
)"""

llm = ChatVertexAI(
    model_name=model_name,
    temperature=0.2,
)


# Configuración del cliente BigQuery
def get_bigquery_client(
    credentials_path: str = "/Users/nmlemus/.config/gcloud/application_default_credentials.json",
):
    """
    Inicializa y retorna un cliente de BigQuery.
    """
    credentials = service_account.Credentials.from_service_account_file(
        credentials_path, scopes=["https://www.googleapis.com/auth/bigquery"]
    )
    return bigquery.Client(credentials=credentials, project=credentials.project_id)


# Modelos de datos para el estado del grafo
class AgentState(BaseModel):
    """Estado del agente LangGraph."""

    messages: List[Dict[str, Any]] = Field(default_factory=list)
    database_schema: Optional[Dict[str, Any]] = None
    user_question: Optional[str] = None
    sql_query: Optional[str] = None
    query_result: Optional[Dict[str, Any]] = None
    needs_clarification: bool = False
    clarification_message: Optional[str] = None


# Nodos del grafo
class Nodes(str, Enum):
    EXTRACT_SCHEMA = "extract_schema"
    VALIDATE_QUESTION = "validate_question"
    GENERATE_QUERY = "generate_query"
    VALIDATE_QUERY = "validate_query"
    EXECUTE_QUERY = "execute_query"
    PRESENT_RESULTS = "present_results"
    REQUEST_CLARIFICATION = "request_clarification"


# Funciones para los nodos del grafo
def extract_schema(state: AgentState) -> AgentState:
    """
    Extrae el esquema de la base de datos de BigQuery.
    """
    client = get_bigquery_client()

    # Aquí necesitamos definir el dataset a consultar
    dataset_id = "your_dataset_id"  # Reemplazar con el ID de tu dataset

    schema = {}

    # Obtener todas las tablas en el dataset
    tables = list(client.list_tables(dataset_id))

    for table in tables:
        table_id = f"{client.project}.{dataset_id}.{table.table_id}"
        table_ref = client.get_table(table_id)

        # Guardar la estructura de la tabla
        schema[table.table_id] = {
            "columns": [
                {
                    "name": field.name,
                    "type": field.field_type,
                    "description": field.description,
                }
                for field in table_ref.schema
            ]
        }

    # Actualizar el estado con el esquema extraído
    state.database_schema = schema

    return state


def validate_question(state: AgentState) -> Dict[str, Any]:
    """
    Valida si la pregunta del usuario tiene sentido según la estructura de la base de datos.
    """
    # llm = ChatOpenAI(model="gpt-4o", temperature=0)

    # Extraer la pregunta del usuario
    for message in reversed(state.messages):
        if message["role"] == "user":
            state.user_question = message["content"]
            break

    # Prompt para validar la pregunta
    prompt = f"""
    Actúa como un experto analista de datos.
    
    Tienes acceso a una base de datos con las siguientes tablas y columnas:
    {json.dumps(state.database_schema, indent=2)}
    
    La pregunta del usuario es: "{state.user_question}"
    
    Evalúa si esta pregunta puede ser respondida con la estructura de base de datos proporcionada.
    Para que una pregunta sea válida:
    1. Debe hacer referencia a datos que estén dentro de las tablas disponibles
    2. Los campos que menciona deben existir en las tablas
    3. La relación entre las tablas debe ser posible (si la pregunta involucra múltiples tablas)
    
    Responde con un JSON con el siguiente formato:
    {{
        "is_valid": true o false,
        "reason": "Explicación de por qué la pregunta es válida o no",
        "clarification_needed": "Si se necesita aclaración, especifica qué información adicional se necesita"
    }}
    """

    response = llm.invoke(prompt)
    validation_result = json.loads(response.content)

    # Actualizar el estado según el resultado de la validación
    state.needs_clarification = not validation_result["is_valid"]
    if state.needs_clarification:
        state.clarification_message = validation_result["clarification_needed"]

    # Determinar el siguiente nodo
    if state.needs_clarification:
        return {"next": Nodes.REQUEST_CLARIFICATION}
    else:
        return {"next": Nodes.GENERATE_QUERY}


def generate_query(state: AgentState) -> AgentState:
    """
    Genera una consulta SQL basada en la pregunta del usuario y el esquema de la base de datos.
    """
    # llm = ChatOpenAI(model="gpt-4o", temperature=0)

    # Prompt para generar la consulta SQL
    prompt = f"""
    Actúa como un experto en SQL para BigQuery.
    
    Tienes acceso a una base de datos con las siguientes tablas y columnas:
    {json.dumps(state.database_schema, indent=2)}
    
    La pregunta del usuario es: "{state.user_question}"
    
    Genera una consulta SQL para BigQuery que responda a esta pregunta.
    La consulta debe ser eficiente y utilizar las mejores prácticas de SQL.
    Incluye comentarios para explicar cada parte relevante de la consulta.
    """

    response = llm.invoke(prompt)

    # Extraer la consulta SQL (asumiendo que está en formato de bloque de código)
    content = response.content

    # Extraer el código SQL de un bloque de código markdown si existe
    import re

    sql_match = re.search(r"```sql\n(.*?)\n```", content, re.DOTALL)

    if sql_match:
        state.sql_query = sql_match.group(1).strip()
    else:
        # Si no está en un bloque de código, intentamos buscar la consulta SQL directamente
        state.sql_query = content.strip()

    return state


def validate_query(state: AgentState) -> Dict[str, Any]:
    """
    Valida si la consulta SQL es correcta y responde a la pregunta del usuario.
    """
    # llm = ChatOpenAI(model="gpt-4o", temperature=0)

    # Prompt para validar la consulta SQL
    prompt = f"""
    Actúa como un experto en SQL para BigQuery y análisis de datos.
    
    La pregunta del usuario es: "{state.user_question}"
    
    La consulta SQL generada es:
    ```sql
    {state.sql_query}
    ```
    
    El esquema de la base de datos es:
    {json.dumps(state.database_schema, indent=2)}
    
    Evalúa si la consulta SQL:
    1. Es sintácticamente correcta para BigQuery
    2. Utiliza las tablas y columnas correctas del esquema proporcionado
    3. Responde adecuadamente a la pregunta del usuario
    
    Responde con un JSON con el siguiente formato:
    {{
        "is_valid": true o false,
        "reason": "Explicación de por qué la consulta es válida o no",
        "needs_clarification": true o false,
        "clarification_message": "Si se necesita aclaración, especifica qué información adicional se necesita"
    }}
    """

    response = llm.invoke(prompt)
    validation_result = json.loads(response.content)

    # Actualizar el estado según el resultado de la validación
    if not validation_result["is_valid"]:
        if validation_result.get("needs_clarification", False):
            state.needs_clarification = True
            state.clarification_message = validation_result["clarification_message"]
            return {"next": Nodes.REQUEST_CLARIFICATION}
        else:
            return {"next": Nodes.GENERATE_QUERY}
    else:
        return {"next": Nodes.EXECUTE_QUERY}


def execute_query(state: AgentState) -> AgentState:
    """
    Ejecuta la consulta SQL en BigQuery y almacena los resultados.
    """
    client = get_bigquery_client()

    try:
        # Ejecutar la consulta
        query_job = client.query(state.sql_query)
        results = query_job.result()

        # Convertir los resultados a un formato serializable
        rows = []
        for row in results:
            rows.append({key: value for key, value in row.items()})

        # Guardar los resultados y metadatos
        state.query_result = {
            "columns": [field.name for field in query_job.result().schema],
            "rows": rows,
            "total_rows": len(rows),
        }

    except Exception as e:
        # En caso de error, marcamos que necesitamos clarificación
        state.needs_clarification = True
        state.clarification_message = f"Error al ejecutar la consulta: {str(e)}"

    return state


def present_results(state: AgentState) -> AgentState:
    """
    Formatea y presenta los resultados de la consulta al usuario.
    """
    #llm = ChatOpenAI(model="gpt-4o", temperature=0)
    
    if state.query_result and state.query_result["total_rows"] > 0:
        # Limitar la cantidad de filas para no sobrecargar el mensaje
        max_rows = min(10, state.query_result["total_rows"])
        display_rows = state.query_result["rows"][:max_rows]
        
        # Crear una tabla Markdown con los resultados
        columns = state.query_result["columns"]
        table_header = "| " + " | ".join(columns) + " |"
        table_separator = "| " + " | ".join(["---" for _ in columns]) + " |"
        
        table_rows = []
        for row in display_rows:
            row_values = [str(row.get(col, "")) for col in columns]
            table_rows.append("| " + " | ".join(row_values) + " |")
        
        table = "\n".join([table_header, table_separator] + table_rows)
        
        # Prompt para generar una explicación de los resultados
        prompt = f"""
        Actúa como un analista de datos.
        
        La pregunta del usuario fue: "{state.user_question}"
        
        Los resultados de la consulta SQL son:
        {table}
        
        Total de filas: {state.query_result["total_rows"]}
        (Mostrando {max_rows} de {state.query_result["total_rows"]} filas)
        
        Proporciona una breve explicación de los resultados que responda directamente a la pregunta del usuario.
        """
        
        response = llm.invoke(prompt)
        
        # Crear un mensaje con los resultados y la explicación
        result_message = f"""
## Resultados de la consulta

{table}

*Mostrando {max_rows} de {state.query_result["total_rows"]} filas*

## Análisis

{response.content}

## Consulta SQL utilizada

```sql
{state.sql_query}
```
"""
        
        # Añadir el mensaje al estado
        state.messages.append({"role": "assistant", "content": result_message})
    else:
        # No hay resultados
        no_results_message = f"""
No se encontraron resultados para la consulta. Esto puede deberse a:
1. No hay datos que coincidan con los criterios de la consulta
2. La consulta puede necesitar ser ajustada

Consulta SQL utilizada:
```sql
{state.sql_query}
```

¿Deseas reformular tu pregunta o ajustar los criterios de búsqueda?
"""
        state.messages.append({"role": "assistant", "content": no_results_message})
    
    return state


def request_clarification(state: AgentState) -> AgentState:
    """
    Solicita aclaración al usuario basado en el mensaje de clarificación.
    """
    clarification_request = f"""
        Necesito un poco más de información para poder responder a tu pregunta correctamente:

        {state.clarification_message}

        ¿Podrías proporcionar esos detalles adicionales para que pueda ayudarte mejor?
    """

    # Añadir el mensaje al estado
    state.messages.append({"role": "assistant", "content": clarification_request})

    return state


# Crear el grafo de estados
def create_graph():
    """
    Crea y retorna el grafo de estados para el agente de consultas BigQuery.
    """
    # Inicializar el grafo con el estado AgentState
    graph = StateGraph(AgentState)

    # Añadir los nodos al grafo
    graph.add_node(Nodes.EXTRACT_SCHEMA, extract_schema)
    graph.add_node(Nodes.VALIDATE_QUESTION, validate_question)
    graph.add_node(Nodes.GENERATE_QUERY, generate_query)
    graph.add_node(Nodes.VALIDATE_QUERY, validate_query)
    graph.add_node(Nodes.EXECUTE_QUERY, execute_query)
    graph.add_node(Nodes.PRESENT_RESULTS, present_results)
    graph.add_node(Nodes.REQUEST_CLARIFICATION, request_clarification)

    # Definir las conexiones entre nodos
    graph.add_edge(Nodes.EXTRACT_SCHEMA, Nodes.VALIDATE_QUESTION)
    graph.add_conditional_edges(
        Nodes.VALIDATE_QUESTION, lambda state: {"next": state["next"]}
    )
    graph.add_edge(Nodes.GENERATE_QUERY, Nodes.VALIDATE_QUERY)
    graph.add_conditional_edges(
        Nodes.VALIDATE_QUERY, lambda state: {"next": state["next"]}
    )
    graph.add_edge(Nodes.EXECUTE_QUERY, Nodes.PRESENT_RESULTS)
    graph.add_edge(Nodes.PRESENT_RESULTS, END)
    graph.add_edge(Nodes.REQUEST_CLARIFICATION, END)

    # Definir el nodo inicial
    graph.set_entry_point(Nodes.EXTRACT_SCHEMA)

    return graph.compile()


# Función principal para ejecutar el agente
def run_bigquery_agent(
    user_question: str, credentials_path: str = "/Users/nmlemus/.config/gcloud/application_default_credentials.json"
):
    """
    Ejecuta el agente de consultas BigQuery con una pregunta del usuario.

    Args:
        user_question: La pregunta del usuario
        credentials_path: Ruta al archivo de credenciales de servicio de GCP

    Returns:
        La respuesta del agente
    """
    # Configurar la variable de entorno para las credenciales
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path

    # Crear el grafo
    agent = create_graph()

    # Inicializar el estado
    initial_state = AgentState(messages=[{"role": "user", "content": user_question}])

    # Ejecutar el grafo
    final_state = agent.invoke(initial_state)

    # Retornar el último mensaje del asistente
    assistant_messages = [
        msg["content"] for msg in final_state.messages if msg["role"] == "assistant"
    ]

    return assistant_messages[-1] if assistant_messages else "No se pudo procesar la consulta."


In [19]:
agent = create_graph()

In [4]:
from IPython.display import Image, display

try:
    display(Image(agent.get_graph().draw_mermaid_png()))
except Exception:
    # This requires some extra dependencies and is optional
    pass

In [20]:
print(agent.get_graph().draw_mermaid())

---
config:
  flowchart:
    curve: linear
---
graph TD;
	__start__([<p>__start__</p>]):::first
	extract_schema(extract_schema)
	validate_question(validate_question)
	generate_query(generate_query)
	validate_query(validate_query)
	execute_query(execute_query)
	present_results(present_results)
	request_clarification(request_clarification)
	__end__([<p>__end__</p>]):::last
	__start__ --> extract_schema;
	execute_query --> present_results;
	extract_schema --> validate_question;
	generate_query --> validate_query;
	present_results --> __end__;
	request_clarification --> __end__;
	validate_question -.-> extract_schema;
	validate_question -.-> generate_query;
	validate_question -.-> validate_query;
	validate_question -.-> execute_query;
	validate_question -.-> present_results;
	validate_question -.-> request_clarification;
	validate_question -.-> __end__;
	validate_query -.-> extract_schema;
	validate_query -.-> validate_question;
	validate_query -.-> generate_query;
	validate_query -.-> exe

In [21]:
# Ejemplo de uso
if __name__ == "__main__":
    user_question = "¿Cuáles son los 5 productos más vendidos en el último mes?"
    response = run_bigquery_agent(user_question)
    print(response)

MalformedError: Service account info was not in the expected format, missing fields token_uri, client_email.