## Descrição e Definição do Problema

O mercado financeiro é influenciado por fatores **locais** e **globais**. Para traders e investidores, identificar relações estatísticas entre ativos (ex.: índice local versus S&P500 e DXY) pode apoiar decisões mais consistentes.

Apesar da disponibilidade de dados históricos (B3 e fontes públicas como Yahoo Finance), transformar relações como **correlação**, **retornos** e **inclinação de tendência** em **sinais operacionais acionáveis** (compra/venda) ainda é um desafio — especialmente para participantes que não dispõem de infraestrutura de análise quantitativa e modelos estatísticos/ML.

## Objetivo

Avaliar se a dinâmica de correlações e retornos entre ativos **locais** (proxy de IBOV/WIN e proxy do dólar) e ativos **globais** (ex.: S&P500, DXY, juros EUA, petróleo, Nasdaq) fornece **vantagem preditiva** para a geração de sinais operacionais, com foco em **Mini Índice (WINFUT)** e **Mini Dólar (DOLFUT)**.

## Tipo de problema

Problema de **Análise Exploratória Quantitativa**, com potencial aplicação em **geração de sinais** e suporte a decisões, utilizando:
- correlação dinâmica,
- retornos e volatilidade,
- inclinação/tendência,
- construção de índices compostos (local e global).

## Hipóteses

1. Existe relação estatisticamente relevante entre **variações na correlação** dos ativos e movimentos futuros de preço.
2. Mudanças específicas nos níveis de correlação podem **anteceder** movimentos direcionais (alta/baixa) em proxies de WINFUT e DOLFUT.
3. Um modelo baseado em **correlação dinâmica + inclinações + retornos** pode produzir sinais mais robustos do que o uso isolado de indicadores técnicos tradicionais.
4. O comportamento passado das correlações e inclinações pode ser preditivo dentro de uma janela temporal adequada (ex.: 21 dias).
5. A combinação de um **índice local** e um **índice global** (ponderados) melhora a consistência/precisão dos sinais.

## Seleção e Restrição dos Dados

## Fonte
- Séries históricas obtidas via **Yahoo Finance** (OHLCV).

## Período e frequência
- Período-alvo: **01/01/2023 a 01/01/2025** (2 anos)
- Frequência: **diária** (limitação para backtests intradiários)

## Restrições e limitações
- Diferenças de horário de negociação e liquidez entre mercados local e global podem introduzir defasagens.
- O Yahoo Finance não disponibiliza diretamente contratos futuros B3 (WINFUT/DOLFUT). Assim, são utilizadas **proxies** (ex.: `^BVSP` e `USDBRL=X` quando aplicável).

## Seleção e Composição dos Ativos

## Ativos locais (exemplos)
- Ações de alta liquidez: VALE3, PETR4, ITUB4, B3SA3, BBDC4, BBAS3, WEGE3, RENT3, ABEV3, PETR3
- Índice `^BVSP` → proxy do **IBOV/WIN**
- `USDBRL=X` → proxy do **Dólar** (quando usado no estudo)

## Ativos globais (exemplos)
- `^GSPC` (S&P500)
- `^IXIC` (Nasdaq)
- `^DJI` (Dow Jones)
- `DX-Y.NYB` (DXY)
- `^TNX` (Treasury 10Y)
- `CL=F` (Petróleo WTI)
- `BTC-USD` (Bitcoin)
- `^MXX` (México — emergente correlacionado)

Racional: capturar influência externa relevante no mercado brasileiro e testar se relações estatísticas oferecem sinalização antecipada.

## Definição dos Atributos

- Preço (OHLC):
  - `Open`, `High`, `Low`, `Close`
- Volume:
  - `Volume` (quando disponível)
- Observação:
  - `Adj Close` pode existir, mas o Bronze padroniza o dataset para colunas essenciais e rastreáveis.

## Features e Dados Derivados (visão)

- Construção de dois indicadores compostos:
  - **Índice Local**: combinação ponderada de ativos brasileiros
  - **Índice Global**: combinação ponderada de ativos internacionais
- Suavização por médias móveis para reduzir ruído.
- Tratamentos:
  - remoção/imputação conservadora de faltantes,
  - checagem de integridade temporal,
  - normalização/padronização quando aplicável.


In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC # Bronze v5 — Ingestão de dados do Yahoo Finance
# MAGIC
# MAGIC ## Objetivo
# MAGIC Implementar a camada **Bronze** do pipeline, ingerindo séries históricas via **Yahoo Finance** e persistindo os dados brutos em formato **Delta**.
# MAGIC
# MAGIC ## Saída esperada
# MAGIC Tabela Delta: `mvp_finance.bronze_prices_raw`
# MAGIC
# MAGIC ## Parâmetros
# MAGIC - Período: `2y`
# MAGIC - Frequência: `1d`
# COMMAND ----------
# MAGIC %md
# MAGIC ## 1) Dependências
# MAGIC Instalamos o pacote `yfinance`, utilizado para baixar OHLCV do Yahoo Finance.
# COMMAND ----------
%pip install yfinance




[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 2) Imports e funções Spark auxiliares
# MAGIC Importamos bibliotecas de manipulação (pandas), download (yfinance) e funções Spark para construir o dataset final.
# COMMAND ----------
# ================================================
# Importando as bibliotecas
# ================================================

import pandas as pd
import yfinance as yf
import numpy as np
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
import plotly.express as px
import matplotlib.ticker as mtick
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import lit, current_timestamp

# Define fonte padrão
mpl.rcParams['font.family'] = 'DejaVu Sans'

# Pandas: Formatação para exibir valores com separador de milhar
pd.options.display.float_format = '{:,.2f}'.format

def formatar_eixo_y_duas_casas():
    """Formata o eixo Y com duas casas decimais."""
    plt.gca().yaxis.set_major_formatter(mtick.StrMethodFormatter('{x:,.2f}'))


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 3) Preparação do ambiente (database)
# MAGIC Garantimos que o database `mvp_finance` exista e o definimos como contexto de execução.
# COMMAND ----------
spark.sql("CREATE DATABASE IF NOT EXISTS mvp_finance")
spark.sql("USE mvp_finance")



DataFrame[]

In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 4) Configuração dos ativos e janelas de extração
# MAGIC Definimos os tickers do Yahoo e seus nomes lógicos (para facilitar leituras futuras) e os parâmetros de período/frequência.
# COMMAND ----------
tickers = {
        '^GSPC': 'SP500',
        '^IXIC': 'NASDAQ',
        '^DJI': 'DOWJONES',
        'DX-Y.NYB': 'DXY',
        '^TNX': 'TREASURY10Y',
        'CL=F': 'PETROLEO',
        'BTC-USD': 'BITCOIN',
        '^MXX': 'MEXICO',
}
period = "2y"
interval = "1d"

In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 5) Função utilitária: normalização de colunas (pandas)
# MAGIC O `yfinance` pode retornar colunas em formato **MultiIndex** em alguns cenários.
# MAGIC Esta função achata o nome das colunas para evitar quebra de schema e facilitar o mapeamento.
# COMMAND ----------
def flatten_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Achata MultiIndex se vier."""
    if isinstance(df.columns, pd.MultiIndex):
        new_cols = []
        for col in df.columns:
            parts = [str(x) for x in col if x not in ("", None)]
            new_cols.append("_".join(parts))
        df.columns = new_cols
    return df

In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 6) Função principal: baixar um ticker e padronizar schema
# MAGIC Esta função:
# MAGIC - baixa o ativo no Yahoo Finance
# MAGIC - garante que o dataset tenha colunas fixas (`date_raw`, `open`, `high`, `low`, `close`, `volume`)
# MAGIC - adiciona metadados (`symbol`, `source`, `ingestion_ts`)
# MAGIC - retorna um **Spark DataFrame** pronto para union
# COMMAND ----------
def baixar_ticker_simples(ticker: str, nome_logico: str) -> DataFrame | None:
    print(f" Baixando {ticker} ({nome_logico}) ...")
    data = yf.download(ticker, period=period, interval=interval)

    if data is None or data.empty:
        print(f"⚠ Sem dados para {ticker}, ignorando.")
        return None

    # Ajuste de index/colunas no pandas
    data = data.reset_index()
    data = flatten_columns(data)
    print(" Colunas pandas:", list(data.columns))

    # mapeia colunas básicas
    col_map = {}

    # identifica coluna de data (varia entre Date/Datetime dependendo do ativo/intervalo)
    for cand in ["Date", "date", "Datetime", "datetime"]:
        if cand in data.columns:
            col_map["date_raw"] = cand
            break

    def achar_col(prefixos):
        for c in data.columns:
            for p in prefixos:
                if str(c).lower().startswith(p.lower()):
                    return c
        return None

    col_map["open"]   = achar_col(["Open", "open"])
    col_map["high"]   = achar_col(["High", "high"])
    col_map["low"]    = achar_col(["Low", "low"])
    col_map["close"]  = achar_col(["Close", "close"])
    col_map["volume"] = achar_col(["Volume", "volume"])

    print(" col_map:", col_map)

    # dataset final (pandas) com schema fixo
    pdf = pd.DataFrame()
    pdf["date_raw"] = data[col_map["date_raw"]]
    pdf["open"]     = data[col_map["open"]]
    pdf["high"]     = data[col_map["high"]]
    pdf["low"]      = data[col_map["low"]]
    pdf["close"]    = data[col_map["close"]]
    pdf["volume"]   = data[col_map["volume"]]

    # converte para Spark
    sdf = spark.createDataFrame(pdf)

    # metadados de linhagem
    sdf = (
        sdf.withColumn("symbol", lit(nome_logico))
           .withColumn("source", lit("yahoo_finance_api"))
           .withColumn("ingestion_ts", current_timestamp())
    )

    return sdf


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 7) Execução da ingestão (loop por ticker)
# MAGIC Percorremos o dicionário `tickers`, chamamos a função de download e acumulamos DataFrames válidos.
# MAGIC Ao final, fazemos o `unionByName` para consolidar tudo em um único DataFrame Bronze.
# COMMAND ----------
bronze_dfs = []

for ticker, nome in tickers.items():
    sdf = baixar_ticker_simples(ticker, nome)
    if sdf is not None:
        bronze_dfs.append(sdf)

if not bronze_dfs:
    raise RuntimeError("Nenhum ticker retornou dados.")

bronze_df = reduce(DataFrame.unionByName, bronze_dfs)



 Baixando ^GSPC (SP500) ...


  data = yf.download(ticker, period=period, interval=interval)
[*********************100%***********************]  1 of 1 completed


 Colunas pandas: ['Date', 'Close_^GSPC', 'High_^GSPC', 'Low_^GSPC', 'Open_^GSPC', 'Volume_^GSPC']
 col_map: {'date_raw': 'Date', 'open': 'Open_^GSPC', 'high': 'High_^GSPC', 'low': 'Low_^GSPC', 'close': 'Close_^GSPC', 'volume': 'Volume_^GSPC'}


  data = yf.download(ticker, period=period, interval=interval)
[*********************100%***********************]  1 of 1 completed

 Baixando ^IXIC (NASDAQ) ...
 Colunas pandas: ['Date', 'Close_^IXIC', 'High_^IXIC', 'Low_^IXIC', 'Open_^IXIC', 'Volume_^IXIC']
 col_map: {'date_raw': 'Date', 'open': 'Open_^IXIC', 'high': 'High_^IXIC', 'low': 'Low_^IXIC', 'close': 'Close_^IXIC', 'volume': 'Volume_^IXIC'}



  data = yf.download(ticker, period=period, interval=interval)
[*********************100%***********************]  1 of 1 completed

 Baixando ^DJI (DOWJONES) ...
 Colunas pandas: ['Date', 'Close_^DJI', 'High_^DJI', 'Low_^DJI', 'Open_^DJI', 'Volume_^DJI']
 col_map: {'date_raw': 'Date', 'open': 'Open_^DJI', 'high': 'High_^DJI', 'low': 'Low_^DJI', 'close': 'Close_^DJI', 'volume': 'Volume_^DJI'}



  data = yf.download(ticker, period=period, interval=interval)


 Baixando DX-Y.NYB (DXY) ...


[*********************100%***********************]  1 of 1 completed
  data = yf.download(ticker, period=period, interval=interval)


 Colunas pandas: ['Date', 'Close_DX-Y.NYB', 'High_DX-Y.NYB', 'Low_DX-Y.NYB', 'Open_DX-Y.NYB', 'Volume_DX-Y.NYB']
 col_map: {'date_raw': 'Date', 'open': 'Open_DX-Y.NYB', 'high': 'High_DX-Y.NYB', 'low': 'Low_DX-Y.NYB', 'close': 'Close_DX-Y.NYB', 'volume': 'Volume_DX-Y.NYB'}
 Baixando ^TNX (TREASURY10Y) ...


[*********************100%***********************]  1 of 1 completed
  data = yf.download(ticker, period=period, interval=interval)


 Colunas pandas: ['Date', 'Close_^TNX', 'High_^TNX', 'Low_^TNX', 'Open_^TNX', 'Volume_^TNX']
 col_map: {'date_raw': 'Date', 'open': 'Open_^TNX', 'high': 'High_^TNX', 'low': 'Low_^TNX', 'close': 'Close_^TNX', 'volume': 'Volume_^TNX'}
 Baixando CL=F (PETROLEO) ...


[*********************100%***********************]  1 of 1 completed
  data = yf.download(ticker, period=period, interval=interval)


 Colunas pandas: ['Date', 'Close_CL=F', 'High_CL=F', 'Low_CL=F', 'Open_CL=F', 'Volume_CL=F']
 col_map: {'date_raw': 'Date', 'open': 'Open_CL=F', 'high': 'High_CL=F', 'low': 'Low_CL=F', 'close': 'Close_CL=F', 'volume': 'Volume_CL=F'}
 Baixando BTC-USD (BITCOIN) ...


[*********************100%***********************]  1 of 1 completed

 Colunas pandas: ['Date', 'Close_BTC-USD', 'High_BTC-USD', 'Low_BTC-USD', 'Open_BTC-USD', 'Volume_BTC-USD']
 col_map: {'date_raw': 'Date', 'open': 'Open_BTC-USD', 'high': 'High_BTC-USD', 'low': 'Low_BTC-USD', 'close': 'Close_BTC-USD', 'volume': 'Volume_BTC-USD'}



  data = yf.download(ticker, period=period, interval=interval)


 Baixando ^MXX (MEXICO) ...


[*********************100%***********************]  1 of 1 completed


 Colunas pandas: ['Date', 'Close_^MXX', 'High_^MXX', 'Low_^MXX', 'Open_^MXX', 'Volume_^MXX']
 col_map: {'date_raw': 'Date', 'open': 'Open_^MXX', 'high': 'High_^MXX', 'low': 'Low_^MXX', 'close': 'Close_^MXX', 'volume': 'Volume_^MXX'}


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 8) Validação rápida do schema e amostra
# MAGIC Inspecionamos o schema final e exibimos uma pequena amostra para confirmar:
# MAGIC - colunas esperadas
# MAGIC - tipos de dados coerentes
# MAGIC - presença de metadados (`symbol`, `source`, `ingestion_ts`)
# COMMAND ----------
print(" Schema Bronze v5:")
bronze_df.printSchema()
display(bronze_df.limit(10))

 Schema Bronze v5:
root
 |-- date_raw: timestamp (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: long (nullable = true)
 |-- symbol: string (nullable = false)
 |-- source: string (nullable = false)
 |-- ingestion_ts: timestamp (nullable = false)



date_raw,open,high,low,close,volume,symbol,source,ingestion_ts
2023-12-18T00:00:00.000Z,4725.580078125,4749.52001953125,4725.580078125,4740.56005859375,4060340000,SP500,yahoo_finance_api,2025-12-17T23:55:23.478Z
2023-12-19T00:00:00.000Z,4743.72021484375,4768.68994140625,4743.72021484375,4768.3701171875,4026970000,SP500,yahoo_finance_api,2025-12-17T23:55:23.478Z
2023-12-20T00:00:00.000Z,4764.72998046875,4778.009765625,4697.81982421875,4698.35009765625,4201320000,SP500,yahoo_finance_api,2025-12-17T23:55:23.478Z
2023-12-21T00:00:00.000Z,4724.2900390625,4748.7099609375,4708.35009765625,4746.75,3431180000,SP500,yahoo_finance_api,2025-12-17T23:55:23.478Z
2023-12-22T00:00:00.000Z,4753.919921875,4772.93994140625,4736.77001953125,4754.6298828125,3046770000,SP500,yahoo_finance_api,2025-12-17T23:55:23.478Z
2023-12-26T00:00:00.000Z,4758.85986328125,4784.72021484375,4758.4501953125,4774.75,2513910000,SP500,yahoo_finance_api,2025-12-17T23:55:23.478Z
2023-12-27T00:00:00.000Z,4773.4501953125,4785.39013671875,4768.89990234375,4781.580078125,2748450000,SP500,yahoo_finance_api,2025-12-17T23:55:23.478Z
2023-12-28T00:00:00.000Z,4786.43994140625,4793.2998046875,4780.97998046875,4783.35009765625,2698860000,SP500,yahoo_finance_api,2025-12-17T23:55:23.478Z
2023-12-29T00:00:00.000Z,4782.8798828125,4788.43017578125,4751.990234375,4769.830078125,3126060000,SP500,yahoo_finance_api,2025-12-17T23:55:23.478Z
2024-01-02T00:00:00.000Z,4745.2001953125,4754.330078125,4722.669921875,4742.830078125,3743050000,SP500,yahoo_finance_api,2025-12-17T23:55:23.478Z


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 9) Persistência em Delta (overwrite)
# MAGIC Recriamos a tabela para garantir reprocessamento idempotente nesta fase do projeto.
# MAGIC Em produção, isso pode ser evoluído para modo incremental/merge.
# COMMAND ----------
spark.sql("""
SELECT symbol, COUNT(*) AS qtde
FROM mvp_finance.bronze_prices_raw
GROUP BY symbol
""").show()

spark.sql("""
SELECT *
FROM mvp_finance.bronze_prices_raw
LIMIT 5
""").show()

+-----------+----+
|     symbol|qtde|
+-----------+----+
|    BITCOIN| 732|
|     MEXICO| 502|
|   PETROLEO| 504|
|      SP500| 502|
|     NASDAQ| 502|
|   DOWJONES| 502|
|        DXY| 504|
|TREASURY10Y| 502|
+-----------+----+

+-------------------+------------------+------------------+------------------+------------------+------+------+-----------------+--------------------+
|           date_raw|              open|              high|               low|             close|volume|symbol|           source|        ingestion_ts|
+-------------------+------------------+------------------+------------------+------------------+------+------+-----------------+--------------------+
|2024-09-26 00:00:00|100.93000030517578|100.97000122070312| 100.4800033569336|100.55999755859375|     0|   DXY|yahoo_finance_api|2025-12-17 23:01:...|
|2024-09-27 00:00:00|100.58999633789062|100.87999725341797|100.16000366210938|100.41999816894531|     0|   DXY|yahoo_finance_api|2025-12-17 23:01:...|
|2024-09-30 00:0

In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 10) Checagens pós-gravação
# MAGIC Fazemos duas checagens simples:
# MAGIC - contagem por `symbol` (confirma ingestão por ativo)
# MAGIC - amostra da tabela persistida (confirma leitura do Delta)
# COMMAND ----------
spark.sql("""
SELECT symbol, COUNT(*) AS qtde
FROM mvp_finance.bronze_prices_raw
GROUP BY symbol
""").show()

spark.sql("""
SELECT *
FROM mvp_finance.bronze_prices_raw
LIMIT 5
""").show()



+-----------+----+
|     symbol|qtde|
+-----------+----+
|    BITCOIN| 732|
|     MEXICO| 502|
|   PETROLEO| 504|
|      SP500| 502|
|     NASDAQ| 502|
|   DOWJONES| 502|
|        DXY| 504|
|TREASURY10Y| 502|
+-----------+----+

+-------------------+------------------+------------------+------------------+------------------+------+------+-----------------+--------------------+
|           date_raw|              open|              high|               low|             close|volume|symbol|           source|        ingestion_ts|
+-------------------+------------------+------------------+------------------+------------------+------+------+-----------------+--------------------+
|2024-09-26 00:00:00|100.93000030517578|100.97000122070312| 100.4800033569336|100.55999755859375|     0|   DXY|yahoo_finance_api|2025-12-17 23:01:...|
|2024-09-27 00:00:00|100.58999633789062|100.87999725341797|100.16000366210938|100.41999816894531|     0|   DXY|yahoo_finance_api|2025-12-17 23:01:...|
|2024-09-30 00:0

In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 11) Critérios de aceite (Quality Gate) — Bronze v5
# MAGIC Antes de persistir a camada Bronze, aplicamos validações mínimas para garantir:
# MAGIC
# MAGIC **(A) Schema esperado**
# MAGIC - Colunas obrigatórias:
# MAGIC   - `date_raw`, `open`, `high`, `low`, `close`, `volume`, `symbol`, `source`, `ingestion_ts`
# MAGIC
# MAGIC **(B) Integridade mínima**
# MAGIC - Pelo menos 1 linha por `symbol`
# MAGIC - Ausência de `NULL` em colunas críticas: `date_raw`, `close`, `symbol`
# MAGIC
# MAGIC **(C) Faixa e consistência de preços**
# MAGIC - `high >= low`
# MAGIC - `open`, `high`, `low`, `close` não-negativos (quando aplicável)
# MAGIC
# MAGIC Se qualquer validação falhar, o notebook interrompe a execução com erro claro.
# COMMAND ----------
from pyspark.sql import functions as F

# 1) Schema obrigatório
required_cols = [
    "date_raw", "open", "high", "low", "close", "volume",
    "symbol", "source", "ingestion_ts"
]
missing = [c for c in required_cols if c not in bronze_df.columns]
if missing:
    raise RuntimeError(f"[QUALITY GATE] Colunas ausentes no Bronze: {missing}")

# 2) Contagem mínima por symbol (>= 1)
counts_df = bronze_df.groupBy("symbol").count()
missing_symbols = (
    counts_df.filter(F.col("count") <= 0)
             .select("symbol")
             .collect()
)
if missing_symbols:
    raise RuntimeError(f"[QUALITY GATE] Symbols sem registros: {[r['symbol'] for r in missing_symbols]}")

# 3) Null checks em colunas críticas
critical_nulls = (
    bronze_df.select(
        F.sum(F.col("date_raw").isNull().cast("int")).alias("null_date_raw"),
        F.sum(F.col("close").isNull().cast("int")).alias("null_close"),
        F.sum(F.col("symbol").isNull().cast("int")).alias("null_symbol"),
    ).collect()[0]
)

if critical_nulls["null_date_raw"] > 0 or critical_nulls["null_close"] > 0 or critical_nulls["null_symbol"] > 0:
    raise RuntimeError(
        "[QUALITY GATE] Nulls em colunas críticas: "
        f"date_raw={critical_nulls['null_date_raw']}, "
        f"close={critical_nulls['null_close']}, "
        f"symbol={critical_nulls['null_symbol']}"
    )

# 4) Regras de consistência (high >= low) e preços não-negativos
bad_hilo = bronze_df.filter(F.col("high") < F.col("low")).count()
if bad_hilo > 0:
    raise RuntimeError(f"[QUALITY GATE] Registros inválidos: high < low (qtde={bad_hilo})")

bad_negative = bronze_df.filter(
    (F.col("open") < 0) | (F.col("high") < 0) | (F.col("low") < 0) | (F.col("close") < 0)
).count()
if bad_negative > 0:
    raise RuntimeError(f"[QUALITY GATE] Preços negativos detectados (qtde={bad_negative})")

print("[QUALITY GATE] OK — Bronze v5 passou nas validações mínimas.")
display(counts_df.orderBy("symbol"))


[QUALITY GATE] OK — Bronze v5 passou nas validações mínimas.


symbol,count
BITCOIN,732
DOWJONES,502
DXY,504
MEXICO,502
NASDAQ,502
PETROLEO,504
SP500,502
TREASURY10Y,502


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 12) Persistência em Delta — somente após o Quality Gate
# MAGIC Se o Quality Gate passou, persistimos a tabela Delta com `overwrite`.
# MAGIC (Em evolução futura: modo incremental + `MERGE` para reprocessamentos parciais.)
# COMMAND ----------
spark.sql("DROP TABLE IF EXISTS mvp_finance.bronze_prices_raw")

(
    bronze_df.write
        .format("delta")
        .mode("overwrite")
        .saveAsTable("mvp_finance.bronze_prices_raw")
)

print(" Bronze v5 criada → mvp_finance.bronze_prices_raw")

spark.sql("SELECT symbol, COUNT(*) AS qtde FROM mvp_finance.bronze_prices_raw GROUP BY symbol").show()
spark.sql("SELECT * FROM mvp_finance.bronze_prices_raw LIMIT 5").show()


 Bronze v5 criada → mvp_finance.bronze_prices_raw
+-----------+----+
|     symbol|qtde|
+-----------+----+
|    BITCOIN| 732|
|     MEXICO| 502|
|   PETROLEO| 504|
|      SP500| 502|
|     NASDAQ| 502|
|   DOWJONES| 502|
|        DXY| 504|
|TREASURY10Y| 502|
+-----------+----+

+-------------------+---------------+---------------+---------------+---------------+-----------+------+-----------------+--------------------+
|           date_raw|           open|           high|            low|          close|     volume|symbol|           source|        ingestion_ts|
+-------------------+---------------+---------------+---------------+---------------+-----------+------+-----------------+--------------------+
|2025-05-09 00:00:00| 18022.55078125|18068.900390625| 17853.83984375|17928.919921875| 8835140000|NASDAQ|yahoo_finance_api|2025-12-17 23:55:...|
|2025-05-12 00:00:00|18674.560546875|18710.220703125|  18472.7109375| 18708.33984375|10717670000|NASDAQ|yahoo_finance_api|2025-12-17 23:55:...|
|

In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 13) Quality Gate (opcional) — Cobertura temporal por ativo
# MAGIC Valida se cada `symbol` cobre um intervalo temporal minimamente aceitável.
# MAGIC
# MAGIC **Por que isso importa?**
# MAGIC - Evita seguir para Silver/Gold com série “capenga” (ex.: ativo voltou só 2 semanas).
# MAGIC - Ajuda a detectar falhas silenciosas no download.
# MAGIC
# MAGIC **Regra aplicada (ajustável)**
# MAGIC - Para cada `symbol`, a diferença entre `min(date_raw)` e `max(date_raw)` deve ser
# MAGIC   pelo menos `min_coverage_days` dias.
# MAGIC
# MAGIC Observação: como o `period="2y"` do Yahoo pode variar um pouco, usamos uma margem conservadora.
# COMMAND ----------
from pyspark.sql import functions as F

# Ajuste aqui se quiser ser mais rígido
min_coverage_days = 300  # ex.: ~1 ano (conservador). Para 2 anos, poderia usar 600+.

coverage_df = (
    bronze_df
    .groupBy("symbol")
    .agg(
        F.min("date_raw").alias("min_date_raw"),
        F.max("date_raw").alias("max_date_raw"),
        F.count("*").alias("rows"),
    )
    .withColumn("coverage_days", F.datediff(F.col("max_date_raw"), F.col("min_date_raw")))
)

display(coverage_df.orderBy("symbol"))

bad_coverage = (
    coverage_df
    .filter(F.col("coverage_days") < F.lit(min_coverage_days))
    .select("symbol", "min_date_raw", "max_date_raw", "coverage_days", "rows")
    .collect()
)

if bad_coverage:
    msg = "\n".join([
        f"- {r['symbol']}: {r['min_date_raw']} → {r['max_date_raw']} "
        f"({r['coverage_days']} dias, rows={r['rows']})"
        for r in bad_coverage
    ])
    raise RuntimeError(
        "[QUALITY GATE - TEMPORAL COVERAGE] Cobertura temporal insuficiente para:\n" + msg
    )

print(f"[QUALITY GATE - TEMPORAL COVERAGE] OK — todos os symbols >= {min_coverage_days} dias de cobertura.")



symbol,min_date_raw,max_date_raw,rows,coverage_days
BITCOIN,2023-12-17T00:00:00.000Z,2025-12-17T00:00:00.000Z,732,731
DOWJONES,2023-12-18T00:00:00.000Z,2025-12-17T00:00:00.000Z,502,730
DXY,2023-12-18T00:00:00.000Z,2025-12-17T00:00:00.000Z,504,730
MEXICO,2023-12-18T00:00:00.000Z,2025-12-17T00:00:00.000Z,502,730
NASDAQ,2023-12-18T00:00:00.000Z,2025-12-17T00:00:00.000Z,502,730
PETROLEO,2023-12-18T00:00:00.000Z,2025-12-17T00:00:00.000Z,504,730
SP500,2023-12-18T00:00:00.000Z,2025-12-17T00:00:00.000Z,502,730
TREASURY10Y,2023-12-18T00:00:00.000Z,2025-12-17T00:00:00.000Z,502,730


[QUALITY GATE - TEMPORAL COVERAGE] OK — todos os symbols >= 300 dias de cobertura.


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 14) Quality Gate (opcional) — Duplicidade por `symbol` + `date_raw`
# MAGIC Valida se existem registros duplicados por chave natural (ativo + data).
# MAGIC
# MAGIC **Por que isso importa?**
# MAGIC - Em modo incremental/merge, duplicatas quebram a integridade e distorcem retornos/correlações.
# MAGIC - Mesmo no `overwrite`, duplicatas “sujam” os cálculos em Silver/Gold.
# MAGIC
# MAGIC **Regra aplicada**
# MAGIC - `COUNT(*)` por (`symbol`, `date_raw`) deve ser sempre igual a 1.
# COMMAND ----------
dups_df = (
    bronze_df
    .groupBy("symbol", "date_raw")
    .count()
    .filter(F.col("count") > 1)
)

dups_count = dups_df.count()

if dups_count > 0:
    print("[QUALITY GATE - DUPLICIDADE] Duplicatas encontradas (amostra):")
    display(dups_df.orderBy(F.desc("count")).limit(50))
    raise RuntimeError(f"[QUALITY GATE - DUPLICIDADE] Falhou: {dups_count} chaves (symbol, date_raw) duplicadas.")
else:
    print("[QUALITY GATE - DUPLICIDADE] OK — nenhuma duplicidade por (symbol, date_raw).")


[QUALITY GATE - DUPLICIDADE] OK — nenhuma duplicidade por (symbol, date_raw).


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 15) Quality Gate (opcional) — Sanidade estatística rápida (outliers básicos)
# MAGIC Checagens simples para capturar anomalias gritantes:
# MAGIC - `close` igual a 0
# MAGIC - `volume` negativo
# MAGIC - `high/low` com valores extremos (ex.: `high` muito distante de `low`)
# MAGIC
# MAGIC Observação: não é um detector completo de anomalias; é um “alarme de incêndio”.
# COMMAND ----------
# Regras básicas (ajuste conforme necessidade)
zero_close = bronze_df.filter(F.col("close") == 0).count()
neg_volume = bronze_df.filter(F.col("volume") < 0).count()

# Range anormal (high/low ratio) — evita divisão por zero
extreme_range = bronze_df.filter(
    (F.col("low") > 0) &
    ((F.col("high") / F.col("low")) > 10)   # 10x no mesmo candle diário geralmente é suspeito
).count()

if zero_close > 0 or neg_volume > 0 or extreme_range > 0:
    raise RuntimeError(
        "[QUALITY GATE - SANIDADE] Falhou: "
        f"close==0: {zero_close}, volume<0: {neg_volume}, high/low>10: {extreme_range}"
    )

print("[QUALITY GATE - SANIDADE] OK — checagens rápidas passaram.")


[QUALITY GATE - SANIDADE] OK — checagens rápidas passaram.


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## 16) Persistência final (após todos os Quality Gates)
# MAGIC Se todas as validações passaram, persistimos a camada Bronze em Delta.
# COMMAND ----------
spark.sql("DROP TABLE IF EXISTS mvp_finance.bronze_prices_raw")

(
    bronze_df.write
        .format("delta")
        .mode("overwrite")
        .saveAsTable("mvp_finance.bronze_prices_raw")
)

print(" Bronze v5 criada → mvp_finance.bronze_prices_raw")

spark.sql("""
SELECT symbol, COUNT(*) AS qtde
FROM mvp_finance.bronze_prices_raw
GROUP BY symbol
ORDER BY symbol
""").show()

spark.sql("""
SELECT *
FROM mvp_finance.bronze_prices_raw
ORDER BY symbol, date_raw
LIMIT 10
""").show()


 Bronze v5 criada → mvp_finance.bronze_prices_raw
+-----------+----+
|     symbol|qtde|
+-----------+----+
|    BITCOIN| 732|
|   DOWJONES| 502|
|        DXY| 504|
|     MEXICO| 502|
|     NASDAQ| 502|
|   PETROLEO| 504|
|      SP500| 502|
|TREASURY10Y| 502|
+-----------+----+

+-------------------+--------------+--------------+--------------+--------------+-----------+-------+-----------------+--------------------+
|           date_raw|          open|          high|           low|         close|     volume| symbol|           source|        ingestion_ts|
+-------------------+--------------+--------------+--------------+--------------+-----------+-------+-----------------+--------------------+
|2023-12-17 00:00:00|  42236.109375|42359.49609375|41274.54296875| 41364.6640625|16678702876|BITCOIN|yahoo_finance_api|2025-12-17 23:55:...|
|2023-12-18 00:00:00|  41348.203125|  42720.296875| 40530.2578125| 42623.5390625|25224642008|BITCOIN|yahoo_finance_api|2025-12-17 23:55:...|
|2023-12-19 00:0


## Camada Bronze — Ingestão (Yahoo Finance)
## Propósito

MAGIC A camada **Bronze** realiza a **ingestão bruta** de séries históricas a partir do **Yahoo Finance**, preservando os dados com mínima transformação e adicionando metadados essenciais para **auditabilidade** e **reprocessamento**.

## Fonte e ativos (conforme notebook `01_bronze_ingest.ipynb`)
A ingestão é realizada via **`yfinance`** para os seguintes tickers (ticker → nome lógico):
- `^GSPC` → `SP500`
- `^IXIC` → `NASDAQ`
- `^DJI` → `DOWJONES`
- `DX-Y.NYB` → `DXY`
- `^TNX` → `TREASURY10Y`
- `CL=F` → `PETROLEO`
- `BTC-USD` → `BITCOIN`
- `^MXX` → `MEXICO`

## Parâmetros de ingestão
- Período: `2y`
- Frequência: `1d`

## Padronização aplicada (mínima e intencional)
- Tratamento de possíveis `MultiIndex` do pandas via função **`flatten_columns()`**
- Mapeamento flexível de colunas para garantir schema consistente:
- `date_raw`, `open`, `high`, `low`, `close`, `volume`
- Enriquecimento com metadados:
- `symbol` (nome lógico)
- `source` = `yahoo_finance_api`
- `ingestion_ts` = `current_timestamp()`

## Persistência (Delta Lake)
Tabela criada (overwrite):
- **`mvp_finance.bronze_prices_raw`**

## Contrato de dados (schema esperado)
- `date_raw` (data/hora original do provedor)
- `open`, `high`, `low`, `close` (preços)
- `volume`
- `symbol` (nome lógico do ativo)
- `source`
- `ingestion_ts`

## Observações de governança
- A Bronze é **reprocessável** por definição (não depende de regras de negócio).
- O uso de `ingestion_ts` habilita deduplicação e “last-write-wins” na Silver.
