In [1]:
import os
import random
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
import shutil
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torchvision.models import resnet18, ResNet18_Weights
from tqdm.notebook import tqdm
from sklearn.metrics import confusion_matrix, accuracy_score, f1_score


In [2]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print("使用设备:", device)


使用设备: cuda


In [3]:
RAW_DIR = Path("D:/OneDriveFiles/OneDrive/人工智能基础期末/dataset2")
OUT_DIR = Path("D:/OneDriveFiles/OneDrive/人工智能基础期末/data_split")    # 拆分后的 train/val 会放在这里

classes = os.listdir(RAW_DIR)
classes

['.DS_Store',
 'baseline_Conv_transformer.ipynb',
 'baseline_resnet.ipynb',
 'BiomedCLIP_baseline.ipynb',
 'class 0',
 'class 1',
 'class 2',
 'class 3',
 'CLIP ViT-L14.ipynb',
 'medsig.ipynb',
 'MedSigLIP.ipynb',
 'medsiglip448_cls_best_acc0.8762.pth',
 'medsig_lora.ipynb',
 'trainlog_19.log',
 'trainlog_24.log',
 'trainlog_linear.log',
 'trainlog_resnet.log']

In [None]:
train_tfm = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),  # 灰度 → 1 通道
    transforms.Resize((448, 448)),               # 统一到 224×224
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5]),
])

val_tfm = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),
    transforms.Resize((448, 448)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5]),
])

train_set = ImageFolder(str(OUT_DIR / "train"), transform=train_tfm)
val_set   = ImageFolder(str(OUT_DIR / "val"),   transform=val_tfm)

train_loader = DataLoader(train_set, batch_size=8, shuffle=True,  num_workers=0)
val_loader   = DataLoader(val_set,   batch_size=8, shuffle=False, num_workers=0)

print("类别映射：", train_set.class_to_idx)
print("训练集大小：", len(train_set))
print("验证集大小：", len(val_set))

类别映射： {'class 0': 0, 'class 1': 1, 'class 2': 2, 'class 3': 3}
训练集大小： 5841
验证集大小： 1462


In [5]:
# ...existing code...
class ConvBranch(nn.Module):
    def __init__(
        self,
        in_channels: int,
        channels=(16, 16, 32, 32, 64),
        kernel_sizes=(35, 17, 15, 9, 3, 3, 3, 3, 3),
        strides=(1, 1, 1, 1, 1, 1, 1, 1, 1),
    ):
        super().__init__()

        assert len(channels) == 9, "现在的设计就是 5 层 CNN，请给 5 个通道数"
        assert len(kernel_sizes) == 9, "kernel_sizes 也需要是 5 个，分别对应每个卷积的 kernel_size"
        assert len(strides) == 9, "strides 也需要是 5 个，分别对应每层的 stride"

        layers = []
        c_in = in_channels

        for c_out, k, s in zip(channels, kernel_sizes, strides):
            padding = k // 2   # 保持空间尺寸不变（在 stride=1 时）
            layers.append(
                nn.Conv2d(
                    in_channels=c_in,
                    out_channels=c_out,
                    kernel_size=k,
                    stride=s,
                    padding=padding,
                    bias=False,
                )
            )
            layers.append(nn.BatchNorm2d(c_out))
            layers.append(nn.ReLU(inplace=True))
            # 去掉固定的 MaxPool2d，避免空间尺寸被过度缩小
            # layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
            c_in = c_out

        self.conv = nn.Sequential(*layers)
        self.gap = nn.AdaptiveAvgPool2d(1)
        self.out_dim = c_in
# ...existing code...

    def forward(self, x):
        feat = self.conv(x)                  # [B, C_last, H', W']
        feat = self.gap(feat)                # [B, C_last, 1, 1]
        feat = feat.view(feat.size(0), -1)   # [B, C_last]
        return feat

class ParallelCNNTransformer(nn.Module):
    def __init__(
        self,
        num_classes: int,
        in_channels: int = 3,  # 如果是灰度图像，这里改成 1 通道
        branch_channels=(
            (16,),              # 第 1 个分支，通道数为 16
            (16,),              # 第 2 个分支
            (16,),              # 第 3 个分支
            (16,),              # 第 4 个分支
            (16,),              # 第 5 个分支
        ),
        d_model: int = 128,    # Transformer 输入的 token 维度
        nhead: int = 8,        # 多头注意力头数
        num_layers: int = 4,   # Transformer 层数
    ):
        super().__init__()

        assert len(branch_channels) == 9, "目前就做 5 个分支，branch_channels 需要是长度为 5 的 tuple/list"

        # 1. 五个并列 CNN 分支，每个分支输出的通道数为 16
        self.branch1 = ConvBranch(in_channels, channels=branch_channels[0])
        self.branch2 = ConvBranch(in_channels, channels=branch_channels[1])
        self.branch3 = ConvBranch(in_channels, channels=branch_channels[2])
        self.branch4 = ConvBranch(in_channels, channels=branch_channels[3])
        self.branch5 = ConvBranch(in_channels, channels=branch_channels[4])

        # 确保五个分支最后的 out_dim 一样，方便后面统一投影
        out_dims = {self.branch1.out_dim, self.branch2.out_dim, self.branch3.out_dim, self.branch4.out_dim, self.branch5.out_dim}
        if len(out_dims) != 1:
            raise ValueError(f"所有分支最后输出维度不一致: {out_dims}，请把 branch_channels 配成相同最后通道数。")
        branch_out_dim = self.branch1.out_dim  # 比如 16

        # 2. 把分支输出向量映射到 d_model 维度（token embedding）
        self.proj = nn.Linear(branch_out_dim, d_model)

        # 3. 五个 token 的可学习位置编码 (5, d_model)
        self.pos_embed = nn.Parameter(torch.zeros(1, 5, d_model))

        # 4. Transformer Encoder：处理 [B, 5, d_model]
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=d_model * 4,
            dropout=0.1,
            batch_first=True,  # 输入 [B, N, d_model]
        )
        self.transformer = nn.TransformerEncoder(
            encoder_layer,
            num_layers=num_layers
        )

        # 5. 分类头：对 5 个 token 做平均池化，再 MLP 分类
        self.classifier = nn.Sequential(
            nn.Linear(d_model, d_model),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(d_model, num_classes),
        )

    def forward(self, x):
        # x: [B, C_in, H, W]
        # 五个分支并行处理同一张图
        f1 = self.branch1(x)   # [B, D_branch]
        f2 = self.branch2(x)
        f3 = self.branch3(x)
        f4 = self.branch4(x)
        f5 = self.branch5(x)

        # 堆成 5 个 token
        tokens = torch.stack([f1, f2, f3, f4, f5], dim=1)  # [B, 5, D_branch]

        # 线性映射到 d_model
        x_tok = self.proj(tokens)                  # [B, 5, d_model]

        # 加位置编码（区分五个分支）
        x_tok = x_tok + self.pos_embed             # [B, 5, d_model]

        # Transformer 编码
        x_tok = self.transformer(x_tok)            # [B, 5, d_model]

        # 对 5 个 token 做平均池化，得到整张图 + 五分支的综合表示
        x_pool = x_tok.mean(dim=1)                 # [B, d_model]

        # 分类
        logits = self.classifier(x_pool)           # [B, num_classes]
        return logits


In [6]:
class CNNTransformer(nn.Module):
    def __init__(
        self,
        num_classes: int,
        in_channels: int = 3,  # 如果是灰度图像，这里改成 1 通道
        branch_channels=(
            (16,16,32,32,64,64,64,64,64),              # 第 1 个分支，通道数为 16
        ),
        d_model: int = 128,    # Transformer 输入的 token 维度
        nhead: int = 8,        # 多头注意力头数
        num_layers: int = 4,   # Transformer 层数
    ):
        super().__init__()

        assert len(branch_channels) == 1, "目前就做 5 个分支，branch_channels 需要是长度为 5 的 tuple/list"

        # 1. 五个并列 CNN 分支，每个分支输出的通道数为 16
        self.branch1 = ConvBranch(in_channels, channels=branch_channels[0])

        # 确保五个分支最后的 out_dim 一样，方便后面统一投影
        branch_out_dim = self.branch1.out_dim  # 比如 16

        # 2. 把分支输出向量映射到 d_model 维度（token embedding）
        self.proj = nn.Linear(branch_out_dim, d_model)

        # 3. 五个 token 的可学习位置编码 (5, d_model)
        self.pos_embed = nn.Parameter(torch.zeros(1, 5, d_model))

        # 4. Transformer Encoder：处理 [B, 5, d_model]
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=d_model * 4,
            dropout=0.1,
            batch_first=True,  # 输入 [B, N, d_model]
        )
        self.transformer = nn.TransformerEncoder(
            encoder_layer,
            num_layers=num_layers
        )

        # 5. 分类头：对 5 个 token 做平均池化，再 MLP 分类
        self.classifier = nn.Sequential(
            nn.Linear(d_model, d_model),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(d_model, num_classes),
        )

    def forward(self, x):
        # x: [B, C_in, H, W]
        # 五个分支并行处理同一张图
        f1 = self.branch1(x)   # [B, D_branch]

        # 堆成 5 个 token
        tokens = torch.stack([f1], dim=1)  # [B, 5, D_branch]

        # 线性映射到 d_model
        x_tok = self.proj(tokens)                  # [B, 5, d_model]

        # 加位置编码（区分五个分支）
        x_tok = x_tok + self.pos_embed             # [B, 5, d_model]

        # Transformer 编码
        x_tok = self.transformer(x_tok)            # [B, 5, d_model]

        # 对 5 个 token 做平均池化，得到整张图 + 五分支的综合表示
        x_pool = x_tok.mean(dim=1)                 # [B, d_model]

        # 分类
        logits = self.classifier(x_pool)           # [B, num_classes]
        return logits


In [7]:
model = CNNTransformer(
    num_classes=4,   # 动态传递类别数
    in_channels=3,             # 假设是 3 通道图像
    branch_channels=(
        (16,16,32,32,64,64,64,64,64,),  # 每个分支 1 层通道数
    ),
    d_model=128,               # Transformer 输入维度
    nhead=8,                   # 注意力头数
    num_layers=4,              # Transformer 层数
).to(device)

# 损失函数，交叉熵损失
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

# 优化器，Adam 优化器
optimizer = torch.optim.Adam(
    model.parameters(),
    lr=1e-4,
    weight_decay=1e-4,
)


In [8]:
print(model)

CNNTransformer(
  (branch1): ConvBranch(
    (conv): Sequential(
      (0): Conv2d(3, 16, kernel_size=(35, 35), stride=(1, 1), padding=(17, 17), bias=False)
      (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
      (3): Conv2d(16, 16, kernel_size=(17, 17), stride=(1, 1), padding=(8, 8), bias=False)
      (4): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (5): ReLU(inplace=True)
      (6): Conv2d(16, 32, kernel_size=(15, 15), stride=(1, 1), padding=(7, 7), bias=False)
      (7): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (8): ReLU(inplace=True)
      (9): Conv2d(32, 32, kernel_size=(9, 9), stride=(1, 1), padding=(4, 4), bias=False)
      (10): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (11): ReLU(inplace=True)
      (12): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=Fal

In [9]:
def train_one_epoch(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0.0
    all_preds, all_labels = [], []
    progress = tqdm(loader, desc="Training", leave=False)

    for x, y in progress:
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad()
        logits = model(x)
        loss = criterion(logits, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * x.size(0)
        preds = logits.argmax(dim=1)
        all_preds.extend(preds.detach().cpu().tolist())
        all_labels.extend(y.detach().cpu().tolist())

    avg_loss = total_loss / len(loader.dataset)
    acc = accuracy_score(all_labels, all_preds)
    f1  = f1_score(all_labels, all_preds, average="macro")
    return avg_loss, acc, f1


@torch.no_grad()
def eval_one_epoch(model, loader, criterion, device):
    model.eval()
    total_loss = 0.0
    all_preds, all_labels = [], []
    progress = tqdm(loader, desc="Validating", leave=False)
    for x, y in progress:
        x, y = x.to(device), y.to(device)

        logits = model(x)
        loss = criterion(logits, y)

        total_loss += loss.item() * x.size(0)
        preds = logits.argmax(dim=1)
        all_preds.extend(preds.detach().cpu().tolist())
        all_labels.extend(y.detach().cpu().tolist())

    avg_loss = total_loss / len(loader.dataset)
    acc = accuracy_score(all_labels, all_preds)
    f1  = f1_score(all_labels, all_preds, average="macro")
    return avg_loss, acc, f1, all_labels, all_preds


In [None]:
num_epochs = 50
best_val_acc = 0.0
best_state = None
torch.cuda.empty_cache()
history_hybrid = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}

for epoch in range(1, num_epochs + 1):
    print(f"\nEpoch {epoch}/{num_epochs}")

    tr_loss, tr_acc, tr_f1 = train_one_epoch(
        model, train_loader, optimizer, criterion, device
    )
    val_loss, val_acc, val_f1, _, _ = eval_one_epoch(
        model, val_loader, criterion, device
    )

    history_hybrid["train_loss"].append(tr_loss)
    history_hybrid["val_loss"].append(val_loss)
    history_hybrid["train_acc"].append(tr_acc)
    history_hybrid["val_acc"].append(val_acc)

    print(
        f"[Epoch {epoch:02d}] "
        f"Train loss={tr_loss:.4f}, acc={tr_acc:.4f} | "
        f"Val loss={val_loss:.4f}, acc={val_acc:.4f}"
    )

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_state = model.state_dict()
        print("  -> 验证集提升，更新 best 模型权重")

# 训练结束后加载 best 权重
if best_state is not None:
    model.load_state_dict(best_state)



Epoch 1/50


Training:   0%|          | 0/366 [00:00<?, ?it/s]