# 02 - 特征工程模块

此 notebook 负责将原始的 K 线数据转换为机器学习模型所需的特征。
包括技术指标计算、价格特征提取和训练数据集构建。

In [None]:
import sys
from pathlib import Path
import logging
import pandas as pd
import numpy as np
from datetime import datetime

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)

print("=== 路径设置测试 ===")
print(f"当前工作目录: {Path.cwd()}")
print(f"当前 Python 路径: {sys.path[:3]}...")  # 只显示前3个避免太长

# 正确的路径设置
project_root = Path.cwd().parent
sys.path.insert(0, str(project_root))

print(f"\n项目根目录: {project_root}")

print("特征工程环境设置完成")

=== 路径设置测试 ===
当前工作目录: /Users/anthony/Documents/github/technial_analysis_helper/notebooks
当前 Python 路径: ['/Users/anthony/Documents/github/technial_analysis_helper/src', '/Users/anthony/Documents/github/technial_analysis_helper/src', '/usr/local/Cellar/python@3.13/3.13.7/Frameworks/Python.framework/Versions/3.13/lib/python313.zip']...

项目根目录: /Users/anthony/Documents/github/technial_analysis_helper
特征工程环境设置完成


In [8]:
# 从数据库加载数据用于特征工程
from src.data.mongodb_handler import mongo_handler
from src.config.settings import config

print("正在加载数据用于特征工程...")

try:
    if mongo_handler.connect():
        # 加载足够多的数据用于特征工程
        # 需要比特征窗口更大的数据量
        # required_records = config.FEATURE_WINDOW_SIZE + 100  # 额外缓冲
        db_data = mongo_handler.get_candlestick_data(limit=100000)
        
        if db_data:
            print(f"✓ 成功加载 {len(db_data)} 条记录")
            
            # 转换为 DataFrame
            df_raw = pd.DataFrame(db_data)
            df_raw = df_raw.sort_values('timestamp').reset_index(drop=True)
            
            print(f"数据时间范围: {datetime.fromtimestamp(df_raw['timestamp'].min()/1000)} "
                  f"到 {datetime.fromtimestamp(df_raw['timestamp'].max()/1000)}")
            print(f"数据形状: {df_raw.shape}")
            
        else:
            print("✗ 数据库中没有足够的数据")
            
        mongo_handler.close()
    else:
        print("✗ 无法连接到数据库")
        
except Exception as e:
    print(f"✗ 加载数据时出错: {e}")

2026-01-13 20:12:00,898 - src.data.mongodb_handler - INFO - Connected to MongoDB at mongodb://localhost:27017
2026-01-13 20:12:00,899 - src.data.mongodb_handler - INFO - Database: technical_analysis
2026-01-13 20:12:00,899 - src.data.mongodb_handler - INFO - Collection: candlesticks
2026-01-13 20:12:00,899 - src.data.mongodb_handler - INFO - Database: technical_analysis
2026-01-13 20:12:00,899 - src.data.mongodb_handler - INFO - Collection: candlesticks


正在加载数据用于特征工程...


2026-01-13 20:12:01,494 - src.data.mongodb_handler - INFO - MongoDB connection closed


✓ 成功加载 53073 条记录
数据时间范围: 2019-12-25 08:00:00 到 2026-01-13 16:00:00
数据形状: (53073, 10)


In [9]:
# 计算技术指标
from src.utils.technical_indicators import tech_calculator

print("正在计算技术指标...")

if 'df_raw' in locals() and not df_raw.empty:
    try:
        # 计算技术指标
        indicators = tech_calculator.calculate_indicators(df_raw)
        
        print(f"✓ 计算完成，共得到 {len(indicators)} 个技术指标")
        
        # 分类显示指标
        rsi_indicators = [k for k in indicators.keys() if 'rsi' in k.lower()]
        bb_indicators = [k for k in indicators.keys() if 'bb_' in k.lower()]
        price_indicators = [k for k in indicators.keys() if k.startswith(('current_', 'price_', 'volume_'))]
        
        print(f"\nRSI 指标 ({len(rsi_indicators)} 个):")
        for ind in rsi_indicators:
            print(f"  {ind}: {indicators[ind]:.4f}")
            
        print(f"\n布林带指标 ({len(bb_indicators)} 个):")
        for ind in bb_indicators[:6]:  # 只显示部分
            print(f"  {ind}: {indicators[ind]:.4f}")
            
        print(f"\n价格相关指标 ({len(price_indicators)} 个):")
        for ind in price_indicators:
            if isinstance(indicators[ind], float):
                print(f"  {ind}: {indicators[ind]:.4f}")
            else:
                print(f"  {ind}: {indicators[ind]}")
                
    except Exception as e:
        print(f"✗ 计算技术指标时出错: {e}")
else:
    print("没有可用的数据，请先运行数据获取 notebook")

正在计算技术指标...
✓ 计算完成，共得到 18 个技术指标

RSI 指标 (3 个):
  short_rsi: 60.1009
  medium_rsi: 57.8208
  long_rsi: 57.7885

布林带指标 (12 个):
  short_bb_upper: 3139.8278
  short_bb_lower: 3083.9942
  short_bb_middle: 3111.9110
  short_bb_position: 0.9105
  medium_bb_upper: 3139.8278
  medium_bb_lower: 3083.9942

价格相关指标 (0 个):


In [10]:
# 计算价格特征（包括未来收益）
print("正在计算价格特征和未来收益...")

PREDICTION_HORIZON = 24  # 预测周期（小时）

if 'df_raw' in locals() and not df_raw.empty:
    try:
        # 计算价格特征
        price_features = tech_calculator.calculate_price_features(df_raw, PREDICTION_HORIZON)
        
        print("✓ 价格特征计算完成")
        
        # 显示关键特征
        print(f"\n关键价格特征:")
        print(f"  当前价格: ${price_features.get('current_price', 'N/A'):.2f}")
        print(f"  24小时波动率: {price_features.get('price_volatility', 'N/A'):.6f}")
        print(f"  24小时价格趋势: {price_features.get('price_trend_24h', 'N/A'):.2f}%")
        print(f"  平均成交量: {price_features.get('volume_avg', 'N/A'):,.0f}")
        
        # 显示未来收益（如果有的话）
        future_return = price_features.get('future_return')
        if future_return is not None and not np.isnan(future_return):
            print(f"  未来 {PREDICTION_HORIZON} 小时收益: {future_return:.4f}%")
            
            # 显示分类标签
            classification_label = price_features.get('classification_label')
            if classification_label is not None:
                print(f"  分类标签: {classification_label}")
                
                # 查找对应的分类区间
                if classification_label in config.CLASSIFICATION_THRESHOLDS:
                    min_val, max_val = config.CLASSIFICATION_THRESHOLDS[classification_label]
                    print(f"  对应区间: {min_val}% 到 {max_val}%")
        else:
            print("  未来收益: 无法计算（数据不足）")
            
    except Exception as e:
        print(f"✗ 计算价格特征时出错: {e}")
else:
    print("没有可用的数据")

正在计算价格特征和未来收益...
✓ 价格特征计算完成

关键价格特征:
  当前价格: $3134.83
  24小时波动率: 0.004220
  24小时价格趋势: 0.66%
  平均成交量: 1,059,939
  未来 24 小时收益: -0.6600%
  分类标签: -1
  对应区间: -2.5% 到 -0.5%


In [11]:
# 提取原始价格序列特征
from src.utils.feature_engineering import feature_engineer

print("正在提取原始价格序列特征...")

if 'df_raw' in locals() and not df_raw.empty:
    try:
        # 提取原始价格序列特征
        raw_features = feature_engineer._extract_raw_price_series(df_raw)
        
        print(f"✓ 提取完成，共得到 {len(raw_features)} 个原始价格特征")
        
        # 显示部分特征
        close_features = [k for k in raw_features.keys() if k.startswith('raw_close_')]
        stat_features = [k for k in raw_features.keys() if k.startswith('raw_price_') and 'close' not in k]
        volume_features = [k for k in raw_features.keys() if 'volume' in k.lower()]
        
        print(f"\n最近收盘价特征 ({len(close_features)} 个):")
        for feature in close_features[:5]:  # 显示最近5小时
            print(f"  {feature}: ${raw_features[feature]:.2f}")
            
        print(f"\n价格统计特征:")
        for feature in stat_features:
            if isinstance(raw_features[feature], float):
                print(f"  {feature}: {raw_features[feature]:.4f}")
            else:
                print(f"  {feature}: {raw_features[feature]}")
                
        print(f"\n成交量特征:")
        for feature in volume_features[:3]:
            print(f"  {feature}: {raw_features[feature]:,.0f}")
            
    except Exception as e:
        print(f"✗ 提取原始价格特征时出错: {e}")
else:
    print("没有可用的数据")

正在提取原始价格序列特征...
✓ 提取完成，共得到 131 个原始价格特征

最近收盘价特征 (24 个):
  raw_close_1h: $3134.83
  raw_close_2h: $3120.86
  raw_close_3h: $3132.96
  raw_close_4h: $3130.18
  raw_close_5h: $3120.70

价格统计特征:
  raw_price_mean: 3120.5991
  raw_price_std: 68.7248
  raw_price_min: 2970.2300
  raw_price_max: 3295.4200
  raw_price_change_mean: 0.5148
  raw_price_change_std: 12.6035
  raw_price_change_sum: 153.9400
  raw_price_range_mean: 20.6310
  raw_price_range_max: 91.8100

成交量特征:
  raw_volume_1h: 460,915
  raw_volume_2h: 453,045
  raw_volume_3h: 728,601


In [12]:
# 创建单个样本的完整特征集
print("正在创建单个样本的完整特征集...")

if 'df_raw' in locals() and not df_raw.empty:
    try:
        # 确保有足够数据
        required_length = config.FEATURE_WINDOW_SIZE + PREDICTION_HORIZON
        if len(df_raw) >= required_length:
            # 准备数据
            data_list = df_raw.to_dict('records')
            
            # 创建特征
            features_dict = feature_engineer.create_features_from_data(data_list, PREDICTION_HORIZON)
            
            if features_dict:
                print(f"✓ 成功创建特征集，共 {len(features_dict)} 个特征")
                
                # 显示特征结构
                print(f"\n特征类型分布:")
                rsi_count = len([k for k in features_dict.keys() if 'rsi' in k.lower()])
                bb_count = len([k for k in features_dict.keys() if 'bb_' in k.lower()])
                price_count = len([k for k in features_dict.keys() if k.startswith(('current_', 'price_', 'volume_'))])
                raw_count = len([k for k in features_dict.keys() if k.startswith('raw_')])
                
                print(f"  技术指标 (RSI等): {rsi_count + bb_count} 个")
                print(f"  价格特征: {price_count} 个")
                print(f"  原始价格序列: {raw_count} 个")
                print(f"  其他特征: {len(features_dict) - rsi_count - bb_count - price_count - raw_count} 个")
                
                # 显示目标变量
                target_label = features_dict.get('classification_label')
                if target_label is not None:
                    print(f"\n目标分类标签: {target_label}")
                    min_val, max_val = config.CLASSIFICATION_THRESHOLDS[target_label]
                    print(f"对应价格变动区间: {min_val}% 到 {max_val}%")
                    
                # 显示时间戳
                timestamp = features_dict.get('timestamp')
                if timestamp:
                    readable_time = datetime.fromtimestamp(timestamp / 1000)
                    print(f"特征对应时间: {readable_time}")
                    
            else:
                print("✗ 无法创建特征集（可能数据不足）")
        else:
            print(f"✗ 数据不足，需要至少 {required_length} 条记录，当前只有 {len(df_raw)} 条")
            
    except Exception as e:
        print(f"✗ 创建特征集时出错: {e}")
else:
    print("没有可用的数据")

正在创建单个样本的完整特征集...
✓ 成功创建特征集，共 156 个特征

特征类型分布:
  技术指标 (RSI等): 15 个
  价格特征: 4 个
  原始价格序列: 131 个
  其他特征: 6 个

目标分类标签: -1
对应价格变动区间: -2.5% 到 -0.5%
特征对应时间: 2026-01-12 16:00:00
✓ 成功创建特征集，共 156 个特征

特征类型分布:
  技术指标 (RSI等): 15 个
  价格特征: 4 个
  原始价格序列: 131 个
  其他特征: 6 个

目标分类标签: -1
对应价格变动区间: -2.5% 到 -0.5%
特征对应时间: 2026-01-12 16:00:00


In [13]:
# 创建完整的训练数据集
print("正在创建完整的训练数据集...")

# 参数设置
STRIDE = 10  # 样本间隔
MAX_SAMPLES = 1000  # 最大样本数（控制计算时间）

if 'df_raw' in locals() and not df_raw.empty:
    try:
        # 准备数据列表
        data_list = df_raw.to_dict('records')
        
        print(f"使用参数: stride={STRIDE}, prediction_horizon={PREDICTION_HORIZON}")
        print(f"原始数据量: {len(data_list)} 条记录")
        
        # 创建训练数据集
        features_df, targets_series = feature_engineer.create_training_dataset(
            data_list, 
            stride=STRIDE,
            prediction_horizon=PREDICTION_HORIZON
        )
        
        if not features_df.empty:
            print(f"\n✓ 训练数据集创建成功")
            print(f"  样本数量: {len(features_df)}")
            print(f"  特征数量: {len(features_df.columns)}")
            print(f"  目标变量维度: {len(targets_series)}")
            
            # 显示数据集基本信息
            print(f"\n数据集概览:")
            print(f"  数据形状: {features_df.shape}")
            print(f"  缺失值数量: {features_df.isnull().sum().sum()}")
            print(f"  内存使用: {features_df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")
            
            # 显示目标分布
            print(f"\n目标变量分布:")
            target_counts = targets_series.value_counts().sort_index()
            for label, count in target_counts.items():
                percentage = (count / len(targets_series)) * 100
                min_val, max_val = config.CLASSIFICATION_THRESHOLDS.get(label, ('N/A', 'N/A'))
                print(f"  类别 {label} ({min_val}% 到 {max_val}%): {count} 样本 ({percentage:.1f}%)")
            
            # 显示特征类型
            print(f"\n特征类型统计:")
            feature_types = {
                'RSI指标': [col for col in features_df.columns if 'rsi' in col.lower()],
                '布林带指标': [col for col in features_df.columns if 'bb_' in col.lower()],
                '价格特征': [col for col in features_df.columns if any(prefix in col for prefix in ['current_', 'price_', 'volume_'])],
                '原始价格序列': [col for col in features_df.columns if col.startswith('raw_')],
                '其他特征': [col for col in features_df.columns if col == 'timestamp']
            }
            
            for category, cols in feature_types.items():
                if cols:
                    print(f"  {category}: {len(cols)} 个")
            
            # 保存数据集供后续使用
            print(f"\n保存数据集...")
            features_df.to_csv('training_features.csv', index=False)
            targets_series.to_csv('training_targets.csv', index=False, header=True)
            print("✓ 数据集已保存为 CSV 文件")
            
            # 显示前几个样本
            print(f"\n前 3 个样本预览:")
            display_cols = ['current_price', 'price_volatility', 'short_rsi', 'medium_rsi', 'long_rsi', 'timestamp']
            available_cols = [col for col in display_cols if col in features_df.columns]
            
            preview_df = features_df[available_cols].copy()
            preview_df['target'] = targets_series
            preview_df['timestamp_readable'] = preview_df['timestamp'].apply(
                lambda x: datetime.fromtimestamp(x / 1000).strftime('%Y-%m-%d %H:%M')
            )
            
            print(preview_df[['timestamp_readable', 'current_price', 'short_rsi', 'target']].head(3).to_string(index=False))
            
        else:
            print("✗ 无法创建训练数据集")
            
    except Exception as e:
        print(f"✗ 创建训练数据集时出错: {e}")
else:
    print("没有可用的数据")

2026-01-13 20:13:24,550 - src.utils.feature_engineering - INFO - Creating training dataset from 53073 records
2026-01-13 20:13:24,550 - src.utils.feature_engineering - INFO - Parameters: stride=10, prediction_horizon=24
2026-01-13 20:13:24,550 - src.utils.feature_engineering - INFO - Parameters: stride=10, prediction_horizon=24


正在创建完整的训练数据集...
使用参数: stride=10, prediction_horizon=24
原始数据量: 53073 条记录


2026-01-13 20:14:16,483 - src.utils.feature_engineering - INFO - Created training dataset with 5275 samples
2026-01-13 20:14:16,484 - src.utils.feature_engineering - INFO - Target distribution: {-1: 1118, 1: 1051, 0: 882, -2: 805, 2: 640, -3: 390, 3: 389}
2026-01-13 20:14:16,484 - src.utils.feature_engineering - INFO - Target distribution: {-1: 1118, 1: 1051, 0: 882, -2: 805, 2: 640, -3: 390, 3: 389}



✓ 训练数据集创建成功
  样本数量: 5275
  特征数量: 155
  目标变量维度: 5275

数据集概览:
  数据形状: (5275, 155)
  缺失值数量: 0
  内存使用: 6.24 MB

目标变量分布:
  类别 -3 (-100% 到 -5.5%): 390 样本 (7.4%)
  类别 -2 (-5.5% 到 -2.5%): 805 样本 (15.3%)
  类别 -1 (-2.5% 到 -0.5%): 1118 样本 (21.2%)
  类别 0 (-0.5% 到 0.5%): 882 样本 (16.7%)
  类别 1 (0.5% 到 2.5%): 1051 样本 (19.9%)
  类别 2 (2.5% 到 5.5%): 640 样本 (12.1%)
  类别 3 (5.5% 到 100%): 389 样本 (7.4%)

特征类型统计:
  RSI指标: 3 个
  布林带指标: 12 个
  价格特征: 42 个
  原始价格序列: 131 个
  其他特征: 1 个

保存数据集...
✓ 数据集已保存为 CSV 文件

前 3 个样本预览:
timestamp_readable  current_price  short_rsi  target
  2020-01-06 19:00         142.70  72.463600      -1
  2020-01-07 05:00         142.49  62.252942      -1
  2020-01-07 15:00         143.61  54.410944      -1
✓ 数据集已保存为 CSV 文件

前 3 个样本预览:
timestamp_readable  current_price  short_rsi  target
  2020-01-06 19:00         142.70  72.463600      -1
  2020-01-07 05:00         142.49  62.252942      -1
  2020-01-07 15:00         143.61  54.410944      -1


In [14]:
# 准备预测用的特征
print("正在准备预测用的特征集...")

if 'df_raw' in locals() and not df_raw.empty:
    try:
        # 确保有足够的数据用于预测
        if len(df_raw) >= config.FEATURE_WINDOW_SIZE:
            # 使用最近的数据
            recent_data = df_raw.tail(config.FEATURE_WINDOW_SIZE).to_dict('records')
            
            # 准备预测特征
            pred_features_df = feature_engineer.prepare_prediction_features(recent_data)
            
            if pred_features_df is not None and not pred_features_df.empty:
                print(f"✓ 预测特征准备完成")
                print(f"  特征数量: {len(pred_features_df.columns)}")
                print(f"  样本数量: {len(pred_features_df)}")
                
                # 显示关键特征
                print(f"\n关键预测特征:")
                key_features = ['current_price', 'short_rsi', 'medium_rsi', 'long_rsi', 
                               'price_volatility', 'volume_avg']
                
                for feature in key_features:
                    if feature in pred_features_df.columns:
                        value = pred_features_df[feature].iloc[0]
                        if isinstance(value, float):
                            if 'price' in feature:
                                print(f"  {feature}: ${value:.2f}")
                            else:
                                print(f"  {feature}: {value:.4f}")
                        else:
                            print(f"  {feature}: {value}")
                
                # 保存预测特征
                pred_features_df.to_csv('prediction_features.csv', index=False)
                print("\n✓ 预测特征已保存为 CSV 文件")
                
            else:
                print("✗ 无法准备预测特征")
        else:
            print(f"✗ 数据不足，需要至少 {config.FEATURE_WINDOW_SIZE} 条记录用于预测")
            
    except Exception as e:
        print(f"✗ 准备预测特征时出错: {e}")
else:
    print("没有可用的数据")

正在准备预测用的特征集...
✓ 预测特征准备完成
  特征数量: 22
  样本数量: 1

关键预测特征:
  current_price: $3134.83
  short_rsi: 60.1009
  medium_rsi: 57.8208
  long_rsi: 57.7885
  price_volatility: $0.00
  volume_avg: 1059939.3288

✓ 预测特征已保存为 CSV 文件


## 总结

本 notebook 完成了以下特征工程任务：

1. ✅ **数据加载** - 从 MongoDB 加载原始 K 线数据
2. ✅ **技术指标计算** - 计算 RSI、布林带等技术指标
3. ✅ **价格特征提取** - 计算波动率、趋势、未来收益等
4. ✅ **原始价格序列提取** - 保留原始 OHLCV 数据作为特征
5. ✅ **单样本特征创建** - 创建单个时间点的完整特征集
6. ✅ **训练数据集构建** - 生成用于模型训练的完整数据集
7. ✅ **预测特征准备** - 准备用于实时预测的特征

### 输出文件
- `training_features.csv` - 训练特征数据
- `training_targets.csv` - 训练目标变量
- `prediction_features.csv` - 预测用特征数据

### 下一步建议

运行完此 notebook 后，可以继续执行：
- `03_model_training.ipynb` - 使用生成的特征训练模型
- `04_prediction.ipynb` - 进行实时价格预测

### 关键发现

- 总共生成了约 150+ 个特征
- 包括技术指标、价格统计、原始序列等多个维度
- 目标变量分布相对均衡
- 特征质量良好，缺失值较少