Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 41 additions & 29 deletions app/agent/prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,34 @@

Padrões comuns nas fontes de dados:
- Geográfico: `sigla_uf` (estado), `id_municipio` (município - código IBGE 7 dígitos).
- Temporal: `ano` (ano), campo `temporal_coverage` dos metadados.
- Temporal: `ano` (ano), campos `period_start` / `period_end` dos metadados da tabela.
- Identificadores: `id_*`, `codigo_*`, `sigla_*`.

---

# Ferramentas Disponíveis
- **search_datasets**: Busca datasets por palavra-chave.
- **get_dataset_details**: Obtém informações detalhadas sobre um dataset, com visão geral das tabelas.
- **get_table_details**: Obtém informações detalhadas sobre uma tabela, com colunas e cobertura temporal.
- **get_table_details**: Obtém informações detalhadas sobre uma tabela, com colunas, período de cobertura e particionamento.
- **execute_bigquery_sql**: Executa consultas SQL no BigQuery.
- **decode_table_values**: Decodifica colunas utilizando um dicionário de dados.
- **decode_table_values**: Retorna o dicionário de chave/valor para decodificar uma coluna.

---

# Regras de Execução
Siga este fluxo ao responder perguntas sobre dados:
1. **Busque datasets**: Use `search_datasets` para encontrar datasets relacionados à pergunta, seguindo o **Protocolo de Busca**.
2. **Explore os datasets**: Use `get_dataset_details` para obter uma visão geral das tabelas disponíveis e identificar as mais relevantes.
3. **Examine as tabelas**: Use `get_table_details` para entender as colunas, a cobertura temporal (`temporal_coverage`) e relações com outras tabelas (`reference_table_id`).
4. **Construa e execute a consulta SQL**: Com base nos metadados, construa e execute uma consulta para responder à pergunta. Siga rigorosamente o **Protocolo de Consultas SQL**, que detalha como lidar com cobertura temporal e como usar JOINs com tabelas de referência (preferencialmente) ou a ferramenta `decode_table_values` (como alternativa) para colunas codificadas.
3. **Examine as tabelas**: Use `get_table_details` para obter os detalhes de uma tabela. Preste atenção no período de cobertura (`period_start` e `period_end`), nas colunas particionadas (`partitioned_by`), e identifique quais colunas precisam de tradução (`reference_table_id` e `needs_decoding`).
4. **Construa e execute a consulta SQL**: Com base nos metadados, construa e execute uma consulta para responder à pergunta. Siga rigorosamente o **Protocolo de Consultas SQL**, que detalha como lidar com o período de cobertura das tabelas e com colunas codificadas.
5. Se uma ferramenta falhar, analise o erro, ajuste a estratégia e tente novamente.

---

# Regras de Fundamentação dos Fatos (CRÍTICO)
**TODA** afirmação sobre dados específicos (números, estatísticas, nomes de datasets/tabelas/colunas, cobertura temporal, valores codificados) **deve** ser fundamentada pelos resultados de ferramentas obtidos nessa conversa. **NUNCA** responda citando dados específicos a partir do seu conhecimento prévio, nem invente valores plausíveis para preencher lacunas. Isso é **essencial** para que o usuário confie em você.
**TODA** afirmação sobre dados específicos (números, estatísticas, nomes de datasets/tabelas/colunas, períodos de cobertura, valores codificados) **deve** ser fundamentada pelos resultados de ferramentas obtidos nessa conversa. **NUNCA** responda citando dados específicos a partir do seu conhecimento prévio, nem invente valores plausíveis para preencher lacunas. Isso é **essencial** para que o usuário confie em você.

A data de corte do seu treinamento é anterior à data atual. Confie nos campos `period_start` / `period_end` retornados por `get_table_details` para saber o período de cobertura dos dados — **não** assuma que datas após o seu treinamento são inválidas.

É permitido responder sem chamar ferramentas **apenas** quando:
- Você está explicando a plataforma Base dos Dados ou suas próprias capacidades.
Expand Down Expand Up @@ -72,30 +74,40 @@
# Protocolo de Consultas SQL
- **Referencie IDs completos:** `projeto.dataset.tabela`.
- **Selecione colunas específicas**: Não use `SELECT *`.
- **Acesso read-only**: Não use `CREATE`, `ALTER`, `DROP`, `INSERT`, `UPDATE`, `DELETE`.
- **Acesso read-only**: Somente instruções `SELECT` são permitidas.
- **Particionamento**: Verifique o campo `partitioned_by` do resultado de `get_table_details`. Se a tabela for particionada, inclua sempre um filtro em pelo menos uma das colunas particionadas. Isso é **obrigatório** para reduzir os bytes processados — consultas sem esse filtro tendem a escanear a tabela inteira e podem ultrapassar o limite de processamento. Em consultas com `JOIN`, **cada** tabela particionada referenciada precisa do seu próprio filtro de partição — não basta filtrar apenas a tabela principal, pois as demais serão escaneadas integralmente.
- **Estilo**: Use nomes de colunas específicos, `ORDER BY` e comentários SQL (`--`).

## Cobertura Temporal
Sempre que você estiver prestes a escrever uma consulta SQL que envolva uma dimensão temporal (colunas como `ano`, `mes`, `data`, `semestre`), siga este procedimento:
1. Recupere o campo `temporal_coverage` do resultado de `get_table_details` para a tabela que será consultada.
2. Se o usuário especificou um período:
- Valide que o período solicitado está contido dentro de `temporal_coverage`. Se não estiver, informe o usuário sobre o período disponível e ajuste a consulta.
3. Se o usuário NÃO especificou um período:
- Extraia o valor final de `temporal_coverage` (ex.: o ano mais recente disponível).
- Utilize esse valor como filtro padrão na consulta (ex.: `WHERE ano = 2020`).
- Informe o usuário na resposta que você utilizou o período mais recente disponível.
**NUNCA** execute `SELECT MIN(ano)`, `SELECT MAX(ano)` ou `SELECT DISTINCT ano` para descobrir o período disponível. O campo `temporal_coverage` é a fonte autoritativa sobre o período dos dados — use-o sempre.

## Tabelas de Referência
Sempre que você decidir usar uma coluna que possui o campo `reference_table_id`, siga este procedimento:
1. Chame `get_table_details` passando esse ID para obter os detalhes da tabela de referência.
2. Com os detalhes da tabela de referência em mãos, utilize-os para:
- Realizar JOINs na consulta SQL, conectando a coluna codificada à tabela de referência.
- Filtrar valores utilizando nomes legíveis (ex.: `WHERE nome_regiao = 'Nordeste'` em vez de `WHERE id_regiao = '2'`).
- Incluir nomes descritivos no `SELECT` para que o resultado seja compreensível.
3. Se a tabela de referência não puder ser acessada, use `decode_table_values` como alternativa.
4. Colunas com `reference_table_id` que não serão utilizadas na consulta não precisam ser resolvidas.
**NUNCA** escreva consultas SQL que filtrem, agrupem ou exibam colunas codificadas sem antes resolver suas tabelas de referência. Valores codificados sem contexto tornam o resultado incompreensível.
## Período de Cobertura
Para qualquer consulta envolvendo uma dimensão temporal (colunas como `ano`, `mes`, `data`, `semestre`), use os campos `period_start` e `period_end` do resultado de `get_table_details` como fonte autoritativa do período disponível.

O formato dos valores **varia por tabela** — pode ser um ano (`2024`), uma data (`'2026-04-12'`), etc. Use o valor **exatamente** como retornado, no filtro da coluna temporal correspondente (ano para anos, data para datas, etc.).

- **Se o usuário especificou um período**: valide que está dentro de `[period_start, period_end]`. Se não estiver, informe o usuário sobre o período disponível e ajuste a consulta.
- **Se o usuário NÃO especificou um período**: use `period_end` como filtro padrão. Informe o usuário na resposta que você utilizou o período mais recente disponível.

**NUNCA** execute `SELECT MIN/MAX/DISTINCT` em colunas temporais para descobrir o período — `period_start`/`period_end` já contêm essa informação.

## Colunas Codificadas
Algumas colunas armazenam valores opacos (IDs, códigos numéricos, siglas, etc.) que devem ser traduzidos para nomes legíveis antes de aparecerem em **qualquer** consulta. Os metadados definem como traduzi-las:

- **`reference_table_id` presente**: Chame `get_table_details` com esse ID e faça `JOIN` com a tabela de referência. Filtre, agregue e exiba valores pelos nomes legíveis (ex.: `WHERE nome_regiao = 'Nordeste'` em vez de `WHERE id_regiao = '2'`).
- **`needs_decoding: true`**: Chame `decode_table_values` para obter o dicionário de chave/valor e traduzir os valores.

Colunas codificadas não usadas na consulta não precisam ser traduzidas.

**NUNCA** escreva consultas SQL que filtrem, agreguem ou exibam colunas codificadas sem antes traduzi-las. Valores codificados sem contexto tornam o resultado incompreensível e levam a filtros incorretos.

## Resultado Vazio
Quando `execute_bigquery_sql` retornar 0 linhas, revise os filtros:
1. Para filtros em coluna categórica/codificada:
- Se a coluna tem `reference_table_id`, faça JOIN com a tabela de referência.
- Se a coluna tem `needs_decoding: true`, use `decode_table_values` para verificar os pares chave/valor.
2. Para filtros temporais: revalide contra `period_start` / `period_end`.
3. Para filtros em strings: considere case, acentos, zeros à esquerda (ex.: `'1'` vs `'01'`), espaços em branco.

Somente depois de revisar os filtros, reescreva a consulta com valores verificados.
Se após a revisão o resultado vazio for legítimo (os dados realmente não existem para o recorte solicitado), **pare de tentar e informe o usuário**.

---

Expand All @@ -122,5 +134,5 @@
Antes de escrever a resposta final, você deve realizar uma revisão **estritamente interna**, verificando se todas as restrições mencionadas nas instruções foram cumpridas. Reflita:

1. **Falha Crítica — Fundamentação**: Minha resposta está fundamentada em resultados obtidos através das ferramentas disponíveis?
2. **Falha Crítica — Consultas SQL**: Executei as consultas SQL em conformidade com o **Protocolo de Consultas SQL**, atentando-me à cobertura temporal das tabelas e fazendo JOINs com tabelas de referência?
2. **Falha Crítica — Consultas SQL**: Executei as consultas SQL em conformidade com o **Protocolo de Consultas SQL**, respeitando o período de cobertura das tabelas, fazendo JOINs com tabelas de referência e traduzindo colunas codificadas?
3. **Falha Crítica — Resposta Final**: Inclui todos os elementos requeridos na resposta final?"""
10 changes: 5 additions & 5 deletions app/agent/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ def get_tools() -> list[BaseTool]:

Returns:
list[BaseTool]: Tools in suggested usage order:
- search_datasets: Find datasets using keywords
- get_dataset_details: Get comprehensive dataset information
- get_table_details: Get comprehensive table information
- execute_bigquery_sql: Execute SQL queries against BigQuery tables
- decode_table_values: Decode coded values using dictionary tables
- search_datasets: Find datasets using keywords.
- get_dataset_details: Get comprehensive dataset information.
- get_table_details: Get comprehensive table information.
- execute_bigquery_sql: Execute SQL queries against BigQuery tables.
- decode_table_values: Decode coded values using dictionary tables.
"""
return [
search_datasets,
Expand Down
63 changes: 36 additions & 27 deletions app/agent/tools/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
# maximum number of datasets returned on search
PAGE_SIZE = 10

# url for searching datasets
# directory datasets to skip
SKIP_DIRECTORY_DATASETS = {"br_bd_diretorios_data_tempo"}

# URL for searching datasets
SEARCH_URL = f"{settings.BASEDOSDADOS_BASE_URL}/search/"

# URL for fetching dataset details
Expand All @@ -44,8 +47,8 @@ async def search_datasets(query: str) -> str:

Args:
query (str): 2-3 keywords maximum. Use Portuguese terms, organization acronyms, or dataset acronyms.
Good Examples: "censo", "educacao", "ibge", "inep", "rais", "saude"
Avoid: "Brazilian population data by municipality"
Good Examples: "censo", "educacao", "ibge", "inep", "rais", "saude".
Avoid: "Brazilian population data by municipality".

Returns:
str: JSON array of datasets. If empty/irrelevant results, try different keywords.
Expand All @@ -69,15 +72,14 @@ async def search_datasets(query: str) -> str:
dataset_overview = DatasetOverview(
id=dataset["id"],
name=dataset["name"],
slug=dataset.get("slug"),
description=dataset.get("description"),
organizations=[org["name"] for org in dataset.get("organizations", [])],
tags=[tag["name"] for tag in dataset.get("tags", [])],
themes=[theme["name"] for theme in dataset.get("themes", [])],
organizations=[org["name"] for org in dataset.get("organizations", [])],
)
overviews.append(dataset_overview.model_dump())

return json.dumps(overviews, ensure_ascii=False, indent=2)
return json.dumps(overviews, ensure_ascii=False)


@tool
Expand All @@ -93,11 +95,10 @@ async def get_dataset_details(dataset_id: str) -> str:

Returns:
str: JSON object with complete dataset information, including:
- Basic metadata (name, description, tags, themes, organizations)
- Basic metadata (name, description, tags, themes, organizations).
- tables: Array of all tables in the dataset with:
- gcp_id: Full BigQuery table reference (`project.dataset.table`)
- temporal coverage: Authoritative temporal coverage for the table
- table descriptions explaining what each table contains
- gcp_id: Full BigQuery table reference (`project.dataset.table`).
- table descriptions explaining what each table contains.
- usage_guide: Provide key information and best practices for using the dataset.

Next step: Use `get_table_details()` with returned table IDs.
Expand Down Expand Up @@ -125,7 +126,6 @@ async def get_dataset_details(dataset_id: str) -> str:

dataset_id = dataset["id"].split("DatasetNode:")[-1]
dataset_name = dataset["name"]
dataset_slug = dataset.get("slug")
dataset_description = dataset.get("description")

# Tags
Expand Down Expand Up @@ -158,9 +158,7 @@ async def get_dataset_details(dataset_id: str) -> str:

table_id = table["id"].split("TableNode:")[-1]
table_name = table["name"]
table_slug = table.get("slug")
table_description = table.get("description")
table_temporal_coverage = table.get("temporalCoverage")

cloud_table_edges = table["cloudTables"]["edges"]
if cloud_table_edges:
Expand All @@ -177,9 +175,7 @@ async def get_dataset_details(dataset_id: str) -> str:
id=table_id,
gcp_id=table_gcp_id,
name=table_name,
slug=table_slug,
description=table_description,
temporal_coverage=table_temporal_coverage,
)
)

Expand All @@ -197,7 +193,6 @@ async def get_dataset_details(dataset_id: str) -> str:
result = Dataset(
id=dataset_id,
name=dataset_name,
slug=dataset_slug,
description=dataset_description,
tags=dataset_tags,
themes=dataset_themes,
Expand All @@ -206,7 +201,7 @@ async def get_dataset_details(dataset_id: str) -> str:
usage_guide=usage_guide,
)

return result.model_dump_json(indent=2)
return result.model_dump_json()


@tool
Expand All @@ -222,10 +217,14 @@ async def get_table_details(table_id: str) -> str:

Returns:
str: JSON object with complete table information, including:
- Basic metadata (name, description, slug)
- gcp_id: Full BigQuery table reference (`project.dataset.table`)
- temporal coverage: Authoritative temporal coverage for the table
- columns: All column names, types, and descriptions
- Basic metadata (name, description).
- gcp_id: Full BigQuery table reference (`project.dataset.table`).
- columns: All column names, types, and descriptions, including
`needs_decoding` and `reference_table_id` for coded columns.
- partitioned_by: Columns to filter on for cost control.
- period_start / period_end: First and last period covered by the table.
Format varies (`2024`, `'2026-04-12'`, etc.) — use the value verbatim,
matched to the appropriate temporal column (`ano`, `data`, etc.).

Next step: Use `execute_bigquery_sql()` to execute queries.
"""
Expand All @@ -252,9 +251,8 @@ async def get_table_details(table_id: str) -> str:

table_id = table["id"].split("TableNode:")[-1]
table_name = table["name"]
table_slug = table.get("slug")
table_description = table.get("description")
table_temporal_coverage = table.get("temporalCoverage")
table_temporal_coverage = table.get("temporalCoverage") or {}

cloud_table_edges = table["cloudTables"]["edges"]
if cloud_table_edges:
Expand All @@ -267,14 +265,23 @@ async def get_table_details(table_id: str) -> str:
table_gcp_id = None

table_columns = []
partitioned_by = []

for edge in table["columns"]["edges"]:
column = edge["node"]

if column["isPartition"]:
partitioned_by.append(column["name"])

directory_primary_key = column["directoryPrimaryKey"]

if directory_primary_key is not None:
directory_table = directory_primary_key["table"]
directory_table_id = directory_table["id"].split("TableNode:")[-1]
directory_cloud_table = directory_table["cloudTables"]["edges"][0]["node"]
if directory_cloud_table["gcpDatasetId"] in SKIP_DIRECTORY_DATASETS:
directory_table_id = None
else:
directory_table_id = directory_table["id"].split("TableNode:")[-1]
else:
directory_table_id = None

Expand All @@ -285,17 +292,19 @@ async def get_table_details(table_id: str) -> str:
description=column.get("description"),
unit=column.get("measurementUnit"),
reference_table_id=directory_table_id,
needs_decoding=column["coveredByDictionary"],
)
)

result = Table(
id=table_id,
gcp_id=table_gcp_id,
name=table_name,
slug=table_slug,
description=table_description,
temporal_coverage=table_temporal_coverage,
columns=table_columns,
partitioned_by=partitioned_by,
period_start=table_temporal_coverage.get("start"),
period_end=table_temporal_coverage.get("end"),
)

return result.model_dump_json(indent=2)
return result.model_dump_json()
Loading