In [8]:
import pandas as pd
from datetime import datetime, timezone
import json, os, time
import numpy as np
from binance.client import Client
from binance.exceptions import BinanceAPIException, BinanceRequestException

In [9]:
# Data calculus
from datetime import datetime, timedelta, timezone
from dateutil.relativedelta import relativedelta
import math

In [10]:
# Data downloading chanks to ram
import json, time, pathlib
from typing import List

In [11]:
# Downloading chanks to file
import pathlib, pandas as pd, sys

In [12]:
%run ./Binance_keys.ipynb

In [13]:
client = Client(
    api_key=api_key,
    api_secret=api_secret,
    tld='com',
    testnet=False,
)
client.API_URL = client.API_URL.replace("api.", "api.")

In [14]:
def test_binance_connection(binance_client: Client) -> None:
    try:
        latency = binance_client.ping()
        acc_info = binance_client.get_account()
        print("✓ Ping OK, server reachable.")
        print(f"✓ Account type: {acc_info['accountType']}, "
              f"makerCommission: {acc_info['makerCommission']}")
    except (BinanceAPIException, BinanceRequestException) as e:
        print("Binance connection failed:", e)
        raise

In [15]:
test_binance_connection(client)

✓ Ping OK, server reachable.
✓ Account type: SPOT, makerCommission: 10


In [16]:
def generate_minute_windows(months: int = 12,
                            interval_minutes: int = 1,
                            limit: int = 1000):

    end_dt   = datetime.now(timezone.utc).replace(second=0, microsecond=0)
    start_dt = end_dt - relativedelta(months=months)

    total_minutes = int((end_dt - start_dt).total_seconds() / 60)
    step          = limit * interval_minutes                 # 1000
    n_windows     = math.ceil(total_minutes / step)

    windows_dt = []
    cur_start  = start_dt
    for _ in range(n_windows):
        cur_end = min(cur_start + timedelta(minutes=step-1), end_dt)
        windows_dt.append((cur_start, cur_end))
        cur_start = cur_end + timedelta(minutes=1)

    to_ms = lambda dt: int(dt.timestamp() * 1000)
    windows_ms = [(to_ms(s), to_ms(e)) for s, e in windows_dt]

    print(f"Generated {len(windows_dt)} windows "
          f"({total_minutes} min total, {step}‑min chunks).")
    print("First w:", windows_dt[0])
    print("Last  w :", windows_dt[-1])
    return windows_dt, windows_ms

windows_dt, windows_ms = generate_minute_windows()


Generated 526 windows (525600 min total, 1000‑min chunks).
First w: (datetime.datetime(2024, 5, 18, 14, 46, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 5, 19, 7, 25, tzinfo=datetime.timezone.utc))
Last  w : (datetime.datetime(2025, 5, 18, 4, 46, tzinfo=datetime.timezone.utc), datetime.datetime(2025, 5, 18, 14, 46, tzinfo=datetime.timezone.utc))


In [17]:
CHECKPOINT_PATH = pathlib.Path("data_final/checkpoint.json")
SYMBOL          = "BTCUSDT"
INTERVAL        = Client.KLINE_INTERVAL_1MINUTE
MAX_RETRIES     = 5          # за окно
SLEEP_BETWEEN   = 0.06       # 16req/сек

In [18]:
def load_checkpoint() -> int:
    if CHECKPOINT_PATH.exists():
        with open(CHECKPOINT_PATH) as f:
            data = json.load(f)
            return data.get("last_idx", -1)
    return -1

In [19]:
def save_checkpoint(idx: int) -> None:
    with open(CHECKPOINT_PATH, "w") as f:
        json.dump({"last_idx": idx}, f)

In [20]:
def fetch_window(symbol: str, start_ms: int, end_ms: int,
                 client: Client, max_retries: int = MAX_RETRIES):
    attempt = 0
    while attempt <= max_retries:
        try:
            raw = client.get_historical_klines(
                symbol=symbol,
                interval=INTERVAL,
                start_str=start_ms,
                end_str=end_ms,
                limit=1000,
            )
            return raw
        except Exception as e:
            wait = 0.5 * (2 ** attempt)
            print(f"[{symbol}] Window fetch fail (attempt {attempt}): {e}. "
                  f"Retry in {wait:.1f}s …")
            time.sleep(wait)
            attempt += 1
    raise RuntimeError(f"Exceeded {max_retries} retries for window")

In [21]:
def convert_to_dataframe(raw_data: List[list]) -> pd.DataFrame:
    cols = ['Open Time', 'Open', 'High', 'Low', 'Close', 'Volume',
            'Close Time', 'Quote Asset Volume', 'Trades',
            'Taker Buy Base', 'Taker Buy Quote', 'Ignore']

    df = pd.DataFrame(raw_data, columns=cols)
    df = df[['Open Time', 'Open', 'High', 'Low', 'Close', 'Volume']]
    df['Open Time'] = pd.to_datetime(df['Open Time'], unit='ms', utc=True)
    df = df.astype({'Open':'float', 'High':'float', 'Low':'float',
                    'Close':'float', 'Volume':'float'})
    df.set_index('Open Time', inplace=True)
    return df

In [22]:
last_done = load_checkpoint()
print(f"Checkpoint: last completed window idx = {last_done}")

chunks = []

for idx, (start_ms, end_ms) in enumerate(windows_ms):
    if idx <= last_done:
        continue
    time.sleep(SLEEP_BETWEEN)

    raw = fetch_window(SYMBOL, start_ms, end_ms, client)
    df_chunk = convert_to_dataframe(raw)
    chunks.append(df_chunk)

    save_checkpoint(idx)
    print(f"✓ Window {idx}/{len(windows_ms)-1} "
          f"({len(df_chunk)} rows) downloaded.")

print("Download loop finished.")

Checkpoint: last completed window idx = 525
Download loop finished.


In [23]:
DATA_DIR = pathlib.Path("data_raw")

def load_all_chunks():
    if "chunks" in globals() and isinstance(chunks, list) and len(chunks):
        print("Usе in‑memory chunk")
        return chunks
    files = sorted(DATA_DIR.glob("btc1m_win*.parquet"))
    if not files:
        sys.exit("No chunks found")
    print(f"Loading {len(files)} parquet chunks from {DATA_DIR}")
    return [pd.read_parquet(f) for f in files]

In [26]:
all_chunks = load_all_chunks()
df = (pd.concat(all_chunks)
        .loc[~pd.concat(all_chunks).index.duplicated(keep='first')]
        .sort_index())

full_idx = pd.date_range(df.index.min(), df.index.max(), freq="1min", tz="UTC")
missing = full_idx.difference(df.index)

mem_mb = df.memory_usage(deep=True).sum() / 1024**2

print("Result: ")
print("─────────────────────────")
print(f"Rows            : {len(df):,}")
print(f"Date range      : {df.index.min()} → {df.index.max()}")
print(f"Missing minutes : {len(missing)}")
print(f"Memory usage    : {mem_mb:.1f} MB")
print("─────────────────────────")

if len(missing):
    print("First 5 missing:", missing[:5].astype(str))


SystemExit: No chunks found

In [27]:
OUTPUT_DIR = pathlib.Path("data_final")
OUTPUT_DIR.mkdir(exist_ok=True)

PARQUET_PATH = OUTPUT_DIR / "btcusdt_1m_20240511‑20250511.parquet"
FEATHER_PATH = OUTPUT_DIR / "btcusdt_1m_20240511‑20250511.feather"

df.to_parquet(PARQUET_PATH, engine="pyarrow", compression="zstd")
print(f"Parquet saved → {PARQUET_PATH}")

df.reset_index().to_feather(FEATHER_PATH)
print(f"Feather saved → {FEATHER_PATH}")

df_chk = pd.read_parquet(PARQUET_PATH)
assert len(df_chk) == len(df), "Row‑count mismatch"
print("OK, rows:", len(df_chk))


NameError: name 'df' is not defined

In [28]:
# STEP 5.1
def rolling_winsorize(series: pd.Series,
                      window: int = 10_080,
                      sigma: float = 3.0) -> pd.Series:
    roll_mean = series.rolling(window, min_periods=1).mean()
    roll_std  = series.rolling(window, min_periods=1).std(ddof=0)
    lower     = roll_mean - sigma * roll_std
    upper     = roll_mean + sigma * roll_std
    return series.clip(lower, upper)

df['Volume_Clipped'] = rolling_winsorize(df['Volume'])
df['LogRet_1m_raw']  = np.log(df['Close'] / df['Close'].shift(1))
df['LogRet_1m']      = rolling_winsorize(df['LogRet_1m_raw'])

df.drop(columns=['LogRet_1m_raw'], inplace=True)



NameError: name 'df' is not defined