## 0. Notebook 结构与今天的目标

先简单规划一下这个 Notebook 的结构，避免后面越写越乱：

1. **导入依赖 + 全局配置**：把常用的包一次性导进来，顺便设置好显示参数和数据路径。
2. **回测相关的数据类（dataclasses）**：用几个 dataclass 把配置、交易记录、统计结果这些结构固定下来，这样后面的函数输入输出会比较清晰。
3. **加载和检查价格数据**：从 parquet 里读出标准化后的 K 线，先确认数据没有问题。
4. **构造/加载情绪时间序列**：目前先用随机情绪模拟，后面再接 LangChain + LLM 的真实情绪接口。
5. **情绪数据标准化 + 价格-情绪对齐**：统一情绪的 DataFrame 格式，然后用 merge_asof 按时间对齐到价格上，避免未来信息泄露。
6. **根据情绪生成多空交易信号**：比如情绪>阈值就做多，<负阈值就做空，中间就空仓观望。
7. **回测执行引擎**：在时间序列上“走一遍”，根据价格和信号更新仓位和资金曲线，并记录每一笔交易。
8. **计算风险收益指标**：比如总收益、年化收益、波动率、夏普率、最大回撤、胜率和交易次数。
9. **基准策略（Buy & Hold）**：用同样的数据跑一条简单的长持策略，作为参照。
10. **统一输出结构**：把策略和基准的结果封装成一个结构，方便前端画图以及和后续模型做对比。
11. **端到端小实验**：用一个真实的 parquet + 模拟情绪，从头到尾跑一次，看最后的曲线和指标是否合理。

今天的目标是先把 1~5 步写好并跑通，后面再逐步填完整个回测逻辑。

# 情绪驱动量化回测管线（Draft）

这里是我这门课的一个小项目，主要是想验证：用新闻情绪这种“软信息”，能不能在历史行情上做出一个还算说得过去的交易策略。

这个 Notebook 的目标是把整个思路先跑通一遍：

1. 读入之前用 `klineAcquision.py` 抓下来的标准化 K 线数据（parquet 格式）。
2. 先用“模拟情绪”替代真实 LLM 情绪分数，把整体回测框架搭起来。
3. 把价格和情绪按时间对齐，过程中注意避免用到未来信息（Look-ahead Bias）。
4. 根据情绪因子，生成一个简单的多空信号（比如情绪很正就做多，很负就做空，中性就观望）。
5. 用这个信号在历史 K 线上做回测，看资金曲线和一些常见指标（收益率、夏普率、最大回撤等）。
6. 同时跑一条“什么都不干，只是长持”的基准策略，对比一下谁更好。
7. 最后把这些结果整理成一个比较干净的输出结构，方便后续接到前端可视化或者报告里。

说明：
- Notebook 里的自然语言说明尽量用中文，方便我自己以后回看；
- 函数的 docstring 会用英文写，方便后续在报告或代码里复用。

In [4]:
# 1. Import libraries and global config

import os
from dataclasses import dataclass
from typing import List, Optional, Dict, Any, Tuple

import numpy as np
import pandas as pd

# 全局显示设置（可根据需要调整）
pd.set_option("display.max_rows", 10)
pd.set_option("display.max_columns", 20)
pd.set_option("display.width", 120)

# 数据路径配置：根据你实际的 parquet / ndjson 保存路径调整
DATA_DIR = "./"  # 比如就用当前目录，如果有子目录可以改成 "./data"

## 2. 用 dataclass 固定回测相关的数据结构


这里先把整个回测里会反复用到的一些结构整理一下，用 `dataclass` 写出来：


- `BacktestConfig`：策略本身的配置，比如手续费、是否允许做空、情绪阈值等；
- `BenchmarkConfig`：基准策略（长持）的配置，通常会简单很多；
- `TradeRecord`：记录每一笔交易的关键信息，方便后面分析；
- `BacktestStats`：把回测结果压成一组常见指标，类似一个“小报告”；
- `BacktestResult`：包含资金曲线 + 交易明细 + 指标；
- `BacktestCurve`：给前端画图时用的一个小包装结构；
- `StrategyVsBenchmark` 和 `BacktestOutput`：把策略和基准的表现放在一起对比。


先把这些壳子搭好，后面实现函数的时候可以直接用这些类型作为输入输出。

In [None]:
# 2. Dataclasses for backtest configuration and results


@dataclass
class BacktestConfig:
    """
    Configuration for sentiment-driven trading backtest.
    """
    symbol: str
    fee_rate: float = 0.001  # 交易手续费比例，比如 0.1%
    allow_short: bool = True  # 是否允许做空
    max_position_holding_bars: Optional[int] = None  # 持仓最长 bar 数，None 表示不限制
    sentiment_long_threshold: float = 0.5  # 情绪大于这个值就考虑做多
    sentiment_short_threshold: float = -0.5  # 情绪小于这个值就考虑做空
    initial_capital: float = 100_000.0  # 初始资金，先随便定一个数
    price_column: str = "close"  # 用哪个价格列来做回测（默认用收盘价）




@dataclass
class BenchmarkConfig:
    """
    Configuration for benchmark (buy & hold) backtest.
    """
    symbol: str
    price_column: str = "close"




@dataclass
class TradeRecord:
    """
    Single trade record.
    """
    entry_time: pd.Timestamp
    exit_time: pd.Timestamp
    direction: int  # 1 for long, -1 for short
    entry_price: float
    exit_price: float
    pnl: float
    return_pct: float




@dataclass
class BacktestStats:
    """
    Summary statistics for a backtest.
    """
    total_return: float
    annualized_return: float
    volatility: float
    sharpe_ratio: float
    max_drawdown: float
    win_rate: float
    num_trades: int




@dataclass
class BacktestResult:
    """
    Detailed backtest result including equity curve and trades.
    """
    equity_curve: pd.Series
    trades: List[TradeRecord]
    stats: BacktestStats




@dataclass
class BacktestCurve:
    """
    Wrapper for equity curve with metadata for plotting.
    """
    symbol: str
    curve: pd.Series
    label: str




@dataclass
class StrategyVsBenchmark:
    """
    Comparison between strategy and benchmark results.
    """
    strategy: BacktestResult
    benchmark: BacktestResult
    strategy_curve: BacktestCurve
    benchmark_curve: BacktestCurve




@dataclass
class BacktestOutput:
    """
    High-level backtest output for front-end visualization.
    """
    symbol: str
    timeframe: str
    comparison: StrategyVsBenchmark
    extra_info: Optional[Dict[str, Any]] = None

## 3. 从 parquet 里把价格数据读出来（先用现成的数据）


在正式跑回测前，先把之前写好的 `klineAcquision.py` 生成的 parquet 文件读进来，简单检查一下：


- 预期字段：`timestamp`, `open`, `high`, `low`, `close`, `volume`, `symbol`, `timeframe`, `datetime`, `date`；
- 这里假设 `timestamp` 最终会作为 DatetimeIndex 使用；
- 先写一个通用的加载函数，以后如果改了文件名或路径，只要改参数就行。


暂时我会留一个 `example_parquet_path` 变量，等真正有文件的时候再填进去。

In [None]:
# 3. 从 parquet 文件加载价格数据


def load_price_data_from_parquet(
    filepath: str,
    price_column: str = "close",
) -> pd.DataFrame:
    """
    Load standardized OHLCV price data from a parquet file.


    Parameters
    ----------
    filepath : str
        Path to the parquet file.
    price_column : str
        Name of the price column used for backtesting.


    Returns
    -------
    pd.DataFrame
        Price DataFrame with DatetimeIndex.
    """
    df = pd.read_parquet(filepath)


    # 如果还没有把 timestamp 设成索引，这里顺便处理一下
    if "timestamp" in df.columns:
        df = df.sort_values("timestamp")
        df = df.set_index("timestamp")


    # 确保 index 是 DatetimeIndex（且是 UTC），方便后面和情绪数据 merge
    if not isinstance(df.index, pd.DatetimeIndex):
        df.index = pd.to_datetime(df.index, utc=True)


    if price_column not in df.columns:
        raise ValueError(f"Price column '{price_column}' not found in DataFrame.")


    return df




# 这里先预留一个示例路径：真正跑的时候记得把路径改成你自己的 parquet 文件名
example_parquet_path = os.path.join(DATA_DIR, "kline_BTCUSDT_1m_2025-12-01.parquet")  # e.g. os.path.join(DATA_DIR, "kline_ETHUSDT_1h_2025-11-26.parquet")


if example_parquet_path is not None and os.path.exists(example_parquet_path):
    price_df_example = load_price_data_from_parquet(example_parquet_path)
    print("价格数据示例：")
    print(price_df_example.head())
else:
    print("暂时还没有设置 example_parquet_path，等有具体文件名再填进去重新运行这个 Cell。")

价格数据示例：
                               open      high       low     close   volume    symbol timeframe  \
timestamp                                                                                        
2025-11-30 16:36:00+00:00  91689.94  91697.87  91665.16  91689.96  5.13384  BTC/USDT        1m   
2025-11-30 16:37:00+00:00  91689.96  91689.97  91667.22  91667.23  1.45994  BTC/USDT        1m   
2025-11-30 16:38:00+00:00  91667.23  91681.42  91667.22  91680.01  2.21934  BTC/USDT        1m   
2025-11-30 16:39:00+00:00  91680.01  91730.39  91680.00  91730.39  3.92809  BTC/USDT        1m   
2025-11-30 16:40:00+00:00  91730.39  91750.00  91730.38  91749.43  1.73131  BTC/USDT        1m   

                                           datetime        date  
timestamp                                                        
2025-11-30 16:36:00+00:00  2025-11-30T16:36:00+0000  2025-11-30  
2025-11-30 16:37:00+00:00  2025-11-30T16:37:00+0000  2025-11-30  
2025-11-30 16:38:00+00:00  2025-11-30T16:

## 4. 先用随机数模拟一条“情绪时间序列”


真实情况里，情绪分数应该来自 LLM 对新闻的打分。但在接口还没完全接上的阶段，先用随机情绪把后面的回测流程跑通：


- 输入：价格 DataFrame（索引为时间戳）、symbol 字符串；
- 输出：情绪 DataFrame，索引同样是时间戳，列里至少要有 `symbol` 和 `sentiment`；
- 随机分布：这里先用正态分布 N(0, 0.5)，后续可以根据实际情况调整；
- 为了可复现，加一个 `seed` 参数，这样每次运行结果一致。

In [None]:
# 4. 用随机数模拟情绪时间序列


def simulate_random_sentiment(
    price_df: pd.DataFrame,
    symbol: str,
    mu: float = 0.0,
    sigma: float = 0.5,
    seed: Optional[int] = 42,
) -> pd.DataFrame:
    """
    Simulate random sentiment time series aligned with price index.


    Parameters
    ----------
    price_df : pd.DataFrame
        Price data with DatetimeIndex.
    symbol : str
        Symbol name (e.g. 'ETH/USDT').
    mu : float
        Mean of the normal distribution.
    sigma : float
        Standard deviation of the normal distribution.
    seed : Optional[int]
        Random seed for reproducibility.


    Returns
    -------
    pd.DataFrame
        DataFrame with columns: ['symbol', 'sentiment'] and DatetimeIndex.
    """
    if seed is not None:
        np.random.seed(seed)


    n = len(price_df)
    sentiments = np.random.normal(loc=mu, scale=sigma, size=n)


    sentiment_df = pd.DataFrame(
        {
            "symbol": symbol,
            "sentiment": sentiments,
        },
        index=price_df.index,
    )
    sentiment_df.index.name = "timestamp"
    return sentiment_df




# 如果上面已经成功加载了 price_df_example，这里可以简单测一下模拟情绪的效果
if "price_df_example" in globals():
    simulated_sentiment = simulate_random_sentiment(price_df_example, symbol="ETH/USDT")
    print("模拟情绪示例：")
    print(simulated_sentiment.head())
else:
    print("还没有可用的价格数据示例，等设置好 example_parquet_path 再来测试这个函数。")

模拟情绪示例：
                             symbol  sentiment
timestamp                                     
2025-11-30 16:36:00+00:00  ETH/USDT   0.248357
2025-11-30 16:37:00+00:00  ETH/USDT  -0.069132
2025-11-30 16:38:00+00:00  ETH/USDT   0.323844
2025-11-30 16:39:00+00:00  ETH/USDT   0.761515
2025-11-30 16:40:00+00:00  ETH/USDT  -0.117077


## 5. 把情绪数据整理成统一格式，并和价格按时间对齐


为了后面少踩坑，先做两件事：


1. **标准化情绪 DataFrame**：不管后面情绪是随机生成的，还是 LLM 模型算出来的，我都希望最后拿到的 DataFrame 至少满足：
   - 索引是时间戳（`DatetimeIndex`），名字叫 `timestamp`；
   - 至少包含列：`symbol` 和 `sentiment`；
2. **和价格数据时间对齐**：
   - 不能用未来情绪信息，所以要把情绪向后对齐到价格 bar；
   - 这里用 `pd.merge_asof`，以价格为左表、情绪为右表，`direction='backward'`，意思是每根 K 线只看到当前或之前最新的一条情绪。

In [15]:
# 5. 情绪 DataFrame 标准化 + 时间对齐函数


def normalize_sentiment_df(
    sentiment_df: pd.DataFrame,
    default_symbol: Optional[str] = None,
) -> pd.DataFrame:
    """
    Normalize sentiment DataFrame schema.


    Expected output schema:
        index: DatetimeIndex named 'timestamp'
        columns: at least ['symbol', 'sentiment']
    """
    df = sentiment_df.copy()


    # 1) 处理索引为时间戳，如果还在列里，就先 set_index 一下
    if not isinstance(df.index, pd.DatetimeIndex):
        if "timestamp" in df.columns:
            df = df.set_index("timestamp")
        else:
            raise ValueError("Sentiment DataFrame must have DatetimeIndex or 'timestamp' column.")


    df.index = pd.to_datetime(df.index, utc=True)
    df.index.name = "timestamp"


    # 2) 确保有 symbol 列，如果没有就用默认 symbol 补一个
    if "symbol" not in df.columns:
        if default_symbol is None:
            raise ValueError(
                "Sentiment DataFrame has no 'symbol' column and no default_symbol provided.",
            )
        df["symbol"] = default_symbol


    # 3) 确保有 sentiment 列
    if "sentiment" not in df.columns:
        raise ValueError("Sentiment DataFrame must have a 'sentiment' column.")


    return df.sort_index()




def align_price_and_sentiment(
    price_df: pd.DataFrame,
    sentiment_df: pd.DataFrame,
    symbol: Optional[str] = None,
) -> pd.DataFrame:
    """
    Align price and sentiment by timestamp using backward merge (no look-ahead).


    Parameters
    ----------
    price_df : pd.DataFrame
        Price data with DatetimeIndex.
    sentiment_df : pd.DataFrame
        Sentiment data with DatetimeIndex and columns ['symbol', 'sentiment', ...].
    symbol : Optional[str]
        If provided, filter both price and sentiment by this symbol.


    Returns
    -------
    pd.DataFrame
        DataFrame with price columns and an extra 'sentiment' column aligned in time.
    """
    price = price_df.copy()
    sentiment = normalize_sentiment_df(sentiment_df, default_symbol=symbol)


    if symbol is not None:
        if "symbol" in price.columns:
            price = price[price["symbol"] == symbol]
        sentiment = sentiment[sentiment["symbol"] == symbol]


    price = price.sort_index()
    sentiment = sentiment.sort_index()


    merged = pd.merge_asof(
        left=price.reset_index().rename(columns={"index": "timestamp"}),
        right=sentiment.reset_index().rename(columns={"index": "timestamp"}),
        on="timestamp",
        by=None,  # 目前只考虑单一 symbol 的情况
        direction="backward",
    )


    merged = merged.set_index("timestamp")
    merged.index = pd.to_datetime(merged.index, utc=True)
    return merged




# 如果已经有 price_df_example 和 simulated_sentiment，可以简单看一下对齐后的结果长什么样
if "price_df_example" in globals() and "simulated_sentiment" in globals():
    merged_example = align_price_and_sentiment(
        price_df_example, simulated_sentiment, symbol=None
    )
    print(merged_example[["close", "sentiment"]].head())
else:
    print("当前还没有 price_df_example 或 simulated_sentiment，等两边都准备好以后再跑这个 Cell。")

                              close  sentiment
timestamp                                     
2025-11-30 16:36:00+00:00  91689.96   0.248357
2025-11-30 16:37:00+00:00  91667.23  -0.069132
2025-11-30 16:38:00+00:00  91680.01   0.323844
2025-11-30 16:39:00+00:00  91730.39   0.761515
2025-11-30 16:40:00+00:00  91749.43  -0.117077


## 6. 从情绪量化结果 CSV 读入真实情绪


上面用的是“随机情绪”来先把流程跑通，接下来接入真正的情绪量化结果。


按照作业要求，情绪 CSV 的结构大概是：


- `timestamp`：时间戳（字符串形式，需要转成时间）；
- `headline`：新闻标题/摘要；
- `score`：情绪分数，范围在 [-1.0, 1.0]；
- `reason`：模型给出的简短英文解释（最多一句）。


为了和后面的回测模块对接，这里写一个小函数：


- 从 CSV 读出这些字段；
- 把 `timestamp` 变成 DatetimeIndex；
- 把 `score` 重命名/复制成我们统一使用的 `sentiment` 列；
- 补上一个 `symbol` 列，方便和价格数据匹配；
- 保留 `headline` 和 `reason`，以后做可视化或诊断时会很有用。

In [16]:
# 6. 从 CSV 文件加载 LLM 情绪结果


def load_sentiment_from_csv(
    csv_path: str,
    symbol: str,
    timestamp_col: str = "timestamp",
) -> pd.DataFrame:
    """
    Load sentiment results from a CSV file produced by the LLM pipeline.


    Expected columns in CSV:
        - timestamp: str, time of the news
        - headline: str, news title or summary
        - score: float in [-1.0, 1.0]
        - reason: str, short explanation


    Output DataFrame schema:
        index: DatetimeIndex named 'timestamp'
        columns: ['symbol', 'sentiment', 'headline', 'reason']
    """
    df = pd.read_csv(csv_path)


    if timestamp_col not in df.columns:
        raise ValueError(f"Timestamp column '{timestamp_col}' not found in sentiment CSV.")


    # 把时间列变成 DatetimeIndex（默认按本地时区解析，再统一到 UTC）
    df[timestamp_col] = pd.to_datetime(df[timestamp_col], utc=True, errors="coerce")
    df = df.dropna(subset=[timestamp_col])
    df = df.set_index(timestamp_col)
    df.index.name = "timestamp"


    # 统一情绪列名：从 score 映射到 sentiment
    if "score" not in df.columns:
        raise ValueError("Sentiment CSV must contain a 'score' column.")


    df["sentiment"] = df["score"].astype(float)


    # 补上 symbol 列
    df["symbol"] = symbol


    # 如果没有 headline 或 reason，也不强制报错（方便调试）
    for col in ["headline", "reason"]:
        if col not in df.columns:
            df[col] = None


    return df.sort_index()




# 示例（等你真正有 CSV 时再取消注释运行）：
# sentiment_csv_path = os.path.join(DATA_DIR, "data/sentiment_results.csv")
# if os.path.exists(sentiment_csv_path):
#     sentiment_df_llm = load_sentiment_from_csv(sentiment_csv_path, symbol="BTC/USDT")
#     print("LLM 情绪数据示例：")
#     print(sentiment_df_llm.head())
# else:
#     print("还没有找到 data/sentiment_results.csv，后面接好 LLM 流水线后再来加载。")

## 7. 根据情绪生成多空交易信号


上面已经有了价格 + 情绪对齐后的 DataFrame，接下来要做的事情比较直接：


- 约定一个简单的规则：
  - 情绪 > `sentiment_long_threshold`：做多，信号记为 +1；
  - 情绪 < `sentiment_short_threshold`：做空，信号记为 -1（如果不允许做空，这部分可以强制改为 0）；
  - 介于两个阈值之间：观望，信号记为 0；
- 这里先不考虑太复杂的过滤（比如信号持仓时间、止损等），这些留到真正调参的时候再补充。


先写一个小函数，把对齐好的 DataFrame 和 `BacktestConfig` 作为输入，吐出一列 `signal`。

In [17]:
# 7. 根据信号规则生成多空交易信号（占位：实际实现见后续）


def normalize_sentiment_df(
    sentiment_df: pd.DataFrame,
    default_symbol: Optional[str] = None,
) -> pd.DataFrame:
    """
    Normalize sentiment DataFrame schema.


    Expected output schema:
        index: DatetimeIndex named 'timestamp'
        columns: at least ['symbol', 'sentiment']


    Notes
    -----
    - If 'sentiment' column is missing but 'score' exists,
      this function will create 'sentiment' from 'score'.
    - Extra columns like 'headline' or 'reason' are preserved.
    """
    df = sentiment_df.copy()


    # 1) 处理索引为时间戳，如果还在列里，就先 set_index 一下
    if not isinstance(df.index, pd.DatetimeIndex):
        if "timestamp" in df.columns:
            df = df.set_index("timestamp")
        else:
            raise ValueError("Sentiment DataFrame must have DatetimeIndex or 'timestamp' column.")


    df.index = pd.to_datetime(df.index, utc=True)
    df.index.name = "timestamp"


    # 2) 确保有 symbol 列，如果没有就用默认 symbol 补一个
    if "symbol" not in df.columns:
        if default_symbol is None:
            raise ValueError(
                "Sentiment DataFrame has no 'symbol' column and no default_symbol provided.",
            )
        df["symbol"] = default_symbol


    # 3) 确保有 sentiment 列：没有的话尝试从 score 映射
    if "sentiment" not in df.columns:
        if "score" in df.columns:
            df["sentiment"] = df["score"].astype(float)
        else:
            raise ValueError("Sentiment DataFrame must have a 'sentiment' or 'score' column.")


    return df.sort_index()




def align_price_and_sentiment(
    price_df: pd.DataFrame,
    sentiment_df: pd.DataFrame,
    symbol: Optional[str] = None,
) -> pd.DataFrame:
    """
    Align price and sentiment by timestamp using backward merge (no look-ahead).


    Parameters
    ----------
    price_df : pd.DataFrame
        Price data with DatetimeIndex.
    sentiment_df : pd.DataFrame
        Sentiment data with DatetimeIndex and columns ['symbol', 'sentiment', ...].
    symbol : Optional[str]
        If provided, filter both price and sentiment by this symbol.


    Returns
    -------
    pd.DataFrame
        DataFrame with price columns and an extra 'sentiment' column aligned in time.
    """
    price = price_df.copy()
    sentiment = normalize_sentiment_df(sentiment_df, default_symbol=symbol)


    if symbol is not None:
        if "symbol" in price.columns:
            price = price[price["symbol"] == symbol]
        sentiment = sentiment[sentiment["symbol"] == symbol]


    price = price.sort_index()
    sentiment = sentiment.sort_index()


    merged = pd.merge_asof(
        left=price.reset_index().rename(columns={"index": "timestamp"}),
        right=sentiment.reset_index().rename(columns={"index": "timestamp"}),
        on="timestamp",
        by=None,  # 目前只考虑单一 symbol 的情况
        direction="backward",
    )


    merged = merged.set_index("timestamp")
    merged.index = pd.to_datetime(merged.index, utc=True)
    return merged
def generate_trading_signals(
    merged_df: pd.DataFrame,
    config: BacktestConfig,
) -> pd.Series:
    """
    Generate long/short/flat trading signals based on sentiment.

    Rules
    -----
    - sentiment > config.sentiment_long_threshold  -> +1 (long)
    - sentiment < config.sentiment_short_threshold -> -1 (short, if allow_short else 0)
    - otherwise                                    -> 0 (flat)

    Parameters
    ----------
    merged_df : pd.DataFrame
        DataFrame after aligning price and sentiment. Must contain a 'sentiment' column
        and DatetimeIndex.
    config : BacktestConfig
        Backtest configuration with sentiment thresholds and allow_short flag.

    Returns
    -------
    pd.Series
        Signal series indexed by time, values in {-1, 0, 1}.
    """
    if "sentiment" not in merged_df.columns:
        raise ValueError("merged_df must contain a 'sentiment' column to generate signals.")

    sentiment = merged_df["sentiment"].astype(float)

    long_th = config.sentiment_long_threshold
    short_th = config.sentiment_short_threshold

    signal = pd.Series(0, index=merged_df.index, dtype=int)

    signal[sentiment > long_th] = 1
    if config.allow_short:
        signal[sentiment < short_th] = -1
    else:
        signal[sentiment < short_th] = 0

    return signal




# 如果已经有 price_df_example 和 simulated_sentiment，可以简单看一下对齐后的结果长什么样
if "price_df_example" in globals() and "simulated_sentiment" in globals():
    merged_example = align_price_and_sentiment(
        price_df_example, simulated_sentiment, symbol=None
    )
    print(merged_example[["close", "sentiment"]].head())
else:
    print("当前还没有 price_df_example 或 simulated_sentiment，等两边都准备好以后再跑这个 Cell。")

                              close  sentiment
timestamp                                     
2025-11-30 16:36:00+00:00  91689.96   0.248357
2025-11-30 16:37:00+00:00  91667.23  -0.069132
2025-11-30 16:38:00+00:00  91680.01   0.323844
2025-11-30 16:39:00+00:00  91730.39   0.761515
2025-11-30 16:40:00+00:00  91749.43  -0.117077


## 8. 写一个最小可用的回测引擎（bar by bar）


接下来是整条链路里最“体力活”的部分：


- 输入：
  - 价格 DataFrame（通常就是前面合并好的 `merged_df`，里面至少要有价格列，比如 `close`）；
  - 信号 Series（索引是时间戳，取值在 {-1, 0, 1} 之间）；
  - `BacktestConfig`（里面有初始资金、手续费、是否允许做空等参数）；
- 输出：
  - 一个资金曲线（每个时间点对应一个账户总权益）；
  - 一组交易记录（什么时候开仓，什么时候平仓，赚赔多少）。


为了简单起见，这里先做一些“学生版本”的假设：


- 每次信号变化，就在下一根 bar 的开盘价执行交易（实际中会更复杂，这里先不深究）；
- 仓位只有三种状态：满仓做多、满仓做空、空仓（不做任何事）；
- 不考虑滑点，只在下单时收一次手续费（按名义成交金额乘以一个费率）。


等这个最小版本能正常跑通，再考虑扩展更多细节（比如杠杆、止损、持仓上限等）。

In [18]:
# 8. 最小版本的回测执行函数


def execute_backtest(
    price_df: pd.DataFrame,
    signal: pd.Series,
    config: BacktestConfig,
) -> BacktestResult:
    """
    Execute a simple bar-by-bar backtest using sentiment-based signals.


    Notes
    -----
    - Position can be -1, 0, or +1 (short / flat / long).
    - Trades are executed at the next bar's close price for simplicity.
    - Fee is charged on notional value when position changes.
    """
    # 对齐索引，确保价格和信号是一一对应的
    signal = signal.reindex(price_df.index).fillna(0).astype(int)


    price_col = config.price_column
    if price_col not in price_df.columns:
        raise ValueError(f"Price column '{price_col}' not found in price DataFrame.")


    prices = price_df[price_col].astype(float)


    equity = []  # 资金曲线
    times = []   # 时间索引
    trades: list[TradeRecord] = []


    cash = config.initial_capital
    position = 0  # 当前仓位方向：-1, 0, 1
    entry_price = None
    entry_time = None


    prev_signal = 0


    for t, price in prices.items():
        current_signal = signal.loc[t]


        # 如果信号变化了，说明需要在当前 bar 收盘价平掉旧仓 + 建新仓
        if current_signal != prev_signal:
            # 1) 先平掉旧仓
            if position != 0 and entry_price is not None and entry_time is not None:
                # 名义仓位大小：这里简单假设满仓，即用全部资金买/卖
                notional = cash
                # 多头盈利： (price / entry_price - 1) * notional
                # 空头盈利： (entry_price / price - 1) * notional
                if position == 1:
                    ret = price / entry_price - 1.0
                else:  # position == -1
                    ret = entry_price / price - 1.0


                pnl = notional * ret


                # 收一次手续费：按两边成交金额的和简单估一下
                fee = abs(notional) * config.fee_rate
                cash = cash + pnl - fee


                trades.append(
                    TradeRecord(
                        entry_time=entry_time,
                        exit_time=t,
                        direction=position,
                        entry_price=float(entry_price),
                        exit_price=float(price),
                        pnl=float(pnl - fee),
                        return_pct=float(ret),
                    )
                )


                position = 0
                entry_price = None
                entry_time = None


            # 2) 再按新信号开仓
            if current_signal != 0:
                position = int(current_signal)
                entry_price = price
                entry_time = t


                # 开仓也要收一次手续费
                fee_open = cash * config.fee_rate
                cash -= fee_open


        # 持仓期间的“浮动权益”：
        if position == 0 or entry_price is None:
            equity_value = cash
        else:
            notional = cash
            if position == 1:
                ret = price / entry_price - 1.0
            else:
                ret = entry_price / price - 1.0
            equity_value = cash + notional * ret


        times.append(t)
        equity.append(equity_value)
        prev_signal = current_signal


    equity_series = pd.Series(equity, index=pd.DatetimeIndex(times, tz="UTC"))


    # 简单计算统计指标
    stats = compute_backtest_stats(equity_series, trades)


    return BacktestResult(
        equity_curve=equity_series,
        trades=trades,
        stats=stats,
    )

## 8. 给回测结果算几项常见指标

上面的引擎能算出一条资金曲线 + 若干笔交易记录，接下来要把它们压成几项常见的数字指标，方便和基准策略对比：

- 总收益率（从初始资金到最后一天的整体收益）；
- 年化收益率（假设一年大概有多少个 bar，然后折算）；
- 年化波动率（资金曲线的收益率标准差 * sqrt(年度 bar 数)）；
- 夏普率（简单用 `年化收益 / 年化波动`，风险自由利率先默认 0）；
- 最大回撤（从历史最高点到之后低点的最大跌幅）；
- 胜率和总交易次数（从 `TradeRecord` 里算）。

这些指标以后可以细调，这里先写一个“能用”的版本。

In [19]:
# 8. 回测结果的统计指标计算


def _estimate_bars_per_year(index: pd.DatetimeIndex) -> float:
    """Estimate how many bars per year based on the index frequency."""
    if len(index) < 2:
        return 252.0  # 随便给个默认值


    total_days = (index[-1] - index[0]).total_seconds() / (3600 * 24)
    if total_days <= 0:
        return 252.0


    bars_per_day = len(index) / total_days
    return bars_per_day * 252.0



def compute_backtest_stats(
    equity_curve: pd.Series,
    trades: list[TradeRecord],
) -> BacktestStats:
    """Compute basic performance statistics from equity curve and trades."""
    equity_curve = equity_curve.sort_index()
    returns = equity_curve.pct_change().dropna()


    if len(equity_curve) == 0:
        return BacktestStats(
            total_return=0.0,
            annualized_return=0.0,
            volatility=0.0,
            sharpe_ratio=0.0,
            max_drawdown=0.0,
            win_rate=0.0,
            num_trades=0,
        )


    total_return = float(equity_curve.iloc[-1] / equity_curve.iloc[0] - 1.0)


    bars_per_year = _estimate_bars_per_year(equity_curve.index)


    avg_ret = returns.mean()
    vol = returns.std()


    if vol > 0:
        sharpe = float((avg_ret * bars_per_year**0.5) / vol)
    else:
        sharpe = 0.0


    # 年化收益（简单版本）：(1 + 日均收益)^(bars_per_year) - 1
    annualized_return = float((1.0 + avg_ret) ** bars_per_year - 1.0)
    volatility = float(vol * (bars_per_year**0.5))


    # 最大回撤
    running_max = equity_curve.cummax()
    drawdown = equity_curve / running_max - 1.0
    max_drawdown = float(drawdown.min())


    # 胜率和交易次数
    num_trades = len(trades)
    if num_trades > 0:
        wins = [1 for tr in trades if tr.pnl > 0]
        win_rate = float(len(wins) / num_trades)
    else:
        win_rate = 0.0


    return BacktestStats(
        total_return=total_return,
        annualized_return=annualized_return,
        volatility=volatility,
        sharpe_ratio=sharpe,
        max_drawdown=max_drawdown,
        win_rate=win_rate,
        num_trades=num_trades,
    )

## 9. 给自己准备一条“长持基准”策略


为了判断情绪策略到底有没有价值，光看它自己好不好是不够的，还需要一个对比对象。


最常见的做法之一就是：


- 基准策略：从第一个 bar 开始买入并一直持有（中间不换手、不择时），观测到最后一个 bar。


在实现上，其实可以直接用同一条价格曲线，模拟一个“始终持有 1 单位标的”的资金曲线，然后用刚才的 `compute_backtest_stats` 函数算指标。


这里先写一个简化版本的基准回测函数。

In [20]:
# 9. 基准策略（Buy & Hold）回测


def run_benchmark_buy_and_hold(
    price_df: pd.DataFrame,
    config: BenchmarkConfig,
    initial_capital: float,
) -> BacktestResult:
    """Run a simple buy-and-hold benchmark strategy."""
    price_col = config.price_column
    if price_col not in price_df.columns:
        raise ValueError(f"Price column '{price_col}' not found in DataFrame.")


    prices = price_df[price_col].astype(float).sort_index()


    if len(prices) == 0:
        empty_equity = pd.Series([], dtype=float)
        empty_trades: list[TradeRecord] = []
        stats = compute_backtest_stats(empty_equity, empty_trades)
        return BacktestResult(empty_equity, empty_trades, stats)


    # 简单假设：一开始用全部资金买入，之后一直持有
    entry_time = prices.index[0]
    entry_price = prices.iloc[0]


    # 持有的份额
    qty = initial_capital / entry_price


    equity = qty * prices


    # 生成一笔“从头持有到尾”的交易记录
    exit_time = prices.index[-1]
    exit_price = prices.iloc[-1]


    pnl = qty * (exit_price - entry_price)
    ret_pct = float(exit_price / entry_price - 1.0)


    trade = TradeRecord(
        entry_time=entry_time,
        exit_time=exit_time,
        direction=1,
        entry_price=float(entry_price),
        exit_price=float(exit_price),
        pnl=float(pnl),
        return_pct=ret_pct,
    )


    stats = compute_backtest_stats(equity, [trade])


    return BacktestResult(
        equity_curve=equity,
        trades=[trade],
        stats=stats,
    )

## 10. 从回测结果构造前端需要的收益对比数据

这里按照前端给定的接口，把情绪策略 vs 长持基准这两条收益曲线，整理成前端可以直接使用的 `dataPoints` 列表。

约定说明：

- 对于每个时间点 `t`：
  - `realPrice`：长持基准的累计收益（从初始资金开始算的收益率）；
  - `predictionPrice`：情绪策略的累计收益；
  - `percentDifference`：两者的“超额收益”，定义为 `predictionPrice - realPrice`；
- `time` 字段会根据点的数量自动选择粒度：
  - 若点数 \(\le 24\)：使用小时级，格式为 `"HH:00"`；
  - 若点数 \(\le 30\)：使用日级，格式为 `"YYYY-MM-DD"`；
  - 否则使用周级，格式为 `"Week of YYYY-MM-DD"`（按周起始日聚合）。

下面的函数不会修改任何已有逻辑，只是基于 `BacktestOutput` 生成一个供前端消费的 Python 列表。

In [21]:
# 10.1 工具函数：从 BacktestOutput 构造前端 ChartDataPoint 列表

from typing import Literal


def _choose_time_granularity(index: pd.DatetimeIndex) -> Literal["hour", "day", "week"]:
    """Choose time granularity based on number of data points.

    This follows the front-end rule:
        - <= 24 points: hourly
        - <= 30 points: daily
        - otherwise: weekly
    """
    n = len(index)
    if n <= 24:
        return "hour"
    if n <= 30:
        return "day"
    return "week"


def build_chart_datapoints_from_output(
    output: BacktestOutput,
) -> list[dict]:
    """Build chart datapoints for front-end from BacktestOutput.

    For each time point t:
        - realPrice: benchmark cumulative return since start
        - predictionPrice: strategy cumulative return since start
        - percentDifference: predictionPrice - realPrice

    The `time` field will be formatted according to the
    chosen granularity (hour/day/week).
    """
    strategy_curve = output.comparison.strategy.equity_curve.sort_index()
    bench_curve = output.comparison.benchmark.equity_curve.sort_index()

    # 对齐时间轴
    joined = pd.concat([strategy_curve, bench_curve], axis=1, join="inner")
    joined.columns = ["strategy_equity", "benchmark_equity"]

    # 累计收益率曲线（从 0 开始）
    s0 = joined["strategy_equity"].iloc[0]
    b0 = joined["benchmark_equity"].iloc[0]

    strategy_ret = joined["strategy_equity"] / s0 - 1.0
    benchmark_ret = joined["benchmark_equity"] / b0 - 1.0

    # 按时间粒度重采样
    granularity = _choose_time_granularity(joined.index)

    if granularity == "hour":
        rule = "1H"
    elif granularity == "day":
        rule = "1D"
    else:  # "week"
        # 以周一为起始的一周
        rule = "W-MON"

    strategy_ret_rs = strategy_ret.resample(rule).last().dropna()
    benchmark_ret_rs = benchmark_ret.resample(rule).last().dropna()

    # 再次对齐，保证索引完全一致
    ret_df = pd.concat([strategy_ret_rs, benchmark_ret_rs], axis=1, join="inner")
    ret_df.columns = ["strategy_ret", "benchmark_ret"]

    datapoints: list[dict] = []

    for ts, row in ret_df.iterrows():
        s_ret = float(row["strategy_ret"])
        b_ret = float(row["benchmark_ret"])
        diff = s_ret - b_ret

        # 根据粒度格式化 time 字段
        if granularity == "hour":
            time_str = ts.strftime("%H:00")
        elif granularity == "day":
            time_str = ts.strftime("%Y-%m-%d")
        else:  # week
            time_str = "Week of " + ts.strftime("%Y-%m-%d")

        datapoints.append(
            {
                "time": time_str,
                "realPrice": b_ret,
                "predictionPrice": s_ret,
                "percentDifference": diff,
            }
        )

    return datapoints


# 示例：在已经跑完 demo 回测、拿到 output 的情况下，可以调用：
# if "output" in globals():
#     chart_data = build_chart_datapoints_from_output(output)
#     print(chart_data[:5])  # 看看前 5 个点长什么样
# else:
#     print("当前还没有 BacktestOutput 对象，先跑一遍回测 demo 再来构造 chart 数据。")

In [24]:
# 10. 顶层封装：一次性跑完情绪策略 vs 基准

def run_backtest_with_benchmark(
    price_df: pd.DataFrame,
    sentiment_df: pd.DataFrame,
    symbol: str,
    timeframe: str,
    strategy_config: BacktestConfig,
) -> BacktestOutput:
    """Run sentiment-driven strategy and benchmark, and pack results together."""
    # 1) 对齐价格和情绪
    merged = align_price_and_sentiment(price_df, sentiment_df, symbol=None)

    # 2) 生成交易信号
    signal = generate_trading_signals(merged, strategy_config)

    # 3) 跑情绪策略回测
    strategy_result = execute_backtest(merged, signal, strategy_config)

    # 4) 跑基准策略回测
    benchmark_cfg = BenchmarkConfig(symbol=symbol, price_column=strategy_config.price_column)
    benchmark_result = run_benchmark_buy_and_hold(
        price_df=price_df,
        config=benchmark_cfg,
        initial_capital=strategy_config.initial_capital,
    )

    # 5) 包装成曲线对象
    strategy_curve = BacktestCurve(
        symbol=symbol,
        curve=strategy_result.equity_curve,
        label=f"Sentiment strategy ({timeframe})",
    )

    benchmark_curve = BacktestCurve(
        symbol=symbol,
        curve=benchmark_result.equity_curve,
        label=f"Buy & Hold ({timeframe})",
    )

    comparison = StrategyVsBenchmark(
        strategy=strategy_result,
        benchmark=benchmark_result,
        strategy_curve=strategy_curve,
        benchmark_curve=benchmark_curve,
    )

    return BacktestOutput(
        symbol=symbol,
        timeframe=timeframe,
        comparison=comparison,
        extra_info=None,
    )


# 11. 一个端到端的小 demo（在有真实数据时再运行）

if "price_df_example" in globals():
    # 1) 用随机情绪生成一条情绪时间序列
    demo_sentiment = simulate_random_sentiment(price_df_example, symbol="BTC/USDT")

    # 2) 配一个相对保守一点的策略参数
    demo_config = BacktestConfig(
        symbol="BTC/USDT",
        fee_rate=0.001,
        allow_short=True,
        sentiment_long_threshold=0.5,
        sentiment_short_threshold=-0.5,
        initial_capital=100_000.0,
        price_column="close",
    )

    # 3) 跑情绪策略 vs 长持基准
    output = run_backtest_with_benchmark(
        price_df=price_df_example,
        sentiment_df=demo_sentiment,
        symbol="BTC/USDT",
        timeframe="1m",
        strategy_config=demo_config,
    )

    print("情绪策略统计指标：")
    print(output.comparison.strategy.stats)
    print("\n长持基准统计指标：")
    print(output.comparison.benchmark.stats)
    print("回测时间范围：", price_df_example.index[0], "到", price_df_example.index[-1])
    print("情绪策略交易次数：", len(output.comparison.strategy.trades))
    for i, tr in enumerate(output.comparison.strategy.trades, 1):
        print(f"第 {i} 笔：开仓 {tr.entry_time}，平仓 {tr.exit_time}，方向 {tr.direction}，收益 {tr.pnl:.2f}")
else:
    print("当前 Notebook 还没有可用的 price_df_example，等加载好实际 K 线数据再跑这个 demo。")


情绪策略统计指标：
BacktestStats(total_return=-0.4091585203218199, annualized_return=-1.0, volatility=0.41834493212642143, sharpe_ratio=-457.02662097434046, max_drawdown=-0.4091585203218199, win_rate=0.06177606177606178, num_trades=259)

长持基准统计指标：
BacktestStats(total_return=-0.054499532991398425, annualized_return=-0.9999999984695063, volatility=0.39806523317380743, sharpe_ratio=-50.989402460101886, max_drawdown=-0.06731605879150782, win_rate=0.0, num_trades=1)
回测时间范围： 2025-11-30 16:36:00+00:00 到 2025-12-01 09:15:00+00:00
情绪策略交易次数： 259
第 1 笔：开仓 2025-11-30 16:39:00+00:00，平仓 2025-11-30 16:40:00+00:00，方向 1，收益 -79.16
第 2 笔：开仓 2025-11-30 16:42:00+00:00，平仓 2025-11-30 16:43:00+00:00，方向 1，收益 -122.53
第 3 笔：开仓 2025-11-30 16:49:00+00:00，平仓 2025-11-30 16:51:00+00:00，方向 -1，收益 -121.16
第 4 笔：开仓 2025-11-30 16:52:00+00:00，平仓 2025-11-30 16:53:00+00:00，方向 -1，收益 -52.74
第 5 笔：开仓 2025-11-30 16:55:00+00:00，平仓 2025-11-30 16:56:00+00:00，方向 -1，收益 -59.12
第 6 笔：开仓 2025-11-30 16:56:00+00:00，平仓 2025-11-30 16:57:00+00:00，方向 