# 网球比赛动量预测系统

本项目基于 ATP 比赛的逐分数据（PBP），完成以下任务：

- ✅ 高精度解析含 `;`, `.`, `/` 的 PBP 字符串
- ✅ 构建动量指标并融入特征
- ✅ 实现 LSTM / GRU / Transformer 模型
- ✅ 支持**多步预测**（未来第1/2/3分）
- ✅ 提供**交互式仪表盘**：可切换比赛、模型、窗口大小


## 步骤 1：数据清洗（Data Cleaning）

加载原始 PBP 数据，实现高精度解析器，支持：
- `;`：局结束（非抢七换发）
- `.`：盘结束（换发）
- `/`：抢七中每2分换发

In [4]:
import pandas as pd
import os
import numpy as np

DATA_DIR = "data/raw"
os.makedirs("data/processed", exist_ok=True)

# 使用pbp_matches_atp_main_archive.csv 和 pbp_matches_atp_main_current.csv 

files = [
    os.path.join(DATA_DIR, "pbp_matches_atp_main_archive.csv"),
    os.path.join(DATA_DIR, "pbp_matches_atp_main_current.csv")
]

df_list = []
use_real = False
for f in files:
    if os.path.exists(f):
        df_temp = pd.read_csv(f)
        if 'tour' in df_temp.columns:
            df_temp = df_temp[df_temp['tour'] == 'ATP']
            df_list.append(df_temp)
            use_real = True

df_raw = pd.concat(df_list, ignore_index=True)

print(f" 原始数据：{len(df_raw)} 场 ATP 比赛")

 原始数据：13050 场 ATP 比赛


In [5]:
def parse_pbp_detailed(pbp_str, server1_first=True):
    points = []
    set_idx = 0
    game_idx = 0
    server_id = 0 if server1_first else 1
    is_tiebreak = False
    game_score = [0, 0]
    
    for char in str(pbp_str):
        if char == '.':
            set_idx += 1
            game_idx = 0
            game_score = [0, 0]
            server_id = 1 - server_id
            is_tiebreak = False
            continue
            
        elif char == ';':
            game_idx += 1
            game_score = [0, 0]
            if not is_tiebreak:
                server_id = 1 - server_id
            continue
            
        elif char == '/':
            server_id = 1 - server_id
            continue
            
        elif char in ['S', 'R', 'A', 'D']:
            points.append({
                'point_char': char,
                'server_id': server_id,
                'set_idx': set_idx,
                'game_idx': game_idx,
                'is_tiebreak': is_tiebreak,
                'score_in_game': tuple(game_score)
            })
            
            if char in ['S', 'A']:
                game_score[0] += 1
            else:
                game_score[1] += 1
                
            if not is_tiebreak and game_idx >= 12 and game_score[0] == game_score[1]:
                is_tiebreak = True
    return points

df_raw['parsed_points'] = df_raw['pbp'].apply(lambda x: parse_pbp_detailed(str(x)) if pd.notna(x) else [])
df_raw['n_points'] = df_raw['parsed_points'].apply(len)

df_clean = df_raw[df_raw['n_points'] >= 20].copy()
print(f" 清洗后：{len(df_clean)} 场有效比赛")

 清洗后：13050 场有效比赛


## 步骤 2：特征工程（Feature Engineering）

构建序列特征 + 动量标量特征，支持单步（horizon=1）和多步（horizon=3）预测。

In [6]:
def build_features(points, window_size=10, horizon=1):
    if len(points) <= window_size + horizon - 1:
        return None, None, None
    
    labels = [1 if p['point_char'] in ['S','A'] else 0 for p in points]
    serves = [p['server_id'] for p in points]
    is_ace_df = [1 if p['point_char'] in ['A','D'] else 0 for p in points]
    
    seq_X, scalar_X, y = [], [], []
    
    for i in range(window_size, len(labels) - horizon + 1):
        past_labels = labels[i-window_size:i]
        past_serves = serves[i-window_size:i]
        past_adf = is_ace_df[i-window_size:i]
        seq_feat = np.column_stack([past_labels, past_serves, past_adf])
        
        recent_wins = sum(labels[max(0, i-3):i])
        momentum = recent_wins / min(3, i)
        scalar_feat = np.array([momentum])
        
        target = labels[i:i+horizon] if horizon > 1 else labels[i]
        
        seq_X.append(seq_feat)
        scalar_X.append(scalar_feat)
        y.append(target)
    
    return np.array(seq_X), np.array(scalar_X), np.array(y)

## 步骤 3：模型构建（Model Construction）

实现四种模型：LSTM / GRU / Transformer / MultiStep-LSTM

In [7]:
import torch
import torch.nn as nn

class TennisLSTM(nn.Module):
    def __init__(self, seq_dim=3, scalar_dim=1, hidden=64, layers=2):
        super().__init__()
        self.lstm = nn.LSTM(seq_dim, hidden, layers, batch_first=True)
        self.scalar_fc = nn.Linear(scalar_dim, hidden)
        self.fc = nn.Linear(hidden * 2, 1)
        self.sigmoid = nn.Sigmoid()
    def forward(self, seq_x, scal_x):
        lstm_out, _ = self.lstm(seq_x)
        feat_seq = lstm_out[:, -1, :]
        feat_scal = self.scalar_fc(scal_x)
        out = self.fc(torch.cat([feat_seq, feat_scal], dim=1))
        return self.sigmoid(out)

class TennisGRU(nn.Module):
    def __init__(self, seq_dim=3, scalar_dim=1, hidden=64, layers=2):
        super().__init__()
        self.gru = nn.GRU(seq_dim, hidden, layers, batch_first=True)
        self.scalar_fc = nn.Linear(scalar_dim, hidden)
        self.fc = nn.Linear(hidden * 2, 1)
        self.sigmoid = nn.Sigmoid()
    def forward(self, seq_x, scal_x):
        gru_out, _ = self.gru(seq_x)
        feat_seq = gru_out[:, -1, :]
        feat_scal = self.scalar_fc(scal_x)
        out = self.fc(torch.cat([feat_seq, feat_scal], dim=1))
        return self.sigmoid(out)

class TennisTransformer(nn.Module):
    def __init__(self, seq_dim=3, scalar_dim=1, d_model=32, nhead=2, layers=2):
        super().__init__()
        self.embed = nn.Linear(seq_dim, d_model)
        encoder = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, batch_first=True)
        self.trans = nn.TransformerEncoder(encoder, num_layers=layers)
        self.scalar_fc = nn.Linear(scalar_dim, d_model)
        self.fc = nn.Linear(d_model * 2, 1)
        self.sigmoid = nn.Sigmoid()
    def forward(self, seq_x, scal_x):
        x = self.embed(seq_x)
        trans_out = self.trans(x)
        feat_seq = trans_out[:, -1, :]
        feat_scal = self.scalar_fc(scal_x)
        out = self.fc(torch.cat([feat_seq, feat_scal], dim=1))
        return self.sigmoid(out)

class MultiStepLSTM(nn.Module):
    def __init__(self, seq_dim=3, scalar_dim=1, hidden=64, layers=2, horizon=3):
        super().__init__()
        self.lstm = nn.LSTM(seq_dim, hidden, layers, batch_first=True)
        self.scalar_fc = nn.Linear(scalar_dim, hidden)
        self.fc = nn.Linear(hidden * 2, horizon)
        self.sigmoid = nn.Sigmoid()
    def forward(self, seq_x, scal_x):
        lstm_out, _ = self.lstm(seq_x)
        feat_seq = lstm_out[:, -1, :]
        feat_scal = self.scalar_fc(scal_x)
        out = self.fc(torch.cat([feat_seq, feat_scal], dim=1))
        return self.sigmoid(out)

## 步骤 4：性能评估（Performance Evaluation）

In [9]:
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from tqdm import tqdm

def train_and_evaluate(model_class, seq_X, scalar_X, y, name, epochs=25, batch_size=128):
    X_seq_tr, X_seq_val, X_scal_tr, X_scal_val, y_tr, y_val = train_test_split(
        seq_X, scalar_X, y, test_size=0.2, random_state=42
    )
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model_class().to(device)
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    train_loader = DataLoader(
        TensorDataset(
            torch.tensor(X_seq_tr, dtype=torch.float32),
            torch.tensor(X_scal_tr, dtype=torch.float32),
            torch.tensor(y_tr, dtype=torch.float32).unsqueeze(1)
        ), batch_size=batch_size, shuffle=True
    )
    val_loader = DataLoader(
        TensorDataset(
            torch.tensor(X_seq_val, dtype=torch.float32),
            torch.tensor(X_scal_val, dtype=torch.float32),
            torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
        ), batch_size=batch_size
    )
    
    for epoch in range(epochs):
        model.train()
        for seq_x, scal_x, y_batch in train_loader:
            seq_x, scal_x, y_batch = seq_x.to(device), scal_x.to(device), y_batch.to(device)
            optimizer.zero_grad()
            pred = model(seq_x, scal_x)
            loss = criterion(pred, y_batch)
            loss.backward()
            optimizer.step()
    
    model.eval()
    preds, truths = [], []
    with torch.no_grad():
        for seq_x, scal_x, y_batch in val_loader:
            seq_x, scal_x, y_batch = seq_x.to(device), scal_x.to(device), y_batch.to(device)
            pred = model(seq_x, scal_x).cpu().numpy().flatten()
            preds.extend(pred)
            truths.extend(y_batch.cpu().numpy().flatten())
    
    acc = accuracy_score(truths, np.array(preds) > 0.5)
    f1 = f1_score(truths, np.array(preds) > 0.5)
    auc = roc_auc_score(truths, preds)
    
    print(f" {name} | Acc: {acc:.4f} | F1: {f1:.4f} | AUC: {auc:.4f}")
    return model

# 构建单步数据
all_seq_X, all_scalar_X, all_y = [], [], []
for _, row in tqdm(df_clean.head(300).iterrows(), total=300, desc="构建单步数据"):
    X_seq, X_scal, y = build_features(row['parsed_points'], window_size=10, horizon=1)
    if X_seq is not None:
        all_seq_X.append(X_seq)
        all_scalar_X.append(X_scal)
        all_y.append(y)

seq_X = np.concatenate(all_seq_X, axis=0)
scalar_X = np.concatenate(all_scalar_X, axis=0)
y = np.concatenate(all_y, axis=0)

# 训练单步模型
models = {}
models['LSTM'] = train_and_evaluate(TennisLSTM, seq_X, scalar_X, y, "LSTM")
models['GRU'] = train_and_evaluate(TennisGRU, seq_X, scalar_X, y, "GRU")
models['Transformer'] = train_and_evaluate(TennisTransformer, seq_X, scalar_X, y, "Transformer")

构建单步数据: 100%|██████████| 300/300 [00:00<00:00, 913.36it/s]


 LSTM | Acc: 0.6190 | F1: 0.7646 | AUC: 0.5244
 GRU | Acc: 0.6196 | F1: 0.7646 | AUC: 0.5151
 Transformer | Acc: 0.6188 | F1: 0.7646 | AUC: 0.5082


In [10]:
# 训练多步模型
WINDOW_SIZE = 10
HORIZON = 3

all_seq_X, all_scalar_X, all_y_multi = [], [], []
for _, row in tqdm(df_clean.head(200).iterrows(), total=200, desc="构建多步数据"):
    X_seq, X_scal, y_multi = build_features(row['parsed_points'], window_size=WINDOW_SIZE, horizon=HORIZON)
    if X_seq is not None and y_multi.ndim == 2 and y_multi.shape[1] == HORIZON:
        all_seq_X.append(X_seq)
        all_scalar_X.append(X_scal)
        all_y_multi.append(y_multi)

seq_X_multi = np.concatenate(all_seq_X, axis=0)
scalar_X_multi = np.concatenate(all_scalar_X, axis=0)
y_multi = np.concatenate(all_y_multi, axis=0)

def train_multistep_model():
    X_seq_tr, X_seq_val, X_scal_tr, X_scal_val, y_tr, y_val = train_test_split(
        seq_X_multi, scalar_X_multi, y_multi, test_size=0.2, random_state=42
    )
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = MultiStepLSTM(horizon=HORIZON).to(device)
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    train_loader = DataLoader(
        TensorDataset(
            torch.tensor(X_seq_tr, dtype=torch.float32),
            torch.tensor(X_scal_tr, dtype=torch.float32),
            torch.tensor(y_tr, dtype=torch.float32)
        ), batch_size=128, shuffle=True
    )
    
    for epoch in range(20):
        model.train()
        for seq_x, scal_x, y_batch in train_loader:
            seq_x, scal_x, y_batch = seq_x.to(device), scal_x.to(device), y_batch.to(device)
            optimizer.zero_grad()
            pred = model(seq_x, scal_x)
            loss = criterion(pred, y_batch)
            loss.backward()
            optimizer.step()
    
    model.eval()
    with torch.no_grad():
        val_pred = model(
            torch.tensor(X_seq_val, dtype=torch.float32).to(device),
            torch.tensor(X_scal_val, dtype=torch.float32).to(device)
        ).cpu().numpy()
        val_true = y_val
    
    for step in range(HORIZON):
        acc = accuracy_score(val_true[:, step], val_pred[:, step] > 0.5)
        print(f"  Step+{step+1} Acc: {acc:.4f}")
    
    return model

models['MultiStep (LSTM)'] = train_multistep_model()

构建多步数据: 100%|██████████| 200/200 [00:00<00:00, 891.60it/s]


  Step+1 Acc: 0.6267
  Step+2 Acc: 0.6211
  Step+3 Acc: 0.6265


## 步骤 5：可视化与交互式仪表盘

使用 `ipywidgets` 创建交互式面板，支持：
- 选择比赛
- 切换模型
- 调整窗口大小
- 实时绘制预测 vs 动量曲线

In [12]:
import ipywidgets as widgets
from IPython.display import display, clear_output
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei']  
plt.rcParams['axes.unicode_minus'] = False 

def compute_sliding_momentum(points, window=5):
    labels = [1 if p['point_char'] in ['S','A'] else 0 for p in points]
    momentum = []
    for i in range(len(labels)):
        start = max(0, i - window + 1)
        rate = sum(labels[start:i+1]) / (i - start + 1)
        momentum.append(rate)
    return momentum

# 准备长比赛用于展示
long_matches = df_clean[df_clean['n_points'] > 80].head(20).reset_index(drop=True)
match_options = [
    f"{row['server1']} vs {row['server2']} (ID:{idx})" 
    for idx, row in long_matches.iterrows()
]

# 控件
match_dropdown = widgets.Dropdown(options=match_options, description='比赛:')
model_dropdown = widgets.Dropdown(options=list(models.keys()), description='模型:')
window_slider = widgets.IntSlider(value=10, min=5, max=20, step=1, description='窗口大小:')
output = widgets.Output()

def on_change(change):
    with output:
        clear_output(wait=True)
        try:
            selected_text = match_dropdown.value
            idx = int(selected_text.split("ID:")[-1].rstrip(')'))
            match_row = long_matches.iloc[idx]
            
            model_name = model_dropdown.value
            window = window_slider.value
            model = models[model_name]
            device = next(model.parameters()).device
            
            points = match_row['parsed_points']
            
            if 'MultiStep' in model_name:
                X_seq, X_scal, y_true = build_features(points, window_size=window, horizon=3)
                if X_seq is None:
                    print("比赛太短")
                    return
                preds = []
                for i in range(len(X_seq)):
                    seq_t = torch.tensor(X_seq[i], dtype=torch.float32).unsqueeze(0).to(device)
                    scal_t = torch.tensor(X_scal[i], dtype=torch.float32).unsqueeze(0).to(device)
                    with torch.no_grad():
                        p = model(seq_t, scal_t)[0, 0].item()
                    preds.append(p)
            else:
                X_seq, X_scal, y_true = build_features(points, window_size=window, horizon=1)
                if X_seq is None:
                    print("比赛太短")
                    return
                preds = []
                for i in range(len(X_seq)):
                    seq_t = torch.tensor(X_seq[i], dtype=torch.float32).unsqueeze(0).to(device)
                    scal_t = torch.tensor(X_scal[i], dtype=torch.float32).unsqueeze(0).to(device)
                    with torch.no_grad():
                        p = model(seq_t, scal_t).item()
                    preds.append(p)
            
            momentum = compute_sliding_momentum(points)
            
            plt.figure(figsize=(14, 5))
            plt.plot(preds, label=f'{model_name} 预测（发球方赢）', color='blue')
            plt.plot(momentum[window:], label='动量（滑动胜率）', color='red', alpha=0.7)
            plt.axhline(0.5, color='gray', linestyle='--', linewidth=0.8)
            plt.title(f" {match_row['server1']} vs {match_row['server2']}")
            plt.xlabel("逐分序号")
            plt.ylabel("概率")
            plt.legend()
            plt.grid(alpha=0.3)
            plt.tight_layout()
            plt.show()
            
        except Exception as e:
            print(" 错误:", str(e))

match_dropdown.observe(on_change, names='value')
model_dropdown.observe(on_change, names='value')
window_slider.observe(on_change, names='value')

ui = widgets.VBox([
    widgets.HBox([match_dropdown, model_dropdown]),
    window_slider,
    output
])
display(ui)

# 触发首次绘图
on_change(None)

VBox(children=(HBox(children=(Dropdown(description='比赛:', options=('Olivier Rochus vs Fabio Fognini (ID:0)', '…

##  项目完成！

你已实现：
- 高精度 PBP 解析
- 动量量化与特征融合
- 多模型对比（LSTM/GRU/Transformer）
- 多步预测能力
- **交互式仪表盘**（核心拓展任务）