In [1]:
# Binance 1h OHLCV backfill (CSV only), starting 2020-01-01
# - Works for BTC first (can expand later)
# - Robust pagination + retries
# - Safe resume if CSV already exists

import ccxt
import pandas as pd
from dateutil import parser
from datetime import datetime, timezone
from time import sleep
from tqdm import tqdm
import os
import math

# ---------------- CONFIG ----------------
EXCHANGE = "binance"
SYMBOLS  = ["BTC/USDT"]           # start with BTC; add more later
TIMEFRAME = "1h"                  # fixed per your plan
SINCE     = "2020-01-01T00:00:00Z"
SAVE_DIR  = "../data/binance"
RATELIMIT_SLEEP = 0.25            # seconds between page requests
MAX_RETRIES = 5
RETRY_WAIT_BASE = 2.0             # exponential backoff base
# ---------------------------------------

os.makedirs(SAVE_DIR, exist_ok=True)

# init exchange (public OHLCV doesn't require keys)
ex = getattr(ccxt, EXCHANGE)({
    "enableRateLimit": True,
    "options": {"adjustForTimeDifference": True}
})

def iso_to_ms(iso: str) -> int:
    return int(parser.isoparse(iso).timestamp() * 1000)

def timeframe_ms(tf: str) -> int:
    return int(ex.parse_timeframe(tf) * 1000)

def read_existing_start(csv_path: str, default_since_iso: str) -> int:
    """If CSV exists, resume after its last timestamp; else use default SINCE."""
    if not os.path.exists(csv_path):
        return iso_to_ms(default_since_iso)
    try:
        df = pd.read_csv(csv_path, parse_dates=["timestamp"])
        if df.empty:
            return iso_to_ms(default_since_iso)
        last_ts = int(pd.Timestamp(df["timestamp"].max()).tz_convert("UTC").timestamp() * 1000)
        # advance by one candle
        return last_ts + timeframe_ms(TIMEFRAME)
    except Exception:
        # If file is corrupted, fall back to default
        return iso_to_ms(default_since_iso)

def fetch_ohlcv_all(symbol: str, timeframe: str, since_ms: int) -> pd.DataFrame:
    """Paginate OHLCV safely since `since_ms` → now with retries."""
    limit = getattr(ex, "fetch_ohlcv_limit", lambda tf: 1000)(timeframe) or 1000
    rows = []
    pbar_desc = f"{symbol} {timeframe}"
    tf_ms = timeframe_ms(timeframe)

    with tqdm(total=0, desc=pbar_desc) as pbar:
        while True:
            # retry wrapper
            for attempt in range(MAX_RETRIES):
                try:
                    data = ex.fetch_ohlcv(symbol, timeframe=timeframe, since=since_ms, limit=limit)
                    break
                except Exception as e:
                    wait = RETRY_WAIT_BASE ** attempt
                    tqdm.write(f"[retry {attempt+1}/{MAX_RETRIES}] {e} — sleeping {wait:.1f}s")
                    sleep(wait)
            else:
                # exhausted retries
                break

            if not data:
                break

            rows.extend(data)
            last_ts = data[-1][0]
            since_ms = last_ts + tf_ms

            # progress feedback
            pbar.set_postfix_str(datetime.fromtimestamp(last_ts/1000, tz=timezone.utc).isoformat())
            pbar.update(1)

            # stop when caught up (exchange returned less than limit)
            if len(data) < limit:
                break

            sleep(RATELIMIT_SLEEP)

    if not rows:
        return pd.DataFrame(columns=["timestamp","open","high","low","close","volume","symbol","exchange"]).set_index("timestamp")

    df = pd.DataFrame(rows, columns=["ts","open","high","low","close","volume"])
    df["timestamp"] = pd.to_datetime(df["ts"], unit="ms", utc=True)
    df = df.drop(columns=["ts"]).set_index("timestamp").sort_index()
    df["symbol"] = symbol
    df["exchange"] = ex.id
    # ensure 1h frequency alignment and drop exact dupes if any
    df = df[~df.index.duplicated(keep="last")]
    return df[["open","high","low","close","volume","symbol","exchange"]]

def upsert_csv(csv_path: str, new_df: pd.DataFrame):
    """Append new rows to CSV (or create). Deduplicate on timestamp."""
    if new_df.empty:
        return 0
    if os.path.exists(csv_path):
        old = pd.read_csv(csv_path, parse_dates=["timestamp"]).set_index("timestamp")
        combo = pd.concat([old, new_df]).sort_index()
        combo = combo[~combo.index.duplicated(keep="last")]
    else:
        combo = new_df
    combo.to_csv(csv_path, index=True)
    return len(new_df)

# -------- run for BTC/USDT from 2020 → now --------
for sym in SYMBOLS:
    base = sym.replace("/", "_")
    csv_path = os.path.join(SAVE_DIR, f"{EXCHANGE}_{base}_{TIMEFRAME}.csv")

    # choose starting point (resume if file exists)
    start_ms = read_existing_start(csv_path, SINCE)
    start_iso = datetime.fromtimestamp(start_ms/1000, tz=timezone.utc).isoformat()
    print(f"\nFetching {sym} {TIMEFRAME} from {start_iso} → now")

    df_new = fetch_ohlcv_all(sym, TIMEFRAME, start_ms)
    added = upsert_csv(csv_path, df_new)

    total = 0
    if os.path.exists(csv_path):
        total = len(pd.read_csv(csv_path))
    print(f"Saved: {csv_path}  (+{added} new rows, total {total})")


Fetching BTC/USDT 1h from 2020-01-01T00:00:00+00:00 → now


BTC/USDT 1h: 50it [00:28,  1.77it/s, 2025-08-18T18:00:00+00:00]


Saved: ../data/binance/binance_BTC_USDT_1h.csv  (+49331 new rows, total 49331)
