In [16]:
import torch
import numpy as np
import cv2
import os
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from torchvision.models import resnet18
from sklearn.model_selection import train_test_split
from tqdm import tqdm  # For showing progress bar
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image

In [17]:
class Dataset(object):
    
    
    def __getitem__(self, index):
        raise NotImplementedError
        
    def __len__(self):
        raise NotImplementedError
        
    def __add__(self, other):
        return ConcatDataset([self,other])

In [18]:
class MRI_Dataset(Dataset):
    def __init__(self, tumor_dir, healthy_dir, image_size=(224, 224)):
        self.images = []
        self.labels = []
        self.image_size = image_size

        # ImageNet normalization
        self.transform = transforms.Compose([
            transforms.Resize(self.image_size),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # ImageNet normalization
        ])

        # Load tumor images
        for filename in os.listdir(tumor_dir):
            img = cv2.imread(os.path.join(tumor_dir, filename))
            if img is not None:
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
                img = Image.fromarray(img)  # Convert numpy array to PIL image
                img = self.transform(img)  # Apply the transformations
                self.images.append(img)
                self.labels.append(1)  # Tumor = 1

        # Load healthy images
        for filename in os.listdir(healthy_dir):
            img = cv2.imread(os.path.join(healthy_dir, filename))
            if img is not None:
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
                img = Image.fromarray(img)  # Convert numpy array to PIL image
                img = self.transform(img)  # Apply the transformations
                self.images.append(img)
                self.labels.append(0)  # Healthy = 0

        self.images = np.array(self.images)
        self.labels = np.array(self.labels)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]  # Already transformed to tensor and normalized
        label = self.labels[idx]
        return {'image': image, 'label': torch.tensor(label, dtype=torch.float32)}


In [19]:
# class MRI_Dataset(Dataset):
#     def __init__(self, tumor_dir, healthy_dir, image_size=(128, 128)):
#         self.tumor_images = []
#         self.healthy_images = []
#         self.labels = []
        
#         self.image_size = image_size
        
#         # Load tumor images
#         for filename in os.listdir(tumor_dir):
#             img = cv2.imread(os.path.join(tumor_dir, filename))
#             img = cv2.resize(img, self.image_size)
#             b, g, r = cv2.split(img)
#             img = cv2.merge([r, g, b])  # Convert from BGR to RGB
#             self.tumor_images.append(img)
#             self.labels.append(1)  # Tumor = 1
        
#         # Load healthy images
#         for filename in os.listdir(healthy_dir):
#             img = cv2.imread(os.path.join(healthy_dir, filename))
#             img = cv2.resize(img, self.image_size)
#             b, g, r = cv2.split(img)
#             img = cv2.merge([r, g, b])  # Convert from BGR to RGB
#             self.healthy_images.append(img)
#             self.labels.append(0)  # Healthy = 0
        
#         self.images = np.array(self.tumor_images + self.healthy_images)
#         self.labels = np.array(self.labels)

#     def __len__(self):
#         return len(self.images)

#     def __getitem__(self, idx):
#         image = self.images[idx].astype(np.float32) / 255.0  # Normalize
#         label = self.labels[idx]
#         return {'image': torch.tensor(image).permute(2, 0, 1), 'label': torch.tensor(label, dtype=torch.float32)}


In [20]:
# Load dataset paths
train_tumor_dir = './Dataset/Training/aug_tumor'
train_healthy_dir = './Dataset/Training/aug_notumor'
test_tumor_dir = './Dataset/Testing/aug_tumor'  # Separate testing directory
test_healthy_dir = './Dataset/Testing/aug_notumor'  # Separate testing directory

# Create Dataset and DataLoader for training
train_dataset = MRI_Dataset(train_tumor_dir, train_healthy_dir)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Create Dataset and DataLoader for testing
test_dataset = MRI_Dataset(test_tumor_dir, test_healthy_dir)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

KeyboardInterrupt: 

In [None]:
# Define device (GPU if available, otherwise CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# Load pre-trained ResNet18
model = resnet18(pretrained=True)
num_features = model.fc.in_features
model.fc = nn.Sequential(
    nn.Linear(num_features, 1),  # Binary classification
    nn.Sigmoid()  # Sigmoid for output probabilities
)

In [None]:
# Initialize model
model = model.to(device)

In [None]:
# Define Loss function and Optimizer
criterion = nn.BCELoss()  # Binary Cross Entropy Loss
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Set number of epochs (adjust as needed)
epochs = 30

In [None]:
# Initialize variables to track best model
best_val_accuracy = 0.0
best_model_path = ""

# Training loop
for epoch in range(epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0
    correct = 0
    total = 0
    for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
        images = batch['image'].to(device)
        labels = batch['label'].to(device)
        
        optimizer.zero_grad()  # Zero the gradients before backprop
        
        # Forward pass
        outputs = model(images).squeeze()  # Forward pass through the network
        
        # Compute the loss
        loss = criterion(outputs, labels)  # Compute loss
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        # Track the running loss
        running_loss += loss.item()
        
        # Calculate accuracy
        predicted = (outputs > 0.5).float()  # Predicted class: 1 if output > 0.5, else 0
        correct += (predicted == labels).sum().item()
        total += labels.size(0)
    
    # Average loss and accuracy for the epoch
    avg_loss = running_loss / len(train_loader)
    accuracy = 100 * correct / total
    print(f"Train Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%")
    
    # Validation loop (using the separate test dataset)
    model.eval()  # Set model to evaluation mode
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():  # Disable gradient computation during validation
        for batch in test_loader:
            images = batch['image'].to(device)
            labels = batch['label'].to(device)
            
            outputs = model(images).squeeze()  # Forward pass through the model
            loss = criterion(outputs, labels)  # Compute validation loss
            val_loss += loss.item()
            
            # Calculate validation accuracy
            predicted = (outputs > 0.5).float()
            correct += (predicted == labels).sum().item()
            total += labels.size(0)
    
    # Average validation loss and accuracy
    avg_val_loss = val_loss / len(test_loader)
    val_accuracy = 100 * correct / total
    print(f"Validation Loss: {avg_val_loss:.4f}, Accuracy: {val_accuracy:.2f}%")

    # Save model if it's the best performing model so far
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        best_model_path = f'Resnet18_best_model.pth'
        torch.save(model.state_dict(), best_model_path)
        print(f"Best model saved at epoch {epoch+1} with accuracy: {val_accuracy:.2f}%")



In [None]:
# # Save model after all epochs
# torch.save(model.state_dict(), 'final_model.pth')