In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score, roc_auc_score

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import timm
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm,trange
from torch.cuda.amp import GradScaler, autocast
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score  
import seaborn as sns

In [None]:
class Config:
    data_dir = "D:\\Leko\\medical_model\\task1\\dataset\\images"
    label_csv = "D:\\Leko\\medical_model\\task1\\dataset\\labels.csv"
    img_size = 224
    batch_size = 32
    epochs = 20
    lr = 5e-5
    num_workers = 0
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model_name = "convnext_tiny_in22ft1k"
    save_dir = "./output"
    #n_splits = 5
    seed = 42

torch.manual_seed(Config.seed)
os.makedirs(Config.save_dir, exist_ok=True)

In [None]:
class TumorDataset(Dataset):
    def __init__(self, df, img_dir, train=True):
        self.df = df.reset_index(drop=True)
        self.img_dir = img_dir
        self.train = train

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = os.path.join(self.img_dir, row['filename'])
        image = np.array(Image.open(img_path).convert('RGB'))
        label = int(row['label'])

        if self.train:
            transform = get_transforms(train=True, is_tumor=(label == 1))
        else:
            transform = get_transforms(train=False)

        image = transform(image=image)['image']
        return image, label


In [None]:
class CutMixCollator:#消融实验记得加
    def __init__(self, beta=1.0, prob=0.5):
        self.beta = beta
        self.prob = prob

    def __call__(self, batch):
        images, labels = zip(*batch)
        images = torch.stack(images)
        labels = torch.tensor(labels).float().view(-1, 1)  

        if np.random.rand() > self.prob:
            return images, labels

        lam = np.random.beta(self.beta, self.beta)
        batch_size = images.size(0)
        index = torch.randperm(batch_size)

        mixed_images = lam * images + (1 - lam) * images[index]
        mixed_labels = lam * labels + (1 - lam) * labels[index]
        return mixed_images, mixed_labels



class MixUpCollator:
    def __init__(self, alpha=0.4, prob=0.5):
        self.alpha = alpha
        self.prob = prob

    def __call__(self, batch):
        images, labels = zip(*batch)
        images = torch.stack(images)
        labels = torch.tensor(labels).float().view(-1, 1)


        if np.random.rand() > self.prob:
            return images, labels

        lam = np.random.beta(self.alpha, self.alpha)
        index = torch.randperm(images.size(0))
        mixed_images = lam * images + (1 - lam) * images[index]
        labels = lam * labels + (1 - lam) * labels[index]
        return mixed_images, labels


In [None]:
def get_transforms(train=True, is_tumor=False):
    if train:
        base = [
            A.Resize(Config.img_size, Config.img_size),
            A.HorizontalFlip(p=0.5),
        ]
        if is_tumor:
            base += [
                A.RandomBrightnessContrast(p=0.5),
                A.ElasticTransform(alpha=1, sigma=50, alpha_affine=30, p=0.3),
                A.CoarseDropout(p=0.3)
            ]
        base += [
            A.Normalize(mean=(0.485, 0.456, 0.406),
                        std=(0.229, 0.224, 0.225)),
            ToTensorV2()
        ]
        return A.Compose(base)
    else:
        return A.Compose([
            A.Resize(Config.img_size, Config.img_size),
            A.Normalize(mean=(0.485, 0.456, 0.406),
                        std=(0.229, 0.224, 0.225)),
            ToTensorV2()
        ])


In [None]:
class TumorClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.backbone = timm.create_model(Config.model_name, pretrained=True, num_classes=0, global_pool="")
        in_features = self.backbone.num_features
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.BatchNorm1d(in_features),
            nn.Dropout(0.6), #0.5过拟合了
            nn.Linear(in_features, 1)
        )


    def forward(self, x):
        features = self.backbone.forward_features(x)
        return self.classifier(features)


In [None]:
class FocalLoss(nn.Module):
    def __init__(self, alpha=0.25, gamma=2.0):
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, inputs, targets):
        BCE_loss = nn.BCEWithLogitsLoss(reduction='none')(inputs, targets)
        pt = torch.exp(-BCE_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * BCE_loss
        return focal_loss.mean()


In [None]:
def train_one_epoch(model, loader, optimizer, criterion, scaler):
    model.train()
    total_loss = 0
    for images, labels in tqdm(loader):
        images = images.to(Config.device)
        labels = labels.to(Config.device)  
        optimizer.zero_grad()
        with autocast():
            outputs = model(images)
            loss = criterion(outputs, labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        total_loss += loss.item()
    return total_loss / len(loader)

In [None]:
def evaluate(model, loader, criterion):
    model.eval()
    total_loss = 0
    preds, targets = [], []
    with torch.no_grad():
        for images, labels in loader:
            images = images.to(Config.device)
            labels = labels.float().unsqueeze(1).to(Config.device)
            with autocast():
                outputs = model(images)
                loss = criterion(outputs, labels)
            total_loss += loss.item()
            probs = torch.sigmoid(outputs)
            preds.extend(probs.cpu().numpy())
            targets.extend(labels.cpu().numpy())
    return total_loss / len(loader), np.array(preds), np.array(targets)

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, delta=0):
        self.patience = patience
        self.delta = delta
        self.best_score = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_score):
        if self.best_score is None:
            self.best_score = val_score
        elif val_score < self.best_score + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = val_score
            self.counter = 0

In [None]:
from sklearn.model_selection import train_test_split

full_df = pd.read_csv(Config.label_csv)

train_df, val_df = train_test_split(
    full_df,
    test_size=0.2,
    stratify=full_df['label'],
    random_state=Config.seed
)

print(f"训练集类别分布：\n{train_df['label'].value_counts()}")
print(f"验证集类别分布：\n{val_df['label'].value_counts()}")

use_mix_mode = "cutmix"  # 或 "mixup", "none"
if use_mix_mode == "cutmix":
    collate_fn = CutMixCollator(beta=1.0, prob=0.3)
elif use_mix_mode == "mixup":
    collate_fn = MixUpCollator(alpha=0.4, prob=0.5)
else:
    collate_fn = None

class_counts = train_df['label'].value_counts().sort_index().values
class_weights = 1. / torch.tensor(class_counts, dtype=torch.float)
sample_weights = train_df['label'].map({0: class_weights[0], 1: class_weights[1]}).astype('float32').values
sampler = WeightedRandomSampler(sample_weights, len(sample_weights))

train_dataset = TumorDataset(train_df, Config.data_dir, train=True)
val_dataset = TumorDataset(val_df, Config.data_dir, train=False)

train_loader = DataLoader(train_dataset, batch_size=Config.batch_size, sampler=sampler, num_workers=Config.num_workers, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=Config.batch_size, shuffle=False, num_workers=Config.num_workers)

print("初始化模型中...")
model = TumorClassifier().to(Config.device)
print("模型初始化完成")

optimizer = optim.Adam(model.parameters(), lr=Config.lr, weight_decay=1e-4)
criterion = FocalLoss(alpha=0.75, gamma=1.5)
early_stopper = EarlyStopping(patience=5)
scaler = GradScaler()

history = {'train_loss': [], 'val_loss': [], 'val_auc': [], 'val_f1': []}
best_auc = 0

train_labels = train_df['label']
print(f"当前训练集图像分布：正常类 {(train_labels == 0).sum()}，肿瘤类 {(train_labels == 1).sum()}")

for epoch in range(Config.epochs):
    print(f"\n📘 Epoch {epoch+1}/{Config.epochs}")
    train_loss = train_one_epoch(model, train_loader, optimizer, criterion, scaler)
    val_loss, preds, targets = evaluate(model, val_loader, criterion)
    val_auc = roc_auc_score(targets, preds)

    # 默认 F1 和 最佳 F1 阈值搜索
    preds_bin_default = (preds >= 0.5).astype(int)
    val_f1_default = f1_score(targets, preds_bin_default)

    best_f1, best_thresh = 0, 0.5
    for t in np.arange(0.1, 0.9, 0.01):
        f1 = f1_score(targets, (preds >= t).astype(int))
        if f1 > best_f1:
            best_f1 = f1
            best_thresh = t
    preds_bin = (preds >= best_thresh).astype(int)

    print(f"预测为肿瘤类(1)的数量: {(preds_bin == 1).sum()} / {len(preds_bin)}")
    print(f"Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | AUC: {val_auc:.4f}")
    print(f"F1@0.50: {val_f1_default:.4f} | Best F1@{best_thresh:.2f}: {best_f1:.4f}")
    print(f"Accuracy: {accuracy_score(targets, preds_bin):.4f}")

    cm = confusion_matrix(targets, preds_bin)
    tn, fp, fn, tp = cm.ravel()
    print(f"Sensitivity (Recall for tumor): {tp / (tp + fn + 1e-8):.4f}")
    print(f"Specificity (Recall for normal): {tn / (tn + fp + 1e-8):.4f}")

    history['train_loss'].append(train_loss)
    history['val_loss'].append(val_loss)
    history['val_auc'].append(val_auc)
    history['val_f1'].append(best_f1)

    if val_auc > best_auc:
        best_auc = val_auc
        torch.save(model.state_dict(), os.path.join(Config.save_dir, "best_model.pth"))
        with open(os.path.join(Config.save_dir, "best_thresh.txt"), 'w') as f:
            f.write(f"{best_thresh:.4f}")
        print("Best model saved.")

    if early_stopper(val_auc):
        print("Early stopping triggered.")
        break

plt.figure(figsize=(12, 4))
plt.subplot(1, 3, 1)
plt.plot(history['train_loss'], label='Train')
plt.plot(history['val_loss'], label='Val')
plt.title("Loss")
plt.legend()

plt.subplot(1, 3, 2)
plt.plot(history['val_auc'])
plt.title("Val AUC")

plt.subplot(1, 3, 3)
plt.plot(history['val_f1'])
plt.title("Best F1")
plt.show()

plt.figure(figsize=(4, 4))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["Normal", "Tumor"], yticklabels=["Normal", "Tumor"])
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.tight_layout()
plt.show()

val_df = val_df.copy()
val_df['pred_prob'] = preds
val_df['pred_label'] = preds_bin
val_df.to_csv(os.path.join(Config.save_dir, "val_predictions.csv"), index=False)


In [None]:
import torch
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import torchvision.transforms as T
import matplotlib


matplotlib.rcParams['font.family'] = 'Microsoft YaHei'
plt.rcParams['axes.unicode_minus'] = False  

# 加载模型结构
model = TumorClassifier().to(Config.device)
model.load_state_dict(torch.load("D:\\Leko\\medical_model\\task1\\output\\best_model.pth"))
model.eval()

# 加载你训练时的图像预处理
infer_transform = get_transforms(train=False)

def predict_image(image_path):
    image = Image.open(image_path).convert("RGB")
    image_np = np.array(image)
    
    # Apply transform
    transformed = infer_transform(image=image_np)
    input_tensor = transformed['image'].unsqueeze(0).to(Config.device)

    # Predict
    with torch.no_grad():
        output = model(input_tensor)
        prob = torch.sigmoid(output).item()
        label = 1 if prob >= 0.5 else 0

    # 可视化
    plt.imshow(image_np)
    plt.axis('off')
    plt.title(f"预测标签: {'肿瘤' if label==1 else '正常'}\n预测概率: {prob:.4f}")
    plt.show()

predict_image("D:\\Leko\\test3.jpg")
