# Etapas de establcer el espacio de trabajo para la importación de historicos de las criptos.

## Paso 1 — Preparar proyecto y configs

1) Comandos (Terminal en tu Mac)

Copia y pega esto tal cual:

In [None]:
# 1) Carpeta del proyecto y entorno virtual
mkdir -p ~/Trading/algo-trading && cd ~/Trading/algo-trading
python3 -m venv .venv
source .venv/bin/activate

# 2) Estructura mínima
mkdir -p configs services/{ingestion,processing} tools/{scripts,duckdb_queries} tests

# 3) Dependencias
cat > requirements.txt <<'EOF'
pandas
pyarrow
duckdb
httpx
tenacity
pydantic
python-dateutil
PyYAML
pytest
typer
EOF
pip install -r requirements.txt

# 4) .gitignore
cat > .gitignore <<'EOF'
.venv/
__pycache__/
*.pyc
data/
.env
EOF

# 5) (opcional) Crea subcarpetas dentro de TU data lake real (por si no existen)
mkdir -p /Users/sultan/Trading/data/{checkpoints,trusted,curated}

In [None]:
cd /Users/sultan/Trading/algo-trading
mkdir -p configs

# data_paths.yml
cat > configs/data_paths.yml <<'YAML'
data_root: "/Users/sultan/Trading/data"
checkpoints_dir: "/Users/sultan/Trading/data/checkpoints"
trusted_root: "/Users/sultan/Trading/data/trusted"
curated_root: "/Users/sultan/Trading/data/curated"
YAML

# symbols.yml
cat > configs/symbols.yml <<'YAML'
market: futures
segment: usdtm
contract: perpetual
intervals: ["1d"]

symbol_map:
  "BTCUSDT": "BTCUSDT"
  "ETHUSDT": "ETHUSDT"
  "BNBUSDT": "BNBUSDT"
  "XRPUSDT": "XRPUSDT"
  "ADAUSDT": "ADAUSDT"
  "DOGEUSDT": "DOGEUSDT"
  "SOLUSDT": "SOLUSDT"
  "TRXUSDT": "TRXUSDT"
  "DOTUSDT": "DOTUSDT"
  "LTCUSDT": "LTCUSDT"
  "1000SHIBUSDT.P": "1000SHIBUSDT"
  "BCHUSDT": "BCHUSDT"
  "LINKUSDT": "LINKUSDT"
  "AVAXUSDT": "AVAXUSDT"
  "XMRUSDT.P": "XMRUSDT"
  "XLMUSDT": "XLMUSDT"
  "UNIUSDT": "UNIUSDT"
  "ETCUSDT": "ETCUSDT"
  "HBARUSDT": "HBARUSDT"
YAML

# orchestration.yml
cat > configs/orchestration.yml <<'YAML'
jobs:
  ingest_daily:
    schedule_utc: "00:05"
    streams: ["klines_1d", "funding", "open_interest", "mark_index"]
retries:
  max_attempts: 5
  base_sleep_seconds: 1.0
  max_sleep_seconds: 60.0
YAML

In [None]:
Paso 2 — Test mínimo (asegura que todo está bien)

tests/test_setup.py


import yaml, pathlib

def test_configs_exist_and_symbols_loaded():
    base = pathlib.Path(__file__).resolve().parents[1]
    for rel in ["configs/symbols.yml", "configs/data_paths.yml", "configs/orchestration.yml"]:
        assert (base / rel).exists(), f"Missing {rel}"

    with open(base / "configs/symbols.yml", "r") as f:
        cfg = yaml.safe_load(f)
    smap = cfg.get("symbol_map", {})
    assert len(smap) == 19, f"Expected 19 mapped symbols, got {len(smap)}"

    # Normalización de sufijos .P
    assert smap["1000SHIBUSDT.P"] == "1000SHIBUSDT"
    assert smap["XMRUSDT.P"] == "XMRUSDT"

def test_paths_point_to_user_data():
    base = pathlib.Path(__file__).resolve().parents[1]
    dp = yaml.safe_load((base / "configs" / "data_paths.yml").read_text())
    assert dp["data_root"] == "/Users/sultan/Trading/data"
    assert dp["checkpoints_dir"].startswith(dp["data_root"])
    assert dp["trusted_root"].startswith(dp["data_root"])
    assert dp["curated_root"].startswith(dp["data_root"])

In [None]:
pytest -q

In [None]:
Esperado: 2 passed

Si pasa, ya quedaste con:
• Proyecto base ✔️
• Data Lake apuntando a tu carpeta real ✔️
• Lista de símbolos de futuros (PERP, USDT-M, 1d) ✔️

## Paso 2 — Cliente de exchange + pipeline (solo OHLCV 1d)

1) Crea archivos y carpetas

Copia y pega tal cual en tu terminal (dentro de ~/Trading/algo-trading y con el venv activo):

# Carpeta destino
cd /Users/sultan/Trading/algo-trading

# Archivos fuente
mkdir -p services/ingestion services/processing

# 1) Cliente Binance (klines 1d + paginación + backoff)
cat > services/ingestion/binance_client.py <<'PY'
from __future__ import annotations
import math, time
from typing import Iterable, List, Dict, Any, Optional
import httpx
from tenacity import retry, wait_exponential_jitter, stop_after_attempt, retry_if_exception_type

BINANCE_UF_BASE = "https://fapi.binance.com"  # USDT-M Futures

class BinanceRateLimit(Exception): ...
class BinanceHTTPError(Exception): ...

def _to_ms(ts: float) -> int:
    return int(ts * 1000)

@retry(
    reraise=True,
    wait=wait_exponential_jitter(initial=1, max=60),
    stop=stop_after_attempt(7),
    retry=retry_if_exception_type((BinanceRateLimit, BinanceHTTPError, httpx.ConnectError, httpx.ReadTimeout))
)
def _get(path: str, params: Dict[str, Any]) -> Any:
    url = BINANCE_UF_BASE + path
    with httpx.Client(timeout=30) as client:
        r = client.get(url, params=params)
    if r.status_code == 429:
        raise BinanceRateLimit("HTTP 429 rate limit")
    if r.status_code >= 400:
        raise BinanceHTTPError(f"http {r.status_code}: {r.text[:200]}")
    return r.json()

def fetch_klines_1d(symbol: str, start_ms: Optional[int]=None, end_ms: Optional[int]=None, limit: int=1500) -> List[List[Any]]:
    """
    Descarga klines 1d (OHLCV) para USDT-M Futures.
    Devuelve lista de velas en el formato de Binance (listas).
    """
    params = {"symbol": symbol, "interval": "1d", "limit": limit}
    if start_ms is not None: params["startTime"] = start_ms
    if end_ms is not None: params["endTime"] = end_ms
    data = _get("/fapi/v1/klines", params)
    # Estructura por vela:
    # [0] openTime, [1] open, [2] high, [3] low, [4] close, [5] volume,
    # [6] closeTime, [7] quoteAssetVolume, [8] numberOfTrades,
    # [9] takerBuyBase, [10] takerBuyQuote, [11] ignore
    return data

def paginate_klines_1d(symbol: str, start_ms: int, end_ms: int, limit: int=1500) -> Iterable[List[List[Any]]]:
    """
    Generador que itera en ventanas hasta cubrir [start_ms, end_ms].
    """
    current = start_ms
    while current <= end_ms:
        batch = fetch_klines_1d(symbol, start_ms=current, end_ms=end_ms, limit=limit)
        if not batch:
            break
        yield batch
        last_close = batch[-1][6]
        next_start = last_close + 1
        if next_start <= current:  # seguridad
            next_start = current + 1
        current = next_start
        # pequeño respiro para ser amable con la API
        time.sleep(0.2)
PY

# 2) Validación y normalización de OHLCV
cat > services/processing/validate.py <<'PY'
from __future__ import annotations
import pandas as pd

def normalize_klines_to_df(klines: list[list]) -> pd.DataFrame:
    cols = ["open_time","open","high","low","close","volume","close_time","quote_volume","trades","taker_buy_base","taker_buy_quote","ignore"]
    df = pd.DataFrame(klines, columns=cols)
    # tipos
    for c in ["open","high","low","close","volume","quote_volume","taker_buy_base","taker_buy_quote"]:
        df[c] = pd.to_numeric(df[c], errors="coerce")
    df["trades"] = pd.to_numeric(df["trades"], errors="coerce").astype("Int64")
    # fecha de referencia = close_time normalizado a día UTC
    df["date"] = pd.to_datetime(df["close_time"], unit="ms", utc=True).dt.normalize()
    return df[["date","open","high","low","close","volume","trades","quote_volume","close_time"]]

def validate_ohlcv_rules(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    # reglas OHLCV
    ok1 = df["low"] <= df[["open","close","high"]].min(axis=1)
    ok2 = df["high"] >= df[["open","close","low"]].max(axis=1)
    ok3 = (df["volume"] >= 0) & (df["trades"].fillna(0) >= 0)
    df = df[ ok1 & ok2 & ok3 ]
    # orden y de-dup por (date)
    df = df.sort_values("date").drop_duplicates(subset=["date"], keep="last").reset_index(drop=True)
    return df
PY

# 3) Escritura Parquet (upsert idempotente por partición year/month)
cat > services/ingestion/writer.py <<'PY'
from __future__ import annotations
import json, os
from pathlib import Path
import pandas as pd

def save_checkpoint(path: str, last_close_time_ms: int) -> None:
    Path(path).parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w") as f:
        json.dump({"last_close_time": int(last_close_time_ms)}, f)

def load_checkpoint(path: str) -> int | None:
    p = Path(path)
    if not p.exists(): return None
    try:
        obj = json.loads(p.read_text())
        return int(obj.get("last_close_time"))
    except Exception:
        return None

def upsert_parquet_curated(df: pd.DataFrame, curated_root: str, market:str, segment:str, contract:str, symbol:str, interval:str="1d") -> int:
    if df.empty: return 0
    root = Path(curated_root) / f"market={market}" / f"segment={segment}" / f"contract={contract}" / f"symbol={symbol}" / f"interval={interval}"
    written = 0
    for y in sorted(df["date"].dt.year.unique()):
        df_y = df[df["date"].dt.year == y]
        for m in sorted(df_y["date"].dt.month.unique()):
            part = df_y[df_y["date"].dt.month == m].copy()
            part = part.sort_values("date").drop_duplicates(subset=["date"], keep="last")
            part_dir = root / f"year={y}" / f"month={m:02d}"
            part_dir.mkdir(parents=True, exist_ok=True)
            path = part_dir / "data.parquet"
            # merge incremental: lee si existe y concatena
            if path.exists():
                old = pd.read_parquet(path)
                merged = pd.concat([old, part], ignore_index=True)
                merged = merged.sort_values("date").drop_duplicates(subset=["date"], keep="last")
            else:
                merged = part
            merged.to_parquet(path, index=False)
            written += len(part)
    return written
PY

# 4) Pipeline (backfill + incremental) con Typer CLI
cat > services/ingestion/pipeline.py <<'PY'
from __future__ import annotations
import json
from pathlib import Path
from typing import Dict, Any
import yaml, pandas as pd, typer
from services.ingestion.binance_client import paginate_klines_1d
from services.processing.validate import normalize_klines_to_df, validate_ohlcv_rules
from services.ingestion.writer import save_checkpoint, load_checkpoint, upsert_parquet_curated

app = typer.Typer(help="Ingesta OHLCV 1d Futuros USDT-M (PERP) → Parquet")

def _load_cfg() -> tuple[dict, dict]:
    base = Path(__file__).resolve().parents[2]  # .../algo-trading
    paths = yaml.safe_load((base/"configs"/"data_paths.yml").read_text())
    symbols = yaml.safe_load((base/"configs"/"symbols.yml").read_text())
    return paths, symbols

def _ckpt_path(paths:dict, symbol:str) -> str:
    return str(Path(paths["checkpoints_dir"]) / f"{symbol}_1d_klines.json")

@app.command()
def backfill(symbol: str, start: str="2017-08-17", end: str=None):
    """
    Backfill completo para un símbolo. start/end en formato YYYY-MM-DD (UTC).
    """
    paths, symbols = _load_cfg()
    api_symbol = symbols["symbol_map"].get(symbol, symbol)
    market = symbols.get("market","futures"); segment=symbols.get("segment","usdtm"); contract=symbols.get("contract","perpetual")
    start_ms = int(pd.Timestamp(start, tz="UTC").timestamp()*1000)
    end_ms = int(pd.Timestamp(end, tz="UTC").timestamp()*1000) if end else int(pd.Timestamp.utcnow().timestamp()*1000)

    ckpt_file = _ckpt_path(paths, symbol)
    last = load_checkpoint(ckpt_file)
    if last is not None and last > start_ms:
        start_ms = last + 1  # continúa desde checkpoint

    total_rows = 0; last_close_ms = None
    for batch in paginate_klines_1d(api_symbol, start_ms, end_ms):
        df = normalize_klines_to_df(batch)
        df = validate_ohlcv_rules(df)
        if df.empty: 
            continue
        upsert_parquet_curated(df, paths["curated_root"], market, segment, contract, symbol, "1d")
        total_rows += len(df)
        last_close_ms = int(df["close_time"].iloc[-1])
        save_checkpoint(ckpt_file, last_close_ms)

    typer.echo(json.dumps({"symbol": symbol, "rows_written": total_rows, "last_close_time": last_close_ms}, indent=2))

@app.command()
def delta(symbol: str):
    """
    Baja solo el delta desde el checkpoint hasta 'ahora'.
    """
    return backfill(symbol=symbol, start="1970-01-01", end=None)  # reutiliza lógica

if __name__ == "__main__":
    app()
PY

# 5) Testes básicos (sin tocar internet): validación + upsert
cat > tests/test_ingest_klines_local.py <<'PY'
import pandas as pd
from services.processing.validate import validate_ohlcv_rules

def test_validate_basic_rules():
    df = pd.DataFrame({
        "date": pd.to_datetime(["2024-01-01","2024-01-02"], utc=True),
        "open":[100,110],"high":[120,130],"low":[90,100],"close":[115,120],
        "volume":[10,20],"trades":[100,200],"quote_volume":[1000,2000],"close_time":[1704067200000,1704153600000]
    })
    out = validate_ohlcv_rules(df)
    assert len(out)==2
PY