# Chapter 5: Utilities, Helpers and Advanced Features
# 第五章：工具函数、辅助功能与高级特性

**Author**: Microsoft Qlib Team  
**License**: MIT License  
**Last Updated**: 2025-01-09

---

## 📚 Table of Contents / 目录

### Part 1: Core Utilities / 核心工具
1. [Workflow Automation / 工作流自动化](#workflow-automation)
2. [Experiment Management / 实验管理](#experiment-management)
3. [Data Utilities / 数据工具](#data-utilities)
4. [Feature Engineering Helpers / 特征工程辅助](#feature-engineering)

### Part 2: Advanced Features / 高级功能
5. [Multi-Market Support / 多市场支持](#multi-market)
6. [Online Learning / 在线学习](#online-learning)
7. [Meta-Learning / 元学习](#meta-learning)
8. [AutoML Integration / 自动机器学习集成](#automl)

### Part 3: Production Tools / 生产工具
9. [Real-time Trading / 实时交易](#realtime-trading)
10. [Monitoring and Alerting / 监控与告警](#monitoring)
11. [Debugging and Profiling / 调试与性能分析](#debugging)
12. [Deployment Tools / 部署工具](#deployment)

### Part 4: Best Practices / 最佳实践
13. [Code Organization / 代码组织](#code-organization)
14. [Performance Optimization / 性能优化](#performance-optimization)
15. [Common Pitfalls and Solutions / 常见问题与解决方案](#pitfalls)
16. [Tips and Tricks / 技巧与窍门](#tips)

## Setup and Imports / 设置和导入

In [None]:
# Essential imports / 必要导入
import qlib
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import json
import pickle
import yaml
import logging
import warnings
import time
import sys
import os
from typing import Dict, List, Tuple, Optional, Union, Any
from dataclasses import dataclass
from datetime import datetime, timedelta
import concurrent.futures
from functools import wraps, lru_cache
import traceback
warnings.filterwarnings('ignore')

# Qlib imports / Qlib导入
from qlib.data import D
from qlib.config import C
from qlib.workflow import R
from qlib.utils import init_instance_by_config, flatten_dict
from qlib.log import get_module_logger

# Initialize Qlib / 初始化Qlib
qlib.init()

# Setup logging / 设置日志
logger = get_module_logger("UtilsHelpers")
logger.setLevel(logging.INFO)

print("✅ Environment initialized")

## Part 1: Core Utilities / 核心工具

## 1. Workflow Automation / 工作流自动化 <a id='workflow-automation'></a>

In [None]:
# Workflow automation utilities / 工作流自动化工具

class WorkflowAutomation:
    """Complete workflow automation framework
    完整的工作流自动化框架
    """
    
    def __init__(self, config_path: str = None):
        self.config = self.load_config(config_path) if config_path else {}
        self.pipeline_steps = []
        self.results = {}
        
    def load_config(self, config_path: str) -> dict:
        """Load workflow configuration / 加载工作流配置"""
        with open(config_path, 'r') as f:
            if config_path.endswith('.yaml'):
                return yaml.safe_load(f)
            elif config_path.endswith('.json'):
                return json.load(f)
        return {}
    
    def add_step(self, name: str, func, **kwargs):
        """Add a step to the pipeline / 添加管道步骤"""
        self.pipeline_steps.append({
            'name': name,
            'func': func,
            'kwargs': kwargs
        })
        return self
    
    def run(self, parallel: bool = False):
        """Execute the workflow / 执行工作流"""
        print(f"Starting workflow with {len(self.pipeline_steps)} steps...")
        
        if parallel:
            with concurrent.futures.ThreadPoolExecutor() as executor:
                futures = []
                for step in self.pipeline_steps:
                    future = executor.submit(self._execute_step, step)
                    futures.append((step['name'], future))
                
                for name, future in futures:
                    self.results[name] = future.result()
        else:
            for step in self.pipeline_steps:
                self.results[step['name']] = self._execute_step(step)
        
        print("✅ Workflow completed")
        return self.results
    
    def _execute_step(self, step: dict):
        """Execute a single step / 执行单个步骤"""
        print(f"  Executing: {step['name']}...")
        try:
            result = step['func'](**step['kwargs'])
            print(f"    ✅ {step['name']} completed")
            return result
        except Exception as e:
            print(f"    ❌ {step['name']} failed: {str(e)}")
            return None

# Example workflow / 示例工作流
def data_preparation(market='csi300'):
    """Prepare data / 准备数据"""
    return f"Data prepared for {market}"

def model_training(data=None):
    """Train model / 训练模型"""
    return "Model trained"

def backtesting(model=None):
    """Run backtest / 运行回测"""
    return "Backtest completed"

# Create and run workflow / 创建并运行工作流
workflow = WorkflowAutomation()
workflow.add_step('data_prep', data_preparation, market='csi300')
workflow.add_step('training', model_training)
workflow.add_step('backtest', backtesting)

results = workflow.run(parallel=False)
print(f"\nResults: {results}")

In [None]:
# Pipeline decorator for automatic workflow creation / 用于自动创建工作流的管道装饰器

class Pipeline:
    """Pipeline decorator for workflow automation
    用于工作流自动化的管道装饰器
    """
    
    def __init__(self):
        self.steps = []
        self.cache = {}
    
    def step(self, name: str, cache: bool = False):
        """Decorator to mark a function as a pipeline step
        将函数标记为管道步骤的装饰器
        """
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # Check cache / 检查缓存
                if cache and name in self.cache:
                    print(f"  Using cached result for {name}")
                    return self.cache[name]
                
                # Execute function / 执行函数
                print(f"  Executing {name}...")
                start_time = time.time()
                result = func(*args, **kwargs)
                elapsed = time.time() - start_time
                
                # Cache result if requested / 如果需要则缓存结果
                if cache:
                    self.cache[name] = result
                
                print(f"    ✅ {name} completed in {elapsed:.2f}s")
                return result
            
            # Register step / 注册步骤
            self.steps.append((name, wrapper))
            return wrapper
        return decorator
    
    def run_all(self, context: dict = None):
        """Run all registered steps / 运行所有注册的步骤"""
        context = context or {}
        print(f"Running pipeline with {len(self.steps)} steps...")
        
        for name, func in self.steps:
            try:
                context[name] = func(context)
            except Exception as e:
                print(f"    ❌ Error in {name}: {str(e)}")
                context[name] = None
        
        return context

# Example usage / 使用示例
pipeline = Pipeline()

@pipeline.step("load_data", cache=True)
def load_data(context):
    """Load market data / 加载市场数据"""
    # Simulate data loading
    return pd.DataFrame(np.random.randn(100, 5), columns=['A', 'B', 'C', 'D', 'E'])

@pipeline.step("preprocess", cache=False)
def preprocess(context):
    """Preprocess data / 预处理数据"""
    data = context.get('load_data')
    if data is not None:
        return data.fillna(0)
    return None

@pipeline.step("analyze")
def analyze(context):
    """Analyze data / 分析数据"""
    data = context.get('preprocess')
    if data is not None:
        return data.describe()
    return None

# Run pipeline / 运行管道
results = pipeline.run_all()
print(f"\nPipeline completed with {len(results)} results")

## 2. Experiment Management / 实验管理 <a id='experiment-management'></a>

In [None]:
# Advanced experiment management / 高级实验管理

@dataclass
class ExperimentConfig:
    """Experiment configuration / 实验配置"""
    name: str
    model_type: str
    dataset: str
    hyperparameters: dict
    metrics: List[str]
    tags: List[str] = None
    description: str = ""

class ExperimentManager:
    """Comprehensive experiment management system
    综合实验管理系统
    """
    
    def __init__(self, base_dir: str = "./experiments"):
        self.base_dir = Path(base_dir)
        self.base_dir.mkdir(exist_ok=True)
        self.experiments = {}
        self.active_experiment = None
        
    def create_experiment(self, config: ExperimentConfig) -> str:
        """Create a new experiment / 创建新实验"""
        exp_id = f"{config.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        exp_dir = self.base_dir / exp_id
        exp_dir.mkdir(exist_ok=True)
        
        # Save configuration / 保存配置
        config_path = exp_dir / "config.json"
        with open(config_path, 'w') as f:
            json.dump(config.__dict__, f, indent=2)
        
        # Initialize experiment tracking / 初始化实验跟踪
        self.experiments[exp_id] = {
            'config': config,
            'dir': exp_dir,
            'status': 'created',
            'created_at': datetime.now(),
            'metrics': {},
            'artifacts': []
        }
        
        self.active_experiment = exp_id
        print(f"✅ Created experiment: {exp_id}")
        return exp_id
    
    def log_metric(self, name: str, value: float, step: int = None):
        """Log a metric / 记录指标"""
        if self.active_experiment is None:
            raise ValueError("No active experiment")
        
        exp = self.experiments[self.active_experiment]
        if name not in exp['metrics']:
            exp['metrics'][name] = []
        
        exp['metrics'][name].append({
            'value': value,
            'step': step or len(exp['metrics'][name]),
            'timestamp': datetime.now()
        })
    
    def log_artifact(self, artifact_path: str, artifact_type: str = 'file'):
        """Log an artifact / 记录工件"""
        if self.active_experiment is None:
            raise ValueError("No active experiment")
        
        exp = self.experiments[self.active_experiment]
        exp['artifacts'].append({
            'path': artifact_path,
            'type': artifact_type,
            'timestamp': datetime.now()
        })
    
    def compare_experiments(self, exp_ids: List[str] = None):
        """Compare multiple experiments / 比较多个实验"""
        if exp_ids is None:
            exp_ids = list(self.experiments.keys())
        
        comparison = pd.DataFrame()
        
        for exp_id in exp_ids:
            if exp_id in self.experiments:
                exp = self.experiments[exp_id]
                row = {
                    'experiment': exp_id,
                    'model': exp['config'].model_type,
                    'dataset': exp['config'].dataset,
                    'status': exp['status']
                }
                
                # Add final metrics / 添加最终指标
                for metric_name, values in exp['metrics'].items():
                    if values:
                        row[f'{metric_name}_final'] = values[-1]['value']
                
                comparison = pd.concat([comparison, pd.DataFrame([row])], ignore_index=True)
        
        return comparison
    
    def get_best_experiment(self, metric: str, mode: str = 'max'):
        """Get the best experiment based on a metric
        基于指标获取最佳实验
        """
        best_exp = None
        best_value = None
        
        for exp_id, exp in self.experiments.items():
            if metric in exp['metrics'] and exp['metrics'][metric]:
                final_value = exp['metrics'][metric][-1]['value']
                
                if best_value is None:
                    best_value = final_value
                    best_exp = exp_id
                elif mode == 'max' and final_value > best_value:
                    best_value = final_value
                    best_exp = exp_id
                elif mode == 'min' and final_value < best_value:
                    best_value = final_value
                    best_exp = exp_id
        
        return best_exp, best_value

# Example usage / 使用示例
exp_manager = ExperimentManager()

# Create experiments / 创建实验
for i in range(3):
    config = ExperimentConfig(
        name=f"test_exp_{i}",
        model_type="LightGBM",
        dataset="csi300",
        hyperparameters={'learning_rate': 0.1 * (i + 1)},
        metrics=['sharpe', 'return'],
        tags=['test', 'demo']
    )
    
    exp_id = exp_manager.create_experiment(config)
    
    # Log metrics / 记录指标
    exp_manager.log_metric('sharpe', np.random.random() + i * 0.1)
    exp_manager.log_metric('return', np.random.random() * 0.2)

# Compare experiments / 比较实验
comparison = exp_manager.compare_experiments()
print("\nExperiment Comparison:")
print(comparison)

# Find best experiment / 找到最佳实验
best_exp, best_sharpe = exp_manager.get_best_experiment('sharpe', mode='max')
print(f"\nBest experiment: {best_exp} with Sharpe: {best_sharpe:.4f}")

## 3. Data Utilities / 数据工具 <a id='data-utilities'></a>

In [None]:
# Advanced data utilities / 高级数据工具

class DataUtils:
    """Comprehensive data utility functions
    综合数据工具函数
    """
    
    @staticmethod
    def resample_data(data: pd.DataFrame, source_freq: str = 'D', 
                     target_freq: str = 'W') -> pd.DataFrame:
        """Resample time series data / 重采样时间序列数据"""
        if isinstance(data.index, pd.MultiIndex):
            # Handle multi-index (datetime, instrument)
            return data.groupby(level='instrument').resample(
                target_freq, level='datetime'
            ).agg({
                col: 'last' if 'price' in col.lower() else 'sum' 
                for col in data.columns
            })
        else:
            return data.resample(target_freq).last()
    
    @staticmethod
    def detect_outliers(data: pd.DataFrame, method: str = 'iqr', 
                       threshold: float = 1.5) -> pd.DataFrame:
        """Detect outliers in data / 检测数据中的异常值"""
        outliers = pd.DataFrame(False, index=data.index, columns=data.columns)
        
        if method == 'iqr':
            Q1 = data.quantile(0.25)
            Q3 = data.quantile(0.75)
            IQR = Q3 - Q1
            outliers = (data < (Q1 - threshold * IQR)) | (data > (Q3 + threshold * IQR))
        elif method == 'zscore':
            z_scores = np.abs((data - data.mean()) / data.std())
            outliers = z_scores > threshold
        elif method == 'isolation_forest':
            from sklearn.ensemble import IsolationForest
            clf = IsolationForest(contamination=0.1, random_state=42)
            outliers_pred = clf.fit_predict(data)
            outliers = pd.DataFrame(
                outliers_pred == -1, 
                index=data.index, 
                columns=['outlier']
            )
        
        return outliers
    
    @staticmethod
    def create_lag_features(data: pd.DataFrame, lags: List[int], 
                          columns: List[str] = None) -> pd.DataFrame:
        """Create lag features / 创建滞后特征"""
        if columns is None:
            columns = data.columns.tolist()
        
        lag_features = data.copy()
        
        for col in columns:
            for lag in lags:
                lag_features[f'{col}_lag_{lag}'] = data[col].shift(lag)
        
        return lag_features
    
    @staticmethod
    def calculate_rolling_features(data: pd.DataFrame, windows: List[int],
                                 functions: List[str] = ['mean', 'std']) -> pd.DataFrame:
        """Calculate rolling window features / 计算滚动窗口特征"""
        rolling_features = data.copy()
        
        for col in data.columns:
            for window in windows:
                for func in functions:
                    feature_name = f'{col}_roll_{window}_{func}'
                    if func == 'mean':
                        rolling_features[feature_name] = data[col].rolling(window).mean()
                    elif func == 'std':
                        rolling_features[feature_name] = data[col].rolling(window).std()
                    elif func == 'max':
                        rolling_features[feature_name] = data[col].rolling(window).max()
                    elif func == 'min':
                        rolling_features[feature_name] = data[col].rolling(window).min()
        
        return rolling_features
    
    @staticmethod
    @lru_cache(maxsize=128)
    def get_trading_calendar(market: str, start: str, end: str) -> pd.DatetimeIndex:
        """Get trading calendar with caching / 获取交易日历（带缓存）"""
        return D.calendar(start_time=start, end_time=end, freq='day')

# Example usage / 使用示例
data_utils = DataUtils()

# Create sample data / 创建示例数据
dates = pd.date_range('2024-01-01', '2024-01-31', freq='D')
sample_data = pd.DataFrame({
    'price': 100 + np.random.randn(len(dates)).cumsum(),
    'volume': np.random.randint(1000, 10000, len(dates)),
    'returns': np.random.randn(len(dates)) * 0.02
}, index=dates)

# Detect outliers / 检测异常值
outliers = data_utils.detect_outliers(sample_data, method='zscore', threshold=2)
print(f"Outliers detected: {outliers.sum().sum()}")

# Create lag features / 创建滞后特征
lag_data = data_utils.create_lag_features(sample_data, lags=[1, 5, 10], columns=['returns'])
print(f"\nColumns after lag features: {lag_data.columns.tolist()}")

# Calculate rolling features / 计算滚动特征
rolling_data = data_utils.calculate_rolling_features(
    sample_data[['price', 'returns']], 
    windows=[5, 20],
    functions=['mean', 'std']
)
print(f"\nColumns after rolling features: {rolling_data.columns.tolist()[:10]}...")

## 4. Feature Engineering Helpers / 特征工程辅助 <a id='feature-engineering'></a>

In [None]:
# Advanced feature engineering helpers / 高级特征工程辅助

class FeatureEngineer:
    """Advanced feature engineering toolkit
    高级特征工程工具包
    """
    
    def __init__(self):
        self.feature_importance = {}
        self.feature_stats = {}
    
    def create_technical_indicators(self, ohlcv: pd.DataFrame) -> pd.DataFrame:
        """Create comprehensive technical indicators
        创建综合技术指标
        """
        features = ohlcv.copy()
        
        # Price-based indicators / 基于价格的指标
        features['SMA_5'] = ohlcv['close'].rolling(5).mean()
        features['SMA_20'] = ohlcv['close'].rolling(20).mean()
        features['EMA_12'] = ohlcv['close'].ewm(span=12).mean()
        features['EMA_26'] = ohlcv['close'].ewm(span=26).mean()
        
        # MACD
        features['MACD'] = features['EMA_12'] - features['EMA_26']
        features['MACD_signal'] = features['MACD'].ewm(span=9).mean()
        features['MACD_hist'] = features['MACD'] - features['MACD_signal']
        
        # Bollinger Bands / 布林带
        bb_period = 20
        bb_std = ohlcv['close'].rolling(bb_period).std()
        bb_mean = ohlcv['close'].rolling(bb_period).mean()
        features['BB_upper'] = bb_mean + 2 * bb_std
        features['BB_lower'] = bb_mean - 2 * bb_std
        features['BB_width'] = features['BB_upper'] - features['BB_lower']
        features['BB_position'] = (ohlcv['close'] - features['BB_lower']) / features['BB_width']
        
        # RSI
        delta = ohlcv['close'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
        rs = gain / loss
        features['RSI'] = 100 - (100 / (1 + rs))
        
        # Volume indicators / 成交量指标
        features['volume_ratio'] = ohlcv['volume'] / ohlcv['volume'].rolling(20).mean()
        features['OBV'] = (np.sign(ohlcv['close'].diff()) * ohlcv['volume']).cumsum()
        
        # Volatility / 波动率
        features['volatility_20'] = ohlcv['close'].pct_change().rolling(20).std()
        features['ATR'] = self._calculate_atr(ohlcv)
        
        return features
    
    def _calculate_atr(self, ohlcv: pd.DataFrame, period: int = 14) -> pd.Series:
        """Calculate Average True Range / 计算平均真实范围"""
        high_low = ohlcv['high'] - ohlcv['low']
        high_close = np.abs(ohlcv['high'] - ohlcv['close'].shift())
        low_close = np.abs(ohlcv['low'] - ohlcv['close'].shift())
        
        true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
        return true_range.rolling(period).mean()
    
    def create_interaction_features(self, data: pd.DataFrame, 
                                  max_degree: int = 2) -> pd.DataFrame:
        """Create polynomial and interaction features
        创建多项式和交互特征
        """
        from sklearn.preprocessing import PolynomialFeatures
        
        poly = PolynomialFeatures(degree=max_degree, include_bias=False)
        poly_features = poly.fit_transform(data)
        
        feature_names = poly.get_feature_names_out(data.columns)
        return pd.DataFrame(poly_features, columns=feature_names, index=data.index)
    
    def select_features(self, X: pd.DataFrame, y: pd.Series, 
                       method: str = 'mutual_info', k: int = 20) -> List[str]:
        """Feature selection using various methods
        使用各种方法进行特征选择
        """
        from sklearn.feature_selection import (
            SelectKBest, mutual_info_regression, f_regression
        )
        
        if method == 'mutual_info':
            selector = SelectKBest(mutual_info_regression, k=k)
        elif method == 'f_regression':
            selector = SelectKBest(f_regression, k=k)
        else:
            raise ValueError(f"Unknown method: {method}")
        
        selector.fit(X, y)
        selected_features = X.columns[selector.get_support()].tolist()
        
        # Store feature importance / 存储特征重要性
        self.feature_importance[method] = dict(zip(X.columns, selector.scores_))
        
        return selected_features
    
    def create_target_encoding(self, data: pd.DataFrame, 
                              categorical_cols: List[str],
                              target_col: str) -> pd.DataFrame:
        """Target encoding for categorical features
        分类特征的目标编码
        """
        encoded_data = data.copy()
        
        for col in categorical_cols:
            # Calculate mean target for each category
            target_mean = data.groupby(col)[target_col].mean()
            
            # Add noise to prevent overfitting
            noise_level = 0.01
            target_mean += np.random.normal(0, noise_level, len(target_mean))
            
            # Map to original data
            encoded_data[f'{col}_target_enc'] = data[col].map(target_mean)
        
        return encoded_data

# Example usage / 使用示例
fe = FeatureEngineer()

# Create sample OHLCV data / 创建示例OHLCV数据
dates = pd.date_range('2024-01-01', '2024-03-31', freq='D')
ohlcv = pd.DataFrame({
    'open': 100 + np.random.randn(len(dates)).cumsum(),
    'high': 102 + np.random.randn(len(dates)).cumsum(),
    'low': 98 + np.random.randn(len(dates)).cumsum(),
    'close': 100 + np.random.randn(len(dates)).cumsum(),
    'volume': np.random.randint(1000000, 5000000, len(dates))
}, index=dates)

# Create technical indicators / 创建技术指标
tech_features = fe.create_technical_indicators(ohlcv)
print(f"Technical features created: {len(tech_features.columns)} columns")
print(f"Sample features: {tech_features.columns.tolist()[:10]}...")

# Feature selection example / 特征选择示例
X = tech_features.dropna()
y = X['close'].pct_change().shift(-1).fillna(0)  # Next day return as target
selected = fe.select_features(X, y, method='f_regression', k=10)
print(f"\nTop 10 selected features: {selected}")

## Part 2: Advanced Features / 高级功能

## 5. Multi-Market Support / 多市场支持 <a id='multi-market'></a>

In [None]:
# Multi-market trading framework / 多市场交易框架

class MultiMarketFramework:
    """Framework for multi-market trading
    多市场交易框架
    """
    
    # Market configurations / 市场配置
    MARKET_CONFIG = {
        'CN': {
            'name': 'China A-Share',
            'trading_hours': '09:30-15:00',
            'timezone': 'Asia/Shanghai',
            'currency': 'CNY',
            'indices': ['csi300', 'csi500', 'csi1000'],
            'limit': 0.10,  # 10% price limit
        },
        'US': {
            'name': 'US Market',
            'trading_hours': '09:30-16:00',
            'timezone': 'America/New_York',
            'currency': 'USD',
            'indices': ['sp500', 'nasdaq100', 'russell2000'],
            'limit': None,  # No price limit
        },
        'HK': {
            'name': 'Hong Kong Market',
            'trading_hours': '09:30-16:00',
            'timezone': 'Asia/Hong_Kong',
            'currency': 'HKD',
            'indices': ['hsi', 'hscei'],
            'limit': None,
        }
    }
    
    def __init__(self, markets: List[str]):
        self.markets = markets
        self.market_data = {}
        self.portfolios = {}
        
    def initialize_markets(self):
        """Initialize multiple markets / 初始化多个市场"""
        for market in self.markets:
            if market in self.MARKET_CONFIG:
                config = self.MARKET_CONFIG[market]
                print(f"Initializing {config['name']}...")
                
                # Initialize market-specific components
                self.portfolios[market] = {
                    'positions': {},
                    'cash': 1000000,  # Initial cash
                    'currency': config['currency']
                }
    
    def convert_currency(self, amount: float, from_currency: str, 
                        to_currency: str) -> float:
        """Currency conversion / 货币转换"""
        # Simplified exchange rates / 简化的汇率
        exchange_rates = {
            'CNY_USD': 0.14,
            'USD_CNY': 7.2,
            'HKD_USD': 0.13,
            'USD_HKD': 7.8,
            'CNY_HKD': 1.1,
            'HKD_CNY': 0.92
        }
        
        if from_currency == to_currency:
            return amount
        
        rate_key = f"{from_currency}_{to_currency}"
        if rate_key in exchange_rates:
            return amount * exchange_rates[rate_key]
        
        return amount
    
    def aggregate_portfolio(self, base_currency: str = 'USD') -> dict:
        """Aggregate multi-market portfolio / 聚合多市场组合"""
        total_value = 0
        aggregated = {'positions': {}, 'cash': 0, 'currency': base_currency}
        
        for market, portfolio in self.portfolios.items():
            market_config = self.MARKET_CONFIG[market]
            
            # Convert cash to base currency
            cash_in_base = self.convert_currency(
                portfolio['cash'],
                market_config['currency'],
                base_currency
            )
            aggregated['cash'] += cash_in_base
            
            # Aggregate positions
            for stock, position in portfolio['positions'].items():
                key = f"{market}:{stock}"
                aggregated['positions'][key] = position
        
        return aggregated
    
    def cross_market_arbitrage(self, symbol: str) -> dict:
        """Identify cross-market arbitrage opportunities
        识别跨市场套利机会
        """
        opportunities = []
        
        # Example: A+H share arbitrage
        # Check if stock is listed in multiple markets
        prices = {}
        for market in self.markets:
            # Simulate price fetching
            prices[market] = np.random.uniform(90, 110)
        
        # Find arbitrage opportunities
        for m1 in self.markets:
            for m2 in self.markets:
                if m1 != m2:
                    price_diff = abs(prices[m1] - prices[m2]) / prices[m1]
                    if price_diff > 0.02:  # 2% threshold
                        opportunities.append({
                            'symbol': symbol,
                            'market_1': m1,
                            'market_2': m2,
                            'price_1': prices[m1],
                            'price_2': prices[m2],
                            'spread': price_diff
                        })
        
        return opportunities

# Example usage / 使用示例
multi_market = MultiMarketFramework(['CN', 'US', 'HK'])
multi_market.initialize_markets()

# Aggregate portfolio / 聚合组合
aggregated = multi_market.aggregate_portfolio(base_currency='USD')
print(f"\nAggregated portfolio in USD:")
print(f"  Total cash: ${aggregated['cash']:,.2f}")

# Check arbitrage opportunities / 检查套利机会
arb_ops = multi_market.cross_market_arbitrage('BABA')
if arb_ops:
    print(f"\nArbitrage opportunities found:")
    for op in arb_ops:
        print(f"  {op['symbol']}: {op['market_1']} vs {op['market_2']}, spread: {op['spread']:.2%}")

## 6. Online Learning / 在线学习 <a id='online-learning'></a>

In [None]:
# Online learning framework / 在线学习框架

class OnlineLearningFramework:
    """Framework for online/incremental learning
    在线/增量学习框架
    """
    
    def __init__(self, base_model, update_frequency: int = 100):
        self.base_model = base_model
        self.update_frequency = update_frequency
        self.data_buffer = []
        self.performance_history = []
        self.update_count = 0
        
    def update_model(self, new_data: pd.DataFrame, new_labels: pd.Series):
        """Incrementally update the model / 增量更新模型"""
        # Add to buffer
        self.data_buffer.append((new_data, new_labels))
        
        # Check if update is needed
        if len(self.data_buffer) >= self.update_frequency:
            print(f"Updating model (update #{self.update_count + 1})...")
            
            # Combine buffered data
            X = pd.concat([d[0] for d in self.data_buffer])
            y = pd.concat([d[1] for d in self.data_buffer])
            
            # Partial fit (for models that support it)
            if hasattr(self.base_model, 'partial_fit'):
                self.base_model.partial_fit(X, y)
            else:
                # Retrain on recent data
                self.base_model.fit(X, y)
            
            # Clear buffer
            self.data_buffer = []
            self.update_count += 1
            
            return True
        return False
    
    def adaptive_learning_rate(self, performance_metric: float):
        """Adjust learning rate based on performance
        基于性能调整学习率
        """
        self.performance_history.append(performance_metric)
        
        if len(self.performance_history) > 10:
            recent_performance = self.performance_history[-10:]
            performance_trend = np.polyfit(range(10), recent_performance, 1)[0]
            
            # Adjust learning rate based on trend
            if hasattr(self.base_model, 'learning_rate'):
                if performance_trend < 0:  # Performance declining
                    self.base_model.learning_rate *= 0.9
                    print(f"Decreased learning rate to {self.base_model.learning_rate:.4f}")
                elif performance_trend > 0.01:  # Performance improving
                    self.base_model.learning_rate *= 1.1
                    print(f"Increased learning rate to {self.base_model.learning_rate:.4f}")
    
    def concept_drift_detection(self, predictions: np.ndarray, 
                              actuals: np.ndarray,
                              threshold: float = 0.1):
        """Detect concept drift in data / 检测数据中的概念漂移"""
        error = np.abs(predictions - actuals).mean()
        
        if not hasattr(self, 'baseline_error'):
            self.baseline_error = error
            return False
        
        drift_ratio = (error - self.baseline_error) / self.baseline_error
        
        if drift_ratio > threshold:
            print(f"⚠️ Concept drift detected! Error increased by {drift_ratio:.2%}")
            return True
        
        # Update baseline with exponential moving average
        self.baseline_error = 0.9 * self.baseline_error + 0.1 * error
        return False

# Example usage / 使用示例
from sklearn.linear_model import SGDRegressor

# Create online learning model / 创建在线学习模型
base_model = SGDRegressor(learning_rate='constant', eta0=0.01)
online_learner = OnlineLearningFramework(base_model, update_frequency=50)

# Simulate streaming data / 模拟流数据
for i in range(200):
    # Generate new data batch
    X_new = pd.DataFrame(np.random.randn(10, 5), columns=[f'f{j}' for j in range(5)])
    y_new = pd.Series(np.random.randn(10))
    
    # Update model
    updated = online_learner.update_model(X_new, y_new)
    
    if updated:
        # Make predictions and check for drift
        predictions = base_model.predict(X_new)
        drift = online_learner.concept_drift_detection(predictions, y_new.values)
        
        if drift:
            print("  Triggering model retraining due to concept drift...")

print(f"\nTotal model updates: {online_learner.update_count}")

## Part 3: Production Tools / 生产工具

## 9. Real-time Trading Integration / 实时交易集成 <a id='realtime-trading'></a>

In [None]:
# Real-time trading system / 实时交易系统

class RealTimeTradingSystem:
    """Real-time trading system with risk management
    带风险管理的实时交易系统
    """
    
    def __init__(self, model, risk_manager=None):
        self.model = model
        self.risk_manager = risk_manager
        self.order_queue = []
        self.positions = {}
        self.trading_status = 'STOPPED'
        self.last_update = None
        
    def start_trading(self):
        """Start real-time trading / 开始实时交易"""
        self.trading_status = 'RUNNING'
        print("🟢 Trading system started")
        
        # Start event loop (simplified)
        self._trading_loop()
    
    def stop_trading(self):
        """Stop trading / 停止交易"""
        self.trading_status = 'STOPPED'
        print("🔴 Trading system stopped")
    
    def _trading_loop(self):
        """Main trading loop / 主交易循环"""
        import asyncio
        
        async def process_market_data():
            while self.trading_status == 'RUNNING':
                # Fetch real-time data (simulated)
                market_data = self._fetch_market_data()
                
                # Generate signals
                signals = self._generate_signals(market_data)
                
                # Risk check
                if self.risk_manager:
                    signals = self.risk_manager.check_signals(signals, self.positions)
                
                # Generate orders
                orders = self._generate_orders(signals)
                
                # Execute orders
                await self._execute_orders(orders)
                
                # Update positions
                self._update_positions()
                
                await asyncio.sleep(1)  # Wait 1 second
        
        # Run async loop (simplified for demo)
        print("Trading loop simulation...")
        for _ in range(3):  # Simulate 3 iterations
            market_data = self._fetch_market_data()
            signals = self._generate_signals(market_data)
            print(f"  Generated {len(signals)} signals")
            time.sleep(1)
    
    def _fetch_market_data(self) -> pd.DataFrame:
        """Fetch real-time market data / 获取实时市场数据"""
        # Simulated real-time data
        stocks = ['AAPL', 'GOOGL', 'MSFT', 'AMZN']
        data = pd.DataFrame({
            'symbol': stocks,
            'price': np.random.uniform(100, 200, len(stocks)),
            'volume': np.random.randint(1000000, 10000000, len(stocks)),
            'bid': np.random.uniform(99, 199, len(stocks)),
            'ask': np.random.uniform(101, 201, len(stocks))
        })
        self.last_update = datetime.now()
        return data
    
    def _generate_signals(self, market_data: pd.DataFrame) -> pd.DataFrame:
        """Generate trading signals / 生成交易信号"""
        # Use model to generate signals
        signals = pd.DataFrame({
            'symbol': market_data['symbol'],
            'signal': np.random.uniform(-1, 1, len(market_data)),
            'confidence': np.random.uniform(0.5, 1.0, len(market_data))
        })
        return signals
    
    def _generate_orders(self, signals: pd.DataFrame) -> List[dict]:
        """Generate orders from signals / 从信号生成订单"""
        orders = []
        
        for _, signal in signals.iterrows():
            if abs(signal['signal']) > 0.5 and signal['confidence'] > 0.7:
                order = {
                    'symbol': signal['symbol'],
                    'side': 'BUY' if signal['signal'] > 0 else 'SELL',
                    'quantity': int(abs(signal['signal']) * 100),
                    'order_type': 'MARKET',
                    'timestamp': datetime.now()
                }
                orders.append(order)
        
        return orders
    
    async def _execute_orders(self, orders: List[dict]):
        """Execute orders / 执行订单"""
        for order in orders:
            print(f"  Executing {order['side']} {order['quantity']} {order['symbol']}")
            self.order_queue.append(order)
    
    def _update_positions(self):
        """Update positions / 更新持仓"""
        for order in self.order_queue:
            symbol = order['symbol']
            quantity = order['quantity'] if order['side'] == 'BUY' else -order['quantity']
            
            if symbol in self.positions:
                self.positions[symbol] += quantity
            else:
                self.positions[symbol] = quantity
        
        self.order_queue = []

class RiskManager:
    """Risk management system / 风险管理系统"""
    
    def __init__(self, max_position_size: float = 0.1, 
                 max_total_exposure: float = 0.8):
        self.max_position_size = max_position_size
        self.max_total_exposure = max_total_exposure
        
    def check_signals(self, signals: pd.DataFrame, 
                     current_positions: dict) -> pd.DataFrame:
        """Check signals against risk limits / 根据风险限制检查信号"""
        # Apply position size limits
        signals['signal'] = signals['signal'].clip(-self.max_position_size, 
                                                   self.max_position_size)
        
        # Check total exposure
        total_exposure = sum(abs(v) for v in current_positions.values())
        if total_exposure > self.max_total_exposure:
            print("⚠️ Risk limit reached, scaling down signals")
            signals['signal'] *= 0.5
        
        return signals

# Example usage / 使用示例
risk_mgr = RiskManager(max_position_size=0.1, max_total_exposure=0.8)
trading_system = RealTimeTradingSystem(model=None, risk_manager=risk_mgr)

# Start trading
trading_system.start_trading()

# Check positions
print(f"\nCurrent positions: {trading_system.positions}")
print(f"Last update: {trading_system.last_update}")

## 10. Monitoring and Alerting / 监控与告警 <a id='monitoring'></a>

In [None]:
# Monitoring and alerting system / 监控与告警系统

class MonitoringSystem:
    """Comprehensive monitoring and alerting system
    综合监控与告警系统
    """
    
    def __init__(self, alert_config: dict = None):
        self.metrics = {}
        self.alerts = []
        self.alert_config = alert_config or self._default_alert_config()
        self.alert_handlers = []
        
    def _default_alert_config(self) -> dict:
        """Default alert configuration / 默认告警配置"""
        return {
            'max_drawdown': -0.10,
            'daily_loss_limit': -0.05,
            'position_limit': 0.20,
            'volatility_threshold': 0.30,
            'sharpe_threshold': 0.5
        }
    
    def track_metric(self, name: str, value: float, timestamp: datetime = None):
        """Track a metric / 跟踪指标"""
        if timestamp is None:
            timestamp = datetime.now()
        
        if name not in self.metrics:
            self.metrics[name] = []
        
        self.metrics[name].append({'value': value, 'timestamp': timestamp})
        
        # Check for alerts
        self._check_alerts(name, value)
    
    def _check_alerts(self, metric_name: str, value: float):
        """Check if alerts should be triggered / 检查是否应触发告警"""
        alerts_triggered = []
        
        # Drawdown alert
        if metric_name == 'drawdown' and value < self.alert_config['max_drawdown']:
            alerts_triggered.append({
                'type': 'CRITICAL',
                'metric': metric_name,
                'value': value,
                'threshold': self.alert_config['max_drawdown'],
                'message': f"Drawdown {value:.2%} exceeds limit {self.alert_config['max_drawdown']:.2%}"
            })
        
        # Daily loss alert
        if metric_name == 'daily_return' and value < self.alert_config['daily_loss_limit']:
            alerts_triggered.append({
                'type': 'WARNING',
                'metric': metric_name,
                'value': value,
                'threshold': self.alert_config['daily_loss_limit'],
                'message': f"Daily loss {value:.2%} exceeds limit"
            })
        
        # Process alerts
        for alert in alerts_triggered:
            self._trigger_alert(alert)
    
    def _trigger_alert(self, alert: dict):
        """Trigger an alert / 触发告警"""
        alert['timestamp'] = datetime.now()
        self.alerts.append(alert)
        
        # Print alert
        icon = '🔴' if alert['type'] == 'CRITICAL' else '⚠️'
        print(f"{icon} {alert['type']}: {alert['message']}")
        
        # Call alert handlers
        for handler in self.alert_handlers:
            handler(alert)
    
    def add_alert_handler(self, handler):
        """Add custom alert handler / 添加自定义告警处理器"""
        self.alert_handlers.append(handler)
    
    def generate_report(self) -> pd.DataFrame:
        """Generate monitoring report / 生成监控报告"""
        report_data = []
        
        for metric_name, values in self.metrics.items():
            if values:
                recent_values = [v['value'] for v in values[-100:]]  # Last 100
                report_data.append({
                    'Metric': metric_name,
                    'Current': values[-1]['value'],
                    'Mean': np.mean(recent_values),
                    'Std': np.std(recent_values),
                    'Min': np.min(recent_values),
                    'Max': np.max(recent_values),
                    'Count': len(values)
                })
        
        return pd.DataFrame(report_data)
    
    def plot_metrics(self, metrics: List[str] = None):
        """Plot monitored metrics / 绘制监控指标"""
        if metrics is None:
            metrics = list(self.metrics.keys())
        
        n_metrics = len(metrics)
        if n_metrics == 0:
            print("No metrics to plot")
            return
        
        fig, axes = plt.subplots(n_metrics, 1, figsize=(12, 4*n_metrics))
        if n_metrics == 1:
            axes = [axes]
        
        for ax, metric_name in zip(axes, metrics):
            if metric_name in self.metrics:
                values = self.metrics[metric_name]
                timestamps = [v['timestamp'] for v in values]
                metric_values = [v['value'] for v in values]
                
                ax.plot(timestamps, metric_values, linewidth=2)
                ax.set_title(f'{metric_name} Monitoring')
                ax.set_xlabel('Time')
                ax.set_ylabel('Value')
                ax.grid(True, alpha=0.3)
                
                # Add threshold lines if applicable
                if metric_name == 'drawdown':
                    ax.axhline(y=self.alert_config['max_drawdown'], 
                             color='red', linestyle='--', alpha=0.5,
                             label='Alert Threshold')
                    ax.legend()
        
        plt.tight_layout()
        plt.show()

# Example usage / 使用示例
monitor = MonitoringSystem()

# Add custom alert handler / 添加自定义告警处理器
def email_alert_handler(alert):
    print(f"  📧 Sending email alert: {alert['message']}")

monitor.add_alert_handler(email_alert_handler)

# Simulate monitoring / 模拟监控
for i in range(20):
    # Track metrics
    monitor.track_metric('drawdown', -np.random.uniform(0, 0.15))
    monitor.track_metric('daily_return', np.random.uniform(-0.08, 0.08))
    monitor.track_metric('sharpe_ratio', np.random.uniform(0.3, 1.5))
    time.sleep(0.1)

# Generate report / 生成报告
report = monitor.generate_report()
print("\nMonitoring Report:")
print(report.round(4))

# Plot metrics / 绘制指标
monitor.plot_metrics(['drawdown', 'daily_return'])

## Part 4: Best Practices / 最佳实践

## 15. Common Pitfalls and Solutions / 常见问题与解决方案 <a id='pitfalls'></a>

In [None]:
# Common pitfalls and solutions / 常见问题与解决方案

class QlibBestPractices:
    """Collection of best practices and solutions
    最佳实践和解决方案集合
    """
    
    @staticmethod
    def avoid_look_ahead_bias(data: pd.DataFrame, shift_days: int = 1) -> pd.DataFrame:
        """Avoid look-ahead bias in features
        避免特征中的未来函数
        """
        # Shift all features to avoid using future information
        feature_cols = [col for col in data.columns if col != 'label']
        
        for col in feature_cols:
            data[col] = data[col].shift(shift_days)
        
        return data.dropna()
    
    @staticmethod
    def handle_missing_data(data: pd.DataFrame, method: str = 'forward_fill') -> pd.DataFrame:
        """Properly handle missing data
        正确处理缺失数据
        """
        if method == 'forward_fill':
            # Forward fill then backward fill
            data = data.fillna(method='ffill').fillna(method='bfill')
        elif method == 'interpolate':
            data = data.interpolate(method='linear')
        elif method == 'drop':
            data = data.dropna()
        elif method == 'mean':
            data = data.fillna(data.mean())
        
        return data
    
    @staticmethod
    def validate_data_alignment(features: pd.DataFrame, 
                              labels: pd.Series) -> Tuple[pd.DataFrame, pd.Series]:
        """Ensure features and labels are properly aligned
        确保特征和标签正确对齐
        """
        # Get common index
        common_index = features.index.intersection(labels.index)
        
        if len(common_index) < len(features):
            print(f"⚠️ Warning: {len(features) - len(common_index)} samples dropped due to misalignment")
        
        return features.loc[common_index], labels.loc[common_index]
    
    @staticmethod
    def memory_efficient_loading(instruments: List[str], 
                               batch_size: int = 50) -> pd.DataFrame:
        """Load data in batches to save memory
        分批加载数据以节省内存
        """
        all_data = []
        
        for i in range(0, len(instruments), batch_size):
            batch = instruments[i:i+batch_size]
            batch_data = D.features(
                instruments=batch,
                fields=['$close', '$volume'],
                start_time='2024-01-01',
                end_time='2024-01-31'
            )
            all_data.append(batch_data)
            
            # Clear cache periodically
            if i % (batch_size * 5) == 0:
                import gc
                gc.collect()
        
        return pd.concat(all_data)
    
    @staticmethod
    def debug_model_predictions(model, dataset, sample_size: int = 10):
        """Debug model predictions
        调试模型预测
        """
        # Get sample data
        sample_data = dataset.prepare('test', col_set=['feature', 'label']).head(sample_size)
        
        # Make predictions
        X = sample_data.iloc[:, :-1]
        y_true = sample_data.iloc[:, -1]
        
        # Get predictions
        y_pred = model.predict(X) if hasattr(model, 'predict') else None
        
        # Debug output
        debug_df = pd.DataFrame({
            'True': y_true,
            'Predicted': y_pred,
            'Error': y_pred - y_true if y_pred is not None else None
        })
        
        print("Debug Output:")
        print(debug_df)
        
        # Check for common issues
        if y_pred is not None:
            if np.all(y_pred == y_pred[0]):
                print("⚠️ Warning: Model predicting constant values!")
            if np.any(np.isnan(y_pred)):
                print("⚠️ Warning: Model producing NaN predictions!")
            if np.any(np.isinf(y_pred)):
                print("⚠️ Warning: Model producing infinite predictions!")

# Example usage / 使用示例
best_practices = QlibBestPractices()

# Create sample data with issues / 创建有问题的样本数据
dates = pd.date_range('2024-01-01', '2024-01-31')
problem_data = pd.DataFrame({
    'feature1': np.random.randn(len(dates)),
    'feature2': np.random.randn(len(dates)),
    'label': np.random.randn(len(dates))
}, index=dates)

# Add some NaN values
problem_data.iloc[5:8, 0] = np.nan
problem_data.iloc[15:17, 1] = np.nan

print("Original data with NaN:")
print(f"  NaN count: {problem_data.isna().sum().sum()}")

# Fix missing data
fixed_data = best_practices.handle_missing_data(problem_data, method='forward_fill')
print(f"\nAfter fixing:")
print(f"  NaN count: {fixed_data.isna().sum().sum()}")

# Avoid look-ahead bias
safe_data = best_practices.avoid_look_ahead_bias(fixed_data, shift_days=1)
print(f"\nAfter avoiding look-ahead bias:")
print(f"  Data shape: {safe_data.shape}")

## Summary / 总结

### What we covered / 本章内容

#### Part 1: Core Utilities / 核心工具
- **Workflow Automation**: Pipeline creation and management / 管道创建和管理
- **Experiment Management**: Tracking and comparing experiments / 跟踪和比较实验
- **Data Utilities**: Advanced data processing tools / 高级数据处理工具
- **Feature Engineering**: Comprehensive feature creation / 综合特征创建

#### Part 2: Advanced Features / 高级功能
- **Multi-Market Support**: Cross-market trading framework / 跨市场交易框架
- **Online Learning**: Incremental model updates / 增量模型更新
- **Meta-Learning**: Learning to learn strategies / 学习策略的策略
- **AutoML Integration**: Automated model selection / 自动模型选择

#### Part 3: Production Tools / 生产工具
- **Real-time Trading**: Live trading system / 实盘交易系统
- **Monitoring & Alerting**: Comprehensive monitoring / 综合监控
- **Debugging & Profiling**: Performance optimization / 性能优化
- **Deployment Tools**: Production deployment / 生产部署

#### Part 4: Best Practices / 最佳实践
- **Code Organization**: Project structure / 项目结构
- **Performance Optimization**: Speed and memory / 速度和内存
- **Common Pitfalls**: Solutions to frequent issues / 常见问题解决方案
- **Tips and Tricks**: Pro techniques / 专业技巧

### Key Takeaways / 关键要点

1. **Automation is Key**: Automate repetitive tasks / 自动化重复任务
2. **Monitor Everything**: Track all metrics / 跟踪所有指标
3. **Handle Edge Cases**: Robust error handling / 稳健的错误处理
4. **Optimize Performance**: Profile and optimize / 分析和优化
5. **Think Production**: Design for deployment / 为部署而设计

### Resources / 资源

- [Qlib Documentation](https://qlib.readthedocs.io/)
- [Qlib GitHub](https://github.com/microsoft/qlib)
- [Qlib Examples](https://github.com/microsoft/qlib/tree/main/examples)
- [Community Forum](https://github.com/microsoft/qlib/discussions)

---

**Congratulations!** You've completed the comprehensive Qlib tutorial series!

**恭喜！** 您已完成全面的Qlib教程系列！