In [2]:
import os
import numpy as np
from PIL import Image, ImageOps
# import torch
from sklearn.model_selection import train_test_split

In [3]:
# Paths
TRAIN_IMAGES = "train_images/"
MASK_OUTPUT = "mask_output/"

# Target size for resizing
IMG_SIZE = (512, 512)

In [4]:
# Resize images and masks while preserving aspect ratio
def resize_with_aspect_ratio(image, target_size):
    """
    Resize an image while preserving aspect ratio, with padding to match the target size.
    Args:
        image: The input image (as a PIL Image).
        target_size: Tuple (target_height, target_width).
    Returns:
        Resized and padded image as a PIL Image.
    """
    img = image.copy()
    img.thumbnail((target_size[1], target_size[0]), Image.Resampling.LANCZOS)  # Use LANCZOS for high-quality downscaling
    delta_w = target_size[1] - img.size[0]
    delta_h = target_size[0] - img.size[1]
    padding = (delta_w // 2, delta_h // 2, delta_w - delta_w // 2, delta_h - delta_h // 2)
    padded_img = ImageOps.expand(img, padding, fill=0)  # Pad with black pixels
    return padded_img

In [5]:
# Load and preprocess images and masks
image_files = sorted(os.listdir(TRAIN_IMAGES))
mask_files = sorted([os.path.join(root, name)
                     for root, _, files in os.walk(MASK_OUTPUT)
                     for name in files])

In [6]:
images = []
masks = []

for img_path, mask_path in zip(image_files, mask_files):
    # Load and preprocess the image
    img = Image.open(os.path.join(TRAIN_IMAGES, img_path)).convert("RGB")  # Ensure RGB format
    img_resized = resize_with_aspect_ratio(img, IMG_SIZE)
    img_array = np.array(img_resized) / 255.0  # Normalize to [0, 1]
    images.append(img_array)

    # Load and preprocess the mask
    mask = Image.open(mask_path)
    mask_resized = resize_with_aspect_ratio(mask, IMG_SIZE)
    mask_array = np.array(mask_resized) / 255.0  # Scale between [0, 1]
    masks.append(mask_array)

In [7]:
# Convert to numpy arrays
images = np.array(images, dtype=np.float32)
masks = np.array(masks, dtype=np.float32)

# Train-test split
X_train, X_val, y_train, y_val = train_test_split(images, masks, test_size=0.2, random_state=42)

In [9]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        
        # Encoder
        self.enc_conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1), nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1), nn.ReLU(inplace=True)
        )
        self.pool1 = nn.MaxPool2d(2, 2)

        self.enc_conv2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1), nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1), nn.ReLU(inplace=True)
        )
        self.pool2 = nn.MaxPool2d(2, 2)

        self.enc_conv3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True)
        )
        self.pool3 = nn.MaxPool2d(2, 2)

        # Bottleneck
        self.bottleneck = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3, padding=1), nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1), nn.ReLU(inplace=True)
        )

        # Decoder
        self.up3 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec_conv3 = nn.Sequential(
            nn.Conv2d(512, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True)
        )

        self.up2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec_conv2 = nn.Sequential(
            nn.Conv2d(256, 128, kernel_size=3, padding=1), nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1), nn.ReLU(inplace=True)
        )

        self.up1 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec_conv1 = nn.Sequential(
            nn.Conv2d(128, 64, kernel_size=3, padding=1), nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1), nn.ReLU(inplace=True)
        )

        # Output
        self.output = nn.Conv2d(64, 1, kernel_size=1)

    def forward(self, x):
        # Encoder
        enc1 = self.enc_conv1(x)
        enc2 = self.enc_conv2(self.pool1(enc1))
        enc3 = self.enc_conv3(self.pool2(enc2))
        
        # Bottleneck
        bottleneck = self.bottleneck(self.pool3(enc3))
        
        # Decoder
        dec3 = self.dec_conv3(torch.cat([self.up3(bottleneck), enc3], dim=1))
        dec2 = self.dec_conv2(torch.cat([self.up2(dec3), enc2], dim=1))
        dec1 = self.dec_conv1(torch.cat([self.up1(dec2), enc1], dim=1))
        
        # Output
        return torch.sigmoid(self.output(dec1))


In [10]:
from torch.utils.data import DataLoader, Dataset
import numpy as np

# Custom Dataset class
class SegmentationDataset(Dataset):
    def __init__(self, images, masks):
        self.images = images
        self.masks = masks

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = torch.tensor(self.images[idx], dtype=torch.float32).permute(2, 0, 1)  # [H, W, C] -> [C, H, W]
        mask = torch.tensor(self.masks[idx], dtype=torch.float32).unsqueeze(0)  # Add channel dimension
        return image, mask

# Create datasets
train_dataset = SegmentationDataset(X_train, y_train)
val_dataset = SegmentationDataset(X_val, y_val)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)


In [12]:
# Step 4: Define Metrics
def iou(pred, target, threshold=0.5):
    pred = (pred > threshold).float()
    intersection = (pred * target).sum()
    union = pred.sum() + target.sum() - intersection
    return intersection / union

def dice_coefficient(pred, target, threshold=0.5):
    pred = (pred > threshold).float()
    intersection = (pred * target).sum()
    dice = (2 * intersection) / (pred.sum() + target.sum())
    return dice

<__main__.SegmentationDataset object at 0x000001A73C30F9D0>


In [11]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNet().to(device)

# Define optimizer, loss function, and metric
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.BCELoss()  # Binary cross-entropy for binary segmentation

# Training loop
def train_model(model, train_loader, val_loader, epochs):
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for images, masks in train_loader:
            images, masks = images.to(device), masks.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, masks)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        val_loss = 0
        dice_scores, iou_scores = [], []
        model.eval()
        with torch.no_grad():
            for images, masks in val_loader:
                images, masks = images.to(device), masks.to(device)
                outputs = model(images)
                loss = criterion(outputs, masks)
                val_loss += loss.item()
                dice_scores.append(dice_coefficient(outputs, masks).item())
                iou_scores.append(iou(outputs, masks).item())

        print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_loss/len(train_loader):.4f}, "
              f"Val Loss: {val_loss/len(val_loader):.4f}, "
              f"Mean Dice: {np.mean(dice_scores):.4f}, Mean IoU: {np.mean(iou_scores):.4f}")

# Evaluation
def evaluate_model(model, loader, criterion):
    model.eval()
    batch_results = []
    total_loss = 0
    dice_scores = []
    iou_scores = []

    with torch.no_grad():
        for batch_idx, (images, masks) in enumerate(loader):
            images, masks = images.to(device), masks.to(device)

            outputs = model(images)
            loss = criterion(outputs, masks)
            total_loss += loss.item()

            batch_dice = dice_coefficient(outputs, masks).item()
            batch_iou = iou(outputs, masks).item()
            dice_scores.append(batch_dice)
            iou_scores.append(batch_iou)

            batch_results.append({
                "Batch": batch_idx + 1,
                "Loss": loss.item(),
                "Dice": batch_dice,
                "IoU": batch_iou
            })

    for result in batch_results:
        print(f"Batch {result['Batch']:>3}: Loss = {result['Loss']:.4f}, Dice = {result['Dice']:.4f}, IoU = {result['IoU']:.4f}")

    mean_loss = total_loss / len(loader)
    mean_dice = np.mean(dice_scores)
    mean_iou = np.mean(iou_scores)

    print(f"Overall Metrics - Loss: {mean_loss:.4f}, Dice: {mean_dice:.4f}, IoU: {mean_iou:.4f}")
    return mean_loss, mean_dice, mean_iou

In [None]:
train_model(model, train_loader, val_loader, epochs=25)

In [None]:
# Step 6: Save and Load Model
torch.save(model.state_dict(), "unet_model.pth")
model.load_state_dict(torch.load("unet_model.pth"))
model.eval()

In [None]:
import matplotlib.pyplot as plt

model.eval()
with torch.no_grad():
    for images, masks in val_loader:
        images, masks = images.to(device), masks.to(device)
        outputs = model(images)
        outputs = (outputs > 0.5).float()  # Threshold predictions
        break

# Visualize the first sample
plt.figure(figsize=(12, 6))
plt.subplot(1, 3, 1)
plt.imshow(images[0].permute(1, 2, 0).cpu().numpy())  # Convert back to [H, W, C]
plt.title("Input Image")
plt.subplot(1, 3, 2)
plt.imshow(masks[0][0].cpu().numpy(), cmap="gray")
plt.title("Ground Truth Mask")
plt.subplot(1, 3, 3)
plt.imshow(outputs[0][0].cpu().numpy(), cmap="gray")
plt.title("Predicted Mask")
plt.show()

In [None]:
# Step 7: Test on Unseen Data
# Assuming `test_images` is a NumPy array of preprocessed test images
test_images =[]
test_dataset = SegmentationDataset(test_images, np.zeros_like(test_images))  # Dummy masks since they're unknown
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Evaluate model on test set
print("Test Set Evaluation:")
test_loss, test_dice, test_iou = evaluate_model(model, test_loader, criterion)

# Save test results to a CSV for further analysis
results = {"Test Loss": test_loss, "Mean Dice": test_dice, "Mean IoU": test_iou}
pd.DataFrame([results]).to_csv("test_results_summary.csv", index=False)

print("Test evaluation complete! Results saved to 'test_results_summary.csv'.")
