In [1]:
import pandas as pd
import numpy as np
from datetime import datetime
import requests
import io
import zipfile
import os
from tqdm import tqdm

In [2]:
# ================= 配置参数 =================
SYMBOL_LIST = ['BTCUSDT', 'ETHUSDT']
INTERVAL = '1m'
START_DATE = "2020-01-01" 
END_DATE = datetime.now()

# K线标准列定义
KLINE_COLS_DEF = [
    'open_time', 'open', 'high', 'low', 'close',
    'volume', 'close_time', 'quote_volume', 
    'count', 'taker_buy_volume', 
    'taker_buy_quote_volume', 'ignore'
    ]

# 需要转换为数值类型的列
NUMERIC_COLS = [
    'open', 'high', 'low', 'close', 'volume',
    'quote_volume', 'count','taker_buy_volume',
    'taker_buy_quote_volume'
    ]

In [None]:
# ================= 核心工具函数 =================
def read_csv_auto(file_handle, fallback_columns=None):
    """
    智能读取：自动判断 CSV 是否有表头
    """
    try:
        first_line = file_handle.readline().decode('utf-8').strip()  # 币安早期的数据没有header，后期的有header，因此需要先读取第一行看一下
        if not first_line: return None # 空文件返回None
        
        first_token = first_line.split(',')[0]  # 判断第一列是否为数字(Binance数据第一列通常是时间戳)
        has_header = False
        try:  # 这里判断的逻辑是看第一列数据能不能转换成float，如果有header的话，header为str，会报错
            float(first_token) # 转换成功 -> 是数字 -> 无表头
            has_header = False
        except ValueError:     # 转换失败 -> 是单词 -> 有表头
            has_header = True
            
        file_handle.seek(0)   # 重置指针，readline会把光标移动到第二行

        if has_header:
            return pd.read_csv(file_handle, header=0)
        else:
            return pd.read_csv(file_handle, header=None, names=fallback_columns)
    except Exception as e:
        print(f"读取CSV失败: {e}")
        return None

def fetch_and_clean_kline(symbol, date, url):
    """下载并清洗 K线 / MarkPrice"""
    try:
        resp = requests.get(url)
        if resp.status_code != 200: return None

        with zipfile.ZipFile(io.BytesIO(resp.content)) as z:
            csv_name = z.namelist()[0]
            with z.open(csv_name) as f:
                df = read_csv_auto(f, fallback_columns=KLINE_COLS_DEF)   # 智能读取，如果klines没有header会自动用KLINE_COLS_DEF
                
                # 基础数据清洗：数据类型的转换
                df['open_time'] = pd.to_datetime(df['open_time'], unit='ms')
                cols_to_convert = [c for c in NUMERIC_COLS if c in df.columns]
                df[cols_to_convert] = df[cols_to_convert].apply(pd.to_numeric, errors='coerce')
                
                # 内存优化：只返回需要的列
                keep_cols = ['open_time'] + cols_to_convert
                return df[keep_cols]
    except Exception as e:
        print(f"\n[K线异常] {symbol} {date}: {e}")
        return None

def fetch_monthly_funding(symbol, month_str):
    """下载并清洗资金费率"""
    url = f"https://data.binance.vision/data/futures/um/monthly/fundingRate/{symbol}/{symbol}-fundingRate-{month_str}.zip"
    try:
        resp = requests.get(url)
        if resp.status_code != 200: return None
        
        with zipfile.ZipFile(io.BytesIO(resp.content)) as z:
            with z.open(z.namelist()[0]) as f:
                df = read_csv_auto(f) # 费率文件通常自带表头
        
        # --- 列名兼容性处理 ---
        # 时间列处理：这里kline数据的header里，时间列为open_time，而资金费率数据的header里，时间列为calc_time，要分开处理
        if 'calc_time' in df.columns:
            df['open_time'] = pd.to_datetime(df['calc_time'], unit='ms')
        elif 'open_time' in df.columns: # 防止某些文件已经是 open_time
            df['open_time'] = pd.to_datetime(df['open_time'], unit='ms')
            
        # 费率列处理：币安后期数据的费率列名为last_funding_rate, 前期的列名为fundingRate，因此这里统一改成last_funding_rate
        if 'last_funding_rate' not in df.columns:
            if 'fundingRate' in df.columns:
                df.rename(columns={'fundingRate': 'last_funding_rate'}, inplace=True)
        
        # 周期列处理：早期币安费率数据没有funding_interval_hours, 因此这里如果没有则设为默认值8
        if 'funding_interval_hours' not in df.columns:
            df['funding_interval_hours'] = 8 

        # 返回清洗后的三列
        return df[['open_time', 'last_funding_rate', 'funding_interval_hours']]
    except Exception as e:
        print(f" [费率异常: {e}]", end="")
        return None


In [None]:
# ================= 主程序 =================
dates = pd.date_range(start=START_DATE, end=END_DATE, freq='D')
str_dates = dates.strftime('%Y-%m-%d').tolist()

os.makedirs("data_parquet", exist_ok=True)

for symbol in SYMBOL_LIST:
    print(f"\n开始处理: {symbol} | 总天数: {len(str_dates)}")

    all_kline_dfs = []
    all_mark_dfs = []
    all_fund_dfs = []
    
    # 记录已处理的月份
    processed_months = set()
    for date in tqdm(str_dates): 
        
        # 下载当月费率
        current_month = date[:7]
        if current_month not in processed_months:
            fund_data = fetch_monthly_funding(symbol, current_month)
            if fund_data is not None:
                all_fund_dfs.append(fund_data)
            processed_months.add(current_month) # 标记已处理

        # 下载每日数据
        kline_url = f"https://data.binance.vision/data/futures/um/daily/klines/{symbol}/{INTERVAL}/{symbol}-{INTERVAL}-{date}.zip"
        mark_url = f"https://data.binance.vision/data/futures/um/daily/markPriceKlines/{symbol}/{INTERVAL}/{symbol}-{INTERVAL}-{date}.zip"
        
        k_data = fetch_and_clean_kline(symbol, date, kline_url)
        m_data = fetch_and_clean_kline(symbol, date, mark_url)
        
        if k_data is not None: all_kline_dfs.append(k_data)
        if m_data is not None: all_mark_dfs.append(m_data)
        
    print(f"\n正在合并 {symbol} 数据...")
    
    if not all_kline_dfs:
        print(f"{symbol} 无数据，跳过。")
        continue
    
    # 合并K线数据
    full_kline_df = pd.concat(all_kline_dfs).sort_values('open_time').reset_index(drop=True)
    
    # 合并MarkPrice
    if all_mark_dfs:
        full_mark_df = pd.concat(all_mark_dfs).sort_values('open_time').reset_index(drop=True)
        # MarkPrice只保留OHLC，防止列名冲突
        full_mark_df = full_mark_df[['open_time', 'open', 'high', 'low', 'close']]
        
        final_df = pd.merge(full_kline_df, full_mark_df, on='open_time', how='left', suffixes=('', '_mark'))
    else:
        final_df = full_kline_df

    # 合并资金费率
    if all_fund_dfs:
        full_fund_df = pd.concat(all_fund_dfs).sort_values('open_time').reset_index(drop=True)
        
        # 左连接：把费率挂到对应的open_time上
        final_df = pd.merge(final_df, full_fund_df, on='open_time', how='left')
        
        cols_to_fill = ['last_funding_rate', 'funding_interval_hours']
        final_df[cols_to_fill] = final_df[cols_to_fill].ffill()   # 币安会在每8小时的整点更新费率，然后在这一刻会按照持仓*费率来调整保证金
        
        # 填补开头缺失(如果K线开始时间早于第一个费率结算点)
        final_df[cols_to_fill] = final_df[cols_to_fill].fillna(0)
    else:
        print(f"⚠️ 未获取到 {symbol} 资金费率")
        final_df['last_funding_rate'] = 0
        final_df['funding_interval_hours'] = 8

    final_df['code'] = symbol

    print(f"✅ 完成 {symbol}！Shape: {final_df.shape}")
    display(final_df.head())
    
    # 保存
    final_df.to_parquet(f"data_parquet/{symbol}_{INTERVAL}.parquet")


开始处理: BTCUSDT | 总天数: 2186


100%|██████████| 2186/2186 [31:03<00:00,  1.17it/s] 



正在合并 BTCUSDT 数据...
✅ 完成 BTCUSDT！Shape: (3146400, 17)


Unnamed: 0,open_time,open,high,low,close,volume,quote_volume,count,taker_buy_volume,taker_buy_quote_volume,open_mark,high_mark,low_mark,close_mark,last_funding_rate,funding_interval_hours,code
0,2020-01-01 00:00:00,7189.43,7190.52,7177.0,7182.44,246.092,1767430.0,336,46.63,334813.1982,7195.365819,7195.365819,7185.821968,7186.832937,-0.000124,8.0,BTCUSDT
1,2020-01-01 00:01:00,7182.43,7182.44,7178.75,7179.01,70.909,509145.8,140,32.597,234063.27884,7187.355631,7188.351086,7185.515767,7186.446199,-0.000124,8.0,BTCUSDT
2,2020-01-01 00:02:00,7179.01,7179.01,7175.25,7177.93,99.42,713539.6,148,16.311,117066.92118,7186.2508,7186.2508,7182.968351,7183.254896,-0.000124,8.0,BTCUSDT
3,2020-01-01 00:03:00,7177.77,7182.6,7177.0,7181.11,69.33,497793.4,104,43.723,313920.02981,7183.309465,7187.865179,7183.309465,7187.620756,-0.000124,8.0,BTCUSDT
4,2020-01-01 00:04:00,7179.1,7179.1,7172.94,7175.25,97.368,698627.4,193,36.616,262734.68999,7187.246287,7187.246287,7180.29026,7180.422124,-0.000124,8.0,BTCUSDT



开始处理: ETHUSDT | 总天数: 2186


100%|██████████| 2186/2186 [50:08<00:00,  1.38s/it]    



正在合并 ETHUSDT 数据...
✅ 完成 ETHUSDT！Shape: (3146400, 17)


Unnamed: 0,open_time,open,high,low,close,volume,quote_volume,count,taker_buy_volume,taker_buy_quote_volume,open_mark,high_mark,low_mark,close_mark,last_funding_rate,funding_interval_hours,code
0,2020-01-01 00:00:00,129.12,129.12,128.91,128.97,289.04,37296.67223,18,155.107,20005.44348,129.149636,129.179051,129.020013,129.036012,0.0001,8.0,ETHUSDT
1,2020-01-01 00:01:00,128.95,129.04,128.93,128.94,695.566,89717.06489,49,284.536,36703.77497,129.036012,129.101554,129.030463,129.052623,0.0001,8.0,ETHUSDT
2,2020-01-01 00:02:00,128.93,128.97,128.92,128.92,131.902,17008.70674,17,107.865,13908.75485,129.052623,129.052623,128.998682,129.003228,0.0001,8.0,ETHUSDT
3,2020-01-01 00:03:00,128.97,128.98,128.93,128.96,455.572,58748.97547,19,306.033,39464.28873,129.001864,129.037843,128.991404,129.035113,0.0001,8.0,ETHUSDT
4,2020-01-01 00:04:00,128.95,128.95,128.76,128.81,866.117,111594.84023,48,273.595,35249.08931,129.021475,129.021475,128.890346,128.892163,0.0001,8.0,ETHUSDT
