In [1]:
from dados.raw.utils.postgres_interactions import PostgresETL
import os
from dotenv import load_dotenv
import pandas as pd
import basedosdados as bd
from typing import Dict, Any

### Geração dos dados de produção rural

In [None]:
import json
import os
from typing import Dict, Any
import pandas as pd
from decimal import Decimal

# Função para salvar JSON em arquivo
def decimal_default(obj):
    """
    Função auxiliar para converter objetos Decimal para float
    durante a serialização JSON.
    """
    if isinstance(obj, Decimal):
        return float(obj)
    raise TypeError(f"Object of type {type(obj)} is not JSON serializable")

def save_json(data: Dict[str, Any], filename: str) -> None:
    """
    Salva um dicionário como arquivo JSON.
    
    Args:
        data (Dict[str, Any]): Dicionário a ser salvo
        filename (str): Nome do arquivo
    """
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=4, default=decimal_default)
    print(f"Arquivo {filename} salvo com sucesso!")

# Já temos os vetores para PEVS e censo combinado conforme código fornecido
vetor_valor_pevs = df_to_json(extracao_vegetal_pevs_query, 'valor_producao')
vetor_quantidade_pevs = df_to_json(extracao_vegetal_pevs_query, 'quantidade_produzida')
censo_combinado = pd.concat([extracao_vegetal_censo_2006_query, extracao_vegetal_censo_2017_query], ignore_index=True)
vetor_valor_censo = df_to_json(censo_combinado, 'valor_producao')
vetor_quantidade_censo = df_to_json(censo_combinado, 'quantidade_produzida')

# Criar vetores para lavoura permanente (PAM)
lavoura_permanente_pam_vetores = {
    'valor': df_to_json(lavoura_permanente_pam_query, 'valor_producao'),
    'quantidade': df_to_json(lavoura_permanente_pam_query, 'quantidade_produzida')
}

# Criar vetores para lavoura temporária (PAM)
lavoura_temporaria_pam_vetores = {
    'valor': df_to_json(lavoura_temporaria_pam_query, 'valor_producao'),
    'quantidade': df_to_json(lavoura_temporaria_pam_query, 'quantidade_produzida')
}

# Criar vetores para lavoura permanente (censo combinado)
lavoura_permanente_censo_combinado = pd.concat([
    lavoura_permanente_censo_2006_query, 
    lavoura_permanente_censo_2017_query
], ignore_index=True)

lavoura_permanente_censo_vetores = {
    'valor': df_to_json(lavoura_permanente_censo_combinado, 'valor_producao'),
    'quantidade': df_to_json(lavoura_permanente_censo_combinado, 'quantidade_produzida')
}

# Criar vetores para lavoura temporária (censo combinado)
lavoura_temporaria_censo_combinado = pd.concat([
    lavoura_temporaria_censo_2006_query, 
    lavoura_temporaria_censo_2017_query
], ignore_index=True)

lavoura_temporaria_censo_vetores = {
    'valor': df_to_json(lavoura_temporaria_censo_combinado, 'valor_producao'),
    'quantidade': df_to_json(lavoura_temporaria_censo_combinado, 'quantidade_produzida')
}

# Criar diretório para os arquivos JSON se não existir
output_dir = 'json_output'
os.makedirs(output_dir, exist_ok=True)

# Salvar todos os JSONs
# PEVS
save_json(vetor_valor_pevs, f'{output_dir}/extracao_vegetal_pevs_valor.json')
save_json(vetor_quantidade_pevs, f'{output_dir}/extracao_vegetal_pevs_quantidade.json')

# Censo Extração Vegetal
save_json(vetor_valor_censo, f'{output_dir}/extracao_vegetal_censo_valor.json')
save_json(vetor_quantidade_censo, f'{output_dir}/extracao_vegetal_censo_quantidade.json')

# Lavoura Permanente PAM
save_json(lavoura_permanente_pam_vetores['valor'], f'{output_dir}/lavoura_permanente_pam_valor.json')
save_json(lavoura_permanente_pam_vetores['quantidade'], f'{output_dir}/lavoura_permanente_pam_quantidade.json')

# Lavoura Temporária PAM
save_json(lavoura_temporaria_pam_vetores['valor'], f'{output_dir}/lavoura_temporaria_pam_valor.json')
save_json(lavoura_temporaria_pam_vetores['quantidade'], f'{output_dir}/lavoura_temporaria_pam_quantidade.json')

# Lavoura Permanente Censo
save_json(lavoura_permanente_censo_vetores['valor'], f'{output_dir}/lavoura_permanente_censo_valor.json')
save_json(lavoura_permanente_censo_vetores['quantidade'], f'{output_dir}/lavoura_permanente_censo_quantidade.json')

# Lavoura Temporária Censo
save_json(lavoura_temporaria_censo_vetores['valor'], f'{output_dir}/lavoura_temporaria_censo_valor.json')
save_json(lavoura_temporaria_censo_vetores['quantidade'], f'{output_dir}/lavoura_temporaria_censo_quantidade.json')

print("Todos os arquivos JSON foram gerados com sucesso!")

### autoconsumo

In [2]:
from __future__ import annotations
from typing import Dict, Any
from pydantic import BaseModel, Field
import os
import pandas as pd
from dotenv import load_dotenv
from dados.raw.utils.postgres_interactions import PostgresETL

# --- CONFIGURATION ---
CENSUS_TABLE_NAMES = [
    "extracao_vegetal_censo_2006",
    "lavoura_permanente_censo_2006",
    "lavoura_temporaria_censo_2006_2284",
    "lavoura_permanente_censo_2017",
    "extracao_vegetal_censo_2017",
    "lavoura_temporaria_censo_2017"
]

def get_query(table_name: str) -> str:
    """Generates a dynamic query for a specific table."""
    return f"""
    SELECT 
        ano, 
        nome_regiao_integracao,
        produto,  
        -- Total
        ROUND(SUM(quantidade_produzida), 2) as quantidade_produzida,
        ROUND(SUM(valor_producao), 2) as valor_producao,
        -- Comércio
        ROUND(SUM(comercio_quantidade_produzida), 2) as comercio_quantidade_produzida,
        ROUND(SUM(comercio_valor_producao), 2) as comercio_valor_producao
    FROM pa_indexadores_producao_rural.{table_name}
    GROUP BY 1, 2, 3
    """

# --- PYDANTIC MODELS ---

class VetoresProducaoRuralBase(BaseModel):
    quantidade_produzida: float = Field(..., description="Quantidade total produzida na unidade de medida")
    valor_producao: float = Field(..., description="Valor financeiro da produção")
    comercio_quantidade_produzida: float = Field(..., description="Quantidade produzida para o comércio na unidade de medida")
    comercio_valor_producao: float = Field(..., description="Valor financeiro da produção")

# Hierarchy Definitions
AnoMap = Dict[str, VetoresProducaoRuralBase]
ProdutoMap = Dict[str, AnoMap]
RegiaoMap = Dict[str, ProdutoMap]
TipoPesquisaMap = Dict[str, RegiaoMap]
NomePesquisaMap = Dict[str, TipoPesquisaMap]

class VetoresProducaoRural(BaseModel):
    vetores_producao_rural: NomePesquisaMap

# --- PROCESSING LOGIC ---

load_dotenv()

def update_structure_from_df(accumulator: Dict, df: pd.DataFrame, tipo_pesquisa: str, nome_pesquisa: str) -> None:
    """
    Updates the 'accumulator' dictionary in-place with data from the DataFrame.
    """
    if df.empty:
        print(f"Warning: DataFrame for {nome_pesquisa} is empty.")
        return

    df = df.fillna(0)

    for row in df.itertuples(index=False):
        regiao = getattr(row, 'nome_regiao_integracao')
        produto = getattr(row, 'produto')
        # Convert year to string for JSON compatibility
        ano = str(int(getattr(row, 'ano'))) 

        # Recursive initialization (ensure keys exist)
        # Structure: vetores_producao_rural -> nome_pesquisa -> tipo_pesquisa -> regiao -> produto -> ano
        current_level = accumulator.setdefault(nome_pesquisa, {}) \
                                   .setdefault(tipo_pesquisa, {}) \
                                   .setdefault(regiao, {}) \
                                   .setdefault(produto, {})

        # Create the data object
        dados_ano = VetoresProducaoRuralBase(
            quantidade_produzida=float(getattr(row, 'quantidade_produzida')),
            valor_producao=float(getattr(row, 'valor_producao')),
            comercio_quantidade_produzida=float(getattr(row, 'comercio_quantidade_produzida')),
            comercio_valor_producao=float(getattr(row, 'comercio_valor_producao'))
        ) 
        
        # Assign data to the leaf node
        current_level[ano] = dados_ano

def save_pydantic_json(model: BaseModel, filename: str):
    with open(filename, 'w', encoding='utf-8') as f:
        f.write(model.model_dump_json(indent=4))
    print(f"Arquivo {filename} salvo com sucesso!")

# --- MAIN EXECUTION ---

if __name__ == "__main__":
    # 1. Initialize the master accumulator dictionary
    # This represents the internal structure of "vetores_producao_rural" field
    master_data_accumulator: Dict[str, Any] = {}

    with PostgresETL(
        host='localhost', 
        database=os.getenv("DB_GOLD_ZONE"), 
        user=os.getenv("POSTGRES_USER"), 
        password=os.getenv("POSTGRES_PASSWORD"),
        schema='pa_indexadores_producao_rural'
    ) as db:
        
        # 2. Loop through all tables
        for table_name in CENSUS_TABLE_NAMES:
            print(f"Processando tabela: {table_name}...")
            
            # Generate query for specific table
            query = get_query(table_name)
            
            try:
                # Extract
                df_raw = db.download_data(query)
                
                # Transform (Accumulate into master dict)
                # Note: We use the table name as 'nome_pesquisa'
                update_structure_from_df(
                    accumulator=master_data_accumulator, 
                    df=df_raw, 
                    tipo_pesquisa="censitaria", 
                    nome_pesquisa=table_name
                )
                print(f" -> {len(df_raw)} registros processados.")
                
            except Exception as e:
                print(f"Erro ao processar tabela {table_name}: {e}")

    # 3. Final Validation and Load
    print("\nIniciando validação final Pydantic...")
    try:
        # Wrap the accumulated dictionary in the Root Model
        modelo_consolidado = VetoresProducaoRural(vetores_producao_rural=master_data_accumulator)
        
        output_dir = 'json_output'
        os.makedirs(output_dir, exist_ok=True)
        
        # Save single consolidated file
        save_pydantic_json(modelo_consolidado, f'{output_dir}/censos_consolidado_full.json')

    except Exception as e:
        print(f"FATAL: Erro na validação final dos dados: {e}")

2026-01-23 22:38:09,413 - PostgresETL - INFO - Connected to database gold_zone at localhost
2026-01-23 22:38:09,563 - PostgresETL - INFO - Downloaded 492 rows from database


Processando tabela: extracao_vegetal_censo_2006...
 -> 492 registros processados.
Processando tabela: lavoura_permanente_censo_2006...


2026-01-23 22:38:09,672 - PostgresETL - INFO - Downloaded 792 rows from database
2026-01-23 22:38:09,767 - PostgresETL - INFO - Downloaded 636 rows from database
2026-01-23 22:38:09,885 - PostgresETL - INFO - Downloaded 852 rows from database


 -> 792 registros processados.
Processando tabela: lavoura_temporaria_censo_2006_2284...
 -> 636 registros processados.
Processando tabela: lavoura_permanente_censo_2017...


2026-01-23 22:38:09,986 - PostgresETL - INFO - Downloaded 636 rows from database
2026-01-23 22:38:10,064 - PostgresETL - INFO - Downloaded 660 rows from database
2026-01-23 22:38:10,075 - PostgresETL - INFO - Disconnected from database


 -> 852 registros processados.
Processando tabela: extracao_vegetal_censo_2017...
 -> 636 registros processados.
Processando tabela: lavoura_temporaria_censo_2017...
 -> 660 registros processados.

Iniciando validação final Pydantic...
Arquivo json_output/censos_consolidado_full.json salvo com sucesso!


### Explorando problemas de compatibilização de produtos

In [25]:
import pandas as pd

path = os.path.join(os.getcwd(), 'temp')


path_tbs_extensa = [file for file in os.listdir(path) if file.startswith('Tb')]

tabela_produto_dict = {}

for tabela in path_tbs_extensa:
    tabela_path = os.path.join(path, tabela)
    try:
        df = pd.read_excel(tabela_path)
        if 'Produto' in df.columns:
            produtos_unicos = df['Produto'].dropna().unique().tolist()
            tabela_produto_dict[tabela] = produtos_unicos
        else:
            print(f"A tabela {tabela} não possui a coluna 'Produto'.")
    except Exception as e:
        print(f"Erro ao processar a tabela {tabela}: {e}")


produtos = list(set(item for sublist in tabela_produto_dict.values() for item in sublist))
produtos




['AndirobaOleo',
 'CestinhaCarocinho',
 'CajuAçuFruto',
 'TaperebaFruto',
 'Peneira',
 'Alface',
 'BacuriFruto',
 'Portal',
 'CestinhaHamburger',
 'Cama',
 'CascaMarapuana',
 'Almofada',
 'CastanhaDoPara',
 'CascaAndiroba',
 'Amendoim',
 'SementeDeCumaru',
 'Estorac',
 'OleoPracaxi',
 'Frango',
 'SementeCumaru',
 'VassouraArtesanal',
 'PaneiroPequeno',
 'LeiteAnani',
 'Piso',
 'Comodas',
 'BacabaCaroco',
 'CascaCarapanauba',
 'FrangoCaipira',
 'TucumaFruto',
 'Paneiro',
 'CascaBarbatimao',
 'Pupunha',
 'Mel',
 'Fava',
 'Abóbora',
 'CestinhaDedal',
 'LeiteJatoba',
 'CarneBovina',
 'Compensado',
 'PimentaDoReino',
 'PimeitaDoReino',
 'Quiabo',
 'PeneiraCorujinha',
 'FavaDeJuca',
 'CascaPariri',
 'CheiroVerde',
 'Degrau',
 'Breu',
 'Farinha',
 'Peixe',
 'Abacaxi',
 'VassouraRegional',
 'Paxiuba',
 'LeiteSucuuba',
 'Guarumã',
 'AcaiCaroco',
 'VassouraInaja',
 'CaixaPia',
 'Soja',
 'Bau3',
 'CurauaFibra',
 'MadeiraVermelha',
 'Caimbe',
 'EscadaDeJabuti',
 'Lenha',
 'PaleteMadeira',
 'Pripri

In [19]:
import os
import pandas as pd

path = os.path.join(os.getcwd(), 'temp')
path_tbs_extensa = [file for file in os.listdir(path) if file.startswith('Tb')]

tabela_produto_dict = {}

soma_global = {}

for tabela in path_tbs_extensa:
    tabela_path = os.path.join(path, tabela)
    try:
        df = pd.read_excel(tabela_path)

        if all(col in df.columns for col in ['Produto', 'Unidade', 'Quantidade']):
            df_filtrado = df[['Produto', 'Unidade', 'Quantidade']].dropna()
            df_filtrado['Unidade'] = df_filtrado['Unidade'].str.lower()
            df_filtrado['Quantidade'] = pd.to_numeric(df_filtrado['Quantidade'], errors='coerce').fillna(0)

            agrupado = (
                df_filtrado
                .groupby(['Produto', 'Unidade'], dropna=True)['Quantidade']
                .sum()
                .reset_index()
            )

            tabela_dict = {}

            for _, row in agrupado.iterrows():
                produto = row['Produto']
                unidade = row['Unidade']
                quantidade = row['Quantidade']

                if produto not in tabela_dict:
                    tabela_dict[produto] = {}
                tabela_dict[produto][unidade] = quantidade

                if produto not in soma_global:
                    soma_global[produto] = {}
                if unidade not in soma_global[produto]:
                    soma_global[produto][unidade] = 0
                soma_global[produto][unidade] += quantidade

            tabela_produto_dict[tabela] = tabela_dict

        else:
            print(f"A tabela {tabela} não possui todas as colunas necessárias.")

    except Exception as e:
        print(f"Erro ao processar a tabela {tabela}: {e}")

linhas = []

for produto, unidades in soma_global.items():
    for unidade, quantidade in unidades.items():
        linhas.append({
            'produto': produto,
            'unidade': unidade,
            'soma_quantidade': quantidade
        })

df_resultado = pd.DataFrame(linhas)





### Algoritmo de indexação do trabalho

In [7]:
lavoura_permanente_pam_query 
lavoura_permanente_censo_2017_query 
lavoura_permanente_censo_2006_query


Unnamed: 0,ano,nome_regiao_integracao,produto,quantidade_produzida,valor_producao
0,2006,Tapajós,pupunha-cacho,1.00,1.80
1,2006,Lago de Tucuruí,pitanga,0.00,0.00
2,2006,Carajás,lichia,0.00,0.00
3,2006,Carajás,jabuticaba,0.00,0.00
4,2006,Xingu,guaraná,0.00,0.11
...,...,...,...,...,...
853,2006,Carajás,nectarina,0.00,0.00
854,2006,Lago de Tucuruí,pera,0.00,0.00
855,2006,,romã,0.00,0.00
856,2006,Rio Capim,nêspera,0.00,0.00
