In [0]:
import requests, json, time
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

bronze_base       = f"abfss://bronze@{acc}.dfs.core.windows.net"
bronze_delta_path = f"{bronze_base}/fx_usdbrl/delta"

def fetch_usd_brl():
    # 1) Frankfurter
    try:
        r = requests.get("https://api.frankfurter.app/latest?from=USD&to=BRL", timeout=10)
        r.raise_for_status()
        p = r.json()
        rate = float(p["rates"]["BRL"])
        ref_date = p.get("date")
        base = p.get("base", "USD")
        return {"provider":"frankfurter", "base":base, "rate":rate, "ref_date":ref_date, "raw":p}
    except Exception:
        pass
    # 2) Open ER-API
    r = requests.get("https://open.er-api.com/v6/latest/USD", timeout=10)
    r.raise_for_status()
    p = r.json()
    if p.get("result") != "success":
        raise ValueError(f"Provider ER-API error: {p}")
    rate = float(p["rates"]["BRL"])
    ref_date = p.get("time_last_update_utc")  # timestamp amigável
    return {"provider":"er-api", "base":"USD", "rate":rate, "ref_date":ref_date, "raw":p}

data = fetch_usd_brl()
assert data["rate"] > 0, "Taxa inválida"

record = [{
    "provider": data["provider"],
    "base":     data["base"],
    "symbol":   "BRL",
    "rate":     data["rate"],
    "ref_date": data["ref_date"],
    "ingestion_epoch": int(time.time()),
    "raw_json": json.dumps(data["raw"], ensure_ascii=False)
}]

schema = StructType([
    StructField("provider", StringType(), True),
    StructField("base", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("rate", DoubleType(), True),
    StructField("ref_date", StringType(), True),
    StructField("ingestion_epoch", StringType(), True),
    StructField("raw_json", StringType(), True),
])

df_bronze = spark.createDataFrame(record, schema)\
                 .withColumn("ingestion_ts", F.current_timestamp())

(df_bronze
 .write
 .format("delta")
 .mode("append")
 .option('mergeSchema', 'true').save(bronze_delta_path)
)

display(spark.read.format("delta").load(bronze_delta_path).orderBy(F.col("ingestion_ts").desc()))


In [0]:
import requests, json, time
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# 1) Consulta HTTP (sem token)
url = "https://api.exchangerate.host/latest?base=USD&symbols=BRL"
resp = requests.get(url, timeout=10)
resp.raise_for_status()
payload = resp.json()

# 2) Extrai campos principais
# Exemplo de payload:
# {"base":"USD","date":"2025-08-19","rates":{"BRL":5.27}, "success":true}
base = payload.get("base")
date = payload.get("date")
rate = float(payload.get("rates", {}).get("BRL", 0.0))
ingestion_ts = int(time.time())

record = [{
    "base": base,
    "symbol": "BRL",
    "rate": rate,
    "ref_date": date,          # data de referência informada pela API
    "ingestion_epoch": ingestion_ts,  # carimbo da coleta
    "raw_json": json.dumps(payload, ensure_ascii=False)  # trilha "raw"
}]

schema = StructType([
    StructField("base", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("rate", DoubleType(), True),
    StructField("ref_date", StringType(), True),
    StructField("ingestion_epoch",  StringType(), True),
    StructField("raw_json", StringType(), True),
])

df_bronze = spark.createDataFrame(record, schema)\
                 .withColumn("ingestion_ts", F.current_timestamp())

# 3) Escreve/Anexa em Delta (Bronze)
(
    df_bronze
    .write
    .format("delta")
    .mode("append")
    .save(bronze_delta_path)
)

print("Bronze ✅ gravado em:", bronze_delta_path)
display(spark.read.format("delta").load(bronze_delta_path).orderBy(F.col("ingestion_ts").desc()))
