## 1 环境与数据准备

### 1.1 安装和导入常用库

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, f1_score
import os

# 查看当前PyTorch是否可用GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

### 1.2 读取csv数据

In [None]:
df = pd.read_csv(r'C:\Users\Lamarck\Desktop\CDRs.csv')
print(df.head()) # 查看前5行
print(df.shape)  # 查看行列数

print(df['type'].value_counts())  # 查看每个亚型的样本数

## 2 数据探索与预处理

### 2.1 检查缺失值与重复值

In [None]:
print("缺失值统计:")
print(df.isnull().sum())

print("重复行数量:", df.duplicated().sum())

print("各亚型分布:")
print(df['type'].value_counts())


### 2.2 编码类别标签
#### 对hpv抗体的亚型类别进行标签编码(将类别型数据转换为数值型)

In [None]:
le = LabelEncoder()
df['label'] = le.fit_transform(df['type'])
num_classes = len(le.classes_)  # 这里应为9
print("标签对应关系:", dict(zip(le.classes_, range(num_classes))))

### 2.3 查看序列长度分布

In [None]:
cdrs = ['HCDR1', 'HCDR2', 'HCDR3', 'LCDR1', 'LCDR2', 'LCDR3']
for c in cdrs:
    lengths = df[c].apply(len)
    print(f"{c} 平均长度: {lengths.mean():.2f}, 最大长度: {lengths.max()}, 最小长度: {lengths.min()}")

## 3 数据划分与交叉验证
#### 训练集60% 验证集20% 测试集20%

In [None]:
train_df, test_df = train_test_split(df, test_size=0.2, 
                                     stratify=df['label'], 
                                     random_state=42)
train_df, val_df = train_test_split(train_df, test_size=0.25, 
                                    stratify=train_df['label'], 
                                    random_state=42)

print("Train size:", len(train_df), "Val size:", len(val_df), "Test size:", len(test_df))


## 4 序列编码与特征表示

### 4.1 构建氨基酸字典

In [None]:
amino_acids = list("ACDEFGHIKLMNPQRSTVWY")  # 常见20种
PAD_TOKEN = "<PAD>" # 用于填充短序列, 确保所有氨基酸序列长度相同
UNK_TOKEN = "<UNK>" # 标记未知字符, 如果遇到非标准氨基酸，直接映射到这个字符
vocab = [PAD_TOKEN, UNK_TOKEN] + amino_acids # 将 <PAD> 和 <UNK> 添加到氨基酸列表的前面
vocab2idx = {v: i for i, v in enumerate(vocab)} # 氨基酸的索引字典
vocab_size = len(vocab)

print("vocab_size:", vocab_size)
print("vocab2idx:", vocab2idx)


### 4.2 将氨基酸序列转换为索引并填充

In [None]:
MAX_LEN = 35 # 短于35的需要填充，长于35的需要截断

def seq2indices(seq, vocab2idx, max_len=35):
    indices = []
    for char in seq:
        if char in vocab2idx:
            indices.append(vocab2idx[char]) # 直接转换
        else:
            indices.append(vocab2idx[UNK_TOKEN]) # 遇到未知字符，用<UNK>代替
    # 填充或截断
    if len(indices) < max_len:
        indices += [vocab2idx[PAD_TOKEN]] * (max_len - len(indices)) # 填充
    else:
        indices = indices[:max_len] # 截断
    return indices


### 4.3 将每行样本(6个CDR)转换为数值向量

In [None]:
def process_dataframe(df, vocab2idx, max_len=35):
    cdr_cols = ['HCDR1','HCDR2','HCDR3','LCDR1','LCDR2','LCDR3']
    
    X_data = [] # X_data 用于存储转换后的 CDR 序列索引（数值化的氨基酸）
    y_data = df['label'].values # y_data 直接获取 df['label']（HPV 亚型标签）

    # 遍历DataFrame的每一行
    for _, row in df.iterrows():
        cdr_indices_list = []
        for col in cdr_cols:
            seq = row[col]
            idx_seq = seq2indices(seq, vocab2idx, max_len)
            cdr_indices_list.append(idx_seq)
        # 得到 shape: [6, max_len]
        X_data.append(cdr_indices_list)
    
    X_data = np.array(X_data)  # => [N, 6, max_len]
    return X_data, y_data

X_train, y_train = process_dataframe(train_df, vocab2idx, MAX_LEN)
X_val, y_val = process_dataframe(val_df, vocab2idx, MAX_LEN)
X_test, y_test = process_dataframe(test_df, vocab2idx, MAX_LEN)

print("X_train shape:", X_train.shape, "y_train shape:", y_train.shape)
print("X_val shape:", X_val.shape, "y_val shape:", y_val.shape)
print("X_test shape:", X_test.shape, "y_test shape:", y_test.shape)


### 4.4 构建 PyTorch Dataset 和 DataLoader
#### 将 CDR 序列数据包装成 PyTorch Dataset 和 DataLoader，以便在神经网络训练时高效加载小批量数据（batch）

In [None]:
# PyTorch提供的一个抽象类用于定义数据集的结构
class CDRDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        x_item = torch.LongTensor(self.X[idx])  # shape: [6, max_len]
        y_item = torch.LongTensor([self.y[idx]])  # shape: [1], 也可返回int
        return x_item, y_item

batch_size = 16

# 创建 Dataset
train_dataset = CDRDataset(X_train, y_train)
val_dataset   = CDRDataset(X_val, y_val)
test_dataset  = CDRDataset(X_test, y_test)

# 使用 DataLoader 加载数据
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


## 5 搭建模型
#### 用一个共享的 Embedding + 1D 卷积 + 池化来为每条 CDR 提取特征，然后拼接 6 路结果再分类

In [None]:
class CDRClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim=32, hidden_dim=64, num_classes=9):
        super(CDRClassifier, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.conv = nn.Conv1d(in_channels=embed_dim, out_channels=hidden_dim, 
                              kernel_size=3, padding=1)
        self.pool = nn.AdaptiveMaxPool1d(1)
        
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 6, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, num_classes)
        )
        
    def forward(self, x):
        """
        x shape: [batch_size, 6, max_len]
        """
        batch_size = x.size(0)
        cdr_features = []
        
        for i in range(6):
            seq_i = x[:, i, :]   # [batch_size, max_len]
            emb_i = self.embedding(seq_i)  # [batch_size, max_len, embed_dim]
            emb_i = emb_i.permute(0, 2, 1) # => [batch_size, embed_dim, max_len]
            
            conv_out = self.conv(emb_i)    # => [batch_size, hidden_dim, max_len]
            conv_out = torch.relu(conv_out)
            pooled = self.pool(conv_out)   # => [batch_size, hidden_dim, 1]
            pooled = pooled.squeeze(-1)     # => [batch_size, hidden_dim]
            
            cdr_features.append(pooled)
        
        # 拼接6个CDR的特征 => [batch_size, hidden_dim*6]
        concat_feats = torch.cat(cdr_features, dim=1)
        out = self.fc(concat_feats)
        return out

model = CDRClassifier(vocab_size=vocab_size, embed_dim=32, hidden_dim=64, 
                      num_classes=num_classes).to(device)
print(model)


## 6 训练与验证

### 6.1 定义损失函数和优化器

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)


### 6.2 训练循环

In [None]:
num_epochs = 50

for epoch in range(num_epochs):
    model.train()
    train_losses = []
    
    for X_batch, y_batch in train_loader:
        X_batch = X_batch.to(device)      # [batch_size, 6, max_len]
        y_batch = y_batch.squeeze(1).to(device)  # [batch_size]
        
        optimizer.zero_grad()
        outputs = model(X_batch)         # [batch_size, num_classes]
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        
        train_losses.append(loss.item())
    
    # 训练集平均损失
    avg_train_loss = np.mean(train_losses)
    
    # 验证集评估
    model.eval()
    val_losses = []
    val_preds = []
    val_labels = []
    with torch.no_grad():
        for X_val, y_val_ in val_loader:
            X_val = X_val.to(device)
            y_val_ = y_val_.squeeze(1).to(device)
            
            val_out = model(X_val)
            val_loss = criterion(val_out, y_val_)
            val_losses.append(val_loss.item())
            
            preds = torch.argmax(val_out, dim=1).cpu().numpy()
            val_preds.extend(preds)
            val_labels.extend(y_val_.cpu().numpy())
    
    avg_val_loss = np.mean(val_losses)
    val_acc = accuracy_score(val_labels, val_preds)
    val_f1 = f1_score(val_labels, val_preds, average='macro')
    
    print(f"Epoch [{epoch+1}/{num_epochs}] "
          f"Train Loss: {avg_train_loss:.4f} | "
          f"Val Loss: {avg_val_loss:.4f} | "
          f"Val Acc: {val_acc:.4f} | "
          f"Val F1: {val_f1:.4f}")


## 7 在测试集上做最终评估

In [None]:
model.eval()
test_preds = []
test_labels = []

almost_correct_count = 0  # 计数器
total_samples = 0

with torch.no_grad():
    for X_test_batch, y_test_batch in test_loader:
        X_test_batch = X_test_batch.to(device)
        y_test_batch = y_test_batch.squeeze(1).to(device)

        # 前向计算
        test_out = model(X_test_batch)        # [batch_size, num_classes]
        preds = torch.argmax(test_out, dim=1) # [batch_size]

        # ------ 计算Top-3 ------
        # 返回 top3 的分数 (values) 和 索引 (indices)
        _, top3_indices = torch.topk(test_out, k=3, dim=1)  # [batch_size, 3]

        # 转到CPU上便于操作
        top3_indices = top3_indices.cpu().numpy()
        y_true_cpu = y_test_batch.cpu().numpy()
        total_samples += len(y_true_cpu)

        # 统计Almost正确数
        for i, label_true in enumerate(y_true_cpu):
            # top3_indices[i] 是该样本top-3预测类别, 形如 [2, 5, 1]
            if label_true in top3_indices[i]:
                almost_correct_count += 1
        # ------ 计算Top-3结束 ------

        test_preds.extend(preds.cpu().numpy())
        test_labels.extend(y_true_cpu)

# 1. 计算Top-1 Accuracy
test_acc = accuracy_score(test_labels, test_preds)

# 2. 计算Almost Accuracy = (Top-3 内有正确标签) / (总数)
almost_accuracy = almost_correct_count / total_samples

print(f"Test Accuracy (Top-1): {test_acc:.4f}")
print(f"Test Almost Accuracy (Top-3): {almost_accuracy:.4f}")


## 8 新样本预测

In [None]:
def predict_hpv_type(model, cdr_list, vocab2idx, max_len=25):
    """
    cdr_list: [HCDR1, HCDR2, HCDR3, LCDR1, LCDR2, LCDR3] (6条序列)
    返回: (prob_array, predicted_label_str)
    """
    model.eval()
    
    # 将6条CDR转为索引 [6, max_len]
    input_list = []
    for seq in cdr_list:
        idx_seq = seq2indices(seq, vocab2idx, max_len)
        input_list.append(idx_seq)
    
    X = torch.LongTensor([input_list]).to(device)  # [1, 6, max_len]
    
    with torch.no_grad():
        logits = model(X)  # [1, 9]
        probs = nn.functional.softmax(logits, dim=1).cpu().numpy().squeeze()  # [9,]
        pred_idx = np.argmax(probs)
    
    # 映射回真实亚型
    pred_label = le.inverse_transform([pred_idx])[0]
    return probs, pred_label

# 示例调用
example_cdrs = [
    "GFTFSSYW",  # HCDR1 
    "INSDGSST",   # HCDR2
    "ARSLQLWPSSDYMDV", # HCDR3
    "QGISDS",    # LCDR1
    "VAS",    # LCDR2
    "QQINSYPPA"   # LCDR3
]
prob_array, pred_type = predict_hpv_type(model, example_cdrs, vocab2idx, MAX_LEN)
print("预测概率分布:", prob_array)
print("最终预测亚型:", pred_type)
