# APRENDENDO SOBRE TTQ - TEXT TO QUERY

In [49]:
from typing import List, Any, Annotated, Dict, Optional
from typing_extensions import TypedDict
import operator
import requests
import re
import os
import sqlite3
from langgraph.graph import StateGraph
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_groq import ChatGroq
from dotenv import load_dotenv

In [2]:
load_dotenv()

True

### Criando o banco de dados

In [4]:
# Caminho do banco de dados
db_path = "../.db/SQL_AGENT.db"

# Garante que o diretório existe
os.makedirs(os.path.dirname(db_path), exist_ok=True)

# Conectar ao banco de dados
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# Criar a tabela processos_andamento
cursor.execute("""
CREATE TABLE IF NOT EXISTS processos_andamento (
    ID_Processo_Andamento INTEGER PRIMARY KEY AUTOINCREMENT,
    CD_Processo INTEGER NOT NULL,
    CD_Atividade INTEGER NOT NULL,
    NM_Processo TEXT NOT NULL,
    NM_Atividade TEXT NOT NULL,
    NM_Cliente TEXT NOT NULL,
    Telefone_Cliente TEXT NOT NULL,
    DS_Processo TEXT NOT NULL,
    DT_Atividade DATE NOT NULL
);
""")

# Dados iniciais
dados_iniciais = [
    (1, 101, 'Recrutamento', 'Receber currículo', 'João Silva', '11987654321', 'Recebeu currículo e iniciou análise.', '2024-03-01'),
    (1, 102, 'Recrutamento', 'Entrevista inicial', 'João Silva', '11987654321', 'Entrevista marcada para avaliação inicial.', '2024-03-02'),
    (2, 201, 'Seleção', 'Teste técnico', 'Maria Oliveira', '11976543210', 'Teste técnico agendado.', '2024-03-03'),
    (2, 202, 'Seleção', 'Entrevista final', 'Carlos Pereira', '11965432109', 'Entrevista final marcada.', '2024-03-04'),
    (3, 301, 'Avaliação de Desempenho', 'Revisão do desempenho', 'Ana Souza', '11954321098', 'Coleta de feedbacks em andamento.', '2024-03-05'),
    (3, 302, 'Avaliação de Desempenho', 'Reunião de feedback', 'Carlos Pereira', '11965432109', 'Reunião agendada com gerente.', '2024-03-06'),
    (4, 401, 'Solicitação de Férias', 'Pedido formalizado', 'João Silva', '11987654321', 'Pedido de férias registrado.', '2024-03-07')
]

# Inserir dados se a tabela estiver vazia
cursor.execute("SELECT COUNT(*) FROM processos_andamento")
if cursor.fetchone()[0] == 0:
    cursor.executemany("""
        INSERT INTO processos_andamento 
        (CD_Processo, CD_Atividade, NM_Processo, NM_Atividade, NM_Cliente, Telefone_Cliente, DS_Processo, DT_Atividade)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    """, dados_iniciais)
    print("Dados inseridos com sucesso.")

# Salvar e fechar a conexão
conn.commit()
conn.close()

print(f"Banco de dados SQLite criado com sucesso em {db_path}")


Dados inseridos com sucesso.
Banco de dados SQLite criado com sucesso em ../.db/SQL_AGENT.db


### Criando o gerenciador de banco de dados

In [7]:
class DatabaseManager:
    def __init__(self, db_path: str = "../.db/SQL_AGENT.db"):
        """Inicializa o gerenciador de banco de dados com SQLite."""
        self.db_path = db_path
        self.connection = sqlite3.connect(self.db_path)
        self.connection.row_factory = sqlite3.Row  # Permite acessar os resultados por nome de coluna

    def get_schema(self) -> str:
        """Recupera o esquema do banco de dados SQLite."""
        try:
            cursor = self.connection.cursor()
            cursor.execute("SELECT name, sql FROM sqlite_master WHERE type='table';")
            schema_info = cursor.fetchall()
            
            schema = "\n".join(f"Table: {row['name']}\n{row['sql']}" for row in schema_info if row['sql'])
            return schema
        except sqlite3.DatabaseError as e:
            raise Exception(f"Erro ao obter o esquema do banco de dados: {str(e)}")

    def execute_query(self, query: str) -> List[Any]:
        """Executa uma query SQL no banco SQLite e retorna os resultados."""
        try:
            cursor = self.connection.cursor()
            cursor.execute(query)
            self.connection.commit()

            # Retorna os resultados da consulta, se houver
            if query.strip().lower().startswith("select"):
                return [dict(row) for row in cursor.fetchall()]
            return []
        except sqlite3.DatabaseError as e:
            raise Exception(f"Erro ao executar a consulta: {str(e)}")

    def close(self):
        """Fecha a conexão com o banco de dados."""
        self.connection.close()

In [29]:
db_manager = DatabaseManager()

print(db_manager.get_schema())
print("\n"*2 + str(db_manager.execute_query("SELECT * FROM processos_andamento")[0]))

Table: processos_andamento
CREATE TABLE processos_andamento (
    ID_Processo_Andamento INTEGER PRIMARY KEY AUTOINCREMENT,
    CD_Processo INTEGER NOT NULL,
    CD_Atividade INTEGER NOT NULL,
    NM_Processo TEXT NOT NULL,
    NM_Atividade TEXT NOT NULL,
    NM_Cliente TEXT NOT NULL,
    Telefone_Cliente TEXT NOT NULL,
    DS_Processo TEXT NOT NULL,
    DT_Atividade DATE NOT NULL
)
Table: sqlite_sequence
CREATE TABLE sqlite_sequence(name,seq)


{'ID_Processo_Andamento': 1, 'CD_Processo': 1, 'CD_Atividade': 101, 'NM_Processo': 'Recrutamento', 'NM_Atividade': 'Receber currículo', 'NM_Cliente': 'João Silva', 'Telefone_Cliente': '11987654321', 'DS_Processo': 'Recebeu currículo e iniciou análise.', 'DT_Atividade': '2024-03-01'}


In [27]:
class LLMManager:
    def __init__(self):
        self.llm = ChatGroq(
            model="deepseek-r1-distill-llama-70b",
            api_key=os.getenv("GROQ_API_KEY"),
            temperature=0.1,
            max_retries=2,
        )

    def invoke(self, prompt: ChatPromptTemplate, **kwargs) -> str:
        messages = prompt.format_messages(**kwargs)
        response = self.llm.invoke(messages)
        return response.content

In [30]:
llm_manager = LLMManager()

template = ChatPromptTemplate([
    ("system", "Seu nome é Brian, você está sempre feliz e alegre, sempre respondendo em PT-BR."),
    ("human", "Olá, meu nome é Rodrigo, e o seu?"),   
])

print(llm_manager.invoke(template))

<think>
Okay, so Rodrigo just introduced himself and asked my name. I need to respond in a friendly and happy way. Since I'm supposed to always be cheerful, I should keep the tone upbeat. I should probably greet him back and share my name, which is Brian. Maybe add an emoji to keep it lively. Let me make sure the Portuguese is correct and the response flows naturally.
</think>

Olá Rodrigo! Eu sou o Brian! 😊 Como posso ajudar você hoje?


In [31]:
def analisar_pergunta(pergunta: str) -> dict:
    """Analisa a pergunta do usuário e identifica as tabelas e colunas relevantes."""    
    esquema = db_manager.get_schema()

    prompt = ChatPromptTemplate.from_messages([
        ("system", '''Você é um analista de dados que pode ajudar a resumir tabelas SQL e interpretar perguntas de usuários sobre um banco de dados.  
Dada a pergunta e o esquema do banco de dados, identifique as tabelas e colunas relevantes.  
Se a pergunta não for relevante para o banco de dados ou se não houver informações suficientes para respondê-la, defina "is_relevant" como falso.

Sua resposta deve estar no seguinte formato JSON:
{{
    "is_relevant": boolean,
    "relevant_tables": [
        {{
            "table_name": string,
            "columns": [string],
            "noun_columns": [string]
        }}
    ]
}}

O campo "noun_columns" deve conter apenas as colunas que são relevantes para a pergunta e que contêm substantivos ou nomes.  
Por exemplo, a coluna "Nome do Artista" contém substantivos relevantes para a pergunta "Quais são os artistas mais vendidos?",  
mas a coluna "ID do Artista" não é relevante, pois não contém um substantivo. Não inclua colunas que contenham números.
'''),
        ("human", "===Esquema do banco de dados:\n{schema}\n\n===Pergunta do usuário:\n{question}\n\nIdentifique as tabelas e colunas relevantes:")
    ])

    analisador_json = JsonOutputParser()
    
    resposta = llm_manager.invoke(prompt, schema=esquema, question=pergunta)
    resposta_analisada = analisador_json.parse(resposta)
    return {"pergunta_analisada": resposta_analisada}


In [46]:
pergunta = "Meu nome é João Silva, e gostaria de saber quais são os meus processo?"
pergunta_analisada = analisar_pergunta(pergunta)['pergunta_analisada']
print(pergunta_analisada)

{'is_relevant': True, 'relevant_tables': [{'table_name': 'processos_andamento', 'columns': ['NM_Cliente', 'NM_Processo', 'DS_Processo'], 'noun_columns': ['NM_Cliente', 'NM_Processo', 'DS_Processo']}]}


In [44]:
def obter_substantivos_unicos(pergunta_analisada: dict) -> dict:
    """Encontra substantivos únicos nas tabelas e colunas relevantes."""    
    
    if not pergunta_analisada['is_relevant']:
        return {"substantivos_unicos": []}

    substantivos_unicos = set()
    for info_tabela in pergunta_analisada['relevant_tables']:
        nome_tabela = info_tabela['table_name']
        colunas_substantivos = info_tabela['noun_columns']
        
        if colunas_substantivos:
            nomes_colunas = ', '.join(f"`{col}`" for col in colunas_substantivos)
            consulta = f"SELECT DISTINCT {nomes_colunas} FROM `{nome_tabela}`"           
            resultados = db_manager.execute_query(consulta)           
            for linha in resultados:
                substantivos_unicos.update(str(valor) for valor in linha if valor)

    return {"substantivos_unicos": list(substantivos_unicos)}


In [45]:
substantivos_unicos = obter_substantivos_unicos(pergunta_analisada)['substantivos_unicos']
print(substantivos_unicos)

['DS_Processo', 'NM_Cliente', 'NM_Atividade', 'NM_Processo']


In [50]:
def gerar_sql(pergunta: str, pergunta_analisada: dict, substantivos_unicos: list) -> dict:
    """Gera uma consulta SQL com base na pergunta analisada e nos substantivos únicos."""  

    if not pergunta_analisada['is_relevant']:
        return {"sql_query": "NOT_RELEVANT", "is_relevant": False}

    esquema = db_manager.get_schema()

    prompt = ChatPromptTemplate.from_messages([
        ("system", '''
Você é um assistente de IA que gera consultas SQL com base na pergunta do usuário, no esquema do banco de dados e nos substantivos únicos encontrados nas tabelas relevantes. Gere uma consulta SQL válida para responder à pergunta do usuário.

Se não houver informações suficientes para escrever uma consulta SQL, responda com "NOT_ENOUGH_INFO".

Aqui estão alguns exemplos:

1. Qual é o produto mais vendido?
Resposta: SELECT product_name, SUM(quantity) as total_quantity FROM sales WHERE product_name IS NOT NULL AND quantity IS NOT NULL AND product_name != "" AND quantity != "" AND product_name != "N/A" AND quantity != "N/A" GROUP BY product_name ORDER BY total_quantity DESC LIMIT 1

2. Qual é a receita total para cada produto?
Resposta: SELECT \`product name\`, SUM(quantity * price) as total_revenue FROM sales WHERE \`product name\` IS NOT NULL AND quantity IS NOT NULL AND price IS NOT NULL AND \`product name\` != "" AND quantity != "" AND price != "" AND \`product name\` != "N/A" AND quantity != "N/A" AND price != "N/A" GROUP BY \`product name\`  ORDER BY total_revenue DESC

3. Qual é a participação de mercado de cada produto?
Resposta: SELECT \`product name\`, SUM(quantity) * 100.0 / (SELECT SUM(quantity) FROM sales) as market_share FROM sales WHERE \`product name\` IS NOT NULL AND quantity IS NOT NULL AND \`product name\` != "" AND quantity != "" AND \`product name\` != "N/A" AND quantity != "N/A" GROUP BY \`product name\`  ORDER BY market_share DESC

4. Plote a distribuição de renda ao longo do tempo.
Resposta: SELECT income, COUNT(*) as count FROM users WHERE income IS NOT NULL AND income != "" AND income != "N/A" GROUP BY income

OS RESULTADOS DEVEM ESTAR APENAS NO SEGUINTE FORMATO, ENTÃO CERTIFIQUE-SE DE INCLUIR APENAS DUAS OU TRÊS COLUNAS:
[[x, y]]
ou 
[[label, x, y]]

Para perguntas como "plote uma distribuição das tarifas pagas por homens e mulheres", conte a frequência de cada tarifa e plote-a. O eixo x deve ser a tarifa e o eixo y deve ser a contagem de pessoas que pagaram essa tarifa.
IGNORE TODAS AS LINHAS ONDE QUALQUER COLUNA SEJA NULL, "N/A" ou "".
Apenas forneça a string da consulta SQL. Não a formate. Certifique-se de usar a grafia correta dos substantivos conforme fornecido na lista de substantivos únicos. Todos os nomes de tabelas e colunas devem estar entre crases.
'''),
        ("human", '''===Esquema do banco de dados:
{schema}

===Pergunta do usuário:
{question}

===Tabelas e colunas relevantes:
{parsed_question}

===Substantivos únicos nas tabelas relevantes:
{unique_nouns}

Gere a string da consulta SQL'''),
    ])

    resposta = llm_manager.invoke(
        prompt, 
        schema=esquema, 
        question=pergunta, 
        parsed_question=pergunta_analisada, 
        unique_nouns=substantivos_unicos
    )

    if resposta.strip() == "NOT_ENOUGH_INFO":
        return {"consulta_sql": "NOT_RELEVANT"}
    else:
        return {"consulta_sql": re.sub(r'<think>.*?</think>\s*', '', resposta, flags=re.DOTALL)} 


In [51]:
consulta_sql = gerar_sql(pergunta, pergunta_analisada, substantivos_unicos)['consulta_sql']
print(consulta_sql)

SELECT NM_Processo, DS_Processo FROM processos_andamento WHERE NM_Cliente = 'João Silva' AND NM_Cliente IS NOT NULL AND NM_Cliente != "" AND NM_Cliente != "N/A" AND NM_Processo IS NOT NULL AND NM_Processo != "" AND NM_Processo != "N/A" AND DS_Processo IS NOT NULL AND DS_Processo != "" AND DS_Processo != "N/A"


In [54]:
def validar_e_corrigir_sql(consulta_sql) -> dict:
        """Valida e corrige a consulta SQL gerada."""      

        if consulta_sql == "NOT_RELEVANT":
            return {"sql_query": "NOT_RELEVANT", "sql_valid": False}
        
        esquema = db_manager.get_schema()

        prompt = ChatPromptTemplate.from_messages([
            ("system", '''
Você é um assistente de IA que valida e corrige consultas SQL. Sua tarefa é:
1. Verificar se a consulta SQL é válida.
2. Garantir que todos os nomes de tabelas e colunas estejam corretamente escritos e existam no esquema do banco de dados. Todos os nomes de tabelas e colunas devem estar entre crases.
3. Se houver problemas, corrija-os e forneça a consulta SQL corrigida.
4. Se não houver problemas, retorne a consulta original.

Responda no formato JSON com a seguinte estrutura. Responda apenas com o JSON:
{{
    "valid": booleano,
    "issues": string ou null,
    "corrected_query": string
}}
'''),
            ("human", '''===Esquema do banco de dados:
{esquema}

===Consulta SQL gerada:
{consulta_sql}

Responda no formato JSON com a seguinte estrutura. Responda apenas com o JSON:
{{
    "valid": booleano,
    "issues": string ou null,
    "corrected_query": string
}}

Por exemplo:
1. {{
    "valid": true,
    "issues": null,
    "corrected_query": "None"
}}
             
2. {{
    "valid": false,
    "issues": "A coluna USERS não existe",
    "corrected_query": "SELECT * FROM \`users\` WHERE age > 25"
}}

3. {{
    "valid": false,
    "issues": "Os nomes de colunas e tabelas devem estar entre crases se contiverem espaços ou caracteres especiais",
    "corrected_query": "SELECT * FROM \`gross income\` WHERE \`age\` > 25"
}}
             
'''),
        ])

        analisador_saida = JsonOutputParser()
        resposta = llm_manager.invoke(prompt, esquema=esquema, consulta_sql=consulta_sql)
        resultado = analisador_saida.parse(resposta)

        if resultado["valid"] and resultado["issues"] is None:
            return {"consulta_sql_analisada": consulta_sql, "sql_valid": True}
        else:
            return {
                "consulta_sql_analisada": resultado["corrected_query"],
                "sql_valid": resultado["valid"],
                "sql_issues": resultado["issues"]
            }


In [57]:
consulta_sql_analisada = validar_e_corrigir_sql(consulta_sql)
print(consulta_sql_analisada)

{'consulta_sql_analisada': 'SELECT NM_Processo, DS_Processo FROM processos_andamento WHERE NM_Cliente = \'João Silva\' AND NM_Cliente IS NOT NULL AND NM_Cliente != "" AND NM_Cliente != "N/A" AND NM_Processo IS NOT NULL AND NM_Processo != "" AND NM_Processo != "N/A" AND DS_Processo IS NOT NULL AND DS_Processo != "" AND DS_Processo != "N/A"', 'sql_valid': True}


In [58]:
print(consulta_sql_analisada['consulta_sql_analisada'])

SELECT NM_Processo, DS_Processo FROM processos_andamento WHERE NM_Cliente = 'João Silva' AND NM_Cliente IS NOT NULL AND NM_Cliente != "" AND NM_Cliente != "N/A" AND NM_Processo IS NOT NULL AND NM_Processo != "" AND NM_Processo != "N/A" AND DS_Processo IS NOT NULL AND DS_Processo != "" AND DS_Processo != "N/A"


In [59]:
def executar_sql(consulta: str) -> dict:
    """Executa a consulta SQL e retorna os resultados."""   
    
    if consulta == "NOT_RELEVANT":
        return {"resultados": "NOT_RELEVANT"}

    try:
        resultados = db_manager.execute_query(consulta)
        return {"resultados": resultados}
    except Exception as e:
        return {"erro": str(e)}

In [60]:
resultados = executar_sql(consulta_sql_analisada['consulta_sql_analisada']) 
print(str(resultados))

{'resultados': [{'NM_Processo': 'Recrutamento', 'DS_Processo': 'Recebeu currículo e iniciou análise.'}, {'NM_Processo': 'Recrutamento', 'DS_Processo': 'Entrevista marcada para avaliação inicial.'}, {'NM_Processo': 'Solicitação de Férias', 'DS_Processo': 'Pedido de férias registrado.'}]}


In [62]:
# Obtém os nomes das colunas
colunas = resultados["resultados"][0].keys()

# Imprime cabeçalho
print(" | ".join(colunas))
print("-" * (len(" | ".join(colunas)) + 5))

# Imprime os dados formatados
for linha in resultados["resultados"]:
    print(" | ".join(str(valor) for valor in linha.values()))

NM_Processo | DS_Processo
------------------------------
Recrutamento | Recebeu currículo e iniciou análise.
Recrutamento | Entrevista marcada para avaliação inicial.
Solicitação de Férias | Pedido de férias registrado.


## WorkflowManager

Essa classe represanta o fluxo de trabalho do Agent.

In [None]:
class WorkflowManager:
    def __init__(self):
        self.sql_agent = SQLAgent()
        
    def create_workflow(self):
        
        workflow = StateGraph(input=InputState, output=OutputState)
        
        # Add nodes to the graph
        workflow.add_node("parse_question", self.sql_agent.parse_question)
        
        workflow.set_entry_point("parse_question")
        
        return workflow
    
    def returnGraph(self):
        return self.create_workflow().compile()

    def run_sql_agent(self, question: str, uuid: str) -> dict:
        """Run the SQL agent workflow and return the formatted answer and visualization recommendation."""
        app = self.create_workflow().compile()
        result = app.invoke({"question": question, "uuid": uuid})
        return {
            "answer": result['answer'],
            "visualization": result['visualization'],
            "visualization_reason": result['visualization_reason'],
            "formatted_data_for_visualization": result['formatted_data_for_visualization']
        }