In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.metrics import classification_report
from tqdm import tqdm
import pandas as pd
import numpy as np
import cv2

# random
torch.manual_seed(42)
np.random.seed(42)

# Using GPU for training
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load data
file_path = ''
data = pd.read_csv(file_path)

# Derive 'label' from the 'filename' column
data['label'] = data['filename'].apply(lambda x: 0 if 'benign' in x else 1)
# Filter data for magnification (mag) == 40
data = data[data['mag'] == 40]

# Parameters
target_size = (224, 224)  # Target size for input

# Dataset class
class BreastCancerDataset(Dataset):
    def __init__(self, dataframe, target_size):
        self.dataframe = dataframe
        self.target_size = target_size

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        img_path = row['filename']
        img_path = '' + img_path
        label = row['label']

        img = cv2.imread(img_path)
        if img is None:
            raise FileNotFoundError(f"Image not found: {img_path}")

        img = cv2.resize(img, self.target_size)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = img / 255.0
        img = np.transpose(img, (2, 0, 1))

        img_tensor = torch.tensor(img, dtype=torch.float32)
        label_tensor = torch.tensor(label, dtype=torch.long)

        return img_tensor.reshape(-1), label_tensor

# Split data into train and test
train_df = data[data['grp'] == 'train']
test_df = data[data['grp'] == 'test']

# validation size（use about 20% training dataset）
train_size = int(0.8 * len(train_df))
val_size = len(train_df) - train_size

# Create datasets
train_dataset = BreastCancerDataset(train_df.iloc[:train_size], target_size)
val_dataset = BreastCancerDataset(train_df.iloc[train_size:], target_size)
test_dataset = BreastCancerDataset(test_df, target_size)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# DNN nodel
class DNNModel(nn.Module):
    def __init__(self, input_size, num_classes):
        super(DNNModel, self).__init__()
        self.fc1 = nn.Linear(input_size, 2048)
        self.bn1 = nn.BatchNorm1d(2048)
        self.fc2 = nn.Linear(2048, 1024)
        self.bn2 = nn.BatchNorm1d(1024)
        self.fc3 = nn.Linear(1024, 512)
        self.bn3 = nn.BatchNorm1d(512)
        self.fc4 = nn.Linear(512, 256)
        self.bn4 = nn.BatchNorm1d(256)
        self.fc5 = nn.Linear(256, num_classes)
        
        self.relu = nn.LeakyReLU(0.1)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.dropout(self.relu(self.bn1(self.fc1(x))))
        x = self.dropout(self.relu(self.bn2(self.fc2(x))))
        x = self.dropout(self.relu(self.bn3(self.fc3(x))))
        x = self.dropout(self.relu(self.bn4(self.fc4(x))))
        x = self.fc5(x)
        return x

# Initialize model
input_size = 3 * target_size[0] * target_size[1]
num_classes = len(data['label'].unique())
model = DNNModel(input_size, num_classes)
model = model.to(device)

# calculate the class weight
total_samples = len(train_df)
num_benign = len(train_df[train_df['label'] == 0])
num_malignant = len(train_df[train_df['label'] == 1])

weight_benign = total_samples / (2 * num_benign)
weight_malignant = total_samples / (2 * num_malignant)
class_weights = torch.tensor([weight_benign, weight_malignant]).to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, 
    mode='min',
    patience=3,  
    factor=0.1,  
    min_lr=1e-6
)

# define training model
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs):
    best_val_loss = float('inf')
    patience = 5
    patience_counter = 0
    
    for epoch in range(epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        train_correct = 0
        train_total = 0
        
        for images, labels in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{epochs}"):
            images, labels = images.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            train_total += labels.size(0)
            train_correct += (predicted == labels).sum().item()
        
        train_loss = running_loss / len(train_loader)
        train_acc = 100 * train_correct / train_total
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()
        
        val_loss = val_loss / len(val_loader)
        val_acc = 100 * val_correct / val_total
        
        # Print epoch results
        print(f'Epoch [{epoch+1}/{epochs}]')
        print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
        print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
        
        # Learning rate scheduling
        scheduler.step(val_loss)
        
        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            # Save best model
            torch.save(model.state_dict(), 'best_model.pth')
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f'Early stopping triggered after epoch {epoch+1}')
                break

# Evaluation function
def evaluate_model(model, test_loader):
    model.eval()
    all_preds = []
    all_labels = []
    test_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            test_loss += loss.item()
            
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    test_loss = test_loss / len(test_loader)
    accuracy = 100 * correct / total
    print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {accuracy:.2f}%')
    return all_labels, all_preds

# Train the model
epochs = 10
train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs)

# Load best model and evaluate
model.load_state_dict(torch.load('best_model.pth'))
y_test, y_pred = evaluate_model(model, test_loader)

# Print classification report
target_names = ['Benign', 'Malignant']
print("\nClassification Report:")
print(classification_report(y_test, y_pred, target_names=target_names))
