In [None]:
# main.ipynb
import os
import glob
import pandas as pd
import numpy as np
import re
import logging
from typing import Dict, List, Optional, Tuple
from scipy.stats import norm, skew
from scipy.optimize import brentq, minimize
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LinearRegression
from sklearn.impute import KNNImputer
import math
from scipy.optimize import minimize_scalar
from matplotlib.font_manager import FontProperties

import OptionPreprocessor
from feature_calculator import FeatureCalculator
import FeaturePlotter
import strategy_backtest

sns.set(style='whitegrid')
plt.rcParams['font.family'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

# AdvancedOptionAnalyzer

In [None]:
class AdvancedOptionAnalyzer:
    def __init__(
        self,
        option_folder: str = "期权数据",
        underlying_file: str = "标的资产价格.xlsx",
        column_mapping: Dict[str, str] = {
            'date': '日期',
            'option_code': '期权代码',
            'strike': '执行价格',
            'close': '收盘价',
            'settle': '结算价',
            'volume': '成交量',
            'underlying_price': '标的收盘价'
        },
        features: List[str] = ['VIX', 'Skew', 'Delta'],
        exchange: str = 'sse',
        r: float = 0.02
    ):
        self.option_folder = option_folder
        self.underlying_file = underlying_file
        self.col = column_mapping
        self.feature_list = features
        self.r = r
        self.merged_data = None
        self.feature_data = None
        self.exchange = exchange.lower()
        
        # 初始化预处理模块
        self.preprocessor = OptionPreprocessor.OptionPreprocessor(exchange, column_mapping)
        
        # 配置日志
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(self.__class__.__name__)
        self._validate_columns()

        # 初始化特征计算器
        self.feature_calculator = FeatureCalculator(
            column_mapping=self.col,
            r=self.r,
            logger=self.logger
        )

    def _validate_columns(self):
        """校验必需字段"""
        required = ['date', 'option_code', 'settle', 'underlying_price']
        missing = [k for k in required if k not in self.col]
        if missing:
            raise KeyError(f"缺少必要列映射: {missing}")

    def load_and_merge(
        self, 
        underlying_date_col: str = '日期',
        underlying_price_col: str = '收盘价'
    ) -> 'AdvancedOptionAnalyzer':
        """数据加载与合并"""
        # 加载期权数据
        option_files = glob.glob(os.path.join(self.option_folder, "*.xlsx"))
        df_list = []
        for f in option_files:
            df = pd.read_excel(f, parse_dates=[self.col['date']])
            df = df[[self.col.get(c, c) for c in [
                'date', 'option_code', 'strike', 'settle', 'volume'
            ] if self.col.get(c, c) in df.columns]]
            df_list.append(df)
        option_df = pd.concat(df_list).sort_values(self.col['date'])
        
        # 加载标的资产数据
        underlying_df = pd.read_excel(
            self.underlying_file,
            parse_dates=[underlying_date_col]
        ).rename(columns={
            underlying_date_col: self.col['date'],
            underlying_price_col: self.col['underlying_price']
        }).sort_values(self.col['date'])
        
        
        # 合并数据
        self.merged_data = pd.merge_asof(
            left=option_df.sort_values(self.col['date']),
            right=underlying_df[[self.col['date'], self.col['underlying_price']]],
            on=self.col['date'],
            direction='backward'
        )
        self.logger.info(f"合并后数据量: {len(self.merged_data)}")
        
        return self

    def preprocess(self) -> 'AdvancedOptionAnalyzer':
        """执行预处理"""
        if self.merged_data is None:
            raise ValueError("请先执行load_and_merge()")
            
        # 数据清洗
        self.merged_data = (
            self.merged_data
            .drop_duplicates([self.col['date'], self.col['option_code']])
            .query(f"{self.col['volume']} > 0 and {self.col['settle']} > 0")
        )
        
        # 调用预处理模块
        self.merged_data = self.preprocessor.preprocess(self.merged_data)
        return self
    
    def compute_features(self) -> 'AdvancedOptionAnalyzer':
        """特征计算入口"""
        if self.merged_data is None:
            raise ValueError("请先执行load_and_merge()")
            
        # 调用特征计算器
        self.feature_data = self.feature_calculator.compute_features(
            feature_list=self.feature_list,
            merged_data=self.merged_data
        )
        return self
    
    def visualize(self, save_dir: str = "features_plot") -> 'AdvancedOptionAnalyzer':
        """结果可视化"""
        os.makedirs(save_dir, exist_ok=True)
        for col in self.feature_data.columns:
            plt.figure(figsize=(12, 6))
            self.feature_data[col].plot(title=col, lw=1)
            plt.savefig(os.path.join(save_dir, f"{col}.png"))
            plt.close()
        return self
    
    def save(self, path: str = "results.xlsx") -> None:
        """结果保存"""
        # Reset index if dates are stored in index (not as a column)
        self.feature_data = self.feature_data.reset_index()  # Moves index to a column named 'index'
        # Rename the index column to match the date column name in merged_data
        self.feature_data = self.feature_data.rename(columns={'index': self.col['date']})

        # 对 self.merged_data 按日期去重
        unique_merged_data = self.merged_data.drop_duplicates(subset=[self.col['date']], keep='first')
        

        # 合并标的资产价格
        self.feature_data = pd.merge(
            self.feature_data,
            unique_merged_data[[self.col['date'], self.col['underlying_price']]],
            on=self.col['date'],
            how='left'
        )
        with pd.ExcelWriter(path) as writer:
            self.feature_data.to_excel(writer, sheet_name='特征指标')
            self.merged_data.head(1000).to_excel(writer, sheet_name='合并数据')

# 一个例子--上证50etf期权

In [None]:
analyzer = AdvancedOptionAnalyzer(
    option_folder = "sz50_option",
    underlying_file = "sz50/华夏上证50ETF(510050.OF)-每日行情数据.xlsx",
    column_mapping={
        'date': '日期',
        'option_code': '交易代码',
        'strike': '行权价',
        'settle': '结算价',
        'volume': '成交量',
        'underlying_price': '标的收盘价',
        'maturity': '到期日',
        'type': "期权类型"
    },
    features=['VIX','Skew','Delta','Gamma','PCRatio','TermSlope','Vega','Theta','ITG','ITL'],
    exchange='sse'
)
    
(analyzer.load_and_merge(underlying_date_col='日期', underlying_price_col='收盘价')
        .preprocess()
        .compute_features()
        .visualize()
        .save())

plotter = FeaturePlotter.FeaturePlotter(
    results_file="results.xlsx",
    underlying_price_col="标的收盘价",
    date_col="日期",  # 指定日期列名
    date_format="%Y-%m-%d"  # 指定日期格式
)
plotter.run()

# 一个例子--豆粕期货期权

In [None]:
analyzer = AdvancedOptionAnalyzer(
    option_folder="soybean_meal_option",
    underlying_file="soybean_meal/K线导出_M0_日线数据.xlsx",
    column_mapping={
        'date': '日期',
        'option_code': '期权代码', 
        'settle': '结算价',
        'volume': '成交量',
        'underlying_price': '标的资产结算价',
        'strike': '行权价',
        'maturity': '到期日',
        'type': '期权类型'
    },
    exchange='dce',
    features=['VIX','Skew','Delta','Gamma','PCRatio','TermSlope','Vega','Theta']
)

(analyzer.load_and_merge(underlying_date_col='交易时间', underlying_price_col='结算价')
        .preprocess()
        .compute_features()
        .visualize()
        .save())

# 一个例子--回测

In [None]:
# 初始化回测器
backtester = strategy_backtest.OptionBacktester(
    data_path="results.xlsx",
    features=['VIX', 'Skew', 'Delta'],
    price_col='标的收盘价',
    fee_rate=0.0003,
    slippage=0.0001,
    allow_short=False 
)

# 生成信号规则配置
factor_rules = {
    'VIX': {
        'type': 'quantile',
        'window': 30,
        'threshold': (0.25, 0.75),
        'direction': 1  # VIX越高越看空
    },
    'Skew': {
        'type': 'std',
        'window': None,  # 全历史统计
        'threshold': (-1, 1),  # 上下1个标准差
        'direction': 0
    },
    'Delta': {
        'type': 'std',
        'window': None,  # 全历史统计
        'threshold': (-1, 1),  # 上下1个标准差
        'direction': 1
    }
}

# 生成信号
signals = backtester.generate_signal(factor_rules, combine_method='any')

# 执行回测
result = backtester.backtest(signals['combined'], stop_loss=0.05)
result.head()
# 绩效分析
perf_metrics, perf_df = backtester.analyze_performance(result)
print(pd.Series(perf_metrics))

# 可视化
backtester.plot_distribution()
backtester.plot_results(perf_df)