In [1]:
import os
import shutil
import random
from pathlib import Path
import numpy as np
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import matplotlib.pyplot as plt
from skimage.restoration import denoise_nl_means
from skimage.metrics import structural_similarity as ssim
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc, precision_recall_curve
from PIL import Image

# Preprocessing functions as described in the paper
def remove_confusing_objects(image):
    """
    Crop 100 pixels from each side of the image to remove confusing objects
    """
    h, w = image.shape[:2]
    cropped = image[100:h-100, 100:w-100]
    return cropped

def apply_nlm_filter(image):
    """
    Apply Non-Local Means filter for denoising
    """
    # Convert to float for NLM algorithm
    image_float = image.astype(np.float32) / 255.0
    
    # Apply NLM denoising with parameters similar to those in the paper
    denoised = denoise_nl_means(image_float, h=7, sigma=0.1, patch_size=7, patch_distance=13)
    
    # Convert back to uint8
    denoised = np.clip(denoised * 255, 0, 255).astype(np.uint8)
    return denoised

def apply_histogram_equalization(image):
    """
    Apply histogram equalization to enhance contrast
    """
    if len(image.shape) == 3 and image.shape[2] == 3:
        # For RGB images
        image_yuv = cv2.cvtColor(image, cv2.COLOR_RGB2YUV)
        image_yuv[:,:,0] = cv2.equalizeHist(image_yuv[:,:,0])
        result = cv2.cvtColor(image_yuv, cv2.COLOR_YUV2RGB)
    else:
        # For grayscale images
        result = cv2.equalizeHist(image)
    return result

def preprocess_image(image):
    """
    Apply the three-step preprocessing approach
    """
    # Step 1: Remove confusing objects
    cropped = remove_confusing_objects(image)
    
    # Step 2: Apply NLM filter
    denoised = apply_nlm_filter(cropped)
    
    # Step 3: Apply histogram equalization
    enhanced = apply_histogram_equalization(denoised)
    
    return enhanced

In [2]:
def create_validation_set(training_dir, validation_dir, split_ratio=0.2):
    """
    Create a validation set by copying a portion of images from training dir
    
    Args:
        training_dir: Path to training directory with class subfolders
        validation_dir: Path to create validation directory with same class structure
        split_ratio: Portion of training images to use for validation (0.0-1.0)
    """
    # Make sure validation dir exists
    Path(validation_dir).mkdir(exist_ok=True, parents=True)
    
    # Get all class folders
    class_folders = [f for f in os.listdir(training_dir) 
                    if os.path.isdir(os.path.join(training_dir, f))]
    
    for class_folder in class_folders:
        # Create class folder in validation dir
        val_class_path = os.path.join(validation_dir, class_folder)
        Path(val_class_path).mkdir(exist_ok=True)
        
        # Get all images in this class
        train_class_path = os.path.join(training_dir, class_folder)
        images = [f for f in os.listdir(train_class_path) 
                 if f.endswith(('.jpg', '.jpeg', '.png'))]
        
        # Determine how many images to move to validation
        num_val_images = int(len(images) * split_ratio)
        
        # Randomly select images for validation
        random.seed(42)  # For reproducibility
        val_images = random.sample(images, num_val_images)
        
        # Copy selected images to validation folder
        for img in val_images:
            src = os.path.join(train_class_path, img)
            dst = os.path.join(val_class_path, img)
            shutil.copy(src, dst)
            
        print(f"Class {class_folder}: Copied {len(val_images)} images to validation set")

# Create validation set
create_validation_set(
    training_dir='dataset/Training',
    validation_dir='dataset/Validation',
    split_ratio=0.2
)

Class MENINGIOMA: Copied 164 images to validation set
Class PITUITARY: Copied 165 images to validation set
Class NOTUMOR: Copied 79 images to validation set
Class GLIOMA: Copied 165 images to validation set


In [3]:
class BrainMRIDataset(Dataset):
    def __init__(self, root_dir, transform=None, preprocess=True):
        """
        Args:
            root_dir (string): Directory with all the images
            transform (callable, optional): Optional transform to be applied on images
            preprocess (bool): Whether to apply the preprocessing steps
        """
        self.root_dir = root_dir
        self.transform = transform
        self.preprocess = preprocess
        
        # Find all image files
        self.classes = ['GLIOMA', 'MENINGIOMA', 'NOTUMOR', 'PITUITARY']
        self.class_to_idx = {cls: i for i, cls in enumerate(self.classes)}
        
        self.image_paths = []
        self.labels = []
        
        for class_name in self.classes:
            class_dir = os.path.join(root_dir, class_name)
            if os.path.isdir(class_dir):
                for img_name in os.listdir(class_dir):
                    if img_name.endswith(('.jpg', '.png', '.jpeg')):
                        self.image_paths.append(os.path.join(class_dir, img_name))
                        self.labels.append(self.class_to_idx[class_name])
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = cv2.imread(img_path, cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        label = self.labels[idx]
        
        if self.preprocess:
            image = preprocess_image(image)
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

In [4]:
class ProposedCNN(nn.Module):
    def __init__(self, num_classes=4):
        super(ProposedCNN, self).__init__()
        
        # Block 1
        self.block1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=7, padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # Block 2
        self.block2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=7, padding=3),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=7, padding=3),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # Block 3
        self.block3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=7, padding=3),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=7, padding=3),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=7, padding=3),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # Block 4
        self.block4 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=7, padding=3),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=7, padding=3),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=7, padding=3),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # Classifier part
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512 * 14 * 14, 1024),
            nn.ReLU(inplace=True),
            nn.Dropout(0.7),
            nn.Linear(1024, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
    
    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.classifier(x)
        return x

In [5]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=60, device='cuda'):
    """
    Train the model and return training history
    """
    model = model.to(device)
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            
            # Statistics
            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = correct / total
        history['train_loss'].append(epoch_loss)
        history['train_acc'].append(epoch_acc)
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()
        
        val_epoch_loss = val_loss / len(val_loader.dataset)
        val_epoch_acc = val_correct / val_total
        history['val_loss'].append(val_epoch_loss)
        history['val_acc'].append(val_epoch_acc)
        
        print(f'Epoch {epoch+1}/{num_epochs}, '
              f'Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}, '
              f'Val Loss: {val_epoch_loss:.4f}, Val Acc: {val_epoch_acc:.4f}')
    
    return history

def evaluate_model(model, test_loader, device='cuda'):
    """
    Evaluate the model on the test set and return performance metrics
    """
    model = model.to(device)
    model.eval()
    
    all_preds = []
    all_labels = []
    all_probs = []
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            probabilities = torch.softmax(outputs, dim=1)
            _, preds = torch.max(outputs, 1)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probabilities.cpu().numpy())
    
    # Calculate metrics
    cm = confusion_matrix(all_labels, all_preds)
    report = classification_report(all_labels, all_preds, target_names=['Glioma', 'Meningioma', 'NoTumor', 'Pituitary'])
    
    # Convert lists to numpy arrays
    all_labels = np.array(all_labels)
    all_probs = np.array(all_probs)
    
    # Calculate ROC curves and AUC for each class
    roc_curves = []
    pr_curves = []
    
    for i in range(4):  # 4 classes
        # One-vs-rest approach for ROC curve
        binary_labels = (all_labels == i).astype(int)
        class_probs = all_probs[:, i]
        
        # ROC curve
        fpr, tpr, _ = roc_curve(binary_labels, class_probs)
        roc_auc = auc(fpr, tpr)
        roc_curves.append((fpr, tpr, roc_auc))
        
        # PR curve
        precision, recall, _ = precision_recall_curve(binary_labels, class_probs)
        pr_auc = auc(recall, precision)
        pr_curves.append((precision, recall, pr_auc))
    
    return {
        'confusion_matrix': cm,
        'classification_report': report,
        'roc_curves': roc_curves,
        'pr_curves': pr_curves
    }

In [6]:
def create_vgg16(num_classes=4):
    """
    Create a modified VGG16 for brain tumor classification
    """
    import torchvision.models as models
    
    model = models.vgg16(pretrained=True)
    
    # Modify the classifier part
    model.classifier = nn.Sequential(
        nn.Linear(512 * 7 * 7, 4096),
        nn.ReLU(True),
        nn.Dropout(p=0.5),
        nn.Linear(4096, 4096),
        nn.ReLU(True),
        nn.Dropout(p=0.5),
        nn.Linear(4096, num_classes)
    )
    
    return model

def create_vgg19(num_classes=4):
    """
    Create a modified VGG19 for brain tumor classification
    """
    import torchvision.models as models
    
    model = models.vgg19(pretrained=True)
    
    # Modify the classifier part
    model.classifier = nn.Sequential(
        nn.Linear(512 * 7 * 7, 4096),
        nn.ReLU(True),
        nn.Dropout(p=0.5),
        nn.Linear(4096, 4096),
        nn.ReLU(True),
        nn.Dropout(p=0.5),
        nn.Linear(4096, num_classes)
    )
    
    return model

class CNNSVM(nn.Module):
    """
    CNN-SVM hybrid model as mentioned in the paper
    Note: This is a simplified implementation. 
    The actual CNN-SVM implementation would require integrating SVM with PyTorch
    """
    def __init__(self, num_classes=4):
        super(CNNSVM, self).__init__()
        
        # CNN part (feature extractor)
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        
        # SVM-like classifier (using linear layer with SVM-like loss)
        self.classifier = nn.Linear(512 * 14 * 14, num_classes)
    
    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

In [7]:
# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Data transformations
data_transforms = {
    'train': transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
}

# Create datasets
train_dataset = BrainMRIDataset(
    root_dir='dataset/Training', 
    transform=data_transforms['train'],
    preprocess=True
)

val_dataset = BrainMRIDataset(
    root_dir='dataset/Validation', 
    transform=data_transforms['val'],
    preprocess=True
)

test_dataset = BrainMRIDataset(
    root_dir='dataset/Testing', 
    transform=data_transforms['test'],
    preprocess=True
)

# Create data loaders
train_loader = DataLoader(
    train_dataset, 
    batch_size=16, 
    shuffle=True, 
    num_workers=4
)

val_loader = DataLoader(
    val_dataset, 
    batch_size=16, 
    shuffle=False, 
    num_workers=4
)

test_loader = DataLoader(
    test_dataset, 
    batch_size=16, 
    shuffle=False, 
    num_workers=4
)

# Create models
models = {
    'proposed': ProposedCNN(num_classes=4),
    'vgg16': create_vgg16(num_classes=4),
    'vgg19': create_vgg19(num_classes=4),
    'cnn_svm': CNNSVM(num_classes=4)
}

# Training parameters
num_epochs = 60
criterion = nn.CrossEntropyLoss()

# Train and evaluate each model
for model_name, model in models.items():
    print(f"\nTraining {model_name} model...")
    
    # Define optimizer
    optimizer = optim.Adam(model.parameters(), lr=0.0001)
    
    # Train the model
    history = train_model(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        criterion=criterion,
        optimizer=optimizer,
        num_epochs=num_epochs,
        device=device
    )
    
    # Plot training history
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history['train_acc'], label='Train Accuracy')
    plt.plot(history['val_acc'], label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title(f'{model_name} - Accuracy')
    
    plt.subplot(1, 2, 2)
    plt.plot(history['train_loss'], label='Train Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title(f'{model_name} - Loss')
    plt.savefig(f'{model_name}_training_history.png')
    plt.close()
    
    # Evaluate model on test set
    print(f"\nEvaluating {model_name} model...")
    eval_results = evaluate_model(model, test_loader, device)
    
    # Print results
    print(f"\nConfusion Matrix for {model_name}:")
    print(eval_results['confusion_matrix'])
    print(f"\nClassification Report for {model_name}:")
    print(eval_results['classification_report'])
    
    # Plot ROC curves
    plt.figure(figsize=(10, 8))
    class_names = ['Glioma', 'Meningioma', 'NoTumor', 'Pituitary']
    for i, (fpr, tpr, roc_auc) in enumerate(eval_results['roc_curves']):
        plt.plot(fpr, tpr, label=f'{class_names[i]} (AUC = {roc_auc:.3f})')
    
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'{model_name} - ROC Curves')
    plt.legend(loc='lower right')
    plt.savefig(f'{model_name}_roc_curves.png')
    plt.close()
    
    # Plot PR curves
    plt.figure(figsize=(10, 8))
    for i, (precision, recall, pr_auc) in enumerate(eval_results['pr_curves']):
        plt.plot(recall, precision, label=f'{class_names[i]} (AUC = {pr_auc:.3f})')
    
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title(f'{model_name} - Precision-Recall Curves')
    plt.legend(loc='lower left')
    plt.savefig(f'{model_name}_pr_curves.png')
    plt.close()

    # Save model
    torch.save(model.state_dict(), f'{model_name}_model.pth')

Using device: cuda


Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to /home/amit/.cache/torch/hub/checkpoints/vgg16-397923af.pth
100%|████████████████████████████████████████| 528M/528M [01:10<00:00, 7.82MB/s]
Downloading: "https://download.pytorch.org/models/vgg19-dcbb9e9d.pth" to /home/amit/.cache/torch/hub/checkpoints/vgg19-dcbb9e9d.pth
100%|████████████████████████████████████████| 548M/548M [01:12<00:00, 7.92MB/s]



Training proposed model...


KeyboardInterrupt: 