# 00 — Setup de Dados (Binance Vision → Postgres)

Este notebook é o **combustível** do laboratório (Artigo 2+):

- Baixa candles (BTCUSDT/ETHUSDT) do **Binance Vision**
- Insere no Postgres (`ohlcv_candles`) para o **Rust** consumir via `KAIROS_DB_URL`
- Gera **features mínimas** (retorno/volatilidade/trend)
- Gera **labels de regime** via **K-Means** (determinístico)

Regras:
- Figuras **apenas inline** (não salvar imagens no disco).
- Datasets baixados/cache ficam em `notebooks/data/` (gitignored).


## Pré-requisitos

Rode dentro do container `dev` com o Postgres disponível (veja `README.md`).

Você precisa do env var:
- `KAIROS_DB_URL` (ex.: `postgres://kairos:...@db:5432/kairos`)


In [None]:
from __future__ import annotations

import csv
import datetime as dt
import io
import os
import subprocess
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Iterator, Optional

import numpy as np
import pandas as pd
import requests
import psycopg
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler

print("pandas", pd.__version__)
print("numpy", np.__version__)


## Configuração (edite aqui)

Dica: comece com um recorte pequeno (ex.: 7 dias) para validar o pipeline.


In [None]:
# Símbolos padrão (Binance spot)
SYMBOLS = ["BTCUSDT", "ETHUSDT"]

# Intervalo para download/ingest
START = "2024-01-01T00:00:00Z"
END = "2024-02-01T00:00:00Z"

# Fonte Binance Vision (klines)
VISION_TIMEFRAME = "1m"  # fonte
KAIROS_TIMEFRAME = "1min"  # armazenado no Postgres (caso do Rust)

# Dimensões do regime (K-Means)
N_REGIMES = 3
RANDOM_SEED = 42

# Cache local (gitignored)
CACHE_DIR = Path("notebooks/data/binance_vision")
CACHE_DIR.mkdir(parents=True, exist_ok=True)

DB_URL = os.environ.get("KAIROS_DB_URL")
if not DB_URL:
    raise RuntimeError("Missing KAIROS_DB_URL. Start dev container or export it.")

print("DB_URL set")


## Migrar schema do Postgres

Isso cria `ohlcv_candles` se ainda não existir (ver `migrations/`).


In [None]:
def run_migrations(db_url: str) -> None:
    cmd = [
        "cargo",
        "run",
        "-q",
        "-p",
        "kairos-ingest",
        "--",
        "migrate",
        "--db-url",
        db_url,
    ]
    subprocess.run(cmd, check=True)


run_migrations(DB_URL)
print("migrations OK")


## Helpers: meses, URL e download

Usamos o dataset público do Binance Vision (evita rate-limit e API keys).


In [None]:
def parse_rfc3339(ts: str) -> dt.datetime:
    ts = ts.strip()
    if ts.endswith("Z"):
        ts = ts[:-1] + "+00:00"
    return dt.datetime.fromisoformat(ts).astimezone(dt.timezone.utc)


def iter_months(start: dt.datetime, end: dt.datetime) -> list[str]:
    # inclusive start month, exclusive end month
    cur = dt.datetime(start.year, start.month, 1, tzinfo=dt.timezone.utc)
    end_month = dt.datetime(end.year, end.month, 1, tzinfo=dt.timezone.utc)
    out: list[str] = []
    while cur <= end_month:
        out.append(f"{cur.year:04d}-{cur.month:02d}")
        if cur.month == 12:
            cur = dt.datetime(cur.year + 1, 1, 1, tzinfo=dt.timezone.utc)
        else:
            cur = dt.datetime(cur.year, cur.month + 1, 1, tzinfo=dt.timezone.utc)
    return out


def vision_monthly_url(symbol: str, timeframe: str, yyyy_mm: str) -> str:
    # Example:
    # https://data.binance.vision/data/spot/monthly/klines/BTCUSDT/1m/BTCUSDT-1m-2024-01.zip
    return (
        "https://data.binance.vision/data/spot/monthly/klines/"
        f"{symbol}/{timeframe}/{symbol}-{timeframe}-{yyyy_mm}.zip"
    )


def download_if_missing(url: str, dest: Path, *, timeout_s: int = 60) -> Path:
    dest.parent.mkdir(parents=True, exist_ok=True)
    if dest.exists() and dest.stat().st_size > 0:
        return dest

    r = requests.get(url, stream=True, timeout=timeout_s)
    if r.status_code != 200:
        raise RuntimeError(f"download failed: {url} status={r.status_code}")

    tmp = dest.with_suffix(dest.suffix + ".part")
    with tmp.open("wb") as f:
        for chunk in r.iter_content(chunk_size=1024 * 1024):
            if not chunk:
                continue
            f.write(chunk)
    tmp.replace(dest)
    return dest


START_DT = parse_rfc3339(START)
END_DT = parse_rfc3339(END)
MONTHS = iter_months(START_DT, END_DT)
MONTHS


## Parser: ler um ZIP do Binance Vision em chunks

O CSV do Binance Vision geralmente vem **sem header**. Usamos os campos relevantes:
- `open_time(ms)` como timestamp (UTC)
- `open/high/low/close/volume`
- `quote_asset_volume` como `turnover` (opcional)


In [None]:
BINANCE_KLINE_COLUMNS = [
    "open_time_ms",
    "open",
    "high",
    "low",
    "close",
    "volume",
    "close_time_ms",
    "quote_asset_volume",
    "number_of_trades",
    "taker_buy_base_asset_volume",
    "taker_buy_quote_asset_volume",
    "ignore",
]


@dataclass(frozen=True)
class KlineChunk:
    symbol: str
    df: pd.DataFrame


def iter_vision_zip_chunks(zip_path: Path, symbol: str, *, chunksize: int = 250_000) -> Iterator[KlineChunk]:
    with zipfile.ZipFile(zip_path, "r") as zf:
        members = [m for m in zf.namelist() if m.lower().endswith(".csv")]
        if len(members) != 1:
            raise RuntimeError(f"unexpected zip members in {zip_path}: {members}")
        name = members[0]
        with zf.open(name, "r") as f:
            wrapper = io.TextIOWrapper(f, encoding="utf-8")
            for chunk in pd.read_csv(
                wrapper,
                header=None,
                names=BINANCE_KLINE_COLUMNS,
                chunksize=chunksize,
            ):
                # Normalize types
                chunk = chunk[[
                    "open_time_ms",
                    "open",
                    "high",
                    "low",
                    "close",
                    "volume",
                    "quote_asset_volume",
                ]].copy()
                chunk["open_time_ms"] = pd.to_numeric(chunk["open_time_ms"], errors="coerce").astype("Int64")
                for col in ["open", "high", "low", "close", "volume", "quote_asset_volume"]:
                    chunk[col] = pd.to_numeric(chunk[col], errors="coerce")
                chunk = chunk.dropna(subset=["open_time_ms", "open", "high", "low", "close", "volume"])

                # Convert timestamp
                chunk["timestamp_utc"] = pd.to_datetime(chunk["open_time_ms"].astype("int64"), unit="ms", utc=True)
                chunk = chunk.drop(columns=["open_time_ms"])
                chunk = chunk.rename(columns={"quote_asset_volume": "turnover"})

                yield KlineChunk(symbol=symbol, df=chunk)


## Ingest: upsert no Postgres (`ohlcv_candles`)

A tabela possui PK `(exchange, market, symbol, timeframe, timestamp_utc)`.
Vamos inserir com `ON CONFLICT DO NOTHING` para ser idempotente.


In [None]:
def ensure_tmp_table(cur) -> None:
    cur.execute(
        """
        CREATE TEMP TABLE IF NOT EXISTS tmp_ohlcv (
            exchange TEXT NOT NULL,
            market TEXT NOT NULL,
            symbol TEXT NOT NULL,
            timeframe TEXT NOT NULL,
            timestamp_utc TIMESTAMPTZ NOT NULL,
            open DOUBLE PRECISION NOT NULL,
            high DOUBLE PRECISION NOT NULL,
            low DOUBLE PRECISION NOT NULL,
            close DOUBLE PRECISION NOT NULL,
            volume DOUBLE PRECISION NOT NULL,
            turnover DOUBLE PRECISION,
            source TEXT NOT NULL
        );
        """
    )


def copy_chunk_to_tmp(cur, rows: Iterable[tuple]) -> None:
    # psycopg3 COPY
    with cur.copy(
        "COPY tmp_ohlcv (exchange, market, symbol, timeframe, timestamp_utc, open, high, low, close, volume, turnover, source) FROM STDIN"
    ) as copy:
        for row in rows:
            copy.write_row(row)


def flush_tmp_into_main(cur) -> int:
    cur.execute(
        """
        INSERT INTO ohlcv_candles (
            exchange, market, symbol, timeframe, timestamp_utc,
            open, high, low, close, volume, turnover, source
        )
        SELECT
            exchange, market, symbol, timeframe, timestamp_utc,
            open, high, low, close, volume, turnover, source
        FROM tmp_ohlcv
        ON CONFLICT DO NOTHING;
        """
    )
    return int(cur.rowcount or 0)


def ingest_kline_chunk(conn, *, symbol: str, df: pd.DataFrame) -> int:
    # Normalize and sort
    df = df.copy()
    df = df[(df["timestamp_utc"] >= START_DT) & (df["timestamp_utc"] <= END_DT)]
    if df.empty:
        return 0

    df = df.sort_values("timestamp_utc", kind="stable")
    df = df.drop_duplicates(subset=["timestamp_utc"], keep="last")

    exchange = "binance"
    market = "spot"
    timeframe = KAIROS_TIMEFRAME
    source = "binance_vision"

    rows = (
        (
            exchange,
            market,
            symbol,
            timeframe,
            ts.to_pydatetime(),
            float(o),
            float(h),
            float(l),
            float(c),
            float(v),
            (None if pd.isna(t) else float(t)),
            source,
        )
        for ts, o, h, l, c, v, t in zip(
            df["timestamp_utc"],
            df["open"],
            df["high"],
            df["low"],
            df["close"],
            df["volume"],
            df.get("turnover", pd.Series([None] * len(df))),
            strict=False,
        )
    )

    with conn.cursor() as cur:
        ensure_tmp_table(cur)
        cur.execute("TRUNCATE tmp_ohlcv;")
        copy_chunk_to_tmp(cur, rows)
        inserted = flush_tmp_into_main(cur)
    return inserted


## Baixar + Ingerir (BTC/ETH)

Este passo pode demorar dependendo do intervalo. Use um intervalo curto para smoke test.


In [None]:
def ingest_symbol(conn, symbol: str) -> int:
    total_inserted = 0
    for yyyy_mm in MONTHS:
        url = vision_monthly_url(symbol, VISION_TIMEFRAME, yyyy_mm)
        zip_path = CACHE_DIR / f"{symbol}-{VISION_TIMEFRAME}-{yyyy_mm}.zip"
        print("download", symbol, yyyy_mm)
        download_if_missing(url, zip_path)

        inserted_month = 0
        for chunk in iter_vision_zip_chunks(zip_path, symbol):
            inserted_month += ingest_kline_chunk(conn, symbol=symbol, df=chunk.df)
        print("ingested", symbol, yyyy_mm, "inserted=", inserted_month)
        total_inserted += inserted_month
    return total_inserted


with psycopg.connect(DB_URL) as conn:
    conn.execute("SET TIME ZONE 'UTC';")
    grand_total = 0
    for sym in SYMBOLS:
        grand_total += ingest_symbol(conn, sym)
    conn.commit()

print("total inserted:", grand_total)


## Verificação rápida no DB


In [None]:
def summarize_db(conn, symbol: str) -> pd.DataFrame:
    q = """
    SELECT
      symbol,
      timeframe,
      MIN(timestamp_utc) AS min_ts,
      MAX(timestamp_utc) AS max_ts,
      COUNT(*) AS rows
    FROM ohlcv_candles
    WHERE exchange = 'binance'
      AND market = 'spot'
      AND timeframe = %s
      AND symbol = %s
      AND timestamp_utc >= %s
      AND timestamp_utc <= %s
    GROUP BY symbol, timeframe
    ORDER BY symbol;
    """
    rows = conn.execute(q, (KAIROS_TIMEFRAME, symbol, START_DT, END_DT)).fetchall()
    return pd.DataFrame(rows, columns=["symbol", "timeframe", "min_ts", "max_ts", "rows"])


with psycopg.connect(DB_URL) as conn:
    for sym in SYMBOLS:
        display(summarize_db(conn, sym))


## Features + Regimes (K-Means) para Artigo 2

Para reduzir volume, derivamos regimes em **1h** (a partir do close 1min).

Saída (CSV) em `notebooks/data/`:
- `regimes_<SYMBOL>_<START>_<END>_1hour.csv`

Obs: isso é **para notebooks/treino/validação**, não é consumido pelo Rust ainda.


In [None]:
def load_1min_close(conn, symbol: str) -> pd.DataFrame:
    q = """
    SELECT timestamp_utc, close, volume
    FROM ohlcv_candles
    WHERE exchange = 'binance'
      AND market = 'spot'
      AND timeframe = %s
      AND symbol = %s
      AND timestamp_utc >= %s
      AND timestamp_utc <= %s
    ORDER BY timestamp_utc;
    """
    rows = conn.execute(q, (KAIROS_TIMEFRAME, symbol, START_DT, END_DT)).fetchall()
    df = pd.DataFrame(rows, columns=["timestamp_utc", "close", "volume"]).copy()
    df["timestamp_utc"] = pd.to_datetime(df["timestamp_utc"], utc=True)
    return df


def compute_hourly_regimes(df_1min: pd.DataFrame, *, n_regimes: int, seed: int) -> pd.DataFrame:
    df = df_1min.copy()
    df = df.set_index("timestamp_utc")
    hourly = pd.DataFrame(
        {
            "close": df["close"].resample("1H").last(),
            "volume": df["volume"].resample("1H").sum(min_count=1),
        }
    ).dropna(subset=["close"])

    hourly["log_return"] = np.log(hourly["close"]).diff()
    hourly["volatility"] = hourly["log_return"].rolling(24, min_periods=12).std()
    hourly["trend"] = hourly["log_return"].rolling(24, min_periods=12).mean()
    hourly = hourly.dropna(subset=["log_return", "volatility", "trend"]).copy()

    X = hourly[["volatility", "trend"]].to_numpy(dtype=np.float64)
    X = StandardScaler().fit_transform(X)

    km = KMeans(n_clusters=int(n_regimes), random_state=int(seed), n_init="auto")
    hourly["regime_id"] = km.fit_predict(X).astype(int)
    out = hourly.reset_index()[["timestamp_utc", "regime_id", "volatility", "trend"]]
    return out


def safe_slug(ts: str) -> str:
    return ts.replace(":", "").replace("+", "").replace("-", "")


def write_regimes_csv(symbol: str, regimes: pd.DataFrame) -> Path:
    out_dir = Path("notebooks/data")
    out_dir.mkdir(parents=True, exist_ok=True)
    out = out_dir / f"regimes_{symbol}_{safe_slug(START)}_{safe_slug(END)}_1hour.csv"
    regimes.assign(symbol=symbol).to_csv(out, index=False)
    return out


with psycopg.connect(DB_URL) as conn:
    for sym in SYMBOLS:
        df_1min = load_1min_close(conn, sym)
        regimes = compute_hourly_regimes(df_1min, n_regimes=N_REGIMES, seed=RANDOM_SEED)
        out_path = write_regimes_csv(sym, regimes)
        print("wrote", out_path, "rows=", len(regimes))
        display(regimes.head())


## (Opcional) Smoke test com Rust

Ajuste uma config para usar `exchange="binance"`, `symbol="BTCUSDT"` e rode validate/backtest.

Exemplo (headless):

```bash
cargo run -q -p kairos-tui -- --headless --mode validate --config configs/sample.toml --strict
cargo run -q -p kairos-tui -- --headless --mode backtest --config configs/sample.toml
```
