
Problem: **Mask Segmentation Using U-Net** (5 Marks) \
i. Train a U-Net model for precise segmentation of mask regions in the images. \
ii. Compare the performance of U-Net with the traditional segmentation method
using metrics like IoU or Dice score.

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#!pip install numpy opencv-python tqdm scikit-learn torch torchvision tensorflow

In [None]:
#pip install keras

In [8]:
import os
import json
import numpy as np
import cv2
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader, Subset
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tqdm.keras import TqdmCallback
import tensorflow.keras.backend as K
import random

# Define Paths & Parameters

In [9]:
# Set base directory
base_dir = "/content/drive/MyDrive/Segmentation Dataset/1"

# Define sub-directories
cropped_images_dir = os.path.join(base_dir, "face_crop")
segmented_images_dir = os.path.join(base_dir, "face_crop_segmentation")
csv_path = os.path.join(base_dir, "dataset.csv")

# Image dimensions
IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS = 256, 256, 3
BATCH_SIZE = 32


In [10]:
# Check number of images in directories
num_cropped_images = len([f for f in os.listdir(cropped_images_dir) if f.endswith('.jpg') or f.endswith('.png')])
num_segmented_images = len([f for f in os.listdir(segmented_images_dir) if f.endswith('.jpg') or f.endswith('.png')])

print(f"Number of images in cropped_images_dir: {num_cropped_images}")
print(f"Number of images in segmented_images_dir: {num_segmented_images}")


Number of images in cropped_images_dir: 9452
Number of images in segmented_images_dir: 9471


# Map Cropped and Segmented Images


In [11]:
def map_images(cropped_dir, segmented_dir):
    valid_extensions = ('.jpg', '.jpeg', '.png', '.JPG', '.JPEG', '.PNG')
    cropped_files = [f for f in os.listdir(cropped_dir) if f.endswith(valid_extensions)]
    segmented_files = [f for f in os.listdir(segmented_dir) if f.endswith(valid_extensions)]

    mapped_pairs = []
    for cropped_file in cropped_files:
        base_name, _ = os.path.splitext(cropped_file)
        matched_seg_file = None
        for seg_file in segmented_files:
            if os.path.splitext(seg_file)[0] == base_name:
                matched_seg_file = seg_file
                break
        if matched_seg_file:
            mapped_pairs.append((
                os.path.join(cropped_dir, cropped_file),
                os.path.join(segmented_dir, matched_seg_file)
            ))
    return mapped_pairs

mapped_images = map_images(cropped_images_dir, segmented_images_dir)
print(f"✅ Mapped {len(mapped_images)} image pairs!")
# Save mapped pairs to JSON (if not already saved)
with open("mapped_images.json", "w") as f:
    json.dump(mapped_images, f)

✅ Mapped 9382 image pairs!


# Define Custom Dataset for PyTorch DataLoader

In [12]:
class MaskDataset(Dataset):
    def __init__(self, mapped_images, img_height, img_width):
        self.mapped_images = mapped_images
        self.img_height = img_height
        self.img_width = img_width

    def __len__(self):
        return len(self.mapped_images)

    def __getitem__(self, idx):
        img_path, mask_path = self.mapped_images[idx]
        # Load and resize image
        img = cv2.imread(img_path)
        if img is None:
            raise FileNotFoundError(f"⚠️ Image not found: {img_path}")
        img = cv2.resize(img, (self.img_width, self.img_height))
        img = img.astype(np.float32) / 255.0
        img = np.transpose(img, (2, 0, 1))  # (C, H, W)

        # Load and resize mask
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        if mask is None:
            raise FileNotFoundError(f"⚠️ Mask not found: {mask_path}")
        mask = cv2.resize(mask, (self.img_width, self.img_height))
        mask = np.where(mask > 0, 1, 0).astype(np.uint8)
        mask = np.expand_dims(mask, axis=0)  # (1, H, W)

        return torch.tensor(img, dtype=torch.float32), torch.tensor(mask, dtype=torch.float32)


# 4. Load Dataset and Create DataLoaders

In [13]:
# Set random seed for reproducibility
RANDOM_SEED = 42
random.seed(RANDOM_SEED)

# Load mapped images from JSON
with open("mapped_images.json", "r") as f:
    mapped_images = json.load(f)

# Initialize dataset
dataset = MaskDataset(mapped_images, IMG_HEIGHT, IMG_WIDTH)

# Get all indices
indices = list(range(len(dataset)))

# Split dataset: 70% training, 30% validation
train_indices, val_indices = train_test_split(indices, test_size=0.3, random_state=RANDOM_SEED)

# Create subsets
train_dataset = Subset(dataset, train_indices)
val_dataset = Subset(dataset, val_indices)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

print(f"✅ Data ready! Train size: {len(train_indices)}, Validation size: {len(val_indices)}")



✅ Data ready! Train size: 6567, Validation size: 2815




In [None]:
# print(f"X_train shape: {X_train.shape}, Y_train shape: {Y_train.shape}")
# print(f"X_val shape: {X_val.shape}, Y_val shape: {Y_val.shape}")


## Build Unet Model

In [29]:
def build_unet(input_shape=(256, 256, 3)):
    inputs = Input(input_shape)
    # Encoder
    c1 = Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    c1 = Conv2D(64, (3, 3), activation='relu', padding='same')(c1)
    p1 = MaxPooling2D((2, 2))(c1)

    c2 = Conv2D(128, (3, 3), activation='relu', padding='same')(p1)
    c2 = Conv2D(128, (3, 3), activation='relu', padding='same')(c2)
    p2 = MaxPooling2D((2, 2))(c2)

    c3 = Conv2D(256, (3, 3), activation='relu', padding='same')(p2)
    c3 = Conv2D(256, (3, 3), activation='relu', padding='same')(c3)
    p3 = MaxPooling2D((2, 2))(c3)

    # Bottleneck
    c4 = Conv2D(512, (3, 3), activation='relu', padding='same')(p3)
    c4 = Conv2D(512, (3, 3), activation='relu', padding='same')(c4)

    # Decoder
    u5 = UpSampling2D((2, 2))(c4)
    u5 = concatenate([u5, c3])
    c5 = Conv2D(256, (3, 3), activation='relu', padding='same')(u5)
    c5 = Conv2D(256, (3, 3), activation='relu', padding='same')(c5)

    u6 = UpSampling2D((2, 2))(c5)
    u6 = concatenate([u6, c2])
    c6 = Conv2D(128, (3, 3), activation='relu', padding='same')(u6)
    c6 = Conv2D(128, (3, 3), activation='relu', padding='same')(c6)

    u7 = UpSampling2D((2, 2))(c6)
    u7 = concatenate([u7, c1])
    c7 = Conv2D(64, (3, 3), activation='relu', padding='same')(u7)
    c7 = Conv2D(64, (3, 3), activation='relu', padding='same')(c7)

    outputs = Conv2D(1, (1, 1), activation='sigmoid')(c7)
    model = Model(inputs, outputs)
    return model



In [30]:

# Define image dimensions and channels
IMG_HEIGHT = 256
IMG_WIDTH = 256
IMG_CHANNELS = 3

# Correct call with input shape
model = build_unet(input_shape=(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS))
model.summary()


# 6. Define Combined Loss: Dice Loss + Binary Cross-Entropy

In [16]:
def dice_coefficient(y_true, y_pred, smooth=1):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

def dice_loss(y_true, y_pred):
    return 1 - dice_coefficient(y_true, y_pred)

def combined_loss(y_true, y_pred):
    bce = tf.keras.losses.BinaryCrossentropy()(y_true, y_pred)
    return 0.5 * bce + 0.5 * dice_loss(y_true, y_pred)

model.compile(optimizer=Adam(learning_rate=1e-4),
              loss=combined_loss,
              metrics=[dice_coefficient])

# 7. Create a Generator from PyTorch DataLoader

In [17]:
def data_generator(loader):
    """
    Generator that yields batches from a PyTorch DataLoader.
    It converts the batch (in shape (batch, channels, height, width))
    to the shape TensorFlow expects: (batch, height, width, channels).
    """
    while True:  # Loop indefinitely (fit() will control number of epochs/steps)
        for batch in loader:
            X_batch, Y_batch = batch
            # Convert PyTorch tensor (C, H, W) to NumPy array in (H, W, C)
            X_batch = X_batch.numpy().transpose(0, 2, 3, 1)
            Y_batch = Y_batch.numpy().transpose(0, 2, 3, 1)
            yield X_batch, Y_batch

# Determine steps per epoch and validation steps
steps_per_epoch = len(train_loader)
validation_steps = len(val_loader)





# 8. Define Callbacks: Early Stopping & Learning Rate Scheduler

In [None]:
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True,
    verbose=1
)
lr_scheduler = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=5,
    min_lr=1e-6,
    verbose=1
)


# 9. Train the Model using the Generator

In [18]:
def load_trained_model(model_path):
    """
    Loads the trained model from disk along with custom objects.

    Parameters:
        model_path (str): Path to the saved model file (e.g., "unet_model.h5")

    Returns:
        model (tf.keras.Model): Loaded Keras model.
    """
    # Define custom objects: combined_loss and dice_coefficient need to be passed
    custom_objects = {
        'combined_loss': combined_loss,
        'dice_coefficient': dice_coefficient
    }

    # Load and return the model
    model = tf.keras.models.load_model(model_path, custom_objects=custom_objects)
    print(f"✅ Model loaded from {model_path}")
    return model

# Example usage:
model = load_trained_model(os.path.join(base_dir, "unet_model.h5"))




✅ Model loaded from /content/drive/MyDrive/Segmentation Dataset/1/unet_model.h5


In [None]:
# # ----------------------------
# # Model Training
# # ----------------------------
# EPOCHS = 25
# BATCH_SIZE = 32

# # Convert data to numpy for TensorFlow training
# X_train, Y_train = next(iter(train_loader))
# X_val, Y_val = next(iter(val_loader))

# X_train = X_train.numpy().transpose(0, 2, 3, 1)
# Y_train = Y_train.numpy().transpose(0, 2, 3, 1)
# X_val = X_val.numpy().transpose(0, 2, 3, 1)
# Y_val = Y_val.numpy().transpose(0, 2, 3, 1)

# # Train U-Net
# history = model.fit(
#     X_train, Y_train,
#     validation_data=(X_val, Y_val),
#     batch_size=BATCH_SIZE,
#     epochs=EPOCHS,
#     verbose=1
# )

# # Save trained model
# model.save(os.path.join(base_dir, "unet_model.h5"))
# print("✅ Model training complete and saved!")


In [None]:
# # ----------------------------
# # IoU and Dice Score Calculation
# # ----------------------------
# def iou(y_true, y_pred):
#     intersection = np.logical_and(y_true, y_pred).sum()
#     union = np.logical_or(y_true, y_pred).sum()
#     return intersection / union if union != 0 else 0

# def dice_score(y_true, y_pred):
#     intersection = np.logical_and(y_true, y_pred).sum()
#     return (2.0 * intersection) / (y_true.sum() + y_pred.sum())

# # Evaluate on validation data
# preds = model.predict(X_val)
# preds_binary = (preds > 0.5).astype(np.uint8)

# ious, dices = [], []
# for i in range(len(Y_val)):
#     ious.append(iou(Y_val[i], preds_binary[i]))
#     dices.append(dice_score)


In [None]:
# ----------------------------
# Evaluation on the Entire Validation Set
# ----------------------------



# Initialize lists to collect predictions and ground truth masks
all_preds = []
all_Y_val = []

# Determine the number of validation steps (batches)
print(f"Evaluating on {validation_steps} validation batches...")

# Iterate over the validation DataLoader
for _ in range(validation_steps):
    X_batch, Y_batch = next(data_generator(val_loader))
    preds_batch = model.predict(X_batch)
    preds_binary_batch = (preds_batch > 0.5).astype(np.uint8)
    all_preds.append(preds_binary_batch)
    all_Y_val.append(Y_batch)

# Concatenate all batches into single arrays
all_preds = np.concatenate(all_preds, axis=0)
all_Y_val = np.concatenate(all_Y_val, axis=0)

# Define IoU and Dice functions
def iou(y_true, y_pred):
    intersection = np.logical_and(y_true, y_pred).sum()
    union = np.logical_or(y_true, y_pred).sum()
    return intersection / union if union != 0 else 0

def dice_score(y_true, y_pred):
    intersection = np.logical_and(y_true, y_pred).sum()
    return (2.0 * intersection) / (y_true.sum() + y_pred.sum() + 1e-7)

# Calculate IoU and Dice Score for each sample in the validation set
iou_scores = []
dice_scores = []
for i in range(len(all_Y_val)):
    iou_scores.append(iou(all_Y_val[i], all_preds[i]))
    dice_scores.append(dice_score(all_Y_val[i], all_preds[i]))

# Print the evaluation results
print(f"✅ Mean IoU: {np.mean(iou_scores):.4f}")
print(f"✅ Mean Dice Score: {np.mean(dice_scores):.4f}")

# Optionally, print per-sample scores
for i, (iou_val, dice_val) in enumerate(zip(iou_scores, dice_scores)):
    print(f"Sample {i+1:3d} - IoU: {iou_val:.4f}, Dice: {dice_val:.4f}")


Evaluating on 88 validation batches...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m55s[0m 55s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m52s[0m 52s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m52s[0m 52s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 53s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 53s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m51s[0m 51s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m51s[0m 51s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m51s[0m 51s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m59s[0m 59s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m55s[0m 55s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 53s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 53s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 53s/step
[1m1/1[0m [32m━━━━━━━━