In [1]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
import os
from skimage.morphology import flood, binary_closing, disk
from skimage import img_as_ubyte

What follows below is the part c of the project, until the demarcation where part d starts.
______________________________________________________________________________________________________________________________


#------------------------- NOTE : SET THE PATHS CORRECTLY IN THE NEXT CELL BEFORE PROCEDING ---------------------------------#

Here for the part c of the project taht requires us to use traditional region-based segmenteaion techniques, we use the images available at <> where we have the correct corresponding mask so that our output can be validated>


In [None]:
input_folder = '../MSFD/1/face_crop' # cropped faces with mask
output_folder = '../output/part_c' # Path to the folder where you want the output segemented images to be saved
ground_truth_folder= '../MSFD/1/face_crop_segmentation' # folder that contains actual face mask

Technique we choose is  clustering based region-based segmentation using the K-means algorithm.

k=2, one for mask region and another for backround.

Here for the choice of the 2 initial centroids, we use domain knowledge. The images are cropped to face-size which implies that it is higly likely that some region of the mask must be in the center of image. 

So we choose one centoid at center and another at corner.

We try one more technique that is segmentaion by 'Region-growing', choice of initail seed here(only one) is center ofthe image, the 'why' of it backed by the reasoning provided above.

In [None]:
def segment_mask_region(image_path, viz = False, to_save = False, save_separetely = None):
    # Load and preprocess image
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  
    gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    blurred = cv2.GaussianBlur(gray, (5, 5), 8)

    # Compute Gradient for Additional Feature
    sobelx = cv2.Sobel(blurred, cv2.CV_64F, 1, 0, ksize=5)
    sobely = cv2.Sobel(blurred, cv2.CV_64F, 0, 1, ksize=5)
    gradient_magnitude = np.sqrt(sobelx**2 + sobely**2)
    gradient_magnitude = cv2.normalize(gradient_magnitude, None, 0, 255, cv2.NORM_MINMAX)

    # Extract features (Intensity, Spatial Location, Gradient)
    height, width = gray.shape
    X_features = []
    for y in range(height):
        for x in range(width):
            X_features.append([gray[y, x], x / width, y / height, gradient_magnitude[y, x]])

    X_features = np.array(X_features)

    # Manual Seed Selection
    center_x, center_y = width // 2, height // 2
    seed1 = [gray[center_y, center_x], center_x / width, center_y / height, gradient_magnitude[center_y, center_x]]
    seed2 = [gray[0, 0], 0, 0, gradient_magnitude[0, 0]]  # Top-left corner as background

    # Apply K-Means with Manual Seeds
    kmeans = KMeans(n_clusters=2, init=np.array([seed1, seed2]), n_init=1, max_iter=100, random_state=42)
    labels = kmeans.fit_predict(X_features)
   
    # Invert if needed (as Mask should always be white)
    #if kmeans.cluster_centers_[0][0] > kmeans.cluster_centers_[1][0]:  
    labels = 1 - labels  # Swap labels

    #print(kmeans.cluster_centers_)
    #print(f'{type(labels)}, {len(labels)}')


    # Reshaping the labels to match the image shape
    segmented_image = labels.reshape((height, width))

    # Convert segmented image to uint8 (0-255 range)
    segmented_display = (segmented_image * 255).astype(np.uint8)

    # Ensuring binary segmentation before Canny
    segmented_display = cv2.threshold(segmented_display, 127, 255, cv2.THRESH_BINARY)[1]

    # Morphological Closing to Refine Segmentation
    kernel = np.ones((5, 5), np.uint8)
    segmented_display = cv2.morphologyEx(segmented_display, cv2.MORPH_CLOSE, kernel)

    # Applying Canny Edge Detection
    edges = cv2.Canny(segmented_display, 50, 150)

    if(viz==True):

        fig, axes = plt.subplots(1, 3, figsize=(15, 5))

        # Display Original Image
        axes[0].imshow(image)
        axes[0].set_title("Original Image")
        axes[0].axis("off")

        # Display Segmented Region (Fix Scaling Issue)
        axes[1].imshow(segmented_display, cmap="gray", vmin=0, vmax=255)
        axes[1].set_title("Segmented Mask Region")
        axes[1].axis("off")

        # Display Edge Detection Result
        axes[2].imshow(edges, cmap="gray", vmin=0, vmax=255)
        axes[2].set_title("Edges of Segmentation")
        axes[2].axis("off")

        plt.show()
    
    if(to_save == True):
        if(save_separetely != None):
            cv2.imwrite(os.path.join(output_folder, image_path), segmented_display)
        else:
            os.makedirs(output_folder, exist_ok=True)
            cv2.imwrite(os.path.join(output_folder, image_path), segmented_display)

    return segmented_display

def segment_region_growing(image_path, tolerance=0.4, viz=False, to_save=False, save_path=None):
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  
    
    # Convert to grayscale & apply Gaussian Blur
    gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    blurred = cv2.GaussianBlur(gray, (17, 17), 8)

    # Normalize image to [0,1] for flood fill
    norm_image = blurred.astype(np.float32) / 255.0  

    # Define seed point at image center
    height, width = norm_image.shape
    seed_point = (height // 2, width // 2)
    
    # Perform region growing using flood fill
    mask = flood(norm_image, seed_point, tolerance=tolerance)
    
    # Convert mask to uint8 binary image (0 or 255)
    segmented_display = img_as_ubyte(mask)

    # Apply morphological closing to refine segmentation
    segmented_display = binary_closing(segmented_display, disk(3)).astype(np.uint8) * 255

    # Detect edges using Canny
    edges = cv2.Canny(segmented_display, 50, 150)

    # Visualization
    if viz:
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))

        axes[0].imshow(image)
        axes[0].set_title("Original Image")
        axes[0].axis("off")

        axes[1].imshow(segmented_display, cmap="gray")
        axes[1].set_title("Segmented Region")
        axes[1].axis("off")

        axes[2].imshow(edges, cmap="gray")
        axes[2].set_title("Edges of Segmentation")
        axes[2].axis("off")

        plt.show()
    
    # Save result if needed
    if to_save:
        os.makedirs(os.path.dirname(save_path), exist_ok=True)
        cv2.imwrite(save_path, segmented_display)

    return segmented_display

def compute_metrics(pred_mask, true_mask_path):
    true_mask = cv2.imread(true_mask_path, cv2.IMREAD_GRAYSCALE)  # Read as grayscale
    if true_mask is None:
        raise ValueError(f"Error: Could not load {true_mask_path}")

    # Convert to binary masks
    pred_mask = (pred_mask > 0).astype(np.uint8)
    true_mask = (true_mask > 0).astype(np.uint8)

    # Compute IoU
    intersection = np.logical_and(pred_mask, true_mask).sum()
    union = np.logical_or(pred_mask, true_mask).sum()
    iou = intersection / union if union > 0 else 0

    # Compute Dice
    dice = (2 * intersection) / (pred_mask.sum() + true_mask.sum()) if (pred_mask.sum() + true_mask.sum()) > 0 else 0

    return {"IoU": iou, "Dice": dice}

To test on individual images:

In [None]:
#pred_mask = segment_mask_region('../MSFD/1/face_crop/000007_1.jpg', viz=True)
pred_mask = segment_region_growing(image_path='../MSFD/1/face_crop/000007_1.jpg', viz=True, tolerance=0.19)
true_mask_path = '../MSFD/1/face_crop_segmentation/000007_1.jpg'
print(compute_metrics(pred_mask, true_mask_path))

Below we attemp to find average IOU and Dice scores across all images.

In [None]:
avg_iou = 0
avg_dice = 0
count = 0  # Count of valid images

# Process each image
for img in os.listdir(input_folder):
    if img.lower().endswith(('.png', '.jpg', '.jpeg')):  # Filter images
        img_path = os.path.join(input_folder, img)
        gt_mask_path = os.path.join(ground_truth_folder, img)  # Assuming same filename

        if not os.path.exists(gt_mask_path):
            print(f"Skipping {img} (No ground truth found)")
            continue
        
        predicted_segment = segment_region_growing(image_path=img_path, tolerance=0.15)  # Generate mask
        
        # Load ground truth mask
        true_mask = cv2.imread(gt_mask_path, cv2.IMREAD_GRAYSCALE)
        
        if true_mask is None:
            print(f"Skipping {img} (Cannot load ground truth mask)")
            continue
        
        # Check shape compatibility
        if predicted_segment.shape != true_mask.shape:
            print(f"Skipping {img} (Shape mismatch: {predicted_segment.shape} vs {true_mask.shape})")
            continue
        
        metrics = compute_metrics(predicted_segment, gt_mask_path)  # Compute IoU & Dice
        
        if metrics:  # If computation was successful
            avg_iou += metrics["IoU"]
            avg_dice += metrics["Dice"]
            count += 1  

if count > 0:
    avg_iou /= count
    avg_dice /= count
    print(f"\nAverage IoU: {avg_iou:.4f}, Average Dice: {avg_dice:.4f}")
else:
    print("\nNo valid images processed.")


Part d
______________________________________________________________________________________________________________________

In [1]:
%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

Looking in indexes: https://download.pytorch.org/whl/cu118
^C
Note: you may need to restart the kernel to use updated packages.


In [None]:
# ...existing code...

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
import os
from PIL import Image
import numpy as np

def evaluate_model(model, data_loader, device, criterion=None):
    model.eval()
    dice_list, iou_list, prec_list = [], [], []
    recall_list, f1_list = [], []
    loss_list = []

    with torch.no_grad():
        for images, masks in data_loader:
            images, masks = images.to(device), masks.to(device)
            logits = model(images)
            preds = torch.sigmoid(logits) > 0.5

            # Compute loss if a criterion is provided
            if criterion is not None:
                loss_val = criterion(logits, masks)
                loss_list.append(loss_val.item())

            intersection = (preds & (masks > 0.5)).float().sum(dim=(1,2,3))
            union = (preds | (masks > 0.5)).float().sum(dim=(1,2,3))

            dice = 2.0 * intersection / (preds.sum(dim=(1,2,3)) + masks.sum(dim=(1,2,3)) + 1e-8)
            iou = intersection / (union + 1e-8)
            precision = intersection / (preds.sum(dim=(1,2,3)) + 1e-8)
            recall = intersection / ((masks > 0.5).float().sum(dim=(1,2,3)) + 1e-8)
            f1 = 2 * precision * recall / (precision + recall + 1e-8)

            dice_list.extend(dice.cpu().numpy())
            iou_list.extend(iou.cpu().numpy())
            prec_list.extend(precision.cpu().numpy())
            recall_list.extend(recall.cpu().numpy())
            f1_list.extend(f1.cpu().numpy())

    metrics = {
        "Dice": np.mean(dice_list),
        "IoU": np.mean(iou_list),
        "Precision": np.mean(prec_list),
        "Recall": np.mean(recall_list),
        "F1": np.mean(f1_list)
    }
    if criterion is not None and len(loss_list) > 0:
        metrics["Loss"] = np.mean(loss_list)
    return metrics

class SimpleUNet(nn.Module):
    def __init__(self, num_classes=1):
        super(SimpleUNet, self).__init__()
        self.enc1 = nn.Sequential(nn.Conv2d(3, 64, 3, padding=1), nn.ReLU(inplace=True))
        self.enc2 = nn.Sequential(nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(inplace=True))
        self.dec1 = nn.Sequential(nn.ConvTranspose2d(128, 64, 2, stride=2), nn.ReLU(inplace=True))
        self.outc = nn.Conv2d(64, num_classes, 1)
        
    def forward(self, x):
        x1 = self.enc1(x)
        x2 = nn.MaxPool2d(2)(x1)
        x2 = self.enc2(x2)
        x3 = self.dec1(x2)
        x_out = self.outc(x3)
        return x_out

class FaceMaskDataset(Dataset):
    def __init__(self, img_dir, seg_dir, transform=None):
        self.img_paths = sorted([os.path.join(img_dir, f) for f in os.listdir(img_dir)])
        self.seg_paths = sorted([os.path.join(seg_dir, f) for f in os.listdir(seg_dir)])
        self.transform = transform

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        image = Image.open(self.img_paths[idx]).convert('RGB')
        mask = Image.open(self.seg_paths[idx]).convert('L')
        if self.transform:
            image = self.transform(image)
            mask = self.transform(mask)
        return image, mask

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define transforms
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor()
])

# Create dataset
full_dataset = FaceMaskDataset(
    img_dir="../MSFD/1/img",
    seg_dir="../MSFD/1/face_crop_segmentation",
    transform=transform
)

# Split into train/val/test (for example, 70/20/10)
total_len = len(full_dataset)
train_len = int(0.7 * total_len)
val_len = int(0.2 * total_len)
test_len = total_len - train_len - val_len
train_dataset, val_dataset, test_dataset = random_split(full_dataset, [train_len, val_len, test_len])

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=4, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=4, shuffle=False)

model = SimpleUNet(num_classes=1).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

num_epochs = 5
best_val_dice = 0.0
best_model_path = "best_face_mask_unet.pth"

for epoch in range(num_epochs):
    model.train()
    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()

    # Evaluate on validation set
    val_metrics = evaluate_model(model, val_loader, device)
    print(f"Epoch {epoch + 1}/{num_epochs} - Val Metrics:", val_metrics)
    if val_metrics["Dice"] > best_val_dice:
        best_val_dice = val_metrics["Dice"]
        torch.save(model.state_dict(), best_model_path)

# Load best model and compute final test metrics
model.load_state_dict(torch.load(best_model_path))
test_metrics = evaluate_model(model, test_loader, device)
print("Test Metrics:", test_metrics)

Epoch 1/5 - Val Metrics: {'Dice': np.float32(0.0020804573), 'IoU': np.float32(0.0011399745), 'Precision': np.float32(0.030992325)}
Epoch 2/5 - Val Metrics: {'Dice': np.float32(0.020634636), 'IoU': np.float32(0.012148887), 'Precision': np.float32(0.15607879)}
Epoch 3/5 - Val Metrics: {'Dice': np.float32(0.017132705), 'IoU': np.float32(0.010042136), 'Precision': np.float32(0.18905637)}
Epoch 4/5 - Val Metrics: {'Dice': np.float32(0.036102004), 'IoU': np.float32(0.021313338), 'Precision': np.float32(0.34320235)}
Epoch 5/5 - Val Metrics: {'Dice': np.float32(0.020815808), 'IoU': np.float32(0.012030196), 'Precision': np.float32(0.32748264)}
Test Metrics: {'Dice': np.float32(0.035041634), 'IoU': np.float32(0.020639274), 'Precision': np.float32(0.35789925)}


In [7]:
def evaluate_model(model, data_loader, device, criterion=None):
    model.eval()
    dice_list, iou_list, prec_list = [], [], []
    recall_list, f1_list = [], []
    loss_list = []

    with torch.no_grad():
        for images, masks in data_loader:
            images, masks = images.to(device), masks.to(device)
            logits = model(images)
            preds = torch.sigmoid(logits) > 0.5

            # Compute loss if a criterion is provided
            if criterion is not None:
                loss_val = criterion(logits, masks)
                loss_list.append(loss_val.item())

            intersection = (preds & (masks > 0.5)).float().sum(dim=(1,2,3))
            union = (preds | (masks > 0.5)).float().sum(dim=(1,2,3))

            dice = 2.0 * intersection / (preds.sum(dim=(1,2,3)) + masks.sum(dim=(1,2,3)) + 1e-8)
            iou = intersection / (union + 1e-8)
            precision = intersection / (preds.sum(dim=(1,2,3)) + 1e-8)
            recall = intersection / ((masks > 0.5).float().sum(dim=(1,2,3)) + 1e-8)
            f1 = 2 * precision * recall / (precision + recall + 1e-8)

            dice_list.extend(dice.cpu().numpy())
            iou_list.extend(iou.cpu().numpy())
            prec_list.extend(precision.cpu().numpy())
            recall_list.extend(recall.cpu().numpy())
            f1_list.extend(f1.cpu().numpy())

    metrics = {
        "Dice": np.mean(dice_list),
        "IoU": np.mean(iou_list),
        "Precision": np.mean(prec_list),
        "Recall": np.mean(recall_list),
        "F1": np.mean(f1_list)
    }
    if criterion is not None and len(loss_list) > 0:
        metrics["Loss"] = np.mean(loss_list)
    return metrics

model.load_state_dict(torch.load(best_model_path))

# Continue training for 10 more epochs
more_epochs = 10
for epoch in range(more_epochs):
    model.train()
    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()

    val_metrics = evaluate_model(model, val_loader, device)
    print(f"Extended Epoch {epoch + 1}/{more_epochs} - Val Metrics:", val_metrics)
    if val_metrics["Dice"] > best_val_dice:
        best_val_dice = val_metrics["Dice"]
        torch.save(model.state_dict(), best_model_path)

# Test with the new best model
model.load_state_dict(torch.load(best_model_path))
test_metrics2 = evaluate_model(model, test_loader, device)
print("New Test Metrics:", test_metrics2)

Extended Epoch 1/10 - Val Metrics: {'Dice': np.float32(0.049665358), 'IoU': np.float32(0.029051635), 'Precision': np.float32(0.41305828), 'Recall': np.float32(0.034094), 'F1': np.float32(0.049328446)}
Extended Epoch 2/10 - Val Metrics: {'Dice': np.float32(0.018553255), 'IoU': np.float32(0.010536993), 'Precision': np.float32(0.3385195), 'Recall': np.float32(0.011891935), 'F1': np.float32(0.018424)}
Extended Epoch 3/10 - Val Metrics: {'Dice': np.float32(0.042415082), 'IoU': np.float32(0.02445732), 'Precision': np.float32(0.40350398), 'Recall': np.float32(0.028059736), 'F1': np.float32(0.042117607)}
Extended Epoch 4/10 - Val Metrics: {'Dice': np.float32(0.019119015), 'IoU': np.float32(0.010661044), 'Precision': np.float32(0.35684618), 'Recall': np.float32(0.011836304), 'F1': np.float32(0.01898374)}
Extended Epoch 5/10 - Val Metrics: {'Dice': np.float32(0.023304809), 'IoU': np.float32(0.013222252), 'Precision': np.float32(0.36518505), 'Recall': np.float32(0.014911198), 'F1': np.float32(0.0

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import (Conv2D, MaxPooling2D, Conv2DTranspose, Concatenate,
                                     Dropout)
from sklearn.model_selection import train_test_split

# Example placeholders for images/masks arrays:
# images, masks = load_your_data()  # (N, H, W, 3), (N, H, W, 1) for example

def dice_coefficient(y_true, y_pred, smooth=1e-6):
    y_true_f = tf.keras.backend.flatten(tf.cast(y_true, tf.float32))
    y_pred_f = tf.keras.backend.flatten(tf.cast(y_pred, tf.float32))
    intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (
        tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) + smooth
    )

def dice_loss(y_true, y_pred):
    return 1 - dice_coefficient(y_true, y_pred)

def bce_dice_loss(y_true, y_pred):
    return tf.keras.losses.BinaryCrossentropy()(y_true, y_pred) + dice_loss(y_true, y_pred)

def unet(input_shape, output_layer=1):
    inputs = Input(input_shape)
    # Encoder
    c1 = Conv2D(16, (3,3), activation='relu', padding='same')(inputs)
    c1 = Conv2D(16, (3,3), activation='relu', padding='same')(c1)
    p1 = MaxPooling2D((2,2))(c1)

    c2 = Conv2D(32, (3,3), activation='relu', padding='same')(p1)
    c2 = Conv2D(32, (3,3), activation='relu', padding='same')(c2)
    p2 = MaxPooling2D((2,2))(c2)

    c3 = Conv2D(64, (3,3), activation='relu', padding='same')(p2)
    c3 = Conv2D(64, (3,3), activation='relu', padding='same')(c3)
    p3 = MaxPooling2D((2,2))(c3)

    c4 = Conv2D(128, (3,3), activation='relu', padding='same')(p3)
    c4 = Conv2D(128, (3,3), activation='relu', padding='same')(c4)
    p4 = MaxPooling2D((2,2))(c4)

    # Bottom
    c5 = Conv2D(256, (3,3), activation='relu', padding='same')(p4)
    c5 = Conv2D(256, (3,3), activation='relu', padding='same')(c5)

    # Decoder
    u6 = Conv2DTranspose(128, (2,2), strides=(2,2), padding='same')(c5)
    x6 = Concatenate()([c4, u6])
    x6 = Dropout(0.2)(x6)
    c6 = Conv2D(128, (3,3), activation='relu', padding='same')(x6)
    c6 = Conv2D(128, (3,3), activation='relu', padding='same')(c6)

    u7 = Conv2DTranspose(64, (2,2), strides=(2,2), padding='same')(c6)
    x7 = Concatenate()([c3, u7])
    x7 = Dropout(0.2)(x7)
    c7 = Conv2D(64, (3,3), activation='relu', padding='same')(x7)
    c7 = Conv2D(64, (3,3), activation='relu', padding='same')(c7)

    u8 = Conv2DTranspose(32, (2,2), strides=(2,2), padding='same')(c7)
    x8 = Concatenate()([c2, u8])
    x8 = Dropout(0.1)(x8)
    c8 = Conv2D(32, (3,3), activation='relu', padding='same')(x8)
    c8 = Conv2D(32, (3,3), activation='relu', padding='same')(c8)

    u9 = Conv2DTranspose(16, (2,2), strides=(2,2), padding='same')(c8)
    x9 = Concatenate()([c1, u9])
    x9 = Dropout(0.1)(x9)
    c9 = Conv2D(16, (3,3), activation='relu', padding='same')(x9)
    c9 = Conv2D(16, (3,3), activation='relu', padding='same')(c9)

    outputs = Conv2D(output_layer, (1,1), activation='sigmoid')(c9)
    return Model(inputs=[inputs], outputs=[outputs])

# Example data splitting
# images, masks = load_images(), load_masks() # shape e.g. (N, 256, 256, 3), (N, 256, 256, 1)
# train_images, val_images, train_masks, val_masks = train_test_split(images, masks, test_size=0.2, random_state=87)
# train_images, test_images, train_masks, test_masks = train_test_split(train_images, train_masks, test_size=0.2, random_state=87)

model = unet((256,256,3))
optimizer = Adam(learning_rate=2e-4)
model.compile(optimizer=optimizer, loss=bce_dice_loss, metrics=[dice_coefficient])

# Example training
# from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau
# checkpoint = ModelCheckpoint("best.h5", monitor="val_dice_coefficient", save_best_only=True, mode="max")
# lr_scheduler = ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, mode="min", verbose=1)
# history = model.fit(
#     train_images, train_masks,
#     batch_size=4,
#     epochs=20,
#     validation_data=(val_images, val_masks),
#     callbacks=[checkpoint, lr_scheduler]
# )
# model.load_weights("best.h5")
# test_results = model.evaluate(test_images, test_masks, verbose=0)
# print("Test Results:", test_results)

Part d [continued....]

In [None]:
import os
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
import cv2
import random
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from tensorflow.image import flip_left_right, flip_up_down, adjust_brightness
from PIL import Image

Data Loading and Preprocessing

In this section, we define functions to load and preprocess the data. The `load_data` function retrieves image filenames from the specified folder, while the `preprocess_data` function resizes the images and masks to the desired input size and applies optional data augmentation techniques such as flipping.

The preprocessed data is then split into training, validation, and test sets for model training and evaluation.


In [None]:
#load data
def load_data(folder_path):
    folder=os.listdir(folder_path)
    images=[]
    for filename in folder:
        if(filename not in ["000601_1.jpg"]):
            images.append(filename)
    images = sorted(images)[:6000]
    return np.array(images)

### change paths to the correct

In [None]:
#change path
image_filenames=load_data("../MSFD/1/face_crop") 
mask_filenames = load_data("../MSFD/1/face_crop_segmentation")


```markdown
### Display Data Function

The `display_data` function is designed to visualize pairs of images and their corresponding masks. It takes the following parameters:

- `actual_img`: Path to the folder containing the original images.
- `mask_img`: Path to the folder containing the mask images.
- `image_paths`: List of filenames for the images.
- `mask_paths`: List of filenames for the masks.

#### Functionality:
1. Creates a grid of subplots with 5 rows and 2 columns to display 5 image-mask pairs.
2. Iterates through the provided image and mask paths:
    - Reads the image and mask using `plt.imread`.
    - Displays the image in the left column and the corresponding mask in the right column.
3. Adjusts the layout for better spacing using `plt.tight_layout`.
4. Displays the plot using `plt.show`.

This function is useful for visually inspecting the dataset to ensure the images and masks are correctly aligned and formatted.
```

In [None]:
def display_data(actual_img, mask_img, image_paths, mask_paths):

    fig, axes = plt.subplots(5, 2, figsize=(10, 20))
    for i, (image_path, mask_path) in enumerate(zip(image_paths, mask_paths)):
        image = plt.imread(actual_img + image_path)
        mask = plt.imread(mask_img + mask_path)
        print(image_path, mask_path)
        axes[i, 0].imshow(image)
        axes[i, 0].set_title('Image')
        axes[i, 0].axis('off')

        axes[i, 1].imshow(mask)
        axes[i, 1].set_title('Mask')
        axes[i, 1].axis('off')

    # Adjust the spacing between subplots
    plt.tight_layout()

    # Show the plot
    plt.show()
    return

```markdown
### Preprocessing Function

The `preprocess_data` function is designed to prepare image and mask data for training and evaluation. It performs the following steps:

#### Parameters:
- `actual_img`: Path to the folder containing the original images.
- `mask_img`: Path to the folder containing the mask images.
- `image_names`: List of filenames for the images.
- `mask_names`: List of filenames for the masks.
- `input_size`: Tuple specifying the target size for resizing the images and masks (e.g., `(256, 256)`).
- `augmented`: Boolean flag to indicate whether data augmentation (flipping) should be applied.

#### Functionality:
1. **Image and Mask Loading**:
    - Reads each image and its corresponding mask using `load_img`.
    - Resizes them to the specified `input_size`.
    - Converts the images to RGB and masks to grayscale.

2. **Normalization**:
    - Normalizes the image pixel values to the range `[0, 1]`.
    - Converts the mask to a boolean array.

3. **Data Augmentation** (if `augmented=True`):
    - Applies horizontal and vertical flipping to both images and masks to increase dataset diversity.

4. **Conversion to Numpy Arrays**:
    - Converts the lists of images and masks into numpy arrays for efficient processing.

#### Returns:
- `images`: Numpy array of preprocessed images.
- `masks`: Numpy array of preprocessed masks.

This function ensures that the data is properly formatted and optionally augmented for training deep learning models.
```

In [None]:
#preprocessing
def preprocess_data(actual_img, mask_img, image_names, mask_names, input_size, augmented=False):

    images = []
    masks = []
    for img_file, mask_file in zip(image_names, mask_names):
        img = load_img(actual_img + img_file, target_size=input_size, color_mode='rgb')
        mask = load_img(mask_img + mask_file, target_size=input_size, color_mode='grayscale')

        # Convert image and mask to arrays
        img_array = img_to_array(img)
        img_array = img_array / 255.0

        mask_array = img_to_array(mask, dtype=np.bool_)

        # Append images and masks to the lists
        images.append(img_array)
        masks.append(mask_array)

        if augmented:
            images.append(flip_left_right(img_array))
            masks.append(flip_left_right(mask_array))

            images.append(flip_up_down(img_array))
            masks.append(flip_up_down(mask_array))

    # Convert lists to numpy arrays
    images = np.array(images)
    masks = np.array(masks)

    return images, masks

### change to correct path

In [None]:
INPUT_SIZE = (256, 256)
INPUT_SHAPE = (256, 256, 3)

images,masks=preprocess_data("../MSFD/1/face_crop/","../MSFD/1/face_crop_segmentation/",image_filenames,mask_filenames,INPUT_SIZE)
print('Shape of image data: ' + str(images.shape))
print('Shape of mask data: ' + str(masks.shape))
# Split the dataset into training and validation sets
train_images, val_images, train_masks, val_masks = train_test_split(images, masks, test_size=0.1, random_state=87)
train_images, test_images, train_masks, test_masks = train_test_split(train_images, train_masks, test_size=0.1, random_state=87)

In [None]:
lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(factor=1/5, patience=3, verbose=1)
checkpoint = tf.kerasa.callbacks.ModelCheckpoint('save_best.keras', verbose=1, save_best_only=True)


### function to calculate dice score


In [None]:
def dice_coefficient(y_true, y_pred, smooth=1e-6):
    y_true = tf.cast(y_true, tf.float32)  # Convert y_true to float32
    y_pred = tf.cast(y_pred, tf.float32)
    y_true_f = tf.keras.backend.flatten(y_true)
    y_pred_f = tf.keras.backend.flatten(y_pred)
    intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) + smooth)

def dice_loss(y_true, y_pred):
    return 1 - dice_coefficient(y_true, y_pred)

def bce_dice_loss(y_true, y_pred):
    return tf.keras.losses.BinaryCrossentropy()(y_true, y_pred) + dice_loss(y_true, y_pred)

U-Net Model Implementation

The following code defines a U-Net model architecture using TensorFlow and Keras. U-Net is a popular convolutional neural network architecture for image segmentation tasks.

Key Components:
1. **Imports**:
    - TensorFlow and Keras modules are imported for building the model.
    - Layers such as `Conv2D`, `MaxPooling2D`, `Dropout`, `UpSampling2D`, `Concatenate`, and `Conv2DTranspose` are used to construct the U-Net.

2. **Function Definition**:
    - The `unet` function takes two parameters:
      - `input_shape`: The shape of the input images (e.g., `(256, 256, 3)` for RGB images).
      - `output_layer`: The number of output channels (e.g., `1` for binary segmentation).

3. **Architecture**:
    - **Encoder**:
      - Consists of convolutional layers (`Conv2D`) followed by max-pooling layers (`MaxPooling2D`).
      - Each block doubles the number of filters, starting from 16.
    - **Bottleneck**:
      - The deepest part of the network with the highest number of filters (256).
    - **Decoder**:
      - Uses transposed convolutional layers (`Conv2DTranspose`) for upsampling.
      - Skip connections (`Concatenate`) are used to combine features from the encoder with the decoder.

4. **Output**:
    - The final layer uses a `Conv2D` layer with a sigmoid activation function to produce the segmentation mask.

Usage:
- Call the `unet` function with the desired input shape and output layer to create the model.
- Compile the model with an appropriate loss function and optimizer before training.
```python
model = unet((256, 256, 3), output_layer=1)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
```

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Dropout, UpSampling2D, Concatenate, Conv2DTranspose

def unet(input_shape, output_layer):
    inputs = Input(input_shape)

    # Encoder
    conv1 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(inputs)
    #conv1 = tf.keras.layers.BatchNormalization()(conv1)
    conv1 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv1)
    #conv1 = tf.keras.layers.BatchNormalization()(conv1)
    #conv1 = Dropout(0.1)(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(pool1)
    #conv2 = tf.keras.layers.BatchNormalization()(conv2)
    conv2 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv2)
    #conv2 = tf.keras.layers.BatchNormalization()(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    #conv2 = Dropout(0.1)(conv2)

    conv3 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(pool2)
    #conv3 = tf.keras.layers.BatchNormalization()(conv3)
    conv3 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv3)
    #conv3 = tf.keras.layers.BatchNormalization()(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
    #conv3 = Dropout(0.2)(conv3)

    conv4 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(pool3)
    #conv4 = tf.keras.layers.BatchNormalization()(conv4)
    conv4 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv4)
    #conv4 = tf.keras.layers.BatchNormalization()(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(conv4)
    #conv4 = Dropout(0.2)(conv4)

    # Bottom
    conv5 = Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(pool4)
    #conv5 = tf.keras.layers.BatchNormalization()(conv5)
    conv5 = Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv5)
    #conv5 = tf.keras.layers.BatchNormalization()(conv5)
    #conv5 = Dropout(0.3)(conv5)

    # Decoder
    up6 = Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(conv5)
    merge6 = Concatenate()([conv4, up6])
    conv6 = Dropout(0.2)(merge6)
    conv6 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv6)
    #conv6 = tf.keras.layers.BatchNormalization()(conv6)
    conv6 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv6)
    #conv6 = tf.keras.layers.BatchNormalization()(conv6)

    up7 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv6)
    merge7 = Concatenate()([conv3, up7])
    conv7 = Dropout(0.2)(merge7)
    conv7 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv7)
    #conv7 = tf.keras.layers.BatchNormalization()(conv7)
    conv7 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv7)
    #conv7 = tf.keras.layers.BatchNormalization()(conv7)

    up8 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(conv7)
    merge8 = Concatenate()([conv2, up8])
    conv8 = Dropout(0.1)(merge8)
    conv8 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv8)
    #conv8 = tf.keras.layers.BatchNormalization()(conv8)
    conv8 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv8)
    #conv8 = tf.keras.layers.BatchNormalization()(conv8)

    up9 = Conv2DTranspose(16, (2, 2), strides=(2, 2), padding='same')(conv8)
    merge9 = Concatenate()([conv1, up9])
    conv9 = Dropout(0.1)(merge9)
    conv9 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv9)
    #conv9 = tf.keras.layers.BatchNormalization()(conv9)
    conv9 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(conv9)
    #conv9 = tf.keras.layers.BatchNormalization()(conv9)

    # Output
    output = Conv2D(output_layer, (1, 1), activation='sigmoid')(conv9)

    model = Model(inputs=inputs, outputs=output)
    return model

# compiling model and setting learning rate

In [None]:
model = unet(INPUT_SHAPE, output_layer=1)
optimizer = Adam(learning_rate=2e-4)

# Compile the model with the optimizer instance
model.compile(optimizer=optimizer, loss=bce_dice_loss, metrics=[dice_coefficient])

In [None]:
gpus = tf.config.list_physical_devices('GPU')
print(gpus)

training

In [None]:
epochs = 20
history = model.fit(train_images, train_masks, batch_size=4, epochs=epochs, validation_data=(val_images, val_masks), callbacks=[checkpoint,lr_scheduler])

## Model being stored so that we don't have to train again and again

In [None]:
from tensorflow.keras.models import load_model

# Load the best saved model after training
best_model = load_model('save_best.keras',custom_objects={'bce_dice_loss': bce_dice_loss, 'dice_coefficient': dice_coefficient})
best_model.save('best_model.h5')
eval = best_model.evaluate(test_images, test_masks)
print('Test accuracy: ' + "{:.2f}".format(eval[1]))

# load the saved model due to prior interuption
model = load_model('best_model.h5',custom_objects={'bce_dice_loss': bce_dice_loss, 'dice_coefficient': dice_coefficient})

showing our results on te images, 5 random iamges are being shown

In [None]:
# display 5 random predictions
random.seed(87)
predictions = model.predict(test_images)
predictions = (predictions > 0.5).astype(np.uint8)

fig, axes = plt.subplots(5, 4, figsize=(5, 3*5))

# Iterate over the image and mask pairs and display them in subplots
for i in range(5):

    image = (test_images[i] * 255).astype(np.uint8)
    mask = predictions[i]
    ground_truth = test_masks[i] * np.array([255, 255, 255]) # convert the forground into yellow color to achieve the desired aesthetic

    overlay = image.copy()

    mask = np.repeat(mask, 3, axis=2) # matching the size of the channel of the mask and the image to perform an overlay
    inverted_mask = 1 - mask

    yellow_mask = np.array([255, 255, 255]) * mask

    # Apply the mask on the image
    result = image * inverted_mask + yellow_mask
    alpha = 0.2
    predicted_overlay = cv2.addWeighted(overlay, alpha, result.astype(overlay.dtype), 1 - alpha, 0)

    # Plot the image and mask in the corresponding subplot
    axes[i, 0].imshow(image)
    axes[i, 0].set_title('Original')
    axes[i, 0].axis('off')
    
    axes[i, 1].imshow(ground_truth)
    axes[i, 1].set_title('Ground Truth')
    axes[i, 1].axis('off')
    
    axes[i, 2].imshow(yellow_mask)
    axes[i, 2].set_title('Predicted')
    axes[i, 2].axis('off')
    
    axes[i, 3].imshow(predicted_overlay)
    axes[i, 3].set_title('Predicted Overlay')
    axes[i, 3].axis('off')
    
# Adjust the spacing between subplots
plt.tight_layout()
plt.savefig('../images/result.png', bbox_inches='tight')  # Save as PNG image

# # Show the plot
# plt.show()

In [None]:
predictions=model.predict(test_images)
predictions = (predictions > 0.5).astype(np.uint8)

showing model scores

In [None]:
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, jaccard_score

def flatten_masks(masks):
    return masks.reshape(masks.shape[0], -1)  # Flatten each image in the batch

def dice_score(y_true, y_pred):
    intersection = np.sum(y_true * y_pred)
    return (2.0 * intersection) / (np.sum(y_true) + np.sum(y_pred) + 1e-7)  # Add small epsilon to avoid division by zero

# Flatten the masks and predictions
flat_predictions = flatten_masks(predictions)
flat_test_masks = flatten_masks(test_masks)

total_accuracy = 0
total_precision = 0
total_recall = 0
total_iou = 0
total_dice = 0
num_samples = predictions.shape[0]

# Loop through each sample and compute the metrics
for i in range(num_samples):
    y_true = flat_test_masks[i]
    y_pred = flat_predictions[i]
    
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    iou = jaccard_score(y_true, y_pred, zero_division=0)
    dice = dice_score(y_true, y_pred)
    
    total_accuracy += accuracy
    total_precision += precision
    total_recall += recall
    total_iou += iou
    total_dice += dice

# Compute average metrics
avg_accuracy = total_accuracy / num_samples
avg_precision = total_precision / num_samples
avg_recall = total_recall / num_samples
avg_iou = total_iou / num_samples
avg_dice = total_dice / num_samples

# Print the results
print(f'Average Accuracy: {avg_accuracy:.4f}')
print(f'Average Precision: {avg_precision:.4f}')
print(f'Average Recall: {avg_recall:.4f}')
print(f'Average IoU: {avg_iou:.4f}')
print(f'Average Dice Score: {avg_dice:.4f}')    