In [25]:
import getpass
import os

if "LANGCHAIN_API_KEY" not in os.environ:
    os.environ["LANGCHAIN_TRACING_V2"] = "true"
    os.environ["LANGCHAIN_API_KEY"] = getpass.getpass()

In [26]:
if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass.getpass()

#### A. Datos de ejemplo

In [27]:
from langchain_community.utilities import SQLDatabase

db = SQLDatabase.from_uri("sqlite:///Chinook.db")
print(db.dialect)
print(db.get_usable_table_names())
db.run("SELECT * FROM Artist LIMIT 10;")

sqlite
['Album', 'Artist', 'Customer', 'Employee', 'Genre', 'Invoice', 'InvoiceLine', 'MediaType', 'Playlist', 'PlaylistTrack', 'Track']


"[(1, 'AC/DC'), (2, 'Accept'), (3, 'Aerosmith'), (4, 'Alanis Morissette'), (5, 'Alice In Chains'), (6, 'Antônio Carlos Jobim'), (7, 'Apocalyptica'), (8, 'Audioslave'), (9, 'BackBeat'), (10, 'Billy Cobham')]"

### B. Replica con TypedDict

In [28]:
from typing_extensions import TypedDict, Annotated

class State(TypedDict):
    question: str
    query: str
    result: str
    answer: str

class QueryOutput(TypedDict):
    query: Annotated[str, ..., "Syntactically valid SQL query"]

In [51]:
from langchain import hub
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model='gpt-4o-mini')

query_prompt_template = hub.pull("langchain-ai/sql-query-system-prompt")
assert len(query_prompt_template.messages) == 1
query_prompt_template[0].pretty_print()


Given an input question, create a syntactically correct [33;1m[1;3m{dialect}[0m query to run to help find the answer. Unless the user specifies in his question a specific number of examples they wish to obtain, always limit your query to at most [33;1m[1;3m{top_k}[0m results. You can order the results by a relevant column to return the most interesting examples in the database.

Never query for all the columns from a specific table, only ask for a the few relevant columns given the question.

Pay attention to use only the column names that you can see in the schema description. Be careful to not query for columns that do not exist. Also, pay attention to which column is in which table.

Only use the following tables:
[33;1m[1;3m{table_info}[0m

Question: [33;1m[1;3m{input}[0m


In [63]:
# %%
from typing import Any
import json
from pydantic import ValidationError, BaseModel

class QueryOutput(BaseModel):
    query: str

def write_query_streaming(state: State):
    """
    Genera un SQL query en streaming basado en el estado proporcionado
    y valida el resultado con un esquema estructurado.
    """
    # Crear el prompt inicial
    prompt = query_prompt_template.invoke({
        "dialect": db.dialect,
        "top_k": 10,
        "table_info": db.get_table_info(),
        "input": state["question"],
    })

    # Configuración del modelo en streaming
    streaming_llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)

    # Acumular resultado de streaming
    raw_output = ""
    print("Generando query en streaming:")
    for token in streaming_llm.stream(prompt):
        raw_output += token.content  # Concatenar el contenido del token
        print(token.content, end="")  # Mostrar token en tiempo real

In [64]:
write_query_streaming({"question": "Cuantos empleados hay?"})

Generando query en streaming:
```sql
SELECT COUNT(*) AS TotalEmployees FROM Employee;
```

In [67]:
def write_query_streaming(state: State):
    """
    Genera un SQL query en streaming basado en el estado proporcionado
    y valida el resultado con un esquema estructurado.
    """
    prompt = query_prompt_template.invoke({
        "dialect": db.dialect,
        "top_k": 10,
        "table_info": db.get_table_info(),
        "input": state["question"],
    })

    # Configuración del modelo con streaming
    streaming_llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)

    # Acumular el resultado generado
    raw_output = ""
    print("Generando respuesta en streaming:")
    for token in streaming_llm.stream(prompt):
        raw_output += token.content
        print(token.content, end="")  # Mostrar en tiempo real


In [77]:
write_query_streaming({"question": "Cuantos empleados hay?"})

Generando respuesta en streaming:
```sql
SELECT COUNT(*) AS TotalEmployees FROM Employee;
```

In [78]:
def write_query_structured(state: State):
    """
    Genera un SQL query usando with_structured_output para validación automática.
    """
    prompt = query_prompt_template.invoke({
        "dialect": db.dialect,
        "top_k": 10,
        "table_info": db.get_table_info(),
        "input": state["question"],
    })

    # Configuración de LLM con esquema estructurado
    structured_llm = llm.with_structured_output(QueryOutput)

    # Invocación del modelo
    result = structured_llm.invoke(prompt)

    # Acceso al atributo query del objeto
    return result.query


In [79]:
write_query_structured({"question": "Cuantos empleados hay?"})

'SELECT COUNT(*) AS TotalEmployees FROM Employee;'

#### Limpiar streaming

In [81]:
import re

def clean_streamed_output(raw_output: str) -> str:
    """
    Limpia los decoradores de bloques de código (como ```sql) de un texto.
    """
    # Elimina bloques de código como ```sql y ```
    cleaned_output = re.sub(r"```[\w]*\n|```", "", raw_output).strip()
    return cleaned_output

def write_query_streaming(state: State):
    """
    Genera un SQL query en streaming basado en el estado proporcionado.
    """
    prompt = query_prompt_template.invoke({
        "dialect": db.dialect,
        "top_k": 10,
        "table_info": db.get_table_info(),
        "input": state["question"],
    })

    # Configuración del modelo con streaming
    streaming_llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)

    # Acumular el resultado generado
    raw_output = ""
    print("Generando respuesta en streaming:")
    for token in streaming_llm.stream(prompt):
        raw_output += token.content
        print(token.content, end="")  # Mostrar en tiempo real

    print("\n\nLimpieza de la salida...")
    cleaned_output = clean_streamed_output(raw_output)

    print("\nQuery limpio:")
    print(cleaned_output)
    return cleaned_output


In [84]:
write_query_streaming({"question": "Cuantos que su nombre inicia con A existen?"})

Generando respuesta en streaming:
```sql
SELECT COUNT(*) AS TotalArtists 
FROM Artist 
WHERE Name LIKE 'A%';
```

Limpieza de la salida...

Query limpio:
SELECT COUNT(*) AS TotalArtists 
FROM Artist 
WHERE Name LIKE 'A%';


"SELECT COUNT(*) AS TotalArtists \nFROM Artist \nWHERE Name LIKE 'A%';"