# 1.数据预处理模块
**DataPreprocess.py**   
目前的数据主要为json格式.
需要进过多部处理才可以作为输入模型的数据.

1. 读取json文件
2. 转为为NodePlan
3. 转为为dataframe (添加两个字段: sql_id, parent_id)
4. 统计各个特征的分布等.数据统计


In [None]:
# 1.数据处理
import sys, os
sys.path.append(os.path.abspath(".."))  # 确保当前目录加入路径

from models.DataPreprocessor import get_plans_dict, DataPreprocessor, plan_trees_to_graphs, graphs_to_df, df_to_graphs, DFStatisticsInfo

json_path = "../data/train_plan_*.csv"
plans_dict, execution_times = get_plans_dict(json_path)
print("plans_dict:\n", plans_dict[0:5])
print("execution_times:\n", execution_times[0:5])

preprocessor = DataPreprocessor()
plans_tree = preprocessor.preprocess_all(plans_dict)

edges_list, matrix_plans = plan_trees_to_graphs(plans_tree, add_self_loops=True, undirected=False)
print(matrix_plans[0][0])
print(matrix_plans[0][1])
print(edges_list[0])
print(edges_list[99])

plans_df = graphs_to_df(matrix_plans)
plans_df.to_csv("../data/process/01_plans_df.csv", index=False)
plans_df.head()

# stats = DFStatisticsInfo(plans_df, sample_threshold=200, sample_k=10, strict_alias_check=True)
# node_types = stats.get_node_type_set()
# global_must = stats.global_must_keys()
# global_all  = stats.global_all_keys()
# per_key     = stats.per_key_values()
# per_type    = stats.per_nodetype_key_stats()
# issues      = stats.report_issues()
# stats.pretty_print_report()


找到的文件: ['../data/train_plan_part17.csv', '../data/train_plan_part8.csv', '../data/train_plan_part6.csv', '../data/train_plan_part3.csv', '../data/train_plan_part19.csv', '../data/train_plan_part9.csv', '../data/train_plan_part11.csv', '../data/train_plan_part1.csv', '../data/train_plan_part0.csv', '../data/train_plan_part18.csv', '../data/train_plan_part10.csv', '../data/train_plan_part12.csv', '../data/train_plan_part16.csv', '../data/train_plan_part15.csv', '../data/train_plan_part2.csv', '../data/train_plan_part14.csv', '../data/train_plan_part5.csv', '../data/train_plan_part7.csv', '../data/train_plan_part13.csv', '../data/train_plan_part4.csv']
总数据行数: 100000
df:
       id                                               json
0  85000  {"Plan": {"Node Type": "Bitmap Heap Scan", "Pa...
1  85001  {"Plan": {"Node Type": "Gather", "Parallel Awa...
2  85002  {"Plan": {"Node Type": "Hash Join", "Parallel ...
3  85003  {"Plan": {"Node Type": "Gather", "Parallel Awa...
4  85004  {"Plan": {"No

Unnamed: 0,plan_id,node_idx,Node Type,Parallel Aware,Relation Name,Alias,Startup Cost,Total Cost,Plan Rows,Plan Width,...,Peak Memory Usage,Filter,Rows Removed by Filter,Join Filter,Rows Removed by Join Filter,Merge Cond,Sort Key,Sort Method,Sort Space Used,Sort Space Type
0,0,0,Bitmap Heap Scan,False,movie_keyword,mk,11788.77,49094.94,1028173,12,...,,,,,,,,,,
1,0,1,Bitmap Index Scan,False,,,0.0,11531.73,1028173,0,...,,,,,,,,,,
2,1,0,Gather,False,,,59038.23,292179.55,958880,136,...,,,,,,,,,,
3,1,1,Hash Join,True,,,58038.23,195291.55,399533,136,...,,,,,,,,,,
4,1,2,Index Scan,True,cast_info,ci,0.44,96872.38,1832141,42,...,,,,,,,,,,


# 2. 模型模块

## NodeEncoder
NodeEncoder.py
主要包括了各种vectorical的编码方式.

1. 转为为Matrix(Node, Edge)


## TreeEncoder
TreeEncoder.py
目前包括两种模型一个是GAT,一个是传统GNN.
1. 转为vector

## PredictionHead
PredictionHead.py
目前进行最简单的FNN进行后续回归任务

1. 预测






In [11]:
# 模型搭建

import torch
import torch.nn as nn
from torch_geometric.data import Data, Batch

class PlanCostModel(nn.Module):
    """
    NodeEncoder → GATTreeEncoder → PredictionHead
    """
    def __init__(self, nodecoder: nn.Module, treeencoder: nn.Module, predict_head: nn.Module):
        super().__init__()
        self.nodecoder = nodecoder
        self.treeencoder = treeencoder
        self.predict_head = predict_head

    def forward(self, data: Data | Batch):
        """
        期望 data 里至少有:
        - x: [N, F_num] (numerical features)
        - x_cat: [N, F_cat] (categorical features)
        - edge_index: [2, E]
        - batch: [N]  指示每个节点属于哪张图
        """
        x = self.nodecoder(data.x)                                   # [N, d_node]
        g = self.treeencoder(x, data.edge_index, data.batch)         # [B, d_graph]
        y = self.predict_head(g)                                     # [B, out_dim]
        return y

from models.NodeEncoder import *
from models.TreeEncoder import *
from models.PredictionHead import *

f_num, d_node, d_graph, out_dim = 16, 32, 64, 1
nodecoder = NodeEncoder_Mini(
    in_dim=f_num,
    d_node=d_node
)
gatTreeEncoder = TreeEncoder_GATMini(
    input_dim=d_node,
    hidden_dim=64,
    output_dim=d_graph,
    num_layers=3,
    num_heads=4,
    dropout=0.1,
    pooling="mean"
)
predict_head = PredictionHead_FNNMini(d_graph, out_dim)
model = PlanCostModel(nodecoder, gatTreeEncoder, predict_head)

print(type(edges_list))
print(model)

TypeError: TreeEncoder_GATMini.__init__() got an unexpected keyword argument 'input_dim'

# 3. 训练模块
TrainAndEval.py

## 训练
主要模块为划分训练集,测试集,验证集.
调用模型进行训练.
## 评估
主要为评估方式.目前为MSE以及Q-error.





In [None]:
import torch
import numpy as np

# 早停机制
class EarlyStopping:
    def __init__(self, patience=15, min_delta=0, restore_best_weights=True):
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best_weights = restore_best_weights
        self.best_loss = None
        self.counter = 0
        self.best_weights = None

    def __call__(self, val_loss, model):
        if self.best_loss is None:
            self.best_loss = val_loss
            self.save_checkpoint(model)
        elif val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
            self.save_checkpoint(model)
        else:
            self.counter += 1

        if self.counter >= self.patience:
            if self.restore_best_weights:
                model.load_state_dict(self.best_weights)
            return True
        return False

    def save_checkpoint(self, model):
        self.best_weights = model.state_dict().copy()

# 训练&评估函数
def train_epoch(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    num_batches = 0
    
    for batch in loader:
        batch = batch.to(device)
        optimizer.zero_grad()
        
        # 前向传播
        pred = model(batch)
        loss = criterion(pred, batch.y)
        
        # 反向传播
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        
        total_loss += loss.item()
        num_batches += 1
    
    return total_loss / num_batches

def validate_epoch(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    num_batches = 0
    q_errors_all = []  # 收集整份验证集的 q-error
    eps = 1e-8
    
    Q50_list = []
    Q95_list = []
    with torch.no_grad():
        for batch in loader:
            batch = batch.to(device)
            pred = model(batch)
            loss = criterion(pred, batch.y)
            total_loss += loss.item()
            num_batches += 1

            
            # 防 0 防负；Q-Error 定义是正数比例
            p = torch.clamp(pred, min=eps)
            t = torch.clamp(batch.y,    min=eps)
            q_error = torch.maximum(p / t, t / p)              # [B]
            q_errors_all.append(q_error.cpu().numpy())

    if q_errors_all:
        q_all = np.concatenate(q_errors_all, axis=0)
        Q50 = float(np.quantile(q_all, 0.5))
        Q95 = float(np.quantile(q_all, 0.95))
    else:
        Q50 = float("nan")
        Q95 = float("nan")

    avg_loss = total_loss / max(1, num_batches)
    print(f"val_loss: {avg_loss:.6f} | Q50: {Q50:.6f} | Q95: {Q95:.6f}")

    return total_loss / num_batches

def evaluate_model(model, test_loader, device):
    model.eval()
    preds_all, targs_all = [], []

    with torch.no_grad():
        for batch in test_loader:
            batch = batch.to(device)
            pred = model(batch).view(-1).float()   # [B]，先拍平
            y    = batch.y.view(-1).float()        # [B]
            preds_all.append(pred.cpu())
            targs_all.append(y.cpu())

    preds = torch.cat(preds_all)   # [N]
    targs = torch.cat(targs_all)   # [N]

    # MSE（Torch实现）
    mse = torch.mean((preds - targs) ** 2).item()

    # Q-Error（Torch实现）
    eps = 1e-8
    p = torch.clamp(preds, min=eps)
    t = torch.clamp(targs, min=eps)
    q = torch.maximum(p / t, t / p)             # [N]
    Q50 = torch.quantile(q, 0.5).item()
    Q95 = torch.quantile(q, 0.95).item()

    # 如果你需要返回 numpy
    predictions = preds.numpy()
    targets = targs.numpy()

    print("\n" + "="*50)
    print("测试集评估结果:")
    print("="*50)
    print(f"MSE:  {mse:.6f}")
    print(f"Q50: {Q50:.6f}, Q95: {Q95:.6f}")
    print("="*50)

    return predictions, targets, {'mse': mse, 'Q50': Q50, 'Q95': Q95}



In [None]:

# 10. 可视化训练过程和结果
import matplotlib.pyplot as plt

def plot_training_history(train_losses, val_losses):
    plt.figure(figsize=(12, 4))
    
    # 训练损失曲线
    plt.subplot(1, 2, 1)
    epochs = range(1, len(train_losses) + 1)
    plt.plot(epochs, train_losses, 'b-', label='Train Loss')
    plt.plot(epochs, val_losses, 'r-', label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.grid(True)
    
    # 预测 vs 真实值
    plt.subplot(1, 2, 2)
    plt.scatter(targets, predictions, alpha=0.5, s=20)
    min_val = min(targets.min(), predictions.min())
    max_val = max(targets.max(), predictions.max())
    plt.plot([min_val, max_val], [min_val, max_val], 'r--', lw=2)
    plt.xlabel('True Execution Time')
    plt.ylabel('Predicted Execution Time')
    plt.title('Predicted vs True Execution Time')
    plt.grid(True)
    
    plt.tight_layout()
    date = datetime.now().strftime("%m%d")
    plt.savefig(f'../results/training_results_{date}.png', dpi=300, bbox_inches='tight')
    plt.show()

# 创建结果目录
os.makedirs('../results', exist_ok=True)
# 绘制结果
plot_training_history(train_losses, val_losses)
