In [0]:
# Importações de ambiente
import requests
import pandas as pd
from pyspark.sql.functions import current_timestamp, lit
from pyspark.sql.utils import AnalysisException

In [0]:
# 1. Definição da URL da API (PIB real, % anual)
url = "http://api.worldbank.org/v2/country/all/indicator/NY.GDP.MKTP.KD.ZG?format=json&per_page=20000"

In [0]:
# 2. Requisição HTTP
response = requests.get(url)
if response.status_code != 200:
    raise Exception(f"Erro ao acessar a API: {response.status_code}")

In [0]:
# 3. Conversão do JSON
data_json = response.json()

In [0]:
# 4. Validação da estrutura esperada
if len(data_json) < 2 or not isinstance(data_json[1], list):
    raise ValueError("Estrutura insesperada na resposta da API.")

In [0]:
# 5. Conversão para DataFrame Pandas
pdf = pd.DataFrame(data_json[1])

In [0]:
# 6. Seleção e transformação de colunas
if not {"country", "countryiso3code", "date", "value"}.issubset(pdf.columns):
    raise ValueError("Colunas esperadas não encontradas no JSON")

pdf = pdf[["country", "countryiso3code", "date", "value"]]
pdf["country"] = pdf["country"].apply(lambda x: x.get("value") if isinstance(x, dict) else x)

In [0]:
# 7. Conversão para Spark DataFrame + colunas extras
df = (spark.createDataFrame(pdf)
      .withColumn("ingest_timestamp", current_timestamp())
      .withColumn("ingest_file_name", lit("api_worldbank")))

In [0]:
# 8. Escrita na camada Bronze (Delta + particionado)
output_path = "/Volumes/workspace/world_bank/bronze"

try:
    df.write.format("delta").mode("overwrite").partitionBy("date").save(output_path)

    print("✅ Dados salvos com sucesso na camada Bronze.")

    # Criar tabela no catalogo
    spark.sql("DROP TABLE IF EXISTS workspace.world_bank.gdp_anual")  # Evita erros se já existir
    spark.sql(f"CREATE TABLE workspace.world_bank.gdp_anual USING DELTA LOCATION '{output_path}'")
    print("✅ Tabela bronze.gdp_anual criada com sucesso.")

except AnalysisException as e:
    print(f"❌ Erro ao salvar dados na camada Bronze: {e}")

In [0]:
# Pipeline:
# - Fonte: API Banco Mundial - PIB anual (%)
# - Ingestão: camada Bronze (formato Delta, particionado por ano)
# - Tabela: bronze.pib_anual
# - Campos extras: data_ingestao, fonte