In [2]:
import asyncio
from websockets import connect
import aiofiles
import sys
import json
import httpx
import time
from datetime import datetime


In [None]:
import asyncio
import csv
import os
from datetime import datetime, timedelta
import ccxt.async_support as ccxt

# 定义交易所
exchanges = {
    "binance": ccxt.binance({'enableRateLimit': True}),
    "okx": ccxt.okx({'enableRateLimit': True}),
    "gate": ccxt.gateio({'enableRateLimit': True}),
    "bitget": ccxt.bitget({'enableRateLimit': True}),
    "bybit": ccxt.bybit({'enableRateLimit': True}),
    "hyperliquid": ccxt.hyperliquid({'enableRateLimit': True})
}

# 需要获取的币种（最好使用完整交易对，例如 "BTC/USDT"）
symbols = ["BTC/USDT", "SOL/USDT", "ETH/USDT"]

# 创建输出目录
OUTPUT_DIR = "./historical_data_1m"
os.makedirs(OUTPUT_DIR, exist_ok=True)

async def fetch_exchange_symbol_ohlcv(exchange, symbol):
    """
    获取过去 30 天的 1m K 线 + 当前订单簿的 best bid/ask。
    将结果存储到单独的 CSV 文件中：   {exchange.id}_{symbol.replace('/', '-')}_1m_data.csv
    """
    timeframe = "1m"
    now = datetime.utcnow()
    # 30 天前
    since = int((now - timedelta(days=30)).timestamp() * 1000)
    limit = 1000  # 单次请求上限

    all_candles = []
    
    # 分页获取 K 线
    while True:
        try:
            candles = await exchange.fetch_ohlcv(symbol, timeframe=timeframe, since=since, limit=limit)
            if not candles:
                break

            all_candles += candles

            # 如果拿到的数据不足 limit，说明没有更多数据了
            if len(candles) < limit:
                break

            # 下一次抓取的起点：最后一根 K 线时间 + 1 毫秒
            since = candles[-1][0] + 1

            # 如果已超过当前时间，就停止
            if candles[-1][0] >= int(now.timestamp() * 1000):
                break
        except Exception as e:
            print(f"Error fetching OHLCV from {exchange.id} - {symbol}: {e}")
            break

    # 获取一次“当前订单簿” best bid/ask
    try:
        orderbook = await exchange.fetch_order_book(symbol, limit=1)
        best_bid = orderbook["bids"][0][0] if orderbook["bids"] else None
        best_ask = orderbook["asks"][0][0] if orderbook["asks"] else None
        best_bid_vol = orderbook["bids"][0][1] if orderbook["bids"] else None
        best_ask_vol = orderbook["asks"][0][1] if orderbook["asks"] else None
    except Exception as e:
        print(f"Error fetching order book from {exchange.id} - {symbol}: {e}")
        best_bid, best_ask = None, None
        best_bid_vol, best_ask_vol = None, None

    # 构造要写入 CSV 的数据
    rows = []
    for candle in all_candles:
        # candle: [timestamp, open, high, low, close, volume]
        candle_time_ms = candle[0]
        ohlcv_open = candle[1]
        ohlcv_high = candle[2]
        ohlcv_low  = candle[3]
        ohlcv_close= candle[4]
        ohlcv_volume = candle[5]

        # 转换成字符串时间
        candle_time_str = datetime.utcfromtimestamp(candle_time_ms / 1000).strftime("%Y-%m-%d %H:%M")

        row = [
            candle_time_str,               # 1) 时间
            exchange.id,                   # 2) 交易所 ID
            symbol,                        # 3) 币对
            ohlcv_open,                    # 4) 开盘
            ohlcv_high,                    # 5) 最高
            ohlcv_low,                     # 6) 最低
            ohlcv_close,                   # 7) 收盘
            ohlcv_volume,                  # 8) 成交量
            best_bid,                      # 9) 当前最优买价
            best_ask,                      # 10) 当前最优卖价
            best_bid_vol,                  # 11) 最优买单量
            best_ask_vol                   # 12) 最优卖单量
        ]
        rows.append(row)

    # 保存到 CSV
    # 组装文件名： {exchange.id}_{symbol.replace('/', '-')}_1m_data.csv
    csv_filename = f"{exchange.id}_{symbol.replace('/', '-')}_1m_data.csv"
    csv_path = os.path.join(OUTPUT_DIR, csv_filename)

    file_existed = os.path.isfile(csv_path)
    with open(csv_path, "a", newline='') as f:
        writer = csv.writer(f)
        # 如果文件之前不存在，则写入表头
        if not file_existed:
            writer.writerow([
                "timestamp", "exchange", "symbol", 
                "open", "high", "low", "close", "volume", 
                "best_bid", "best_ask", "best_bid_vol", "best_ask_vol"
            ])
        writer.writerows(rows)

    print(f">>> Finished: {exchange.id} / {symbol}, candles={len(all_candles)}")

async def fetch_all_exchanges():
    tasks = []
    for exchange_id, exchange in exchanges.items():
        for symbol in symbols:
            # 注意：你若只传 "BTC" 可能报错；一般需 "BTC/USDT" 之类
            tasks.append(fetch_exchange_symbol_ohlcv(exchange, symbol))
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(fetch_all_exchanges())

In [14]:
import json


try:
    with open('exchange.json', 'r') as file:
        pairs_config = json.load(file)
    print(type(pairs_config))
except FileNotFoundError:
    print(f"Error: The file {file_path} was not found.")
except json.JSONDecodeError:
    print(f"Error: Failed to parse the JSON file {file_path}.")

# 将 pairs_config 中的键值对转换为元组
pairs_tuples = [(key, val) for key, values in pairs_config.items() for val in values]
print(pairs_tuples)

        

<class 'dict'>
[('bybit', 'BTCUSDT'), ('bybit', 'channel'), ('bitget', 'BTCUSDT'), ('okx', 'BTCUSDT'), ('binance', 'BTCUSDT')]
