# Import Required Libraries & Setup Device

In [1]:
import os
import random
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision.transforms as T
from torchvision.transforms import functional as F

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Error Handling
try:
    from torch.cuda.amp import autocast, GradScaler
except ImportError:
    from contextlib import contextmanager
    @contextmanager
    def autocast(enabled=True):
        yield
    class GradScaler:
        def scale(self, loss):
            return loss
        def unscale_(self, optimizer):
            pass
        def step(self, optimizer):
            optimizer.step()
        def update(self):
            pass

# Set Current Working Directory
from pathlib import Path
project_root = Path.cwd().parent



# Data Augmentation: JointRandomFlipRotate
Define the JointRandomFlipRotate class to perform random horizontal flips and rotations on images and corresponding masks.

In [2]:
class JointRandomFlipRotate:
    def __init__(self, p_flip=0.5, degrees=15):
        self.p_flip = p_flip
        self.degrees = degrees

    def __call__(self, image, mask):
        # Random horizontal flip
        if random.random() < self.p_flip:
            image = F.hflip(image)
            mask = F.hflip(mask)
        
        # Random rotation
        angle = random.uniform(-self.degrees, self.degrees)
        image = F.rotate(image, angle, fill=0)
        mask = F.rotate(mask, angle, fill=0)
        
        return image, mask

# Ultrasound Segmentation Dataset
Create a custom Dataset class to load ultrasound images and masks, apply joint and individual transforms, and return the processed data.

In [3]:
# Dataset class for segmentation
class UltrasoundSegmentationDataset(Dataset):
    def __init__(self, images_dir, masks_dir, joint_transform=None, transform_img=None, transform_mask=None):
        self.images_dir = images_dir
        self.masks_dir = masks_dir
        self.image_files = sorted(os.listdir(images_dir))
        self.mask_files = sorted(os.listdir(masks_dir))
        self.joint_transform = joint_transform
        self.transform_img = transform_img
        self.transform_mask = transform_mask

    def __len__(self):
        return len(self.image_files)
    
    def __getitem__(self, idx):
        img_path = os.path.join(self.images_dir, self.image_files[idx])
        mask_path = os.path.join(self.masks_dir, self.mask_files[idx])
        image = Image.open(img_path).convert("RGB")
        mask = Image.open(mask_path).convert("L")
        
        # Apply joint augmentation if provided
        if self.joint_transform is not None:
            image, mask = self.joint_transform(image, mask)
        
        # Apply individual transforms if provided
        if self.transform_img is not None:
            image = self.transform_img(image)
        if self.transform_mask is not None:
            mask = self.transform_mask(mask)
        
        return image, mask

# Transforms (resize and to-tensor)
transform_img = T.Compose([
    T.Resize((256, 256)),
    T.ToTensor(),
])
transform_mask = T.Compose([
    T.Resize((256, 256)),
    T.ToTensor(),
])

# Update these paths as needed
images_dir = project_root / "Dataset" / "Images"
masks_dir = project_root / "Dataset" / "Masks"

# Create dataset and loaders
joint_augmentation = JointRandomFlipRotate(p_flip=0.5, degrees=15)
dataset = UltrasoundSegmentationDataset(images_dir, masks_dir, joint_transform=joint_augmentation, 
                                        transform_img=transform_img, transform_mask=transform_mask)
total_samples = len(dataset)
train_size = int(0.8 * total_samples)
val_size = total_samples - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

# Data Transforms and Loader Creation
Define transforms for images and masks (resize and to-tensor), then create dataset instances. Split the dataset into training and validation sets and create DataLoaders.

In [4]:
# Transforms (resize and to-tensor)
transform_img = T.Compose([
    T.Resize((256, 256)),
    T.ToTensor(),
])
transform_mask = T.Compose([
    T.Resize((256, 256)),
    T.ToTensor(),
])

# Update these paths as needed
images_dir = project_root /  "Dataset" / "Images"
masks_dir = project_root /  "Dataset" / "Masks"

# Create dataset and loaders
joint_augmentation = JointRandomFlipRotate(p_flip=0.5, degrees=15)
dataset = UltrasoundSegmentationDataset(images_dir, masks_dir, joint_transform=joint_augmentation, 
                                        transform_img=transform_img, transform_mask=transform_mask)
total_samples = len(dataset)
train_size = int(0.8 * total_samples)
val_size = total_samples - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

# U-Net Model Definition
Implement the U-Net segmentation model architecture using PyTorch modules including encoders, decoders, a bottleneck, and the final convolution with sigmoid activation.

In [5]:
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        # Encoder
        self.enc_conv1 = nn.Sequential(
            nn.Conv2d(3, 16, 3, padding=1), nn.ReLU(),
            nn.Conv2d(16, 16, 3, padding=1), nn.ReLU()
        )
        self.pool1 = nn.MaxPool2d(2)
        self.enc_conv2 = nn.Sequential(
            nn.Conv2d(16, 32, 3, padding=1), nn.ReLU(),
            nn.Conv2d(32, 32, 3, padding=1), nn.ReLU()
        )
        self.pool2 = nn.MaxPool2d(2)
        # Bottleneck
        self.bottleneck = nn.Sequential(
            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(),
            nn.Conv2d(64, 64, 3, padding=1), nn.ReLU()
        )
        # Decoder
        self.upconv2 = nn.ConvTranspose2d(64, 32, kernel_size=2, stride=2)
        self.dec_conv2 = nn.Sequential(
            nn.Conv2d(64, 32, 3, padding=1), nn.ReLU(),
            nn.Conv2d(32, 32, 3, padding=1), nn.ReLU()
        )
        self.upconv1 = nn.ConvTranspose2d(32, 16, kernel_size=2, stride=2)
        self.dec_conv1 = nn.Sequential(
            nn.Conv2d(32, 16, 3, padding=1), nn.ReLU(),
            nn.Conv2d(16, 16, 3, padding=1), nn.ReLU()
        )
        self.final_conv = nn.Conv2d(16, 1, kernel_size=1)
        
    def forward(self, x):
        e1 = self.enc_conv1(x)
        p1 = self.pool1(e1)
        e2 = self.enc_conv2(p1)
        p2 = self.pool2(e2)
        b = self.bottleneck(p2)
        up2 = self.upconv2(b)
        cat2 = torch.cat([up2, e2], dim=1)
        d2 = self.dec_conv2(cat2)
        up1 = self.upconv1(d2)
        cat1 = torch.cat([up1, e1], dim=1)
        d1 = self.dec_conv1(cat1)
        out = self.final_conv(d1)
        out = torch.sigmoid(out)
        return out

model = UNet().to(device)
print(model)

UNet(
  (enc_conv1): Sequential(
    (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU()
  )
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (enc_conv2): Sequential(
    (0): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU()
  )
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (bottleneck): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU()
  )
  (upconv2): ConvTranspose2d(64, 32, kernel_size=(2, 2), stride=(2, 2))
  (dec_conv2): Sequential(
    (0): Conv2d(64, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): 

# Loss, Optimizer, and Scheduler
Define the loss function (BCELoss), set up the Adam optimizer, and configure a StepLR scheduler for learning rate adjustments.

In [6]:
# Define the loss function
criterion = nn.BCELoss()

# Set up the Adam optimizer
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Configure a StepLR scheduler for learning rate adjustments
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.5)

# Training Loop and Model Saving
Implement the training and validation loops, calculate average losses, and save the model weights when the validation loss improves.

In [None]:
# Initialize the best validation loss to a high value
best_val_loss = float("inf")

# Define the training function
def train_model(model, train_loader, val_loader, epochs, device):
    global best_val_loss
    for epoch in range(epochs):
        model.train()
        train_loss = 0.0
        for images, masks in train_loader:
            images = images.to(device)
            masks = masks.to(device)
            optimizer.zero_grad()
            preds = model(images)
            loss = criterion(preds, masks)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        scheduler.step()
        avg_train_loss = train_loss / len(train_loader)
        
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for images, masks in val_loader:
                images = images.to(device)
                masks = masks.to(device)
                preds = model(images)
                loss = criterion(preds, masks)
                val_loss += loss.item()
        avg_val_loss = val_loss / len(val_loader)
        print(f"Epoch [{epoch+1}/{epochs}] - Train Loss: {avg_train_loss:.4f} - Val Loss: {avg_val_loss:.4f}")
        
        # Save model weights if validation loss improves
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), project_root / "Model Weights" / "hypothesis_best_model_weights.pth")
    print("Training complete.")

# Set the number of epochs
epochs = 30

# Train the model
train_model(model, train_loader, val_loader, epochs, device)