In [1]:
# Imports
import os
import numpy as np
from PIL import Image
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Parameters
img_size = 256  # Size of the images
DataPath = "/Users/connormorgan-matthews/Documents/Neuro Project/data/lgg-mri-segmentation/kaggle_3m/"
batch_size = 16  # Adjust based on your MacBook's memory capacity
epochs = 10  # Number of epochs for training
learning_rate = 0.001  # Learning rate for the optimizer


In [2]:
# Load and preprocess data
def load_and_preprocess_images(images, masks, img_size, filter_blank=True):
    X = []
    Y = []
    for image_path, mask_path in zip(images, masks):
        img = Image.open(image_path).resize((img_size, img_size))
        img = np.array(img) / 255.0  # Normalize to [0, 1]
        mask = Image.open(mask_path).convert("L").resize((img_size, img_size))
        mask = (np.array(mask) > 0).astype(np.uint8)  # Binary mask
        if filter_blank and np.any(mask):
            X.append(img)
            Y.append(mask)
        elif not filter_blank:
            X.append(img)
            Y.append(mask)
    return np.array(X), np.array(Y)

# Populate images and masks lists
images = []
masks = []

for root, _, files in os.walk(DataPath):
    for file in files:
        if 'mask' in file:
            masks.append(os.path.join(root, file))
            images.append(os.path.join(root, file.replace('_mask', '')))

# Load data
X, Y = load_and_preprocess_images(images, masks, img_size)
Y = np.expand_dims(Y, axis=-1)  # Add channel dimension for compatibility

# Split data into training and validation sets
X_train, X_val, Y_train, Y_val = train_test_split(X, Y, test_size=0.2, random_state=42)

# Check shapes of the datasets
print("Training set shape:", X_train.shape, Y_train.shape)


Training set shape: (1098, 256, 256, 3) (1098, 256, 256, 1)


In [3]:
# Prepare the training data
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).permute(0, 3, 1, 2)  # Convert to (batch, channels, height, width)
Y_train_tensor = torch.tensor(Y_train, dtype=torch.float32).permute(0, 3, 1, 2)  # Reshape masks to (batch, 1, height, width)

# Create a DataLoader
train_dataset = TensorDataset(X_train_tensor, Y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


In [16]:
class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=1):
        super(UNet, self).__init__()

        # Encoder
        self.enc1 = self.conv_block(in_channels, 64)
        self.enc2 = self.conv_block(64, 128)
        self.enc3 = self.conv_block(128, 256)
        self.enc4 = self.conv_block(256, 512)

        # Bottleneck
        self.bottleneck = self.conv_block(512, 1024)

        # Decoder
        self.upconv1 = self.upconv_block(1024, 512)
        self.upconv2 = self.upconv_block(512, 256)
        self.upconv3 = self.upconv_block(256, 128)
        self.upconv4 = self.upconv_block(128, 64)

        # Final layer
        self.final_conv = nn.Conv2d(64, out_channels, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU()
        )

    def upconv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2),
            nn.ReLU()
        )

    def forward(self, x):
        # Encoder
        enc1 = self.enc1(x)
        enc2 = self.enc2(enc1)
        enc3 = self.enc3(enc2)
        enc4 = self.enc4(enc3)

        # Bottleneck
        bottleneck = self.bottleneck(enc4)

        # Decoder
        up1 = self.upconv1(bottleneck)
        up2 = self.upconv2(up1)
        up3 = self.upconv3(up2)
        up4 = self.upconv4(up3)

        # Final convolution layer
        out = self.final_conv(up4)
        out = torch.sigmoid(out)  # Apply sigmoid for binary output
        return out


In [None]:
# Initialize and set up the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ImprovedCNN().to(device)
criterion = nn.BCEWithLogitsLoss()  # For binary segmentation

optimizer = optim.Adam(model.parameters(), lr=0.0005)  # Adjust to 0.0001 or 0.0005




In [15]:
# Training loop
for epoch in range(epochs):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    
    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)

        # Compute loss
        loss = criterion(outputs, masks)  # Masks should already have the shape (batch, 1, height, width)
        loss.backward()  # Backward pass
        optimizer.step()  # Optimize

        running_loss += loss.item()
    
    print(f"Epoch [{epoch + 1}/{epochs}], Loss: {running_loss / len(train_loader):.4f}")

print("Training complete!")


Epoch [1/10], Loss: 0.8999
Epoch [2/10], Loss: 0.7251
Epoch [3/10], Loss: 0.6931
Epoch [4/10], Loss: 0.6923
Epoch [5/10], Loss: 0.6918
Epoch [6/10], Loss: 0.6913
Epoch [7/10], Loss: 0.6910


KeyboardInterrupt: 

In [None]:
import os
import numpy as np
from PIL import Image
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms

# Parameters
img_size = 256  # Size of the images
DataPath = "/Users/connormorgan-matthews/Documents/Neuro Project/data/lgg-mri-segmentation/kaggle_3m/"
batch_size = 16  # Adjust based on your MacBook's memory capacity
epochs = 50  # Number of epochs for training
learning_rate = 0.0005  # Learning rate for the optimizer
patience = 5  # Early stopping patience
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # Use GPU if available

# Define transformations for data augmentation
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomResizedCrop(img_size, scale=(0.8, 1.0)),
    transforms.ToTensor()
])

# Load and preprocess data
def load_and_preprocess_images(images, masks, img_size, filter_blank=True):
    X = []
    Y = []
    for image_path, mask_path in zip(images, masks):
        img = Image.open(image_path).resize((img_size, img_size))
        img = np.array(img) / 255.0  # Normalize to [0, 1]
        mask = Image.open(mask_path).convert("L").resize((img_size, img_size))
        mask = (np.array(mask) > 0).astype(np.uint8)  # Binary mask
        if filter_blank and np.any(mask):
            X.append(img)
            Y.append(mask)
        elif not filter_blank:
            X.append(img)
            Y.append(mask)
    return np.array(X), np.array(Y)

# Populate images and masks lists
images = []
masks = []

for root, _, files in os.walk(DataPath):
    for file in files:
        if 'mask' in file:
            masks.append(os.path.join(root, file))
            images.append(os.path.join(root, file.replace('_mask', '')))

# Load data
X, Y = load_and_preprocess_images(images, masks, img_size)
Y = np.expand_dims(Y, axis=-1)  # Add channel dimension for compatibility

# Split data into training and validation sets
X_train, X_val, Y_train, Y_val = train_test_split(X, Y, test_size=0.2, random_state=42)

# Prepare the training data
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).permute(0, 3, 1, 2)  # Convert to (batch, channels, height, width)
Y_train_tensor = torch.tensor(Y_train, dtype=torch.float32).permute(0, 3, 1, 2)  # Reshape masks to (batch, 1, height, width)

# Create a DataLoader
train_dataset = TensorDataset(X_train_tensor, Y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Define the U-Net architecture
class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=1):
        super(UNet, self).__init__()

        # Encoder
        self.enc1 = self.conv_block(in_channels, 64)
        self.enc2 = self.conv_block(64, 128)
        self.enc3 = self.conv_block(128, 256)
        self.enc4 = self.conv_block(256, 512)

        # Bottleneck
        self.bottleneck = self.conv_block(512, 1024)

        # Decoder
        self.upconv1 = self.upconv_block(1024, 512)
        self.upconv2 = self.upconv_block(512, 256)
        self.upconv3 = self.upconv_block(256, 128)
        self.upconv4 = self.upconv_block(128, 64)

        # Final layer
        self.final_conv = nn.Conv2d(64, out_channels, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU()
        )

    def upconv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2),
            nn.ReLU()
        )

    def forward(self, x):
        # Encoder
        enc1 = self.enc1(x)
        enc2 = self.enc2(enc1)
        enc3 = self.enc3(enc2)
        enc4 = self.enc4(enc3)

        # Bottleneck
        bottleneck = self.bottleneck(enc4)

        # Decoder
        up1 = self.upconv1(bottleneck)
        up2 = self.upconv2(up1)
        up3 = self.upconv3(up2)
        up4 = self.upconv4(up3)

        # Final convolution layer
        out = self.final_conv(up4)
        out = torch.sigmoid(out)  # Apply sigmoid for binary output
        return out

# Initialize and set up the model
model = UNet().to(device)

# Loss function: Combined BCE and Dice Loss
def dice_loss(pred, target, smooth=1e-6):
    intersection = torch.sum(pred * target)
    return 1 - (2. * intersection + smooth) / (torch.sum(pred) + torch.sum(target) + smooth)

def combined_loss(pred, target):
    bce_loss = nn.BCEWithLogitsLoss()(pred, target)
    dice = dice_loss(pred, target)
    return bce_loss + dice

criterion = combined_loss

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)  # Reduce LR by half every 5 epochs

# Early stopping
best_loss = np.inf
patience_counter = 0

# Training loop
for epoch in range(epochs):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    
    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)

        # Compute loss
        loss = criterion(outputs, masks)  # Masks should already have the shape (batch, 1, height, width)
        loss.backward()  # Backward pass
        optimizer.step()  # Optimize

        running_loss += loss.item()

    # Scheduler step
    scheduler.step()

    print(f"Epoch [{epoch + 1}/{epochs}], Loss: {running_loss / len(train_loader):.4f}")

    # Early stopping
    if running_loss < best_loss:
        best_loss = running_loss
        patience_counter = 0
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping triggered")
            break

print("Training complete!")
