In [0]:
# -------------------------------------------------------------
# BRONZE: EXTRACT API TRANSFEREGOV
# -------------------------------------------------------------

import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import current_timestamp

spark = SparkSession.builder.getOrCreate()

# Widget para escolher endpoint
dbutils.widgets.text("endpoint", "plano_acao")
endpoint = dbutils.widgets.get("endpoint")

# -------------------------------------------------------------
# 1. Função de extração com paginação
# -------------------------------------------------------------
def extract_api_data(endpoint, limit=1000):
    url = f"https://api.transferegov.gestao.gov.br/fundoafundo/{endpoint}"
    headers = {"Accept": "application/json"}

    dados_totais = []
    offset = 0

    while True:
        try:
            params = {"offset": offset, "limit": limit}
            r = requests.get(url, headers=headers, params=params, timeout=30)
            r.raise_for_status()
            dados = r.json()

        except Exception as e:
            print(f"Erro ao conectar na API: {e}")
            break

        if not dados:
            break

        dados_totais.extend(dados)
        offset += limit

    return dados_totais

# -------------------------------------------------------------
# 2. Função de normalização: todos os valores → STRING
# -------------------------------------------------------------
def normalize(item):
    return {
        k: (
            json.dumps(v) if isinstance(v, (dict, list))
            else str(v) if v is not None
            else None
        )
        for k, v in item.items()
    }

# -------------------------------------------------------------
# 3. Buscar dados da API
# -------------------------------------------------------------
dados = extract_api_data(endpoint)

if len(dados) == 0:
    raise Exception("Nenhum dado retornado da API.")

# -------------------------------------------------------------
# 4. Descobrir TODAS as colunas existentes
# -------------------------------------------------------------
todas_colunas = set()
for item in dados:
    todas_colunas.update(item.keys())

# Normalizar todos os itens para STRING + garantir colunas faltantes
dados_normalizados = []
for item in dados:
    row = {}
    for col in todas_colunas:
        valor = item.get(col)
        if isinstance(valor, (dict, list)):
            row[col] = json.dumps(valor)
        elif valor is None:
            row[col] = None
        else:
            row[col] = str(valor)
    dados_normalizados.append(row)

# -------------------------------------------------------------
# 5. Criar schema totalmente STRING
# -------------------------------------------------------------
schema = StructType([StructField(col, StringType(), True) for col in todas_colunas])

# -------------------------------------------------------------
# 6. Criar DataFrame e adicionar coluna de auditoria(data da atualizacao)
# -------------------------------------------------------------
df = (
    spark.createDataFrame(dados_normalizados, schema)
         .withColumn("data_atualizacao", current_timestamp())
)

# -------------------------------------------------------------
# 7. Salvar no catálogo = bronze/transferegov/endpoint
# -------------------------------------------------------------
df.write.mode("overwrite") \
      .option("overwriteSchema", "true") \
      .format("delta") \
      .saveAsTable(f"bronze.transferegov.{endpoint}")

In [0]:
%sql
select count(*) from bronze.transferegov.plano_acao