In [1]:
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@File    :   BC-kaggle.ipynb
@Time    :   2025/03/25 10:33:12
@Author  :   Neutrin 
'''

# here put the import lib
import numpy as np
import pandas as pd
import torch 
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import matplotlib.pyplot as plt
import seaborn as sns
import os 
import cv2 
from torchvision import transforms
import lightning
import random
import warnings

# Set random seeds for reproducibility
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

# Ignore warnings
warnings.filterwarnings('ignore')

# Fix matplotlib for displaying Chinese characters
plt.rcParams['font.sans-serif'] = ['SimHei']  # Use SimHei font for Chinese characters
plt.rcParams['axes.unicode_minus'] = False    # Fix minus sign display


In [2]:
root_path = 'E:/Dataset/breastcancer2'
# Define subdirectories
subsets = ['test', 'train', 'valid']
classes = ['0', '1']  # 0: benign, 1: malignant
class BreastCancerDataset(Dataset):
    def __init__(self, root_dir, subset, transform=None):
        self.root_dir = root_dir
        self.subset = subset        # 'train', 'valid', 'test'
        self.transform = transform # 图像预处理
        self.samples = []
        
        # 加载数据为(samples, label)的形式
        for class_idx, class_name in enumerate(classes):   # 0: benign, 1: malignant
            class_dir = os.path.join(root_dir, subset, class_name)  
            if os.path.exists(class_dir):               # 检查路径是否存在
                for img_name in os.listdir(class_dir):      # 遍历文件夹
                    if img_name.endswith(('.jpg', '.png', '.jpeg')):  # 检查文件格式
                        img_path = os.path.join(class_dir, img_name)    # 图片路径
                        self.samples.append((img_path, class_idx))     # (图片路径, label)
        
    def __len__(self):
        return len(self.samples)  # 返回数据集大小
    
    def __getitem__(self, idx):   # 返回图片和标签
        img_path, label = self.samples[idx]   # 获取图片路径和标签
        image = cv2.imread(img_path)            
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # OpenCV默认BGR，转换为RGB
        
        if self.transform:   # 转换图片
            image = self.transform(image) 
        else:
            # 进行归一化
            image = image / 255.0
            image = image.transpose((2, 0, 1))  # 将图片维度转换为(C, H, W)
            image = torch.from_numpy(image).float() # 转换为tensor
            
        return image, label
    
# 定义10个数据增强方法
train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
val_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
# 创建数据集
train_dataset = BreastCancerDataset(root_path, 'train', transform=train_transform)
val_dataset = BreastCancerDataset(root_path, 'valid', transform=val_transform)
test_dataset = BreastCancerDataset(root_path, 'test', transform=val_transform)

if __name__ == '__main__':
    # Print dataset information
    print(f"Training dataset size: {len(train_dataset)}")
    print(f"Validation dataset size: {len(val_dataset)}")
    print(f"Test dataset size: {len(test_dataset)}")

    # 检查数据集的类分布
    def count_class_distribution(dataset):
        class_counts = [0, 0]
        for _, label in dataset:
            class_counts[label] += 1
        return class_counts

    train_dist = count_class_distribution(train_dataset)
    val_dist = count_class_distribution(val_dataset)
    test_dist = count_class_distribution(test_dataset)

    print(f"Training class distribution: Benign={train_dist[0]}, Malignant={train_dist[1]}")
    print(f"Validation class distribution: Benign={val_dist[0]}, Malignant={val_dist[1]}")
    print(f"Test class distribution: Benign={test_dist[0]}, Malignant={test_dist[1]}")

Training dataset size: 2372
Validation dataset size: 675
Test dataset size: 336
Training class distribution: Benign=1569, Malignant=803
Validation class distribution: Benign=448, Malignant=227
Test class distribution: Benign=208, Malignant=128


In [3]:
# Define a simple CNN model for binary classification
class SimpleCNN(nn.Module):
    def __init__(self, in_channels=3, num_classes=2):
        super().__init__()
        self.features = nn.Sequential(
            # First conv block
            nn.Conv2d(in_channels, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Second conv block
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Third conv block
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Fourth conv block
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        
        # Calculate input size for the classifier
        # For 224x224 input images, after 4 max pooling layers (stride=2), 
        # spatial dimensions are reduced to 14x14
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((7, 7)),
            nn.Flatten(),
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, 1024),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(1024, num_classes),
        )
        
        # Initialize the weights
        self._initialize_weights()
        
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)
                
    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

# Instantiate the CNN model
model = SimpleCNN(
    in_channels=3,           # Input channels (RGB)
    num_classes=len(classes)
)

print(model)
# Calculate number of parameters
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of parameters: {total_params:,}")


SimpleCNN(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU(inplace=True)
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU(inplace=True)
    (11): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (12): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (13): ReLU(inplace=True)
    (14): MaxPoo

In [None]:
from tqdm.notebook import tqdm
import torch.nn as nn
import matplotlib.pyplot as plt

# 定义训练参数
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 定义批量大小参数
batch_size = 16  # 可在此调整批量大小

# 创建数据加载器，使用可调节的batch_size
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# 将模型移动到指定设备（GPU或CPU）
model = model.to(device)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.1)
# 移除verbose参数或使用其他学习率调度器
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5)

# 训练函数
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    best_acc = 0.0
    history = {
        'train_loss': [], 'train_acc': [],
        'val_loss': [], 'val_acc': []
    }
    
    # 使用tqdm显示epoch进度
    for epoch in tqdm(range(num_epochs), desc="Epochs"):
        print(f'Epoch {epoch+1}/{num_epochs}')
        
        for phase in ['train', 'val']:  #这是一个训练和验证循环
            if phase == 'train':
                model.train()
                dataloader = train_loader
            else:
                model.eval()
                dataloader = val_loader
                
            running_loss = 0.0
            running_corrects = 0
            
            # 使用tqdm显示批处理进度
            for inputs, labels in tqdm(dataloader, desc=f"{phase}", leave=False):
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            
            epoch_loss = running_loss / len(dataloader.dataset)
            epoch_acc = running_corrects.double() / len(dataloader.dataset)
            
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            
            # 保存训练指标 - 只保存一次
            if phase == 'train':
                history['train_loss'].append(epoch_loss)
                history['train_acc'].append(epoch_acc.item())
            else:
                history['val_loss'].append(epoch_loss)
                history['val_acc'].append(epoch_acc.item())
                # 使用学习率调度器
                scheduler.step(epoch_loss)
                
                # 保存最佳模型
                if epoch_acc > best_acc:
                    best_acc = epoch_acc
                    torch.save(model.state_dict(), 'best_alexnet_model.pth')
                    print(f"Saved model with accuracy: {epoch_acc:.4f}")
    
    # 加载最佳模型权重
    model.load_state_dict(torch.load('best_alexnet_model.pth'))
    return model, history

# 训练模型
num_epochs = 100
model, history = train_model(model, criterion, optimizer, scheduler, num_epochs=num_epochs)

# 绘制训练历史图表
plt.figure(figsize=(12, 4))

# 绘制损失曲线
plt.subplot(1, 2, 1)
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('训练和验证损失')
plt.legend()

# 绘制准确率曲线
plt.subplot(1, 2, 2)
plt.plot(history['train_acc'], label='Train Accuracy')
plt.plot(history['val_acc'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('训练和验证准确率')
plt.legend()

plt.tight_layout()
plt.show()

Using device: cuda


Epochs:   0%|          | 0/100 [00:00<?, ?it/s]

Epoch 1/100


train:   0%|          | 0/149 [00:00<?, ?it/s]

train Loss: 21121.1621 Acc: 0.6003


val:   0%|          | 0/43 [00:00<?, ?it/s]

val Loss: 0.6399 Acc: 0.6637
Saved model with accuracy: 0.6637
Epoch 2/100


train:   0%|          | 0/149 [00:00<?, ?it/s]

train Loss: 0.6488 Acc: 0.6594


val:   0%|          | 0/43 [00:00<?, ?it/s]

val Loss: 0.6393 Acc: 0.6637
Epoch 3/100


train:   0%|          | 0/149 [00:00<?, ?it/s]

train Loss: 0.6473 Acc: 0.6615


val:   0%|          | 0/43 [00:00<?, ?it/s]

val Loss: 0.6444 Acc: 0.6637
Epoch 4/100


train:   0%|          | 0/149 [00:00<?, ?it/s]

train Loss: 0.6457 Acc: 0.6615


val:   0%|          | 0/43 [00:00<?, ?it/s]

val Loss: 0.6408 Acc: 0.6637
Epoch 5/100


train:   0%|          | 0/149 [00:00<?, ?it/s]

train Loss: 0.6444 Acc: 0.6615


val:   0%|          | 0/43 [00:00<?, ?it/s]

val Loss: 0.6415 Acc: 0.6637
Epoch 6/100


train:   0%|          | 0/149 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
import torch
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, roc_curve, auc
import numpy as np
import seaborn as sns

import matplotlib.pyplot as plt

# Create test dataloader
test_loader = torch.utils.data.DataLoader(BreastCancerDataset.test_dataset, batch_size=batch_size, shuffle=False)

# Set model to evaluation mode
model.eval()

# Lists to store predictions and true labels
all_preds = []
all_labels = []
all_probs = []

# Evaluate on test set
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(inputs)
        probs = F.softmax(outputs, dim=1)
        _, preds = torch.max(outputs, 1)
        
        # Store predictions and labels
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
        all_probs.extend(probs[:,1].cpu().numpy())  # Probability of class 1 (malignant)

# Calculate accuracy
test_acc = accuracy_score(all_labels, all_preds)
print(f"Test accuracy: {test_acc:.4f}")

# Create confusion matrix
cm = confusion_matrix(all_labels, all_preds)

# Calculate sensitivity and specificity
sensitivity = cm[1,1] / (cm[1,0] + cm[1,1])  # True positive rate (TPR)
specificity = cm[0,0] / (cm[0,0] + cm[0,1])  # True negative rate (TNR)

print(f"Sensitivity (TPR): {sensitivity:.4f}")
print(f"Specificity (TNR): {specificity:.4f}")

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=['Benign', 'Malignant'],
            yticklabels=['Benign', 'Malignant'])
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()

# Print classification report
print("\nClassification Report:")
print(classification_report(all_labels, all_preds, target_names=['Benign', 'Malignant']))

# ROC curve and AUC
fpr, tpr, _ = roc_curve(all_labels, all_probs)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='gray', lw=1, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()