In [2]:
import pandas as pd
import numpy as np
import os
import lightgbm as lgb
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import StandardScaler
from lightgbm import log_evaluation, early_stopping
import matplotlib.pyplot as plt
from datetime import datetime
import warnings

# 忽略警告，使输出更清晰
warnings.filterwarnings('ignore')

# 策略参数
LOOKAHEAD_DAYS = 15     # 预测未来N日涨跌
TRAIN_TEST_SPLIT = '2022-01-01'  # 训练集截止日期
FEATURE_WINDOW = 20    # 特征计算窗口
SIGNAL_THRESHOLD_HIGH = 0.65   # 买入阈值
SIGNAL_THRESHOLD_LOW = 0.35    # 卖出阈值
MIN_HOLDING_DAYS = 3   # 最少持有天数
TREND_FILTER_DAYS = 10  # 趋势过滤窗口
MAX_LOSS = -0.10       # 10%止损
MAX_PROFIT = 0.15      # 15%止盈
DATE_RANGE = ('2010-01-01', '2022-12-31')  # 回测日期范围

def create_features(df):
    """增强版特征工程，添加中长期趋势特征"""
    # 复制数据以避免修改原始数据
    df = df.copy()
    
    # 检查必要的列是否存在
    required_cols = ['CLOSE', 'HIGH', 'LOW', 'VOLUME']
    missing_cols = [col for col in required_cols if col not in df.columns]
    if missing_cols:
        raise ValueError(f"数据缺少必要的列: {missing_cols}")
    
    # 确保有ADJ列，如果没有则默认为1
    if 'ADJ' not in df.columns:
        df['ADJ'] = 1.0
    
    # 使用复权价格
    df['CLOSE_ADJ'] = df['CLOSE'] * df['ADJ']
    
    # 基础特征
    df['returns'] = df['CLOSE_ADJ'].pct_change()
    df['volatility'] = df['returns'].rolling(FEATURE_WINDOW).std()
    
    # 技术指标 - 短期
    df['ma5'] = df['CLOSE_ADJ'].rolling(5).mean()
    df['ma20'] = df['CLOSE_ADJ'].rolling(20).mean()
    df['rsi'] = 100 - (100 / (1 + (df['CLOSE_ADJ'].diff().clip(lower=0).rolling(14).mean() / 
                               abs(df['CLOSE_ADJ'].diff().clip(upper=0).rolling(14).mean().replace(0, np.nan)))))
    
    # 新增中长期趋势特征
    df['ma50'] = df['CLOSE_ADJ'].rolling(50).mean()
    df['ma120'] = df['CLOSE_ADJ'].rolling(120).mean()
    
    # 趋势指标
    df['trend_10d'] = df['CLOSE_ADJ'].pct_change(10)
    df['trend_30d'] = df['CLOSE_ADJ'].pct_change(30)
    
    # 均线交叉指标
    df['ma_cross_5_20'] = (df['ma5'] > df['ma20']).astype(int)
    df['ma_cross_20_50'] = (df['ma20'] > df['ma50']).astype(int)
    
    # 价格相对均线位置
    df['price_rel_ma20'] = df['CLOSE_ADJ'] / df['ma20'] - 1
    df['price_rel_ma50'] = df['CLOSE_ADJ'] / df['ma50'] - 1
    
    # 成交量特征
    df['volume_ma'] = df['VOLUME'].rolling(FEATURE_WINDOW).mean()
    df['volume_change'] = df['VOLUME'] / df['volume_ma']
    df['volume_trend'] = df['VOLUME'].rolling(10).mean() / df['VOLUME'].rolling(30).mean()
    
    # 波动特征
    df['range'] = (df['HIGH'] - df['LOW']) / df['CLOSE_ADJ']
    df['atr'] = df['HIGH'].rolling(FEATURE_WINDOW).max() - df['LOW'].rolling(FEATURE_WINDOW).min()
    
    # 目标变量：未来LOOKAHEAD_DAYS日是否上涨
    df['target'] = (df['CLOSE_ADJ'].shift(-LOOKAHEAD_DAYS) > df['CLOSE_ADJ']).astype(int)
    
    # 滞后特征
    for lag in [1, 3, 5, 10]:
        df[f'return_lag{lag}'] = df['returns'].shift(lag)
    
    return df.dropna()

def run_strategy(df, symbol_name="Unknown"):
    """对单个品种执行完整的策略回测"""
    print(f"处理品种: {symbol_name}...")
    
    try:
        # 确保数据包含所需的列
        if 'CLOSE' not in df.columns:
            print(f"跳过 {symbol_name}: 缺少必要的列")
            return None
            
        # 转换日期格式
        if 'CLOCK' in df.columns:
            df['CLOCK'] = pd.to_datetime(df.CLOCK)
            df = df.set_index('CLOCK')
        elif df.index.dtype != 'datetime64[ns]':
            # 尝试将索引转换为日期类型
            df.index = pd.to_datetime(df.index)
        
        # 筛选日期范围
        start_date, end_date = pd.to_datetime(DATE_RANGE[0]), pd.to_datetime(DATE_RANGE[1])
        df = df[(df.index >= start_date) & (df.index <= end_date)]
        
        if len(df) < 252:  # 至少需要一年的数据
            print(f"跳过 {symbol_name}: 数据点不足 ({len(df)})")
            return None
            
        # 创建特征
        feat_df = create_features(df)
        
        # 定义特征
        features = [
            'returns', 'volatility', 'ma5', 'ma20', 'ma50', 'ma120', 'rsi',
            'trend_10d', 'trend_30d', 'ma_cross_5_20', 'ma_cross_20_50',
            'price_rel_ma20', 'price_rel_ma50', 'volume_ma', 'volume_change',
            'volume_trend', 'range', 'atr'
        ] + [f'return_lag{lag}' for lag in [1, 3, 5, 10]]
        
        # 准备训练和测试数据
        X = feat_df[features]
        y = feat_df['target']
        
        # 时间序列分割
        train_mask = feat_df.index < TRAIN_TEST_SPLIT
        X_train, X_test = X[train_mask], X[~train_mask]
        y_train, y_test = y[train_mask], y[~train_mask]
        
        if len(X_train) < 252 or len(X_test) < 63:  # 训练集至少一年，测试集至少3个月
            print(f"跳过 {symbol_name}: 训练或测试数据不足")
            return None
        
        # 标准化
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        # LightGBM参数
        params = {
            'objective': 'binary',
            'metric': 'auc',
            'learning_rate': 0.03,
            'max_depth': 4,
            'num_leaves': 16,
            'feature_fraction': 0.7,
            'bagging_fraction': 0.8,
            'bagging_freq': 5,
            'min_data_in_leaf': 20,
            'lambda_l1': 0.1,
            'lambda_l2': 0.1,
            'verbosity': -1
        }
        
        # 创建回调函数
        callbacks = [log_evaluation(period=0), early_stopping(stopping_rounds=50)]
        
        # 时间序列交叉验证
        tscv = TimeSeriesSplit(n_splits=3)
        models = []
        
        for train_idx, val_idx in tscv.split(X_train_scaled):
            train_data = lgb.Dataset(X_train_scaled[train_idx], label=y_train.iloc[train_idx])
            val_data = lgb.Dataset(X_train_scaled[val_idx], label=y_train.iloc[val_idx])
            
            model = lgb.train(params, train_data, valid_sets=[val_data], 
                             num_boost_round=1000, callbacks=callbacks)
            models.append(model)
        
        # 全样本预测
        X_full_scaled = scaler.transform(X)
        full_probs = np.mean([model.predict(X_full_scaled) for model in models], axis=0)
        
        # 生成预测概率和平滑概率
        feat_df['predict_proba'] = full_probs
        feat_df['predict_proba_smooth'] = feat_df['predict_proba'].rolling(5).mean().fillna(feat_df['predict_proba'])
        
        # 趋势过滤器
        feat_df['price_trend'] = feat_df['CLOSE_ADJ'].rolling(TREND_FILTER_DAYS).mean().pct_change(TREND_FILTER_DAYS)
        
        # 生成原始信号
        feat_df['raw_signal'] = 0
        feat_df.loc[feat_df['predict_proba_smooth'] > SIGNAL_THRESHOLD_HIGH, 'raw_signal'] = 1
        feat_df.loc[feat_df['predict_proba_smooth'] < SIGNAL_THRESHOLD_LOW, 'raw_signal'] = -1
        
        # 应用趋势过滤
        feat_df['filtered_signal'] = 0
        feat_df.loc[(feat_df['raw_signal'] == 1) & 
                 ((feat_df['price_trend'] > 0) | (feat_df['predict_proba_smooth'] > 0.75)), 
                 'filtered_signal'] = 1
        feat_df.loc[(feat_df['raw_signal'] == -1) & 
                 ((feat_df['price_trend'] < 0) | (feat_df['predict_proba_smooth'] < 0.25)), 
                 'filtered_signal'] = -1
        
        # 初始化持仓管理变量
        feat_df['signal'] = 0
        feat_df['position'] = 0
        feat_df['days_in_position'] = 0
        feat_df['cumulative_price_change'] = 0.0
        feat_df['stop_triggered'] = False
        
        # 执行持仓管理逻辑
        for i in range(1, len(feat_df)):
            prev_pos = feat_df['position'].iloc[i-1]
            curr_signal = feat_df['filtered_signal'].iloc[i]
            days_held = feat_df['days_in_position'].iloc[i-1]
            
            # 默认保持前一天的持仓
            feat_df['position'].iloc[i] = prev_pos
            feat_df['days_in_position'].iloc[i] = days_held + 1 if prev_pos != 0 else 0
            
            # 如果前一天有持仓，计算价格变化百分比
            if prev_pos != 0:
                curr_price = feat_df['CLOSE_ADJ'].iloc[i]
                entry_price = feat_df['CLOSE_ADJ'].iloc[i-days_held] if days_held > 0 else curr_price
                price_change = (curr_price - entry_price) / entry_price * prev_pos  # 考虑方向
                feat_df['cumulative_price_change'].iloc[i] = price_change
                
                # 止损逻辑
                if price_change <= MAX_LOSS:
                    feat_df['position'].iloc[i] = 0
                    feat_df['signal'].iloc[i] = -prev_pos
                    feat_df['days_in_position'].iloc[i] = 0
                    feat_df['stop_triggered'].iloc[i] = True
                    continue
                    
                # 止盈逻辑
                if price_change >= MAX_PROFIT:
                    feat_df['position'].iloc[i] = 0
                    feat_df['signal'].iloc[i] = -prev_pos
                    feat_df['days_in_position'].iloc[i] = 0
                    continue
            
            if prev_pos == 0:  # 当前无持仓
                if curr_signal != 0:  # 有新信号
                    feat_df['position'].iloc[i] = curr_signal
                    feat_df['signal'].iloc[i] = curr_signal
                    feat_df['days_in_position'].iloc[i] = 1
            else:  # 当前有持仓
                # 如果持仓天数已超过最小持有期，并且有反向信号
                if days_held >= MIN_HOLDING_DAYS and (curr_signal * prev_pos < 0 or curr_signal == -prev_pos):
                    feat_df['position'].iloc[i] = curr_signal
                    feat_df['signal'].iloc[i] = curr_signal
                    if curr_signal == 0:
                        feat_df['days_in_position'].iloc[i] = 0
        
        # 运行回测
        return backtest_strategy(feat_df, symbol_name)
        
    except Exception as e:
        print(f"处理 {symbol_name} 时出错: {str(e)}")
        return None

def backtest_strategy(df, symbol_name):
    """执行策略回测并计算性能指标"""
    df = df.copy()
    
    # 计算收益
    df['strategy_return'] = df['position'].shift(1) * df['returns']
    
    # 考虑交易成本（双边万2）
    trade_changes = df['position'].diff().abs() > 0
    df['strategy_return'] -= trade_changes * 0.0002
    
    # 累计收益
    df['cumulative_return'] = (1 + df['strategy_return']).cumprod()
    
    # 计算买入持有基准收益
    df['buy_hold_return'] = (1 + df['returns']).cumprod()
    
    # 统计交易次数
    trades = df[trade_changes].shape[0]
    
    # 计算策略绩效指标
    performance = calculate_performance_metrics(df)
    
    # 添加品种信息和交易次数
    performance['品种'] = symbol_name
    performance['交易次数'] = trades
    performance['样本数量'] = len(df)
    performance['回测开始日期'] = df.index.min().strftime('%Y-%m-%d')
    performance['回测结束日期'] = df.index.max().strftime('%Y-%m-%d')
    
    return performance

def calculate_performance_metrics(df):
    """计算策略绩效指标"""
    returns = df['strategy_return'].dropna()
    cumulative = df['cumulative_return']
    
    if len(returns) == 0 or cumulative.iloc[-1] <= 0:
        return {
            '年化收益率': np.nan,
            '累计收益率': np.nan,
            '夏普比率': np.nan,
            '卡玛比率': np.nan,
            '波动率': np.nan,
            '最大回撤': np.nan,
            '胜率': np.nan,
            '盈亏比': np.nan,
            '买入持有收益率': np.nan,
            '相对买入持有超额收益': np.nan
        }
    
    # 基础指标
    total_return = cumulative.iloc[-1] - 1.0
    annual_return = (1 + total_return) ** (252 / len(returns)) - 1
    
    # 波动率
    volatility = returns.std() * np.sqrt(252)
    
    # 夏普比率
    sharpe_ratio = annual_return / volatility if volatility != 0 else 0
    
    # 最大回撤计算
    peak = cumulative.expanding().max()
    drawdown = (cumulative - peak) / peak
    max_drawdown = drawdown.min()
    
    # 卡玛比率
    calmar_ratio = annual_return / abs(max_drawdown) if max_drawdown != 0 else 0
    
    # 胜率计算
    win_trades = (returns > 0).sum()
    lose_trades = (returns < 0).sum()
    total_trades = win_trades + lose_trades
    win_rate = win_trades / total_trades if total_trades > 0 else 0
    
    # 盈亏比计算
    avg_win = returns[returns > 0].mean() if (returns > 0).any() else 0
    avg_loss = abs(returns[returns < 0].mean()) if (returns < 0).any() else 0
    
    # 确保除数不为零，并限制盈亏比的最大值
    if avg_loss > 0.0001:  # 设置一个非零最小阈值
        profit_loss_ratio = avg_win / avg_loss
        # 可以选择限制盈亏比的最大值，例如10
        profit_loss_ratio = min(profit_loss_ratio, 10.0)
    else:
        # 如果几乎没有亏损交易，设置一个合理的默认值
        profit_loss_ratio = 10.0 if avg_win > 0 else 0.0
    
    # 比较买入持有策略
    bh_return = df['buy_hold_return'].iloc[-1] - 1.0 if 'buy_hold_return' in df.columns else 0
    bh_annual = (1 + bh_return) ** (252 / len(returns)) - 1 if bh_return > -1 else -1
    strategy_vs_bh = annual_return - bh_annual
    
    return {
        '年化收益率': annual_return,
        '累计收益率': total_return,
        '夏普比率': sharpe_ratio,
        '卡玛比率': calmar_ratio,
        '波动率': volatility,
        '最大回撤': max_drawdown,
        '胜率': win_rate,
        '盈亏比': profit_loss_ratio,
        '买入持有收益率': bh_return,
        '相对买入持有超额收益': strategy_vs_bh
    }

def batch_process_csv_files(directory_path):
    """批量处理指定目录下的所有CSV文件"""
    # 获取目录下所有CSV文件
    csv_files = [f for f in os.listdir(directory_path) if f.endswith('.csv')]
    
    if not csv_files:
        print(f"在 {directory_path} 目录下没有找到CSV文件。")
        return None
    
    print(f"找到 {len(csv_files)} 个CSV文件，开始处理...")
    
    # 存储所有品种的结果
    results = []
    
    # 处理每个CSV文件
    for file in csv_files:
        file_path = os.path.join(directory_path, file)
        symbol_name = os.path.splitext(file)[0]  # 使用文件名作为品种名称
        
        try:
            # 读取CSV文件
            df = pd.read_csv(file_path)
            
            # 运行策略
            performance = run_strategy(df, symbol_name)
            
            if performance is not None:
                results.append(performance)
            
        except Exception as e:
            print(f"处理文件 {file} 时出错: {str(e)}")
    
    # 合并所有结果
    if results:
        all_results = pd.DataFrame(results)
        
        # 计算平均性能指标
        avg_metrics = all_results.drop(columns=['品种', '交易次数', '样本数量', '回测开始日期', '回测结束日期']).mean()
        avg_row = pd.DataFrame([avg_metrics])
        avg_row['品种'] = 'ALL_AVERAGE'
        avg_row['交易次数'] = all_results['交易次数'].mean()
        avg_row['样本数量'] = all_results['样本数量'].mean()
        
        # 添加平均行
        all_results = pd.concat([all_results, avg_row], ignore_index=True)
        
        # 按年化收益率排序
        all_results = all_results.sort_values(by='年化收益率', ascending=False)
        
        # 保存结果
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_file = f"strategy_performance_{timestamp}.csv"
        all_results.to_csv(output_file, index=False, float_format='%.4f')
        
        print(f"结果已保存到 {output_file}")
        
        return all_results
    else:
        print("没有成功处理任何文件。")
        return None

if __name__ == "__main__":
    # 指定包含CSV文件的目录
    #directory = "./data"  # 替换为实际目录路径
    directory = "D:/Pythonfiles/Week1/day_20220611"
    # 运行批处理
    results = batch_process_csv_files(directory)
    
    if results is not None:
        # 显示所有品种的性能排名
        print("\n性能排名 (按年化收益率排序):")
        print(results[['品种', '年化收益率', '最大回撤', '夏普比率', '交易次数']].head(10))
        
        # 显示平均性能
        avg_row = results[results['品种'] == 'ALL_AVERAGE']
        print("\n平均性能指标:")
        for col in ['年化收益率', '夏普比率', '最大回撤', '胜率', '盈亏比']:
            print(f"{col}: {avg_row[col].values[0]:.4f}")

找到 41 个CSV文件，开始处理...
处理品种: R.CN.CFE.IC.0004...
Training until validation scores don't improve for 50 rounds
Early stopping, best iteration is:
[2]	valid_0's auc: 0.532183
Training until validation scores don't improve for 50 rounds
Early stopping, best iteration is:
[2]	valid_0's auc: 0.455562
Training until validation scores don't improve for 50 rounds
Early stopping, best iteration is:
[75]	valid_0's auc: 0.602404
处理品种: R.CN.CFE.IF.0004...
Training until validation scores don't improve for 50 rounds
Early stopping, best iteration is:
[1]	valid_0's auc: 0.520496
Training until validation scores don't improve for 50 rounds
Early stopping, best iteration is:
[1]	valid_0's auc: 0.556512
Training until validation scores don't improve for 50 rounds
Early stopping, best iteration is:
[20]	valid_0's auc: 0.548442
处理品种: R.CN.CFE.IH.0004...
Training until validation scores don't improve for 50 rounds
Early stopping, best iteration is:
[37]	valid_0's auc: 0.55156
Training until validation score