# Montar ou identificar os diretórios locais, Persistir em um Postgres os dados em Silver.

In [2]:
# Montar Google Drive (Colab) ou detectar Google Drive Desktop (local)
# - Em Colab: monta em /content/drive
# - Local (Windows): usa Google Drive for Desktop (ex.: G:\Drives compartilhados)

import os
from pathlib import Path

IN_COLAB = False
try:
    from google.colab import drive as _gdrive  # type: ignore
    IN_COLAB = True
except Exception:
    IN_COLAB = False

DRIVE_ROOT = None  # caminho raiz do Drive

if IN_COLAB:
    # Monta o Google Drive no Colab
    _gdrive.mount('/content/drive', force_remount=False)
    DRIVE_ROOT = '/content/drive'
    print(f"[OK] Google Drive montado no Colab em: {DRIVE_ROOT}")
else:
    # Detecta Google Drive for Desktop no Windows
    candidates = [
        r'G:\\Drives compartilhados',   # PT-BR: Shared drives
        r'G:\\Shared drives',           # EN: Shared drives
        r'G:\\My Drive',                # EN: Meu Drive (conta pessoal)
        os.path.expandvars(r'%USERPROFILE%\\Google Drive'),  # legado
        os.path.expandvars(r'%USERPROFILE%\\Google Drive (Shared drives)'),
    ]
    DRIVE_ROOT = next((p for p in candidates if os.path.exists(p)), None)
    if DRIVE_ROOT:
        print(f"[OK] Google Drive detectado localmente em: {DRIVE_ROOT}")
    else:
        print('[AVISO] Google Drive não encontrado automaticamente.\n'
              '       Verifique se o Google Drive for Desktop está instalado e o caminho correto (ex.: G:\\Drives compartilhados).')

# Variáveis úteis para uso posterior
SHARED_DRIVES = str(Path(DRIVE_ROOT) if DRIVE_ROOT else '')
print('IN_COLAB =', IN_COLAB)
print('DRIVE_ROOT =', DRIVE_ROOT)
print('SHARED_DRIVES =', SHARED_DRIVES)

[OK] Google Drive detectado localmente em: G:\\Drives compartilhados
IN_COLAB = False
DRIVE_ROOT = G:\\Drives compartilhados
SHARED_DRIVES = G:\Drives compartilhados


In [3]:
# Verificar parquet em 00_data/01_raw e resumir esquema
import os
from pathlib import Path
import json

RAW_DIR = Path('G:/Drives compartilhados/BOLSA_2026/a_bolsa2026_gemini/00_data/01_raw')
assert RAW_DIR.exists(), f"Diretório não encontrado: {RAW_DIR}"

# Garantir pyarrow para leitura robusta
try:
    import pyarrow.parquet as pq
    import pyarrow as pa
except Exception:
    import sys, subprocess
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'pyarrow'])
    import pyarrow.parquet as pq
    import pyarrow as pa

from collections import Counter, defaultdict

files = sorted([p for p in RAW_DIR.glob('*.parquet')])
print(f"Encontrados {len(files)} arquivos parquet em {RAW_DIR}")

schemas = {}
errors = {}
row_counts = {}
example_values = defaultdict(dict)

for p in files:
    try:
        meta = pq.ParquetFile(p)
        schema = meta.schema_arrow
        schemas[p.name] = schema
        row_counts[p.name] = meta.metadata.num_rows
        # coletar alguns valores de exemplo das primeiras 5 linhas
        table = meta.read_row_groups([0], columns=[f.name for f in schema]) if meta.metadata.num_row_groups>0 else meta.read()
        head = table.slice(0, min(5, table.num_rows))
        for f in schema:
            col = head.column(f.name) if f.name in head.column_names else None
            if col is not None:
                vals = col.to_pylist()
                example_values[p.name][f.name] = vals
    except Exception as e:
        errors[p.name] = str(e)

# Determinar colunas comuns e tipos
all_columns = [set(s.names) for s in schemas.values()]
common_cols = set.intersection(*all_columns) if all_columns else set()

# mapa de tipos por coluna (contagem)
col_type_counter = defaultdict(Counter)
for fname, schema in schemas.items():
    for f in schema:
        col_type_counter[f.name][str(f.type)] += 1

print("\nErros de leitura:")
if errors:
    for k,v in errors.items():
        print(f"- {k}: {v}")
else:
    print("Nenhum erro")

print("\nColunas comuns (presentes em todos):")
print(sorted(common_cols))

print("\nTipos por coluna (top-1):")
for col, counter in sorted(col_type_counter.items()):
    top = counter.most_common(1)[0]
    print(f"- {col}: {top[0]} (freq {top[1]}/{len(schemas)})")

print("\nAmostras de valores (primeiras 5 linhas) para 2 arquivos:")
for fname in list(schemas.keys())[:2]:
    print(f"\n== {fname} (linhas={row_counts.get(fname)}) ==")
    samples = example_values.get(fname, {})
    for col, vals in list(samples.items())[:10]:
        print(f"  {col}: {vals}")

Encontrados 31 arquivos parquet em G:\Drives compartilhados\BOLSA_2026\a_bolsa2026_gemini\00_data\01_raw

Erros de leitura:
Nenhum erro

Colunas comuns (presentes em todos):
['Adj Close', 'Close', 'Date', 'Dividends', 'High', 'Low', 'Open', 'Stock Splits', 'Volume']

Tipos por coluna (top-1):
- Adj Close: double (freq 31/31)
- Capital Gains: double (freq 1/31)
- Close: double (freq 31/31)
- Date: timestamp[ns, tz=America/Sao_Paulo] (freq 25/31)
- Dividends: double (freq 31/31)
- High: double (freq 31/31)
- Low: double (freq 31/31)
- Open: double (freq 31/31)
- Stock Splits: double (freq 31/31)
- Volume: int64 (freq 31/31)

Amostras de valores (primeiras 5 linhas) para 2 arquivos:

== _bvsp_ohlcv_actions_20120101_20250922.parquet (linhas=3400) ==
  Open: [57836.0, 59263.0, 59354.0, 58565.0, 58601.0]
  High: [59288.0, 59519.0, 59354.0, 59261.0, 59220.0]
  Low: [57836.0, 58558.0, 57963.0, 58355.0, 58599.0]
  Close: [59265.0, 59365.0, 58546.0, 58600.0, 59083.0]
  Adj Close: [59265.0, 59365

In [4]:
# Resumo compacto (JSON) das estruturas parquet
import json, re

def top_types_for(cols):
    out = {}
    for c in cols:
        cnt = col_type_counter.get(c, None)
        if cnt:
            t, n = cnt.most_common(1)[0]
            out[c] = {"type": t, "freq": n, "total": len(schemas)}
    return out

summary = {}
summary["file_count"] = len(files)
summary["error_count"] = len(errors)
summary["error_files"] = sorted(list(errors.keys()))[:5]
summary["has_ohlcv_actions_pattern"] = sum(1 for p in files if "_ohlcv_actions_" in p.name)

names = [p.name for p in files]
summary["sample_files"] = names[:5]

# Colunas comuns e tipos
cc = sorted(list(common_cols))
summary["common_columns"] = cc
summary["common_columns_types"] = top_types_for(cc)

# Amostra de colunas de 2 arquivos
sample_cols = {}
for fname in names[:2]:
    sch = schemas.get(fname)
    if sch:
        sample_cols[fname] = sch.names
summary["sample_columns_by_file"] = sample_cols

# Padrões por nome: conta tags tipo '_sa_', '_nyb_', etc
patterns = {
    "_sa_": sum(1 for n in names if "_sa_" in n),
    "_nyb_": sum(1 for n in names if "_nyb_" in n),
    "_metadata_": sum(1 for n in names if "_metadata_" in n),
    "_ohlcv_actions_": sum(1 for n in names if "_ohlcv_actions_" in n),
}
summary["name_patterns_counts"] = patterns

print(json.dumps(summary, ensure_ascii=False, indent=2))

{
  "file_count": 31,
  "error_count": 0,
  "error_files": [],
  "has_ohlcv_actions_pattern": 31,
  "sample_files": [
    "_bvsp_ohlcv_actions_20120101_20250922.parquet",
    "_gspc_ohlcv_actions_20120101_20250922.parquet",
    "_tnx_ohlcv_actions_20120101_20250922.parquet",
    "_vix_ohlcv_actions_20120101_20250922.parquet",
    "abev3_sa_ohlcv_actions_20120101_20250922.parquet"
  ],
  "common_columns": [
    "Adj Close",
    "Close",
    "Date",
    "Dividends",
    "High",
    "Low",
    "Open",
    "Stock Splits",
    "Volume"
  ],
  "common_columns_types": {
    "Adj Close": {
      "type": "double",
      "freq": 31,
      "total": 31
    },
    "Close": {
      "type": "double",
      "freq": 31,
      "total": 31
    },
    "Date": {
      "type": "timestamp[ns, tz=America/Sao_Paulo]",
      "freq": 25,
      "total": 31
    },
    "Dividends": {
      "type": "double",
      "freq": 31,
      "total": 31
    },
    "High": {
      "type": "double",
      "freq": 31,
      "tot

In [6]:
# Ajustar timezone para America/Sao_Paulo nos 6 arquivos divergentes e salvar em 00_data/02_processed
import os
from pathlib import Path
import pandas as pd

RAW_DIR = Path('G:/Drives compartilhados/BOLSA_2026/a_bolsa2026_gemini/00_data/01_raw')
PROC_DIR = Path('G:/Drives compartilhados/BOLSA_2026/a_bolsa2026_gemini/00_data/02_processed')
PROC_DIR.mkdir(parents=True, exist_ok=True)

# Utilizar resultados computados previamente (schemas, files) se existirem no kernel; caso contrário, recomputar leve
try:
    files
    schemas
except NameError:
    from collections import Counter, defaultdict
    import pyarrow.parquet as pq
    files = sorted([p for p in RAW_DIR.glob('*.parquet')])
    schemas = {}
    for p in files:
        meta = pq.ParquetFile(p)
        schemas[p.name] = meta.schema_arrow

# Identificar quais não têm tz America/Sao_Paulo
not_sp = []
for p in files:
    sch = schemas.get(p.name)
    if not sch:
        continue
    if 'Date' in sch.names:
        f = next((f for f in sch if f.name == 'Date'), None)
        t = str(f.type) if f else ''
        if 'timestamp' in t and 'tz=America/Sao_Paulo' in t:
            continue
        if 'timestamp' in t:
            not_sp.append(p)
    else:
        # Pode estar como index no parquet (sem coluna explícita)
        not_sp.append(p)

print(f"Arquivos a ajustar (esperados 6): {len(not_sp)}")
for p in not_sp:
    print('-', p.name)

# Função de ajuste: garantir Date com tz America/Sao_Paulo
import pytz
sp_tz = pytz.timezone('America/Sao_Paulo')

fixed = []
for p in not_sp:
    df = pd.read_parquet(p)  # pandas com pyarrow engine

    # Descobrir a coluna/índice de data
    date_col = None
    for cand in ['Date', 'date', 'DATE', 'Datetime', 'datetime', 'Timestamp', 'timestamp']:
        if cand in df.columns:
            date_col = cand
            break
    if date_col is None:
        # Tentar índice
        if isinstance(df.index, pd.DatetimeIndex):
            df = df.reset_index().rename(columns={'index':'Date'})
            date_col = 'Date'
        else:
            print(f"[SKIP] Sem coluna/índice de data reconhecida: {p.name}")
            continue

    d = pd.to_datetime(df[date_col], errors='coerce', utc=False)

    if pd.api.types.is_datetime64tz_dtype(d):
        # já tem tz, converter
        df[date_col] = d.dt.tz_convert('America/Sao_Paulo')
    else:
        # naive -> assumir horário local SP
        df[date_col] = d.dt.tz_localize('America/Sao_Paulo')

    # Garantir nome padronizado 'Date'
    if date_col != 'Date':
        df = df.rename(columns={date_col: 'Date'})

    out_path = PROC_DIR / p.name
    df.to_parquet(out_path, engine='pyarrow', index=False)
    fixed.append(out_path)

print(f"Ajustados e salvos: {len(fixed)} em {PROC_DIR}")
for q in fixed:
    print('  ->', q.name)

# Verificar tipos após escrita
import pyarrow.parquet as pq
ok = 0
for q in fixed:
    meta = pq.ParquetFile(q)
    sch = meta.schema_arrow
    f = next((f for f in sch if f.name == 'Date'), None)
    if f and 'tz=America/Sao_Paulo' in str(f.type):
        ok += 1
print(f"Verificação: {ok}/{len(fixed)} com Date em tz=America/Sao_Paulo")

Arquivos a ajustar (esperados 6): 6
- _gspc_ohlcv_actions_20120101_20250922.parquet
- _tnx_ohlcv_actions_20120101_20250922.parquet
- _vix_ohlcv_actions_20120101_20250922.parquet
- bz=f_ohlcv_actions_20120101_20250922.parquet
- dx-y.nyb_ohlcv_actions_20120101_20250922.parquet
- ewz_ohlcv_actions_20120101_20250922.parquet
Ajustados e salvos: 6 em G:\Drives compartilhados\BOLSA_2026\a_bolsa2026_gemini\00_data\02_processed
  -> _gspc_ohlcv_actions_20120101_20250922.parquet
  -> _tnx_ohlcv_actions_20120101_20250922.parquet
  -> _vix_ohlcv_actions_20120101_20250922.parquet
  -> bz=f_ohlcv_actions_20120101_20250922.parquet
  -> dx-y.nyb_ohlcv_actions_20120101_20250922.parquet
  -> ewz_ohlcv_actions_20120101_20250922.parquet


  if pd.api.types.is_datetime64tz_dtype(d):
  if pd.api.types.is_datetime64tz_dtype(d):
  if pd.api.types.is_datetime64tz_dtype(d):
  if pd.api.types.is_datetime64tz_dtype(d):
  if pd.api.types.is_datetime64tz_dtype(d):
  if pd.api.types.is_datetime64tz_dtype(d):


Verificação: 6/6 com Date em tz=America/Sao_Paulo


In [7]:
# Padronizar camada SILVER: schema consistente, tz America/Sao_Paulo, coluna Symbol, salvar em 00_data/02_processed
from pathlib import Path
import re
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

RAW_DIR = Path('G:/Drives compartilhados/BOLSA_2026/a_bolsa2026_gemini/00_data/01_raw')
SILVER_DIR = Path('G:/Drives compartilhados/BOLSA_2026/a_bolsa2026_gemini/00_data/02_processed')
SILVER_DIR.mkdir(parents=True, exist_ok=True)

# Lista alvo de colunas na ordem final
TARGET_COLS = ['Symbol','Date','Open','High','Low','Close','Adj Close','Volume','Dividends','Stock Splits']

# Extrair símbolo a partir do nome do arquivo
# Exemplos: "abev3_sa_ohlcv_actions_...", "_gspc_ohlcv_actions_..."
# Regra: pegar token inicial até o primeiro "_ohlcv_actions_"; remover prefixo '_' se houver; manter sufixo de mercado (ex.: _sa_)
SYMBOL_RE = re.compile(r"^(.*?)_ohlcv_actions_", re.IGNORECASE)


def infer_symbol(fname: str) -> str:
    m = SYMBOL_RE.match(fname)
    raw = m.group(1) if m else fname.split('.')[0]
    # normalizar: tirar duplicidade de underscores, remover prefixo '_'
    raw = raw.lstrip('_')
    return raw.lower()


def ensure_tz_sp(s: pd.Series) -> pd.Series:
    s = pd.to_datetime(s, errors='coerce', utc=False)
    if isinstance(s.dtype, pd.DatetimeTZDtype):
        return s.dt.tz_convert('America/Sao_Paulo')
    else:
        return s.dt.tz_localize('America/Sao_Paulo')


files = sorted(RAW_DIR.glob('*.parquet'))
print(f"Total arquivos a processar: {len(files)}")

written = []
for p in files:
    df = pd.read_parquet(p)

    # Descobrir/Padronizar Date
    date_col = None
    for cand in ['Date','date','DATE','Datetime','datetime','Timestamp','timestamp']:
        if cand in df.columns:
            date_col = cand
            break
    if date_col is None and isinstance(df.index, pd.DatetimeIndex):
        df = df.reset_index().rename(columns={'index':'Date'})
        date_col = 'Date'
    if date_col is None:
        print(f"[SKIP] {p.name}: sem coluna/índice de data.")
        continue

    # Tipos numéricos / faltantes
    for col in ['Open','High','Low','Close','Adj Close','Volume','Dividends','Stock Splits']:
        if col in df.columns:
            if col == 'Volume':
                df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int64')  # permitir NA, salva como int64 no parquet
            else:
                df[col] = pd.to_numeric(df[col], errors='coerce').astype('float64')
        else:
            # coluna ausente vira NA
            df[col] = pd.Series([pd.NA]*len(df)) if col=='Volume' else pd.Series([float('nan')]*len(df))

    # Garantir timezone America/Sao_Paulo
    df[date_col] = ensure_tz_sp(df[date_col])
    if date_col != 'Date':
        df = df.rename(columns={date_col: 'Date'})

    # Adicionar Symbol
    symbol = infer_symbol(p.name)
    df.insert(0, 'Symbol', symbol)

    # Reordenar colunas
    missing = [c for c in TARGET_COLS if c not in df.columns]
    for c in missing:
        if c == 'Volume' and c not in df.columns:
            df[c] = pd.Series([pd.NA]*len(df))
        elif c not in df.columns:
            df[c] = float('nan')
    df = df[TARGET_COLS]

    # Escrever com pyarrow, mantendo tz na Date
    out = SILVER_DIR / p.name
    df.to_parquet(out, engine='pyarrow', index=False)
    written.append(out)

print(f"Gravados {len(written)} arquivos em {SILVER_DIR}")

# Verificação: todos com schema idêntico e tz correta
schemas = []
for q in written:
    meta = pq.ParquetFile(q)
    sch = meta.schema_arrow
    schemas.append((q.name, sch))

# verificar nomes/ordem das colunas
ok_cols = all(s.names == TARGET_COLS for _, s in schemas)
# verificar tz da Date
ok_tz = all('tz=America/Sao_Paulo' in str(next(f for f in s if f.name=='Date').type) for _, s in schemas)

print('OK_COLS =', ok_cols)
print('OK_TZ_SP =', ok_tz)
if not ok_cols:
    bad = [n for n,s in schemas if s.names != TARGET_COLS]
    print('Arquivos com colunas fora do padrão:', bad[:5])
if not ok_tz:
    bad = []
    for n, s in schemas:
        f = next((f for f in s if f.name=='Date'), None)
        if 'tz=America/Sao_Paulo' not in str(f.type):
            bad.append(n)
    print('Arquivos com timezone fora do padrão:', bad[:5])

Total arquivos a processar: 31
Gravados 31 arquivos em G:\Drives compartilhados\BOLSA_2026\a_bolsa2026_gemini\00_data\02_processed
OK_COLS = True
OK_TZ_SP = True


In [8]:
# Mostrar dados de PETR4 em setembro/2022 (camada silver)
from pathlib import Path
import pandas as pd

# Montar caminho para 02_processed usando SHARED_DRIVES se disponível
try:
    base = Path(SHARED_DRIVES) / 'BOLSA_2026' / 'a_bolsa2026_gemini'
    proc_dir = base / '00_data' / '02_processed'
    assert proc_dir.exists()
except Exception:
    proc_dir = Path(r'G:/Drives compartilhados/BOLSA_2026/a_bolsa2026_gemini/00_data/02_processed')

# Localizar arquivo silver de PETR4
petr4_file = next(proc_dir.glob('petr4_sa_ohlcv_actions_*.parquet'))

# Ler dados
df = pd.read_parquet(petr4_file)

# Garantir Date com tz America/Sao_Paulo (deveria já estar, mas deixamos robusto)
if not isinstance(df['Date'].dtype, pd.DatetimeTZDtype):
    df['Date'] = pd.to_datetime(df['Date'], errors='coerce').dt.tz_localize('America/Sao_Paulo')

# Filtro de setembro de 2022 (intervalo fechado-aberto)
start = pd.Timestamp('2022-09-01', tz='America/Sao_Paulo')
end = pd.Timestamp('2022-10-01', tz='America/Sao_Paulo')
mask = (df['Date'] >= start) & (df['Date'] < end)
sep22 = df.loc[mask].sort_values('Date').reset_index(drop=True)

print('Arquivo:', petr4_file.name)
print('Período:', start.date(), 'a', (end - pd.Timedelta(seconds=1)).date())
print('Linhas:', len(sep22))
sep22

Arquivo: petr4_sa_ohlcv_actions_20120101_20250922.parquet
Período: 2022-09-01 a 2022-09-30
Linhas: 21


Unnamed: 0,Symbol,Date,Open,High,Low,Close,Adj Close,Volume,Dividends,Stock Splits
0,petr4_sa,2022-09-01 00:00:00-03:00,33.360001,33.950001,32.759998,33.849998,18.471289,69184700,0.0,0.0
1,petr4_sa,2022-09-02 00:00:00-03:00,34.439999,34.57,33.259998,33.419998,18.236647,81792300,0.0,0.0
2,petr4_sa,2022-09-05 00:00:00-03:00,33.849998,34.110001,33.150002,33.34,18.192995,76337600,0.0,0.0
3,petr4_sa,2022-09-06 00:00:00-03:00,32.27,32.59,31.51,32.099998,17.516346,111813500,0.0,0.0
4,petr4_sa,2022-09-08 00:00:00-03:00,32.310001,32.779999,31.4,31.799999,17.352642,63159800,0.0,0.0
5,petr4_sa,2022-09-09 00:00:00-03:00,32.490002,32.669998,31.700001,31.790001,17.347187,49871900,0.0,0.0
6,petr4_sa,2022-09-12 00:00:00-03:00,32.150002,32.73,31.459999,31.48,17.178028,71374500,0.0,0.0
7,petr4_sa,2022-09-13 00:00:00-03:00,30.940001,31.440001,30.52,30.65,16.725109,88327800,0.0,0.0
8,petr4_sa,2022-09-14 00:00:00-03:00,30.75,31.43,30.629999,31.120001,16.981581,49039700,0.0,0.0
9,petr4_sa,2022-09-15 00:00:00-03:00,30.92,31.23,30.799999,31.059999,16.948839,45553200,0.0,0.0


## Banco de Dados (Postgres/SQLite) – Camada Silver → SQL

Vamos criar um schema SQL para persistir os dados deduplicados e com timezone America/Sao_Paulo, e carregar todos os 31 Parquets da pasta 02_processed.

In [9]:
# Configuração de conexão (usa Postgres via env DATABASE_URL; se não setado, cai para SQLite local)
import os
from pathlib import Path

# Ex.: DATABASE_URL=postgresql+psycopg2://user:pass@host:5432/dbname
DB_URL = os.getenv('DATABASE_URL')
if not DB_URL:
    # fallback local para validação
    DB_URL = 'sqlite:///G:/Drives compartilhados/BOLSA_2026/a_bolsa2026_gemini/00_data/03_final/ohlcv.db'
    Path('G:/Drives compartilhados/BOLSA_2026/a_bolsa2026_gemini/00_data/03_final').mkdir(parents=True, exist_ok=True)

print('DB_URL =', DB_URL)

DB_URL = sqlite:///G:/Drives compartilhados/BOLSA_2026/a_bolsa2026_gemini/00_data/03_final/ohlcv.db


In [10]:
# Criar engine SQLAlchemy e schema da tabela
import sqlalchemy as sa
from sqlalchemy import text

engine = sa.create_engine(DB_URL, future=True)

with engine.begin() as conn:
    # Postgres: timestamptz; SQLite: armazena texto ISO8601 com tz ou timestamp naive (vamos manter texto ISO para portabilidade)
    # Chave primária: (symbol, date)
    conn.execute(text("""
    CREATE TABLE IF NOT EXISTS ohlcv_daily (
        symbol TEXT NOT NULL,
        date   TEXT NOT NULL,
        open   DOUBLE PRECISION,
        high   DOUBLE PRECISION,
        low    DOUBLE PRECISION,
        close  DOUBLE PRECISION,
        adj_close DOUBLE PRECISION,
        volume BIGINT,
        dividends DOUBLE PRECISION,
        stock_splits DOUBLE PRECISION,
        PRIMARY KEY (symbol, date)
    )
    """))

print('Tabela ohlcv_daily pronta.')

Tabela ohlcv_daily pronta.


In [12]:
# Carregar todos os Parquets de 02_processed para o banco (upsert deduplicado)
from pathlib import Path
import pandas as pd
import sqlalchemy as sa
from sqlalchemy import text

SILVER_DIR = Path('G:/Drives compartilhados/BOLSA_2026/a_bolsa2026_gemini/00_data/02_processed')
files = sorted(SILVER_DIR.glob('*.parquet'))
print('Arquivos a carregar:', len(files))

dialect = sa.create_engine(DB_URL).dialect.name
is_pg = (dialect == 'postgresql')

# Preparar comandos de upsert específicos por SGBD
if is_pg:
    upsert_sql = text("""
        INSERT INTO ohlcv_daily (symbol, date, open, high, low, close, adj_close, volume, dividends, stock_splits)
        VALUES (:symbol, :date, :open, :high, :low, :close, :adj_close, :volume, :dividends, :stock_splits)
        ON CONFLICT (symbol, date) DO UPDATE SET
            open=EXCLUDED.open,
            high=EXCLUDED.high,
            low=EXCLUDED.low,
            close=EXCLUDED.close,
            adj_close=EXCLUDED.adj_close,
            volume=EXCLUDED.volume,
            dividends=EXCLUDED.dividends,
            stock_splits=EXCLUDED.stock_splits
    """)
else:
    # SQLite
    upsert_sql = text("""
        INSERT OR REPLACE INTO ohlcv_daily (symbol, date, open, high, low, close, adj_close, volume, dividends, stock_splits)
        VALUES (:symbol, :date, :open, :high, :low, :close, :adj_close, :volume, :dividends, :stock_splits)
    """)

engine = sa.create_engine(DB_URL, future=True)

def to_rows(df: pd.DataFrame):
    # Garantir colunas e tipos
    cols = ['Symbol','Date','Open','High','Low','Close','Adj Close','Volume','Dividends','Stock Splits']
    for c in cols:
        if c not in df.columns:
            df[c] = pd.NA
    # Timezone: persistir ISO 8601 com offset (-03:00) mantendo America/Sao_Paulo
    dt = df['Date']
    if not isinstance(dt.dtype, pd.DatetimeTZDtype):
        dt = pd.to_datetime(dt, errors='coerce').dt.tz_localize('America/Sao_Paulo')
    else:
        dt = dt.dt.tz_convert('America/Sao_Paulo')
    date_iso = dt.apply(lambda x: x.isoformat() if pd.notnull(x) else None)

    # Deduplicar por (Symbol, Date)
    tmp = df.copy()
    tmp['__date_iso__'] = date_iso
    tmp = tmp.dropna(subset=['__date_iso__'])
    tmp = tmp.sort_values(['Symbol','__date_iso__']).drop_duplicates(['Symbol','__date_iso__'], keep='last')

    # Construir DataFrame final com nomes de colunas do SQL
    out = pd.DataFrame({
        'symbol': tmp['Symbol'].astype(str).str.lower(),
        'date': tmp['__date_iso__'],
        'open': pd.to_numeric(tmp['Open'], errors='coerce'),
        'high': pd.to_numeric(tmp['High'], errors='coerce'),
        'low': pd.to_numeric(tmp['Low'], errors='coerce'),
        'close': pd.to_numeric(tmp['Close'], errors='coerce'),
        'adj_close': pd.to_numeric(tmp['Adj Close'], errors='coerce'),
        'volume': pd.to_numeric(tmp['Volume'], errors='coerce').astype('Int64'),
        'dividends': pd.to_numeric(tmp['Dividends'], errors='coerce'),
        'stock_splits': pd.to_numeric(tmp['Stock Splits'], errors='coerce'),
    })

    # Converter NaN/<NA> para None (para bind SQL)
    out = out.astype(object).where(pd.notnull(out), None)
    return out.to_dict(orient='records')

loaded = 0
with engine.begin() as conn:
    for p in files:
        df = pd.read_parquet(p)
        # Assegurar presença da coluna Symbol no silver
        if 'Symbol' not in df.columns:
            # Inferir do nome, fallback
            sym = p.name.split('_ohlcv_actions_')[0].lstrip('_').lower()
            df.insert(0, 'Symbol', sym)
        rows = to_rows(df)
        if rows:
            conn.execute(upsert_sql, rows)
            loaded += len(rows)
        print(f"{p.name}: upsert {len(rows)} linhas")

print('Total de linhas upsertadas:', loaded)

Arquivos a carregar: 31
_bvsp_ohlcv_actions_20120101_20250922.parquet: upsert 3400 linhas
_gspc_ohlcv_actions_20120101_20250922.parquet: upsert 3449 linhas
_tnx_ohlcv_actions_20120101_20250922.parquet: upsert 3448 linhas
_vix_ohlcv_actions_20120101_20250922.parquet: upsert 3449 linhas
abev3_sa_ohlcv_actions_20120101_20250922.parquet: upsert 3409 linhas
b3sa3_sa_ohlcv_actions_20120101_20250922.parquet: upsert 3409 linhas
bbas3_sa_ohlcv_actions_20120101_20250922.parquet: upsert 3409 linhas
bz=f_ohlcv_actions_20120101_20250922.parquet: upsert 3432 linhas
cple6_sa_ohlcv_actions_20120101_20250922.parquet: upsert 3408 linhas
csna3_sa_ohlcv_actions_20120101_20250922.parquet: upsert 3409 linhas
dx-y.nyb_ohlcv_actions_20120101_20250922.parquet: upsert 3450 linhas
elet3_sa_ohlcv_actions_20120101_20250922.parquet: upsert 3409 linhas
ewz_ohlcv_actions_20120101_20250922.parquet: upsert 3449 linhas
ggbr4_sa_ohlcv_actions_20120101_20250922.parquet: upsert 3409 linhas
hapv3_sa_ohlcv_actions_20120101_2

In [13]:
# Verificações rápidas no banco: contagem total e amostra por símbolo
import sqlalchemy as sa
from sqlalchemy import text

engine = sa.create_engine(DB_URL, future=True)
with engine.begin() as conn:
    total = conn.execute(text('SELECT COUNT(*) FROM ohlcv_daily')).scalar_one()
    print('Total linhas:', total)
    rows = conn.execute(text('SELECT symbol, MIN(date), MAX(date), COUNT(*) FROM ohlcv_daily GROUP BY symbol ORDER BY symbol LIMIT 10')).fetchall()
    for r in rows:
        print(r)

Total linhas: 101298
('abev3_sa', '2012-01-02T00:00:00-02:00', '2025-09-19T00:00:00-03:00', 3409)
('b3sa3_sa', '2012-01-02T00:00:00-02:00', '2025-09-19T00:00:00-03:00', 3409)
('bbas3_sa', '2012-01-02T00:00:00-02:00', '2025-09-19T00:00:00-03:00', 3409)
('bvsp', '2012-01-03T00:00:00-02:00', '2025-09-19T00:00:00-03:00', 3400)
('bz=f', '2012-01-03T03:00:00-02:00', '2025-09-19T01:00:00-03:00', 3432)
('cple6_sa', '2012-01-02T00:00:00-02:00', '2025-09-19T00:00:00-03:00', 3408)
('csna3_sa', '2012-01-02T00:00:00-02:00', '2025-09-19T00:00:00-03:00', 3409)
('dx-y.nyb', '2012-01-03T03:00:00-02:00', '2025-09-19T01:00:00-03:00', 3450)
('elet3_sa', '2012-01-02T00:00:00-02:00', '2025-09-19T00:00:00-03:00', 3409)
('ewz', '2012-01-03T03:00:00-02:00', '2025-09-19T01:00:00-03:00', 3449)


# Atualização diária e persistencia do Postgres

# Rotina diária — Delta de datas a buscar

> Objetivo deste bloco: descobrir, para cada símbolo, o intervalo de datas que falta entre a última data persistida no banco (`ohlcv_daily`) e a última data disponível via fonte externa (Yahoo Finance), sem ainda realizar a ingestão. O resultado é um relatório CSV em `04_outputs/reports/` com janelas de busca por símbolo.

In [6]:
# Imports e setup de paths/engine para rotina diária (delta de datas)
import os
from pathlib import Path
import pandas as pd
import sqlalchemy as sa
from sqlalchemy import text
from datetime import datetime, timedelta, timezone

# Pastas
ROOT = Path(r"G:/Drives compartilhados/BOLSA_2026/a_bolsa2026_gemini")
RAW_DIR = ROOT / '00_data' / '01_raw'
REPORTS_DIR = ROOT / '04_outputs' / 'reports'
REPORTS_DIR.mkdir(parents=True, exist_ok=True)

# Banco (usa DATABASE_URL se existir; senão, SQLite local)
DB_URL = os.getenv('DATABASE_URL') or f"sqlite:///{ROOT / '00_data' / '03_final' / 'ohlcv.db'}"
engine = sa.create_engine(DB_URL, future=True)
print('DB_URL =', DB_URL)

DB_URL = sqlite:///G:\Drives compartilhados\BOLSA_2026\a_bolsa2026_gemini\00_data\03_final\ohlcv.db


In [None]:
# Configuração Twelve Data (API)
import os
import time
import requests

TD_API_KEY = os.getenv('TWELVE_DATA_API_KEY') or 'd636f3fa4f2042a9ac3bf9ead03d2dc0'
TD_BASE_URL = 'https://api.twelvedata.com'
TD_DEFAULT_TZ = 'America/Sao_Paulo'
TD_SESSION = requests.Session()
# Intervalo entre chamadas para aliviar rate-limit (ajuste se necessário)
TD_SLEEP_SECONDS = float(os.getenv('TWELVE_DATA_SLEEP', '0.2'))

print('TD_BASE_URL =', TD_BASE_URL)
print('TD_DEFAULT_TZ =', TD_DEFAULT_TZ)

In [7]:
# Lista de 31 símbolos a monitorar (derivada do manifesto raw + índices/macros)
SYMBOLS = [
    'itub4_sa','bbas3_sa','b3sa3_sa','pssa3_sa','vale3_sa','ggbr4_sa','csna3_sa','suzb3_sa',
    'petr4_sa','prio3_sa','ugpa3_sa','elet3_sa','taee11_sa','cple6_sa','sbsp3_sa','vivt3_sa',
    'tims3_sa','rdor3_sa','hapv3_sa','abev3_sa','wege3_sa','tots3_sa','lren3_sa','rail3_sa',
    '_bvsp','ewz','_gspc','_vix','dx-y.nyb','_tnx','bz=f'
 ]

# Mapa para Twelve Data
# Regras:
# - Ações B3: TICKER.SA (ex.: PETR4.SA)
# - ETFs/índices/forex/commodities seguem convenção Twelve Data
TD_MAP = {
    'itub4_sa': 'ITUB4.SA','bbas3_sa': 'BBAS3.SA','b3sa3_sa': 'B3SA3.SA','pssa3_sa': 'PSSA3.SA','vale3_sa': 'VALE3.SA',
    'ggbr4_sa': 'GGBR4.SA','csna3_sa': 'CSNA3.SA','suzb3_sa': 'SUZB3.SA','petr4_sa': 'PETR4.SA','prio3_sa': 'PRIO3.SA',
    'ugpa3_sa': 'UGPA3.SA','elet3_sa': 'ELET3.SA','taee11_sa': 'TAEE11.SA','cple6_sa': 'CPLE6.SA','sbsp3_sa': 'SBSP3.SA',
    'vivt3_sa': 'VIVT3.SA','tims3_sa': 'TIMS3.SA','rdor3_sa': 'RDOR3.SA','hapv3_sa': 'HAPV3.SA','abev3_sa': 'ABEV3.SA',
    'wege3_sa': 'WEGE3.SA','tots3_sa': 'TOTS3.SA','lren3_sa': 'LREN3.SA','rail3_sa': 'RAIL3.SA',
    # Índices/ETFs/Macros (verificamos convenções comuns do Twelve Data):
    '_bvsp': 'BVSP',        # Ibovespa index
    'ewz': 'EWZ',           # iShares MSCI Brazil ETF (NYSE)
    '_gspc': 'SPX',         # S&P 500 index ticker common alias in TD
    '_vix': 'VIX',          # CBOE Volatility Index
    'dx-y.nyb': 'DXY',      # US Dollar Index
    '_tnx': 'TNX',          # 10-Year Treasury Note Yield Index
    'bz=f': 'BZ=F'          # Brent (check TD support; if unsupported, fallback via alt symbol)
}
len(SYMBOLS), len(TD_MAP)

(31, 31)

In [None]:
# Funções auxiliares: última data no DB e via Twelve Data
import contextlib
import pandas as pd

def to_db_symbol(sym: str) -> str:
    """Normaliza o símbolo para o padrão persistido no DB.
    Regra aplicada no carregamento Silver->DB: lowercase e remoção de underscores à esquerda.
    Ex.: "_bvsp" -> "bvsp", "itub4_sa" -> "itub4_sa".
    """
    return sym.lstrip('_').lower() if isinstance(sym, str) else sym

def get_last_persisted_date(engine, symbol: str) -> pd.Timestamp | None:
    """Retorna a última data (tz-aware) para o símbolo no banco (ohlcv_daily).
    Considera que a coluna 'date' é ISO 8601 com offset; converte para tz America/Sao_Paulo.
    Usa o símbolo normalizado para o padrão do DB (ex.: _bvsp -> bvsp).
    """
    from sqlalchemy import text
    symbol_db = to_db_symbol(symbol)
    q = text("SELECT MAX(date) FROM ohlcv_daily WHERE symbol = :s")
    with engine.begin() as conn:
        val = conn.execute(q, {'s': symbol_db}).scalar()
    if not val:
        return None
    # Converte string ISO para Timestamp tz-aware
    try:
        ts = pd.to_datetime(val)
        if ts.tz is None:
            ts = ts.tz_localize('America/Sao_Paulo')
        else:
            ts = ts.tz_convert('America/Sao_Paulo')
        return ts
    except Exception:
        return None

# Twelve Data helpers
import pytz
from datetime import datetime

def td_request(endpoint: str, params: dict) -> dict | None:
    url = f"{TD_BASE_URL.rstrip('/')}/{endpoint.lstrip('/')}"
    p = {**params, 'apikey': TD_API_KEY}
    try:
        r = TD_SESSION.get(url, params=p, timeout=30)
        if r.status_code != 200:
            return None
        data = r.json()
        # Twelve Data error payload has 'status':'error'
        if isinstance(data, dict) and data.get('status') == 'error':
            return None
        return data
    except Exception:
        return None
    finally:
        # respeitar pequeno intervalo entre chamadas
        time.sleep(TD_SLEEP_SECONDS)

def get_last_available_date_td(td_symbol: str) -> pd.Timestamp | None:
    """Obtém a última data diária disponível via Twelve Data para o símbolo informado."""
    data = td_request('time_series', {
        'symbol': td_symbol,
        'interval': '1day',
        'outputsize': 1,
        'order': 'desc',
        'timezone': TD_DEFAULT_TZ,
        'format': 'JSON',
    })
    if not data or 'values' not in data or not data['values']:
        return None
    # 'datetime' costuma vir como 'YYYY-MM-DD' (sem hora)
    dt_str = data['values'][0].get('datetime')
    if not dt_str:
        return None
    try:
        d = pd.to_datetime(dt_str).date()
        return pd.Timestamp(d, tz=TD_DEFAULT_TZ)
    except Exception:
        return None

def td_download_range(td_symbol: str, start_date: str, end_date: str) -> pd.DataFrame:
    """Baixa candles diários [start_date, end_date] inclusive via Twelve Data e retorna DataFrame padronizado.
    start_date/end_date: 'YYYY-MM-DD'.
    """
    data = td_request('time_series', {
        'symbol': td_symbol,
        'interval': '1day',
        'start_date': start_date,
        'end_date': end_date,
        'order': 'asc',
        'timezone': TD_DEFAULT_TZ,
        'format': 'JSON',
    })
    if not data or 'values' not in data or not isinstance(data['values'], list) or not data['values']:
        return pd.DataFrame()

    vals = data['values']
    df = pd.DataFrame(vals)
    # Normalizar nomes
    rename_map = {
        'datetime': 'Date',
        'open': 'Open',
        'high': 'High',
        'low': 'Low',
        'close': 'Close',
        'volume': 'Volume',
    }
    df = df.rename(columns=rename_map)

    # Tipos numéricos
    for c in ['Open','High','Low','Close','Volume']:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors='coerce')
        else:
            df[c] = pd.NA

    # Adj Close = Close (sem ajuste corporativo diário via TD aqui). Dividends/Splits ausentes nesta chamada
    df['Adj Close'] = df['Close']
    df['Dividends'] = pd.NA
    df['Stock Splits'] = pd.NA

    # Date como tz America/Sao_Paulo (Twelve Data já devolve no tz pedido, mas sem offset)
    dt = pd.to_datetime(df['Date'], errors='coerce')
    # As strings costumam ser 'YYYY-MM-DD' => timezone-naive; localizamos para SP
    df['Date'] = dt.dt.tz_localize(TD_DEFAULT_TZ, nonexistent='shift_forward', ambiguous='NaT')

    # Ordenar por data e remover duplicatas
    df = df.sort_values('Date').drop_duplicates('Date', keep='last')
    return df

In [35]:
# Calcular janelas de delta (por símbolo) sem ingerir (capando no "ontem SP"):
from datetime import timedelta
import pandas as pd

# Data de corte: ontem no fuso America/Sao_Paulo
sp_now = pd.Timestamp.now(tz='America/Sao_Paulo')
cutoff_date = (sp_now - pd.Timedelta(days=1)).date()
cutoff_ts = pd.Timestamp(cutoff_date, tz='America/Sao_Paulo')
print('Cutoff (ontem SP):', cutoff_date)

rows = []
for sym in SYMBOLS:
    td_symbol = TD_MAP.get(sym)
    if not td_symbol:
        rows.append({'symbol': sym, 'symbol_db': to_db_symbol(sym), 'yf_ticker': None, 'last_db_date': None, 'last_yf_date': None, 'fetch_from': None, 'fetch_to': None, 'days_missing': None, 'status': 'no-mapping'})
        continue

    last_db = get_last_persisted_date(engine, sym)
    last_yf = get_last_available_date_td(td_symbol)

    if last_yf is None:
        rows.append({'symbol': sym, 'symbol_db': to_db_symbol(sym), 'yf_ticker': td_symbol, 'last_db_date': last_db, 'last_yf_date': None, 'fetch_from': None, 'fetch_to': None, 'days_missing': None, 'status': 'no-yf-data'})
        continue

    # Aplicar corte: não permitir buscar além de cutoff_ts (ontem SP)
    effective_to = min(last_yf, cutoff_ts)

    # Regra de janela
    if last_db is None:
        fetch_from = pd.Timestamp('2012-01-01', tz='America/Sao_Paulo')
    else:
        fetch_from = pd.Timestamp(last_db.date() + timedelta(days=1), tz='America/Sao_Paulo')

    fetch_to = effective_to
    if fetch_from.date() > fetch_to.date():
        days_missing = 0
        status = 'up-to-date'
        fetch_from_out = None
        fetch_to_out = None
    else:
        days_missing = (fetch_to.date() - fetch_from.date()).days + 1
        status = 'delta' if days_missing > 0 else 'up-to-date'
        fetch_from_out = fetch_from
        fetch_to_out = fetch_to

    rows.append({
        'symbol': sym,
        'symbol_db': to_db_symbol(sym),
        'yf_ticker': td_symbol,
        'last_db_date': last_db,
        'last_yf_date': last_yf,
        'fetch_from': fetch_from_out,
        'fetch_to': fetch_to_out,
        'days_missing': days_missing,
        'status': status,
    })

delta_df = pd.DataFrame(rows)
# Ordenar com prioridade para quem tem delta
delta_df = delta_df.sort_values(['status','symbol'], ascending=[True, True]).reset_index(drop=True)
print('Símbolos no mapeamento:', len(SYMBOLS))
print('Com delta > 0 dias:', int((delta_df['days_missing'].fillna(0) > 0).sum()))
delta_df.head(10)

Cutoff (ontem SP): 2025-09-30


NameError: name 'TD_MAP' is not defined

In [29]:
# Salvar relatório de delta em CSV e exibir resumo
from datetime import datetime
run_ts = datetime.now().strftime('%Y%m%d_%H%M%S')
out_csv = REPORTS_DIR / f'delta_ranges_{run_ts}.csv'
delta_df_out = delta_df.copy()
# Converter timestamps para string ISO (sem perder tz) para persistência
for col in ['last_db_date','last_td_date','fetch_from','fetch_to']:
    if col in delta_df_out:
        delta_df_out[col] = delta_df_out[col].apply(lambda x: x.isoformat() if pd.notnull(x) else None)
delta_df_out['cutoff_sp_date'] = cutoff_date
delta_df_out.to_csv(out_csv, index=False, encoding='utf-8')
print('Relatório salvo em:', out_csv)
# Visualizar primeiros com problema de mapeamento/td e com delta
cols = ['symbol','symbol_db','td_symbol','last_db_date','last_td_date','fetch_from','fetch_to','days_missing','status']
view = delta_df[cols].copy()
# Ordenar destaques: no-mapping/no-td-data primeiro, depois delta, depois up-to-date
cat = pd.CategoricalDtype(categories=['no-mapping','no-td-data','delta','up-to-date'], ordered=True)
view['status'] = view['status'].astype(cat)
view = view.sort_values(['status','symbol']).reset_index(drop=True)
view.head(30)

Relatório salvo em: G:\Drives compartilhados\BOLSA_2026\a_bolsa2026_gemini\04_outputs\reports\delta_ranges_20251001_083245.csv


Unnamed: 0,symbol,symbol_db,yf_ticker,last_db_date,last_yf_date,fetch_from,fetch_to,days_missing,status
0,abev3_sa,abev3_sa,ABEV3.SA,2025-09-26 00:00:00-03:00,2025-09-29 00:00:00-03:00,2025-09-27 00:00:00-03:00,2025-09-29 00:00:00-03:00,3,delta
1,cple6_sa,cple6_sa,CPLE6.SA,2025-09-26 00:00:00-03:00,2025-09-29 00:00:00-03:00,2025-09-27 00:00:00-03:00,2025-09-29 00:00:00-03:00,3,delta
2,csna3_sa,csna3_sa,CSNA3.SA,2025-09-26 00:00:00-03:00,2025-09-29 00:00:00-03:00,2025-09-27 00:00:00-03:00,2025-09-29 00:00:00-03:00,3,delta
3,elet3_sa,elet3_sa,ELET3.SA,2025-09-26 00:00:00-03:00,2025-09-29 00:00:00-03:00,2025-09-27 00:00:00-03:00,2025-09-29 00:00:00-03:00,3,delta
4,ggbr4_sa,ggbr4_sa,GGBR4.SA,2025-09-26 00:00:00-03:00,2025-09-29 00:00:00-03:00,2025-09-27 00:00:00-03:00,2025-09-29 00:00:00-03:00,3,delta
5,hapv3_sa,hapv3_sa,HAPV3.SA,2025-09-26 00:00:00-03:00,2025-09-29 00:00:00-03:00,2025-09-27 00:00:00-03:00,2025-09-29 00:00:00-03:00,3,delta
6,rail3_sa,rail3_sa,RAIL3.SA,2025-09-26 00:00:00-03:00,2025-09-29 00:00:00-03:00,2025-09-27 00:00:00-03:00,2025-09-29 00:00:00-03:00,3,delta
7,rdor3_sa,rdor3_sa,RDOR3.SA,2025-09-26 00:00:00-03:00,2025-09-29 00:00:00-03:00,2025-09-27 00:00:00-03:00,2025-09-29 00:00:00-03:00,3,delta
8,suzb3_sa,suzb3_sa,SUZB3.SA,2025-09-26 00:00:00-03:00,2025-09-29 00:00:00-03:00,2025-09-27 00:00:00-03:00,2025-09-29 00:00:00-03:00,3,delta
9,taee11_sa,taee11_sa,TAEE11.SA,2025-09-26 00:00:00-03:00,2025-09-29 00:00:00-03:00,2025-09-27 00:00:00-03:00,2025-09-29 00:00:00-03:00,3,delta


In [11]:
# Diagnóstico: comparar símbolos esperados vs símbolos no DB e verificar últimas datas
import pandas as pd
from sqlalchemy import text

expected_syms = SYMBOLS
expected_db_syms = [to_db_symbol(s) for s in expected_syms]
expected_map = pd.DataFrame({
    'expected_symbol': expected_syms,
    'expected_db_symbol': expected_db_syms,
    'td_symbol': [TD_MAP.get(s) for s in expected_syms],
})

with engine.begin() as conn:
    db_syms = pd.read_sql(text("SELECT DISTINCT symbol FROM ohlcv_daily ORDER BY symbol"), conn)
    last_dates = pd.read_sql(text("SELECT symbol, MAX(date) AS last_date FROM ohlcv_daily GROUP BY symbol"), conn)

db_syms['symbol'] = db_syms['symbol'].astype(str)
last_dates['symbol'] = last_dates['symbol'].astype(str)

diag = expected_map.merge(db_syms.rename(columns={'symbol':'in_db_symbol'}),
                            left_on='expected_db_symbol', right_on='in_db_symbol', how='left')
diag = diag.merge(last_dates, left_on='expected_db_symbol', right_on='symbol', how='left', suffixes=('','_dup'))
diag = diag.drop(columns=[c for c in ['symbol'] if c in diag.columns])
diag['in_db'] = diag['in_db_symbol'].notna()
diag['last_date'] = pd.to_datetime(diag['last_date'], errors='coerce')
diag = diag.sort_values(['in_db','expected_symbol'], ascending=[True, True]).reset_index(drop=True)
print('Símbolos esperados:', len(expected_syms))
print('Símbolos distintos no DB:', db_syms.shape[0])
diag[['expected_symbol','expected_db_symbol','td_symbol','in_db','last_date']].head(40)

Símbolos esperados: 31
Símbolos distintos no DB: 31


Unnamed: 0,expected_symbol,expected_db_symbol,yf_ticker,in_db,last_date
0,_bvsp,bvsp,^BVSP,True,2025-09-19 00:00:00-03:00
1,_gspc,gspc,^GSPC,True,2025-09-19 01:00:00-03:00
2,_tnx,tnx,^TNX,True,2025-09-19 02:00:00-03:00
3,_vix,vix,^VIX,True,2025-09-19 02:00:00-03:00
4,abev3_sa,abev3_sa,ABEV3.SA,True,2025-09-19 00:00:00-03:00
5,b3sa3_sa,b3sa3_sa,B3SA3.SA,True,2025-09-19 00:00:00-03:00
6,bbas3_sa,bbas3_sa,BBAS3.SA,True,2025-09-19 00:00:00-03:00
7,bz=f,bz=f,BZ=F,True,2025-09-19 01:00:00-03:00
8,cple6_sa,cple6_sa,CPLE6.SA,True,2025-09-19 00:00:00-03:00
9,csna3_sa,csna3_sa,CSNA3.SA,True,2025-09-19 00:00:00-03:00


## Ingestão incremental — persistir deltas no DB

> Esta seção baixa somente as janelas [fetch_from, fetch_to] de cada símbolo, normaliza `Date` para tz `America/Sao_Paulo`, aplica deduplicação e realiza upsert em `ohlcv_daily`. Em seguida, gera um resumo do estado do banco.

In [23]:
# Helpers de normalização e upsert
import pandas as pd
import sqlalchemy as sa
from sqlalchemy import text

def ensure_tz_sp_series(s: pd.Series) -> pd.Series:
    s = pd.to_datetime(s, errors='coerce', utc=False)
    if isinstance(s.dtype, pd.DatetimeTZDtype):
        return s.dt.tz_convert('America/Sao_Paulo')
    else:
        return s.dt.tz_localize('America/Sao_Paulo')

def df_to_rows_for_db(df: pd.DataFrame) -> list[dict]:
    # Garante colunas obrigatórias
    cols = ['Symbol','Date','Open','High','Low','Close','Adj Close','Volume','Dividends','Stock Splits']
    for c in cols:
        if c not in df.columns:
            df[c] = pd.NA

    # Normaliza Date para tz America/Sao_Paulo
    dt = df['Date']
    if not isinstance(dt.dtype, pd.DatetimeTZDtype):
        dt = pd.to_datetime(dt, errors='coerce').dt.tz_localize('America/Sao_Paulo')
    else:
        dt = dt.dt.tz_convert('America/Sao_Paulo')
    date_iso = dt.apply(lambda x: x.isoformat() if pd.notnull(x) else None)

    tmp = df.copy()
    symbol_db = tmp['Symbol'].astype(str).str.lower().str.lstrip('_')
    mask = date_iso.notna()
    if not mask.any():
        return []

    out = pd.DataFrame({
        'symbol': symbol_db[mask],
        'date': date_iso[mask],
        'open': pd.to_numeric(tmp.loc[mask, 'Open'], errors='coerce'),
        'high': pd.to_numeric(tmp.loc[mask, 'High'], errors='coerce'),
        'low': pd.to_numeric(tmp.loc[mask, 'Low'], errors='coerce'),
        'close': pd.to_numeric(tmp.loc[mask, 'Close'], errors='coerce'),
        'adj_close': pd.to_numeric(tmp.loc[mask, 'Adj Close'], errors='coerce'),
        'volume': pd.to_numeric(tmp.loc[mask, 'Volume'], errors='coerce').astype('Int64'),
        'dividends': pd.to_numeric(tmp.loc[mask, 'Dividends'], errors='coerce'),
        'stock_splits': pd.to_numeric(tmp.loc[mask, 'Stock Splits'], errors='coerce'),
    })
    out = out.astype(object).where(pd.notnull(out), None)
    # Deduplicar por (symbol, date) por segurança
    out = out.sort_values(['symbol','date']).drop_duplicates(['symbol','date'], keep='last')
    return out.to_dict(orient='records')


dialect = sa.create_engine(DB_URL).dialect.name
is_pg = (dialect == 'postgresql')
if is_pg:
    UPSERT_SQL = text("""
        INSERT INTO ohlcv_daily (symbol, date, open, high, low, close, adj_close, volume, dividends, stock_splits)
        VALUES (:symbol, :date, :open, :high, :low, :close, :adj_close, :volume, :dividends, :stock_splits)
        ON CONFLICT (symbol, date) DO UPDATE SET
            open=EXCLUDED.open,
            high=EXCLUDED.high,
            low=EXCLUDED.low,
            close=EXCLUDED.close,
            adj_close=EXCLUDED.adj_close,
            volume=EXCLUDED.volume,
            dividends=EXCLUDED.dividends,
            stock_splits=EXCLUDED.stock_splits
    """)
else:
    UPSERT_SQL = text("""
        INSERT OR REPLACE INTO ohlcv_daily (symbol, date, open, high, low, close, adj_close, volume, dividends, stock_splits)
        VALUES (:symbol, :date, :open, :high, :low, :close, :adj_close, :volume, :dividends, :stock_splits)
    """)

In [31]:
# Ingestão incremental com base em delta_df (via Twelve Data)
from datetime import timedelta
import pandas as pd

ingested = []
failed = []

with engine.begin() as conn:
    for _, r in delta_df.iterrows():
        sym = r['symbol']
        sym_db = r['symbol_db'] if 'symbol_db' in r else to_db_symbol(sym)
        td_sym = r.get('td_symbol')
        ffrom = r['fetch_from']
        fto = r['fetch_to']
        status = r['status']
        if status != 'delta' or pd.isna(ffrom) or pd.isna(fto) or not td_sym:
            continue
        try:
            start_str = pd.Timestamp(ffrom).date().strftime('%Y-%m-%d')
            end_str = pd.Timestamp(fto).date().strftime('%Y-%m-%d')
            df = td_download_range(td_sym, start_str, end_str)
            if df is None or df.empty:
                failed.append((sym, 'empty-download'))
                continue
            # Padronizar colunas esperadas
            for c in ['Open','High','Low','Close','Adj Close','Volume','Dividends','Stock Splits']:
                if c not in df.columns:
                    if c == 'Adj Close' and 'Close' in df.columns:
                        df[c] = df['Close']
                    else:
                        df[c] = pd.NA

            dt = df['Date']
            if not isinstance(dt.dtype, pd.DatetimeTZDtype):
                dt = pd.to_datetime(dt, errors='coerce').dt.tz_localize('America/Sao_Paulo')
            else:
                dt = dt.dt.tz_convert('America/Sao_Paulo')
            date_iso = dt.apply(lambda x: x.isoformat() if pd.notnull(x) else None)

            rows_df = pd.DataFrame({
                'symbol': pd.Series([sym_db]*len(df)),
                'date': date_iso,
                'open': pd.to_numeric(df['Open'], errors='coerce'),
                'high': pd.to_numeric(df['High'], errors='coerce'),
                'low': pd.to_numeric(df['Low'], errors='coerce'),
                'close': pd.to_numeric(df['Close'], errors='coerce'),
                'adj_close': pd.to_numeric(df['Adj Close'], errors='coerce'),
                'volume': pd.to_numeric(df['Volume'], errors='coerce').astype('Int64'),
                'dividends': pd.to_numeric(df['Dividends'], errors='coerce'),
                'stock_splits': pd.to_numeric(df['Stock Splits'], errors='coerce'),
            })
            rows_df = rows_df.astype(object).where(pd.notnull(rows_df), None)
            rows_df = rows_df.dropna(subset=['date'])
            if rows_df.empty:
                failed.append((sym, 'no-valid-dates'))
                continue
            rows_df = rows_df.sort_values(['symbol','date']).drop_duplicates(['symbol','date'], keep='last')
            rows = rows_df.to_dict(orient='records')
            if rows:
                conn.execute(UPSERT_SQL, rows)
                ingested.append((sym, len(rows)))
        except Exception as e:
            failed.append((sym, str(e)))

print('Ingestão concluída (Twelve Data).')
print('Símbolos com dados inseridos:', len(ingested))
print('Falhas:', len(failed))
ingested[:10], failed[:10]


1 Failed download:
['ABEV3.SA']: YFPricesMissingError('possibly delisted; no price data found  (1d 2025-09-27 -> 2025-09-30)')

1 Failed download:
['CPLE6.SA']: YFPricesMissingError('possibly delisted; no price data found  (1d 2025-09-27 -> 2025-09-30)')

1 Failed download:
['CSNA3.SA']: YFPricesMissingError('possibly delisted; no price data found  (1d 2025-09-27 -> 2025-09-30)')

1 Failed download:
['ELET3.SA']: YFPricesMissingError('possibly delisted; no price data found  (1d 2025-09-27 -> 2025-09-30)')

1 Failed download:
['GGBR4.SA']: YFPricesMissingError('possibly delisted; no price data found  (1d 2025-09-27 -> 2025-09-30)')

1 Failed download:
['HAPV3.SA']: YFPricesMissingError('possibly delisted; no price data found  (1d 2025-09-27 -> 2025-09-30)')

1 Failed download:
['RAIL3.SA']: YFPricesMissingError('possibly delisted; no price data found  (1d 2025-09-27 -> 2025-09-30)')

1 Failed download:
['RDOR3.SA']: YFPricesMissingError('possibly delisted; no price data found  (1d 2025

Ingestão concluída.
Símbolos com dados inseridos: 0
Falhas: 14


([],
 [('abev3_sa', 'empty-download'),
  ('cple6_sa', 'empty-download'),
  ('csna3_sa', 'empty-download'),
  ('elet3_sa', 'empty-download'),
  ('ggbr4_sa', 'empty-download'),
  ('hapv3_sa', 'empty-download'),
  ('rail3_sa', 'empty-download'),
  ('rdor3_sa', 'empty-download'),
  ('suzb3_sa', 'empty-download'),
  ('taee11_sa', 'empty-download')])

In [30]:
# Limpeza opcional: remover linhas além do cutoff (datas futuras em relação ao ontem SP)
import pandas as pd
from sqlalchemy import text

confirm = True  # mude para False se quiser apenas ver o que seria removido

cutoff_date_str = cutoff_date.strftime('%Y-%m-%d')
print('Cutoff SP (data):', cutoff_date_str)

with engine.begin() as conn:
    preview = pd.read_sql(text("""
        SELECT symbol, date
        FROM ohlcv_daily
        WHERE substr(date,1,10) > :cut
        ORDER BY symbol, date
        LIMIT 100
    """), conn, params={'cut': cutoff_date_str})

print('Linhas além do cutoff (mostrando até 100):', len(preview))
display(preview.head(20))  # Mostra o DataFrame no notebook

if confirm and not preview.empty:
    with engine.begin() as conn2:
        conn2.execute(text("DELETE FROM ohlcv_daily WHERE substr(date,1,10) > :cut"), {'cut': cutoff_date_str})
    print('Remoção executada para datas acima de', cutoff_date_str)
else:
    print('Remoção não executada (confirm=False) ou nenhuma linha a remover.')

Cutoff SP (data): 2025-09-30
Linhas além do cutoff (mostrando até 100): 0


Unnamed: 0,symbol,date


Remoção não executada (confirm=False) ou nenhuma linha a remover.


In [32]:
# Relato do estado do banco após ingestão
import pandas as pd
from sqlalchemy import text

with engine.begin() as conn:
    total = conn.execute(text('SELECT COUNT(*) FROM ohlcv_daily')).scalar_one()
    by_symbol = pd.read_sql(text("""
        SELECT symbol, COUNT(*) AS n, MIN(date) AS min_date, MAX(date) AS max_date
        FROM ohlcv_daily
        GROUP BY symbol
        ORDER BY symbol
    """), conn)

print('Total de linhas no banco:', total)
print('Símbolos carregados:', len(by_symbol))
by_symbol.head(20)

Total de linhas no banco: 101473
Símbolos carregados: 31


Unnamed: 0,symbol,n,min_date,max_date
0,abev3_sa,3414,2012-01-02T00:00:00-02:00,2025-09-26T00:00:00-03:00
1,b3sa3_sa,3415,2012-01-02T00:00:00-02:00,2025-09-29T00:00:00-03:00
2,bbas3_sa,3415,2012-01-02T00:00:00-02:00,2025-09-29T00:00:00-03:00
3,bvsp,3406,2012-01-03T00:00:00-02:00,2025-09-29T00:00:00-03:00
4,bz=f,3439,2012-01-03T03:00:00-02:00,2025-09-30T00:00:00-03:00
5,cple6_sa,3413,2012-01-02T00:00:00-02:00,2025-09-26T00:00:00-03:00
6,csna3_sa,3414,2012-01-02T00:00:00-02:00,2025-09-26T00:00:00-03:00
7,dx-y.nyb,3457,2012-01-03T03:00:00-02:00,2025-09-30T00:00:00-03:00
8,elet3_sa,3414,2012-01-02T00:00:00-02:00,2025-09-26T00:00:00-03:00
9,ewz,3455,2012-01-03T03:00:00-02:00,2025-09-29T00:00:00-03:00


Summarized conversation historyI'll first add a short diagnostic step to compute max dates per symbol as local dates and verify inconsistencies, then propose and implement a fix to clamp ingestion to the previous local business day and clean any future-dated rows, followed by rerunning the status cell. I'll keep you updated after these reads and edits. 

Added 2 todos

To diagnose and then fix the inconsistent max_date, I'll insert diagnostic and cleanup cells, edit the delta calculation to cap fetch_to at "yesterday in São Paulo," and then run the cleanup and re-check cells to confirm alignment. 

Retrieving Notebook summary.

Adding a diagnostic cell and updating delta logic to cap fetch_to at "yesterday in São Paulo", plus a cleanup cell to remove rows beyond the cutoff, then I'll run the cleanup and re-run ingestion and status to confirm uniform max_date. 

Editing notebook

Editing notebook

Editing notebook

Editing notebook

Starting (2/4) *Clamp fetch_to to yesterday (SP)*

Ran [](vscode-notebook-cell:/g%3A/Drives%20compartilhados/BOLSA_2026/a_bolsa2026_gemini/01_notebooks/estrutura_e_ingesta.ipynb#X23sZmlsZQ%3D%3D)

Ran [](vscode-notebook-cell:/g%3A/Drives%20compartilhados/BOLSA_2026/a_bolsa2026_gemini/01_notebooks/estrutura_e_ingesta.ipynb#X24sZmlsZQ%3D%3D)

Ran [](vscode-notebook-cell:/g%3A/Drives%20compartilhados/BOLSA_2026/a_bolsa2026_gemini/01_notebooks/estrutura_e_ingesta.ipynb#X35sZmlsZQ%3D%3D)

Ran [](vscode-notebook-cell:/g%3A/Drives%20compartilhados/BOLSA_2026/a_bolsa2026_gemini/01_notebooks/estrutura_e_ingesta.ipynb#X31sZmlsZQ%3D%3D)

Ran [](vscode-notebook-cell:/g%3A/Drives%20compartilhados/BOLSA_2026/a_bolsa2026_gemini/01_notebooks/estrutura_e_ingesta.ipynb#X34sZmlsZQ%3D%3D)

Completed (2/4) *Clamp fetch_to to yesterday (SP)*

Estou vendo a saída mais recente e a divergência de max_date entre tickers é real. Eu já rodei um diagnóstico e ajustei a rotina para não mirar além de “ontem SP” e chequei o banco. Eis o que encontramos e por quê os tickers não alinharam em 30/09/2025:

## O que aconteceu

- Distribuição das últimas datas no banco (convertidas para America/Sao_Paulo):
  - 2025-09-26: 14 símbolos
  - 2025-09-29: 14 símbolos
  - 2025-09-30: 3 símbolos

- Reingestão dos deltas até “ontem SP” (2025-09-30) falhou para vários tickers .SA com downloads vazios:
  - Mensagem típica do yfinance: “possibly delisted; no price data found (1d 2025-09-27 -> 2025-09-30)”.
  - Ou seja, a fonte (Yahoo Finance) não devolveu candles para o período, apesar de não ser feriado.

- Conferência de “datas no futuro”:
  - Adicionei uma limpeza opcional para remover linhas > 2025-09-30; o preview mostrou 0 linhas, então não foi problema de timezone empurrando para o dia seguinte.

Resumo: a falha não foi na normalização de timezone nem em end-date inclusivo. É latência/indisponibilidade de dados no Yahoo para vários tickers da B3 no momento da execução. Por isso alguns ficaram em 26/09 (sexta), outros em 29/09 (segunda) e poucos chegaram a 30/09. Essa diferença de “janela de atualização” entre bolsas/ativos no Yahoo é comum.

## Correções já aplicadas

- Cap do fetch_to: agora limitamos a busca a “ontem em São Paulo”, para evitar assimetrias por datas futuras.
- Verificação e limpeza de linhas acima do cutoff (nenhuma remoção foi necessária).
- Reexecução de ingestão com o novo cutoff: os .SA seguiram sem dados no Yahoo para 29–30/09 no momento da execução (empty-download).

## Como garantir todos em 30/09 de forma robusta

Se o requisito é “todos os símbolos devem fechar na mesma data (ontem) sempre que não houver feriado local”, você precisa neutralizar a latência de atualização da fonte:

- Calendário por bolsa + hora de execução:
  - Determinar o “último dia útil esperado” por bolsa (ex.: B3 vs NYSE) e rodar a ingestão após o horário em que o Yahoo costuma publicar os fechamentos da B3 (ex.: 21h–23h BRT), com 1–2 tentativas automáticas de reprocessamento.
- Tentar novamente (retry/backfill):
  - Se o download veio vazio, reprogramar tentativas (ex.: a cada 30 min) até alcançar a data-alvo do calendário, e só então fechar o dia.
- Fonte alternativa (opcional):
  - Se o SLA precisa ser D+0 rígido, considere uma fonte com menor latência para B3 (ex.: feed oficial B3, serviços pagos, ou outra API que atualize mais cedo que o Yahoo). Também dá para usar um “mix”: yfinance como base e fallback alternativo em D+0.
- Alertas:
  - Emitir relatório quando algum ticker esperado para a data-alvo vier vazio, para acompanhamento.

Posso implementar já:
- Um calendário simples (B3 x EUA) para estimar a data-alvo por bolsa.
- Uma janela de execução/retentativa para garantir que todos atinjam a mesma max_date.
- Um alerta/relatório destacando quais tickers ainda não atualizaram e quando reprocessar.

Se quiser, sigo com essa implementação agora e deixo a rotina automática para assegurar que todos fechem em 30/09/2025 (e nos próximos dias) de forma consistente.

Made changes.

In [33]:
# Diagnóstico: quais são as max_date por símbolo (data local SP) e quem foge do cutoff?
import pandas as pd
from sqlalchemy import text

with engine.begin() as conn:
    by_symbol_full = pd.read_sql(text("""
        SELECT symbol,
               MAX(date) AS max_date_iso
        FROM ohlcv_daily
        GROUP BY symbol
        ORDER BY symbol
    """), conn)

by_symbol_full['max_date_iso'] = pd.to_datetime(by_symbol_full['max_date_iso'], errors='coerce')
# Extrair data local (America/Sao_Paulo) a partir do ISO com offset
by_symbol_full['max_date_sp'] = by_symbol_full['max_date_iso'].dt.tz_convert('America/Sao_Paulo').dt.date

expected = (pd.Timestamp.now(tz='America/Sao_Paulo') - pd.Timedelta(days=1)).date()
print('Esperado (ontem SP):', expected)
print('Distribuição de max_date_sp:')
print(by_symbol_full['max_date_sp'].value_counts().sort_index())

offenders = by_symbol_full[by_symbol_full['max_date_sp'] != expected]
offenders

Esperado (ontem SP): 2025-09-30
Distribuição de max_date_sp:
max_date_sp
2025-09-26    14
2025-09-29    14
2025-09-30     3
Name: count, dtype: int64


Unnamed: 0,symbol,max_date_iso,max_date_sp
0,abev3_sa,2025-09-26 00:00:00-03:00,2025-09-26
1,b3sa3_sa,2025-09-29 00:00:00-03:00,2025-09-29
2,bbas3_sa,2025-09-29 00:00:00-03:00,2025-09-29
3,bvsp,2025-09-29 00:00:00-03:00,2025-09-29
5,cple6_sa,2025-09-26 00:00:00-03:00,2025-09-26
6,csna3_sa,2025-09-26 00:00:00-03:00,2025-09-26
8,elet3_sa,2025-09-26 00:00:00-03:00,2025-09-26
9,ewz,2025-09-29 00:00:00-03:00,2025-09-29
10,ggbr4_sa,2025-09-26 00:00:00-03:00,2025-09-26
11,gspc,2025-09-29 00:00:00-03:00,2025-09-29
