In [1]:
import os
import pandas as pd
import cv2
from torch.utils.data import Dataset
import albumentations as A
from albumentations.pytorch import ToTensorV2
import timm
import torch
from torch.utils.data import DataLoader
import torch.nn as nn
from sklearn.metrics import accuracy_score
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

In [2]:
# 构建分类数据集
class APTOSClassifyDataset(Dataset):
    def __init__(self, image_dir, csv_file, image_size=224, mode='train'):
        self.df = pd.read_csv(csv_file)
        self.image_dir = image_dir
        self.mode = mode
        self.transform = A.Compose([
            A.Resize(image_size, image_size),
            A.HorizontalFlip(p=0.5 if mode == 'train' else 0.0),
            A.Normalize(mean=(0.485, 0.456, 0.406),
                        std=(0.229, 0.224, 0.225)),
            ToTensorV2()
        ])

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = os.path.join(self.image_dir, row['id_code'] + '.png')
        image = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)
        image = self.transform(image=image)['image']
        label = row['diagnosis']
        return image, label

In [3]:
class EfficientNetClassifier(nn.Module):
    def __init__(self, backbone='efficientnet_b0', num_classes=5, pretrained_encoder_path=None):
        super().__init__()
        self.encoder = timm.create_model(backbone, pretrained=False, num_classes=0)
        
        if pretrained_encoder_path:
            state_dict = torch.load(pretrained_encoder_path, map_location='cpu')
            self.encoder.load_state_dict(state_dict, strict=False)
            print(f"✅ Loaded SimCLR encoder from: {pretrained_encoder_path}")

        self.classifier = nn.Linear(self.encoder.num_features, num_classes)

    def forward(self, x):
        features = self.encoder(x)
        logits = self.classifier(features)
        return logits

In [4]:
device = torch.device('cuda' )

In [5]:
train_dataset = APTOSClassifyDataset('fine_tune/train_split', 'fine_tune/train.csv', mode='train')
test_dataset = APTOSClassifyDataset('fine_tune/test_split', 'fine_tune/test.csv', mode='test')

In [6]:
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

In [7]:
model = EfficientNetClassifier(pretrained_encoder_path='checkpoints/simclr_epoch_9.pth').to(device)

✅ Loaded SimCLR encoder from: checkpoints/simclr_epoch_9.pth


In [8]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

In [20]:
# num_epochs = 20
# for epoch in range(1, num_epochs + 1):
#     model.train()
#     train_loss = 0
#     for images, labels in train_loader:
#         images, labels = images.to(device), labels.to(device)
#         logits = model(images)
#         loss = criterion(logits, labels)
# 
#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()
# 
#         train_loss += loss.item()
# 
#     model.eval()
#     all_preds, all_labels = [], []
#     with torch.no_grad():
#         for images, labels in test_loader:
#             images = images.to(device)
#             outputs = model(images)
#             preds = torch.argmax(outputs, dim=1).cpu().numpy()
#             all_preds.extend(preds)
#             all_labels.extend(labels.numpy())
# 
#     acc = accuracy_score(all_labels, all_preds)
#     kappa = cohen_kappa_score(all_labels, all_preds, weights='quadratic')
#     print(f"Epoch {epoch}/{num_epochs} - Train Loss: {train_loss/len(train_loader):.4f} - Val Acc: {acc:.4f} - Kappa: {kappa:.4f}")

Epoch 1/20 - Train Loss: 1.5337 - Val Acc: 0.3000 - Kappa: 0.0000
Epoch 2/20 - Train Loss: 1.4284 - Val Acc: 0.3000 - Kappa: 0.0000
Epoch 3/20 - Train Loss: 1.3271 - Val Acc: 0.3000 - Kappa: 0.0000
Epoch 4/20 - Train Loss: 1.1888 - Val Acc: 0.3000 - Kappa: 0.0000
Epoch 5/20 - Train Loss: 1.0532 - Val Acc: 0.3000 - Kappa: 0.0000


RuntimeError: CUDA error: an illegal memory access was encountered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [9]:
import pandas as pd

df = pd.read_csv('fine_tune/train.csv')
print(df['diagnosis'].value_counts())
# 样本不平衡

diagnosis
0    346
2    186
1     71
4     58
3     38
Name: count, dtype: int64


In [10]:
train_df = pd.read_csv('fine_tune/train.csv')
class_weights = compute_class_weight(class_weight='balanced',
                                     classes=np.unique(train_df['diagnosis']),
                                     y=train_df['diagnosis'])
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)

In [11]:
num_epochs = 20
for epoch in range(1, num_epochs + 1):
    model.train()
    train_loss = 0
    train_preds, train_labels = [], []

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        logits = model(images)
        loss = criterion(logits, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        preds = torch.argmax(logits, dim=1).detach().cpu().numpy()
        train_preds.extend(preds)
        train_labels.extend(labels.cpu().numpy())

    train_acc = accuracy_score(train_labels, train_preds)

    # === 验证 ===
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            outputs = model(images)
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels.numpy())

    val_acc = accuracy_score(all_labels, all_preds)

    print(f"Epoch {epoch}/{num_epochs} "
          f"- Train Loss: {train_loss / len(train_loader):.4f} "
          f"- Train Acc: {train_acc:.4f} "
          f"- Val Acc: {val_acc:.4f}")


Epoch 1/20 - Train Loss: 1.5545 - Train Acc: 0.3763 - Val Acc: 0.3000
Epoch 2/20 - Train Loss: 1.4562 - Train Acc: 0.4764 - Val Acc: 0.3000
Epoch 3/20 - Train Loss: 1.3632 - Train Acc: 0.5622 - Val Acc: 0.3000
Epoch 4/20 - Train Loss: 1.2550 - Train Acc: 0.5880 - Val Acc: 0.3000
Epoch 5/20 - Train Loss: 1.1213 - Train Acc: 0.6252 - Val Acc: 0.3000
Epoch 6/20 - Train Loss: 1.0129 - Train Acc: 0.6552 - Val Acc: 0.3300
Epoch 7/20 - Train Loss: 0.8905 - Train Acc: 0.6924 - Val Acc: 0.5767
Epoch 8/20 - Train Loss: 0.7871 - Train Acc: 0.7096 - Val Acc: 0.6933
Epoch 9/20 - Train Loss: 0.7322 - Train Acc: 0.7182 - Val Acc: 0.6933
Epoch 10/20 - Train Loss: 0.7150 - Train Acc: 0.7239 - Val Acc: 0.7100
Epoch 11/20 - Train Loss: 0.6651 - Train Acc: 0.7396 - Val Acc: 0.7000
Epoch 12/20 - Train Loss: 0.6277 - Train Acc: 0.7425 - Val Acc: 0.7000


KeyboardInterrupt: 

In [12]:
os.makedirs('finetune_checkpoints', exist_ok=True)
torch.save(model.state_dict(), 'finetune_checkpoints/finetuned_model.pth')
print("✅ 微调后的模型已保存。")

✅ 微调后的模型已保存。


In [22]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

cm = confusion_matrix(all_labels, all_preds)

plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=[0,1,2,3,4], yticklabels=[0,1,2,3,4])
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix (Val Set)')
plt.savefig('finetune_checkpoints/confusion_matrix.png')
plt.close()

In [23]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

features = []  # shape: N x 128
labels = []

model.eval()
with torch.no_grad():
    for images, lbls in test_loader:
        images = images.to(device)
        feats = model.encoder(images).cpu().numpy()
        features.extend(feats)
        labels.extend(lbls.numpy())

features = np.array(features)
labels = np.array(labels)

tsne = TSNE(n_components=2, perplexity=30, random_state=42)
reduced = tsne.fit_transform(features)

plt.figure(figsize=(8, 6))
scatter = plt.scatter(reduced[:, 0], reduced[:, 1], c=labels, cmap='tab10', alpha=0.7)
plt.legend(*scatter.legend_elements(), title="Class")
plt.title("t-SNE Visualization of Features (Val Set)")
plt.savefig('finetune_checkpoints/tsne_features.png')
plt.close()

In [24]:
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt
import pandas as pd

# classification_report
report = classification_report(all_labels, all_preds, output_dict=True)
df_report = pd.DataFrame(report).transpose().iloc[:5]  # 前5类（0-4）

plt.figure(figsize=(10, 6))
df_report[['precision', 'recall', 'f1-score']].plot(kind='bar')
plt.title("Classification Report per Class")
plt.ylabel("Score")
plt.ylim(0, 1)
plt.grid(True, axis='y')
plt.xticks(rotation=0)
plt.tight_layout()
plt.savefig('finetune_checkpoints/classification_report.png')
plt.close()

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


<Figure size 1000x600 with 0 Axes>

In [25]:
import numpy as np

plt.figure(figsize=(6, 4))
classes = np.unique(all_labels)
pred_counts = [np.sum(np.array(all_preds) == c) for c in classes]
true_counts = [np.sum(np.array(all_labels) == c) for c in classes]

x = np.arange(len(classes))
plt.bar(x - 0.2, true_counts, width=0.4, label='True')
plt.bar(x + 0.2, pred_counts, width=0.4, label='Predicted')
plt.xticks(x, classes)
plt.xlabel("Class")
plt.ylabel("Count")
plt.title("Class Distribution: True vs Predicted")
plt.legend()
plt.grid(True, axis='y')
plt.savefig('finetune_checkpoints/class_distribution.png')
plt.close()


In [26]:
from sklearn.manifold import TSNE

features = []
labels = []
model.eval()
with torch.no_grad():
    for images, lbls in test_loader:
        images = images.to(device)
        feats = model.encoder(images).cpu().numpy()
        features.extend(feats)
        labels.extend(lbls.numpy())

features = np.array(features)
labels = np.array(labels)

tsne = TSNE(n_components=2, perplexity=30, random_state=42)
reduced = tsne.fit_transform(features)

plt.figure(figsize=(8, 6))
scatter = plt.scatter(reduced[:, 0], reduced[:, 1], c=labels, cmap='tab10', alpha=0.7)
plt.legend(*scatter.legend_elements(), title="Class")
plt.title("t-SNE Visualization of Val Set Features")
plt.savefig('finetune_checkpoints/tsne_visualization.png')
plt.close()
