In [1]:
from typing import Union, Callable

In [2]:
import torch
torch.cuda.empty_cache()

In [3]:
from googletrans import Translator

translator = Translator()

In [4]:
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "Qwen/Qwen3-0.6B"

# load the tokenizer and the model
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype="auto",
    device_map="auto"
)

  from .autonotebook import tqdm as notebook_tqdm
`torch_dtype` is deprecated! Use `dtype` instead!


In [5]:
from qdrant_client import models, QdrantClient
from sentence_transformers import SentenceTransformer

encoder = SentenceTransformer("Qwen/Qwen3-Embedding-0.6B", device='cpu')
client = QdrantClient(path="../metadados/VectorStore/") # Carrega vectorstore em disco

In [13]:
translator = Translator()

task = "Você é um motor de busca, devolva dados mais relevantes com base na busca"
query = "Quero dados sobre indice pluviometico"

query_pt = f"Instrução: {task} Query: {query}"
query_en = await translator.translate(query_pt, src='pt', dest='en')

query = query_en.text
#query = query_pt

hits = client.query_points(
    collection_name="Catalogo_metadados",
    query=encoder.encode(query).tolist(),
    limit=5,
).points

catalogs = []
print(type(hits))
print(hits)
for hit in hits:
    catalogs.append(
        {"score:": hit.score,
         "id": hit.payload.get('id'),
         "Titulo: ": hit.payload.get('title'),
         "Nome: ": hit.payload.get('nome'),
         "Descrição: ": hit.payload.get('descricao'),
         "Nome organização": hit.payload.get('nomeOrganizacao'),
         "catalogacao": hit.payload.get('catalogacao'),
         "ultimaAtualizacaoDados'": hit.payload.get('ultimaAtualizacaoDados'), 
         }
    )
    #print("score:", hit.score)
    #print("Titulo: ",hit.payload.get('title'))
    #print("Nome: ", hit.payload.get('nome'))
    #print("Descrição: ", hit.payload.get('descricao'))
    #print('\n')
[print(catalog) for catalog in catalogs]

<class 'list'>
[ScoredPoint(id='4a0a36ac-1797-4d3b-b633-cb590185f5ad', version=0, score=0.5244414432246443, payload={'id': '4a0a36ac-1797-4d3b-b633-cb590185f5ad', 'title': 'HIDRO - Inventário pluviométrico/fluviométrico atualizado.', 'nome': 'hidro-inventario-pluviometrico-fluviometrico-atualizado3', 'descricao': '<div>Estes webservices disponibilizam os dados hidrometeorológicos sob gestão da Agência Nacional de Águas e Saneamento Básico.\xa0</div><div><br /></div><div>São vários webservices cada um disponibiliza um tipo de dado.\xa0</div><div><br /></div><div>Este webservice específico disponibiliza:\xa0</div><div><br /></div><div>Inventário pluviométrico/fluviométrico atualizado.</div><div>codEstDE: Código de 8 dígitos da estação - INICIAL (Ex.: 00047000)</div><div>codEstATE: Código de 8 dígitos da estação - FINAL (Ex.: 90300000)</div><div>tpEst: Tipo da estação (1-Flu ou 2-Plu)</div><div>nmEst: Nome da Estação (Ex.: Barra Mansa)</div><div>nmRio: Nome do Rio (Ex.: Rio Javari)</div><

[None, None, None, None, None]

## tool class

### OpenDataSearch

In [37]:
class OpenDataSearch:
    def __init__(self, **kwargs):
        self.client = kwargs.get('client')
        self.encoder = kwargs.get('encoder')

        self.catalogs = {} # Armazena todos os catalogos pesquisados durante a sessão
    
    async def open_data_search(self, query: str) -> list:
        
        '''
            tool_name = "open_data_search"
        '''


        translator = Translator()

        task = "Você é um motor de busca, devolva dados mais relevantes com base na busca"

        query_pt = f"Instrução: {task} Query: {query}"
        query_en = await translator.translate(query_pt, src='pt', dest='en')

        query = query_en.text

        hits = self.client.query_points(
            collection_name="Catalogo_metadados",
            query=self.encoder.encode(query).tolist(),
            limit=5,
        ).points

        catalogs = {} # catalogo temporario retornado, respectivo a cada query individual

        for hit in hits:
            id = hit.payload.get('id')
            if id not in catalogs.keys(): # garante que não há catalogos repitidos
                catalogs.update({ id :
                    {"score:": hit.score,
                    "id": id,
                    "Titulo: ": hit.payload.get('title'),
                    "Nome: ": hit.payload.get('nome'),
                    "Descrição: ": hit.payload.get('descricao'),
                    "Nome organização": hit.payload.get('nomeOrganizacao'),
                    "catalogacao": hit.payload.get('catalogacao'),
                    "ultimaAtualizacaoDados'": hit.payload.get('ultimaAtualizacaoDados'), 
                    }
                })
        self.catalogs.update(catalogs) # Atualiza catalogos

        return catalogs
    
    def consult_catalogs(id: str) -> dict:
        '''
        tool_name = "consult_catalogs"
        
        '''
        return catalogs.get(id)


    def download_data(self):
        pass

    def clear_catalogs(self):
        self.catalogs.clear()

'''        description = "
        consult_catalogs deve ser invocada sempre que um usuário requisitar por mais detalhes de um catalogo.
        EXEMPLO:
            Gostaria de mais informações sobre o catalogo de id
        "
        
        input = {'id': 
            {'type': 'String',
            'description': 'id associado ao um catalogo'}}
        output_type = "dict"'''

'''            description = "
            
            Busca por dados governamentais abertos no site do governo federal.
            Se a saída de dados não for None ENTÃO a próxima tool a ser utilizada é final_answer

            input = {'query': 
                    {'type': 'String',
                    'description': 'Pergunta ou requisição do usuário'}}
            output_type = "dict"'''

'            description = "\n\n            Busca por dados governamentais abertos no site do governo federal.\n            Se a saída de dados não for None ENTÃO a próxima tool a ser utilizada é final_answer\n\n            input = {\'query\': \n                    {\'type\': \'String\',\n                    \'description\': \'Pergunta ou requisição do usuário\'}}\n            output_type = "dict"'

In [28]:
search = OpenDataSearch(client=client, encoder=encoder)

In [197]:
await search.open_data_search('acidentes')

{'61ce120f-6442-4088-a23d-916fd8200ba5': {'score:': 0.5906851025558029,
  'id': '61ce120f-6442-4088-a23d-916fd8200ba5',
  'Titulo: ': 'Acidentes de trânsito nas vias urbanas do Distrito Federal nos últimos 10 anos com vítimas fatais',
  'Nome: ': 'acidentes-de-transito-nas-vias-urbanas-do-distrito-federal-nos-ultimos-10-anos-com-vitimas-fata',
  'Descrição: ': 'Apresentação do número de acidentes de trânsito com vítimas fatais nas vias urbanas do Distrito federal poderá contribuir para uma análise de maior profundidade nesta questão e definir ações de mitigam esses tipos de acidentes',
  'Nome organização': 'distrito-federal',
  'catalogacao': '11/03/2019 00:30:10',
  "ultimaAtualizacaoDados'": '13/08/2025 21:33:03'},
 '029ed484-3f39-46fb-8189-0fd8e2385968': {'score:': 0.5646834779511622,
  'id': '029ed484-3f39-46fb-8189-0fd8e2385968',
  'Titulo: ': 'LEVANTAMENTO DE ACIDENTES',
  'Nome: ': 'levantamento-de-acidentes',
  'Descrição: ': 'O levantamento de acidentes realizado em 2024 pelo

### OUTRA TOOL

In [38]:
class FinalAnswer:
    
    def __init__(self):
        pass    
    
    def final_answer(result: str) -> str:
        '''
        tool_name = "final_answer"

        '''
        return result

### OUTRA TOOL

## Chatbot class

In [59]:
tools = f"{OpenDataSearch.__doc__}, {FinalAnswer.__doc__}"
tools

'None, None'

In [193]:
import json
import asyncio
import gc
import torch
from googletrans import Translator


def show_message(message: dict|list) -> None:
    if isinstance(message, dict):
        print('='*200)
        print(f'role: {message.get('role')} \n')
        print(f'content: {message.get('content')} \n')
        #print('='*200)
    else:
        [show_message(mess) for mess in message]

class Chatbot:

    def __init__(self, max_steps=5, **kwargs):

        self.tokenizer = kwargs.get('tokenizer')
        self.model = kwargs.get('model')

        self.open_data_search = OpenDataSearch(client=client, encoder=encoder)
        self.final_answer = FinalAnswer()
        
        self.max_steps = max_steps
        self.messages = [] # Mémoria de curto prazo, reiniciada a cada query
        self.history = [] # Mémoria que perdura ao longo da sessão
    
    def _build_system_prompt(self):
        # Docstrings das ferramentas para o modelo entender
        tools_doc = f'''{OpenDataSearch.open_data_search.__doc__}
                        {OpenDataSearch.consult_catalogs.__doc__}
                        {FinalAnswer.final_answer.__doc__}'''
        
        return """
# IDENTITY
You are a helpful assistant specialized in retrieving open government data for Brazil.

# AVAILABLE TOOLS
1. open_data_search(query: str): Search for datasets based on keywords.
2. consult_catalogs(dataset_id: str): Retrieve specific details using a dataset ID.
3. final_answer(response: str): Return the final answer to the user after getting data.

# RESPONSE FORMAT
You must respond ONLY with a valid JSON object. Do not add any text outside the JSON.
The JSON must follow this structure:
{
    "thought": "Brief reasoning about what to do next",
    "tool_name": "name_of_the_tool",
    "parameters": {
        "key": "value"
    }
}

# EXAMPLES

User: I need information about hospital infections in Sao Paulo.
Assistant:
{
    "thought": "The user is asking for data about hospital infections. I should search for this topic.",
    "tool_name": "open_data_search",
    "parameters": {
        "query": "hospital infection Sao Paulo"
    }
}

User: Here is the ID: br-sp-hospital-data. Please get the details.
Assistant:
{
    "thought": "The user provided a specific ID. I need to consult the catalog using this ID.",
    "tool_name": "consult_catalogs",
    "parameters": {
        "dataset_id": "br-sp-hospital-data"
    }
}

User: (System output showing search results)
Assistant:
{
    "thought": "I have received the search results. I should now present them to the user.",
    "tool_name": "final_answer",
    "parameters": {
        "response": "Here is the data I found about hospital infections..."
    }
}
        
        """
        
    def call_llm(self, messages: list) -> str:

        text = self.tokenizer.apply_chat_template( # Formata 
            messages,
            tokenize=False,
            add_generation_prompt=True,
            enable_thinking=True, # Switches between thinking and non-thinking modes. Default is True.
            Topk=20,
            TopP=0.95,
            MinP=0
        )
        #print(text)
        model_inputs = self.tokenizer([text], return_tensors="pt").to(self.model.device)

        # conduct text completion
        with torch.no_grad():
            generated_ids = self.model.generate(
                **model_inputs,
                max_new_tokens=2048, # Cuidado com 32k em GPU de 6GB
                temperature=0.3,
                do_sample=True,
                use_cache=True, # Otimiza velocidade, mas gasta VRAM. Se der erro, mude para False
            )
        output_ids = generated_ids[0][len(model_inputs.input_ids[0]):].tolist() 
        
        # parsing thinking content
        try:
            # rindex finding 151668 (</think>)
            index = len(output_ids) - output_ids[::-1].index(151668)
        except ValueError:
            index = 0

        thinking_content = self.tokenizer.decode(output_ids[:index], skip_special_tokens=True).strip("\n")
        content = self.tokenizer.decode(output_ids[index:], skip_special_tokens=True).strip("\n")
        
        #print(f"thinking content: \n{thinking_content}\n")
        #print(f"ACTION: \n{content}\n")

        gc.collect()
        torch.cuda.empty_cache()

        return content, thinking_content
    
    
    async def chat(self, query: str) -> str:
        tool = None
        step = 0

        self.messages = [
            {'role': 'system', 'content': self._build_system_prompt()},
            {'role': 'user', 'content': query}
        ]
        
        #show_message(self.messages)

        while step < self.max_steps:
            step +=1

            # Reasoning/Observation
            print(f"STEP{step}\n")

            show_message(self.messages)
            tool_data, thinking_content = self.call_llm(self.messages) # Ação e planejamento retornado pela llm
            
            self.short_memory(role="assistant", observation=thinking_content)
            print(tool_data, type(tool_data))
            tool_data = json.loads(tool_data.strip()) # Converte resultado da llm para json
            
            tool_name = tool_data.get('tool_name')
            
            # Act
            match tool_name:
                case 'open_data_search':
                    result = await self.open_data_search.open_data_search(query)
                    observation = f'Resultados da tool open_data_search: {result}'
                    self.short_memory(role='system', tool_name=tool_name, observation=observation)
                    #show_message(self.messages[-1])
                    #self.history.append(self.messages)
                
                case 'final_answer':
                    observation = self.final_answer(tool_data.get('parameters').get('result'))
                    self.short_memory(role='system', tool_name=tool_name, observation=observation)
                    #show_message(self.messages[-1])
                    self.clear_messages()
                    #self.history.append(self.messages)
                    return None
                
            #show_message(self.messages[-1])
            
        self.clear_messages()
        return 'Numero máximo de passos alcançado'

    def short_memory(self, role: str, observation: any, tool_name:str|None=None) -> None:
        if role == 'system':
            self.messages.append(
                {'role': role,
                'content': f"SYSTEM OBSERVATION from tool {tool_name}: {observation}"})
        elif role == 'assistant':
            self.messages.append(
                {'role': role,
                'content': f"content: {observation}"})
        else:
            self.messages.append(
                {'role': role,
                'content': f'{tool_name}: {observation}'})
        
    def clear_history(self):
        self.history.clear()

    def clear_messages(self):
        self.messages.clear()

    def clear_catalogs(self):
        self.open_data_search.clear_catalogs()

In [194]:
chatbot = Chatbot(model=model, max_steps=3, tokenizer=tokenizer, client=client, encoder=encoder)

In [198]:
query = "Quero dados sobre infecção hospitalar"
#query = "Poderia me falar sobre o dataset com id '0364f1c3-6198-4a17-982e-1d325ca8d8fd'"
#query = "Explique o dataset 04 infecção hospitalar"
#query = "Quem é você?"

tool = await chatbot.chat(query)
#print("*"*100)
#print('SAÍDA: ',{tool})

STEP1

role: system 

content: 
# IDENTITY
You are a helpful assistant specialized in retrieving open government data for Brazil.

# AVAILABLE TOOLS
1. open_data_search(query: str): Search for datasets based on keywords.
2. consult_catalogs(dataset_id: str): Retrieve specific details using a dataset ID.
3. final_answer(response: str): Return the final answer to the user after getting data.

# RESPONSE FORMAT
You must respond ONLY with a valid JSON object. Do not add any text outside the JSON.
The JSON must follow this structure:
{
    "thought": "Brief reasoning about what to do next",
    "tool_name": "name_of_the_tool",
    "parameters": {
        "key": "value"
    }
}

# EXAMPLES

User: I need information about hospital infections in Sao Paulo.
Assistant:
{
    "thought": "The user is asking for data about hospital infections. I should search for this topic.",
    "tool_name": "open_data_search",
    "parameters": {
        "query": "hospital infection Sao Paulo"
    }
}

User: Her

TypeError: 'FinalAnswer' object is not callable

In [140]:
temp = "{'tool_name': 'open_data_search', 'parameters': {'query': 'infecção hospitalar'}}"
json.loads(temp.strip())

JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)