In [0]:
%sql
-- Volume para zona raw (arquivos JSON)
CREATE VOLUME IF NOT EXISTS lakehouse.raw.raw_yfinance


In [0]:
%python
%pip install yfinance


In [0]:
%python
dbutils.library.restartPython()

In [0]:
%python
import yfinance as yf
import pandas as pd
from datetime import datetime, UTC
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Caminho do Volume UC (Raw Zone)
RAW_BASE_PATH = "/Volumes/lakehouse/raw/raw_yfinance/commodities/latest_prices"

def get_commodities_df() -> pd.DataFrame:
    """
    Retorna as últimas cotações (1 minuto) de Ouro, Petróleo e Prata via Yahoo Finance.
    """
    symbols = ["GC=F", "CL=F", "SI=F"]  # Ouro, Petróleo, Prata
    dfs = []

    for sym in symbols:
        # Baixa o último preço (1 minuto)
        try:
            ultimo_df = yf.Ticker(sym).history(period="1d", interval="1m")[["Close"]].tail(1)
            if ultimo_df.empty:
                continue

            ultimo_df = ultimo_df.rename(columns={"Close": "preco"})
            ultimo_df["ativo"] = sym
            ultimo_df["moeda"] = "USD"
            ultimo_df["horario_coleta"] = datetime.now(UTC)

            dfs.append(ultimo_df[["ativo", "preco", "moeda", "horario_coleta"]])
        except Exception as e:
            print(f"⚠️ Erro ao buscar {sym}: {e}")

    if not dfs:
        raise ValueError("Nenhuma cotação retornada pelo Yahoo Finance.")

    return pd.concat(dfs, ignore_index=True)


# Coleta
pdf = get_commodities_df()

# Define schema explícito (boa prática para ingestion)
schema = StructType([
    StructField("ativo", StringType(), False),
    StructField("preco", DoubleType(), False),
    StructField("moeda", StringType(), False),
    StructField("horario_coleta", TimestampType(), False),
])

# Cria DataFrame Spark
df = (
    spark.createDataFrame(pdf, schema=schema)
        .withColumn("ingestion_ts_utc", F.current_timestamp())
        .withColumn("source_system", F.lit("yfinance"))
        .withColumn("source_endpoint", F.lit("https://finance.yahoo.com"))
        .withColumn("ingestion_date", F.to_date(F.col("ingestion_ts_utc")))
)

# Escrita no Volume UC, particionada por ingestion_date
(
    df.write
      .mode("append")
      .partitionBy("ingestion_date")
      .json(RAW_BASE_PATH)
)

print("✅ JSON salvo em:", RAW_BASE_PATH)
