In [11]:
from typing import Type, cast
import ccxt
import ccxt.binance
from tenacity import (
    retry,
    wait_exponential,
    stop_after_attempt,
    retry_if_exception_type,
)


EXCHANGE_NAME = "binance"
FETCH_KWARGS = {
    "symbol": "BTC/USDT",
    "timeframe": "15m",
    "days_back": 7,
}


# 初始化交易所
@retry(
    wait=wait_exponential(multiplier=1, min=3, max=30),
    stop=stop_after_attempt(5),
    retry=retry_if_exception_type((ccxt.NetworkError, ccxt.ExchangeNotAvailable)),
    reraise=True,
)
def init_exchange(exchange_id: str = "binance", **kwargs) -> ccxt.Exchange:
    """带重试机制的交易所初始化"""
    try:
        if exchange_id not in ccxt.exchanges:
            raise ValueError(f"交易所 {exchange_id} 不存在")

        # 使用 cast 进行类型断言
        ExchangeClass: Type[ccxt.Exchange] = cast(
            Type[ccxt.Exchange], getattr(ccxt, exchange_id)
        )

        # 创建交易所实例
        exchange = ExchangeClass(
            {
                "enableRateLimit": True,
                "options": {"adjustForTimeDifference": True},
                **kwargs,
            }
        )
        exchange.load_markets()

        # 验证必要权限
        if not exchange.has["fetchOHLCV"]:
            raise RuntimeError(f"{exchange_id} 不支持OHLCV数据获取")

        return exchange
    except ccxt.AuthenticationError as e:
        raise ValueError("API密钥错误或权限不足") from e
    except ccxt.ExchangeError as e:
        raise RuntimeError(f"交易所初始化失败: {str(e)}") from e


# 执行初始化
crypto_ex = init_exchange(EXCHANGE_NAME)
print(f"交易所 {crypto_ex.name} 已初始化")

交易所 Binance 已初始化


In [12]:
import math
import pandas as pd


def fetch_historical_data(
    exchange: ccxt.Exchange,
    symbol: str = "BTC/USDT",
    timeframe: str = "15m",
    days_back: int = 7,
) -> pd.DataFrame:
    """通用化历史数据获取"""

    # 计算单个K线周期毫秒数
    timeframe_ms = exchange.parse_timeframe(timeframe) * 1000

    # 计算总需数据量
    total_candles = math.ceil((days_back * 24 * 60 * 60 * 1000) / timeframe_ms)

    all_data = []
    since = exchange.milliseconds() - (days_back * 24 * 60 * 60 * 1000)

    while len(all_data) < total_candles:
        try:
            data = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=1000)
            if not data:
                break

            since = data[-1][0] + timeframe_ms  # 精确时间推进
            all_data.extend(data)

            # 进度显示
            progress = min(len(all_data) / total_candles * 100, 100)
            print(f"\r获取进度: {progress:.1f}%", end="", flush=True)

            # 遵守速率限制
            exchange.sleep(exchange.rateLimit // 1000)

        except ccxt.NetworkError as e:
            print(f"\n网络异常: {str(e)}")
            exchange.sleep(5000)

    return pd.DataFrame(
        all_data, columns=["timestamp", "open", "high", "low", "close", "volume"]
    ).drop_duplicates()


# 执行数据获取
df = fetch_historical_data(crypto_ex, **FETCH_KWARGS)

df["datetime"] = pd.to_datetime(df["timestamp"], unit="ms", utc=True)
df = df.set_index("datetime").sort_index()
df

获取进度: 100.0%

Unnamed: 0_level_0,timestamp,open,high,low,close,volume
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2025-01-23 17:30:00+00:00,1737653400000,105286.94,105680.00,105224.13,105591.68,450.04338
2025-01-23 17:45:00+00:00,1737654300000,105591.68,106230.42,105555.20,105927.78,464.24396
2025-01-23 18:00:00+00:00,1737655200000,105927.78,106142.46,105170.46,105217.31,344.79177
2025-01-23 18:15:00+00:00,1737656100000,105217.32,105878.93,105205.66,105755.08,308.71357
2025-01-23 18:30:00+00:00,1737657000000,105755.08,105927.92,105100.00,105189.48,275.69431
...,...,...,...,...,...,...
2025-01-30 16:15:00+00:00,1738253700000,105638.29,106004.48,105544.00,105948.73,348.59595
2025-01-30 16:30:00+00:00,1738254600000,105948.73,106013.01,105687.01,105795.68,256.51760
2025-01-30 16:45:00+00:00,1738255500000,105795.68,105843.68,105624.53,105696.96,138.34753
2025-01-30 17:00:00+00:00,1738256400000,105696.96,105698.27,105272.72,105335.41,253.25501


In [13]:
import plotly.graph_objects as go
import numpy as np
from plotly.subplots import make_subplots


def adaptive_plot(df: pd.DataFrame) -> None:
    """完整数据可视化（禁用降采样）"""
    # 动态时间格式
    time_format = "%Y-%m-%d %H:%M" if "T" not in df.index[0].isoformat() else "%Y-%m-%d"

    fig = make_subplots(
        rows=2,
        cols=1,
        shared_xaxes=True,
        row_heights=[0.7, 0.3],
        vertical_spacing=0.05,
        specs=[[{"type": "scattergl"}], [{"type": "bar"}]],
    )

    # K线主图（使用WebGL加速）
    fig.add_trace(
        go.Candlestick(
            x=df.index,
            open=df.open,
            high=df.high,
            low=df.low,
            close=df.close,
            name="价格",
            increasing_line_color="#2ECC71",  # 绿色上涨
            decreasing_line_color="#E74C3C",  # 红色下跌
        ),
        row=1,
        col=1,
    )

    # 成交量（优化渲染性能）
    colors = np.where(df.close > df.open, "#2ECC71", "#E74C3C")
    fig.add_trace(
        go.Bar(
            x=df.index,
            y=df.volume,
            marker_color=colors,
            name="成交量",
            marker_line_width=0,
        ),
        row=2,
        col=1,
    )

    # 高性能布局配置
    fig.update_layout(
        height=800,
        template="plotly_white",
        hovermode="x unified",
        xaxis_rangeslider_visible=False,
        xaxis=dict(type="date", tickformat=time_format, rangeslider_thickness=0.02),
        yaxis=dict(title_text="价格 (USDT)"),
        yaxis2=dict(title_text="成交量 (BTC)"),
        plot_bgcolor="rgba(255,255,255,0.9)",
        modebar_add=["webgl"],
    )

    # WebGL加速配置
    fig.update_traces(selector=dict(type="candlestick"), opacity=1, line_width=1)

    fig.show()


def validate_dataset(df: pd.DataFrame) -> None:
    # 检查时间间隔是否符合预期
    time_diff = df.index.to_series().diff().dt.total_seconds()
    expected = pd.Timedelta(FETCH_KWARGS["timeframe"]).total_seconds()
    if not (time_diff[1:] == expected).all():
        print(f"警告：发现 {len(time_diff[time_diff != expected])} 处时间间隔异常")


validate_dataset(df)

# 智能可视化
adaptive_plot(df)

In [14]:
from datetime import datetime, timezone
import json
from pathlib import Path


def save_dataset(df: pd.DataFrame, params: dict, data_dir: str = "../data/raw") -> str:
    """
    带时间戳的标准化存储
    """
    # 创建存储路径
    date_str = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
    save_path = Path(data_dir) / date_str
    save_path.mkdir(parents=True, exist_ok=True)

    # 生成唯一文件名
    symbol_clean = params["symbol"].replace("/", "_").lower()
    filename = (
        f"{params['exchange_name']}_{symbol_clean}_{params['timeframe']}.csv"
    )

    # 保存数据
    df.to_csv(save_path / filename, index=True)

    # 保存元数据 (关键修正点)
    meta = {
        "generated_at": datetime.now(timezone.utc).isoformat(),
        "columns": list(df.columns),
        "params": params,
        "data_hash": int(pd.util.hash_pandas_object(df).sum()),
    }
    with open(save_path / f"{filename}.meta.json", "w") as f:
        json.dump(meta, f, indent=2)

    print(f"数据已保存至：{save_path / filename}")
    return str(save_path / filename)


save_dataset(
    df,
    params={
        "exchange_name": crypto_ex.name.lower(),
        **FETCH_KWARGS,
    },
    data_dir="../../data/raw",
)

数据已保存至：../../data/raw/20250130_171956/binance_btc_usdt_15m.csv


'../../data/raw/20250130_171956/binance_btc_usdt_15m.csv'