Mask Segmentation Using U-Net

In [23]:
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


Using device: cpu


In [29]:
import torch.nn as nn

class UNet(nn.Module):
    def __init__(self, in_channels=1, out_channels=1):
        super(UNet, self).__init__()

        def conv_block(in_c, out_c):
            return nn.Sequential(
                nn.Conv2d(in_c, out_c, kernel_size=3, padding=1),
                nn.BatchNorm2d(out_c),  # BatchNorm speeds up training
                nn.LeakyReLU(inplace=True),
                nn.Conv2d(out_c, out_c, kernel_size=3, padding=1),
                nn.BatchNorm2d(out_c),
                nn.LeakyReLU(inplace=True)
            )

        self.encoder1 = conv_block(in_channels, 16)  # Reduced from 64 to 16
        self.encoder2 = conv_block(16, 32)           # Reduced from 128 to 32
        self.encoder3 = conv_block(32, 64)           # Reduced from 256 to 64

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.bottleneck = conv_block(64, 128)        # Reduced from 512 to 128

        self.upconv3 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.decoder3 = conv_block(128, 64)
        self.upconv2 = nn.ConvTranspose2d(64, 32, kernel_size=2, stride=2)
        self.decoder2 = conv_block(64, 32)
        self.upconv1 = nn.ConvTranspose2d(32, 16, kernel_size=2, stride=2)
        self.decoder1 = conv_block(32, 16)

        self.final_conv = nn.Conv2d(16, out_channels, kernel_size=1)

    def forward(self, x):
        e1 = self.encoder1(x)
        e2 = self.encoder2(self.pool(e1))
        e3 = self.encoder3(self.pool(e2))

        b = self.bottleneck(self.pool(e3))

        d3 = self.decoder3(torch.cat([self.upconv3(b), e3], dim=1))
        d2 = self.decoder2(torch.cat([self.upconv2(d3), e2], dim=1))
        d1 = self.decoder1(torch.cat([self.upconv1(d2), e1], dim=1))

        return torch.sigmoid(self.final_conv(d1))

# Initialize model on CPU
model = UNet().to(device)
print("U-Net Model Initialized Successfully on CPU")


U-Net Model Initialized Successfully on CPU


In [None]:
import os
import cv2
import torch
import multiprocessing
from torch.utils.data import Dataset, DataLoader

# Dataset Class
class SegmentationDataset(Dataset):
    def __init__(self, image_dir, mask_dir):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.image_files = [f for f in os.listdir(image_dir) if f.endswith('.jpg') or f.endswith('.png')]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.image_dir, self.image_files[idx])
        mask_path = os.path.join(self.mask_dir, self.image_files[idx])

        image = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        # Resize to 32x32 for faster training
        image = cv2.resize(image, (32, 32)) / 255.0
        mask = cv2.resize(mask, (32, 32)) / 255.0

        image = torch.tensor(image, dtype=torch.float32).unsqueeze(0)
        mask = torch.tensor(mask, dtype=torch.float32).unsqueeze(0)

        return image, mask

# Get available CPU cores
num_workers = min(2, multiprocessing.cpu_count())  # Avoid system overload

# Load dataset
image_dir = "image/MSFD/1/face_crop"
mask_dir = "image/MSFD/1/face_crop_segmentation"
dataset = SegmentationDataset(image_dir, mask_dir)

# Optimized DataLoader for CPU
dataloader = DataLoader(
    dataset, batch_size=2, shuffle=True,
    num_workers=num_workers, pin_memory=False  # No pin_memory for CPU
)

print(f"Dataset Loaded: {len(dataset)} images")


Dataset Loaded: 9382 images


In [33]:
import torch.optim as optim
import torch.nn as nn

def train_unet(model, dataloader, criterion, optimizer, device, epochs=5):
    model.train()
    for epoch in range(epochs):
        epoch_loss = 0
        for images, masks in dataloader:
            images, masks = images.to(device), masks.to(device)

            optimizer.zero_grad()  # Reset gradients

            outputs = model(images)
            loss = criterion(outputs, masks)

            loss.backward()  # Compute gradients
            optimizer.step()  # Update model weights

            epoch_loss += loss.item()

        print(f"Epoch [{epoch+1}/{epochs}], Loss: {epoch_loss/len(dataloader):.4f}")

# Initialize Loss and Optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

# Train the model on CPU
train_unet(model, dataloader, criterion, optimizer, device="cpu", epochs=5)


Epoch [1/5], Loss: 0.1621
Epoch [2/5], Loss: 0.1425
Epoch [3/5], Loss: 0.1355
Epoch [4/5], Loss: 0.1294
Epoch [5/5], Loss: 0.1266


In [34]:
import numpy as np

def evaluate(model, dataloader, device):
    model.eval()
    iou_values = []
    dice_values = []
    with torch.no_grad():
        for images, masks in dataloader:
            images, masks = images.to(device), masks.to(device)
            outputs = model(images)

            pred_masks = (outputs > 0.5).float()

            intersection = (pred_masks * masks).sum()
            union = (pred_masks + masks).sum() - intersection
            dice = (2. * intersection) / (pred_masks.sum() + masks.sum())

            if union > 0:
                iou = intersection / union
                iou_values.append(iou.item())
            dice_values.append(dice.item())

    print(f"Average IoU: {np.mean(iou_values):.4f}")
    print(f"Average Dice Score: {np.mean(dice_values):.4f}")

# Evaluate Model
evaluate(model, dataloader, device)

Average IoU: 0.8589
Average Dice Score: 0.9218
