In [None]:
# Importing libraries
import os
import yaml
import torch
import pandas as pd
import torch.nn as nn
import torch.optim as optim

from glob import glob
from tqdm import tqdm
from collections import OrderedDict
from albumentations.augmentations import transforms
from sklearn.model_selection import train_test_split
from albumentations.core.composition import Compose, OneOf

# from train import train, validate
# from source.network import UNetPP
# from source.dataset import DataSet

In [None]:
import os
import cv2
import numpy as np
import torch.utils.data

class DataSet(torch.utils.data.Dataset):
    def __init__(self, img_ids, img_dir, mask_dir, img_ext, mask_ext, transform=None):
        # Initialize the dataset with image and mask file information
        self.img_ids = img_ids
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.img_ext = img_ext
        self.mask_ext = mask_ext
        self.transform = transform  # Data augmentation or transformation

    def __len__(self):
        # Return the number of samples in the dataset
        return len(self.img_ids)

    def __getitem__(self, idx):
        # Get a specific sample (image and mask) from the dataset

        img_id = self.img_ids[idx]  # Get the image ID

        # Load the image using OpenCV
        img = cv2.imread(os.path.join(self.img_dir, img_id + self.img_ext))

        mask = []  # Initialize a list to store mask(s)
        
        # Load the mask image as grayscale and add it to the list
        mask.append(cv2.imread(os.path.join(self.mask_dir, img_id + self.mask_ext), cv2.IMREAD_GRAYSCALE)[..., None])
        
        # Stack the mask(s) along the depth dimension to form a multi-channel mask
        mask = np.dstack(mask)

        if self.transform is not None:
            # If data augmentation or transformation is provided, apply it to both image and mask
            augmented = self.transform(image=img, mask=mask)
            img = augmented['image']
            mask = augmented['mask']

        # Normalize and transpose the image and mask data for compatibility with PyTorch
        img = img.astype('float32') / 255
        img = img.transpose(2, 0, 1)  # Transpose image dimensions (channels-first)
        mask = mask.astype('float32') / 255
        mask = mask.transpose(2, 0, 1)  # Transpose mask dimensions

        # Return the image, mask, and additional information as a dictionary
        return img, mask, {'img_id': img_id}


In [None]:
import torch
from torch import nn

class VGGBlock(nn.Module):
    def __init__(self, in_channels, middle_channels, out_channels):
        super().__init__()
        # Define a VGG-style block with convolutional layers, ReLU activation, and batch normalization
        self.relu = nn.ReLU(inplace=True)
        self.conv1 = nn.Conv2d(in_channels, middle_channels, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(middle_channels)
        self.conv2 = nn.Conv2d(middle_channels, out_channels, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        # Forward pass for the VGG block
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)
        return out

class UNetPP(nn.Module):
    def __init__(self, num_classes, input_channels=3, deep_supervision=False, **kwargs):
        super().__init__()

        nb_filter = [32, 64, 128, 256, 512]

        self.deep_supervision = deep_supervision

        self.pool = nn.MaxPool2d(2, 2)
        self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

        # Define VGG blocks at different stages
        self.conv0_0 = VGGBlock(input_channels, nb_filter[0], nb_filter[0])
        self.conv1_0 = VGGBlock(nb_filter[0], nb_filter[1], nb_filter[1])
        self.conv2_0 = VGGBlock(nb_filter[1], nb_filter[2], nb_filter[2])
        self.conv3_0 = VGGBlock(nb_filter[2], nb_filter[3], nb_filter[3])
        self.conv4_0 = VGGBlock(nb_filter[3], nb_filter[4], nb_filter[4])

        # Define lateral connections and additional VGG blocks
        self.conv0_1 = VGGBlock(nb_filter[0]+nb_filter[1], nb_filter[0], nb_filter[0])
        self.conv1_1 = VGGBlock(nb_filter[1]+nb_filter[2], nb_filter[1], nb_filter[1])
        self.conv2_1 = VGGBlock(nb_filter[2]+nb_filter[3], nb_filter[2], nb_filter[2])
        self.conv3_1 = VGGBlock(nb_filter[3]+nb_filter[4], nb_filter[3], nb_filter[3])

        self.conv0_2 = VGGBlock(nb_filter[0]*2+nb_filter[1], nb_filter[0], nb_filter[0])
        self.conv1_2 = VGGBlock(nb_filter[1]*2+nb_filter[2], nb_filter[1], nb_filter[1])
        self.conv2_2 = VGGBlock(nb_filter[2]*2+nb_filter[3], nb_filter[2], nb_filter[2])

        self.conv0_3 = VGGBlock(nb_filter[0]*3+nb_filter[1], nb_filter[0], nb_filter[0])
        self.conv1_3 = VGGBlock(nb_filter[1]*3+nb_filter[2], nb_filter[1], nb_filter[1])

        self.conv0_4 = VGGBlock(nb_filter[0]*4+nb_filter[1], nb_filter[0], nb_filter[0])

        if self.deep_supervision:
            # If deep supervision is enabled, define multiple final convolution layers
            self.final1 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
            self.final2 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
            self.final3 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
            self.final4 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
        else:
            # If deep supervision is not used, define a single final convolution layer
            self.final = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)

    def forward(self, input):
        # Forward pass for the UNet++
        x0_0 = self.conv0_0(input)
        x1_0 = self.conv1_0(self.pool(x0_0))
        x0_1 = self.conv0_1(torch.cat([x0_0, self.up(x1_0)], 1))

        x2_0 = self.conv2_0(self.pool(x1_0))
        x1_1 = self.conv1_1(torch.cat([x1_0, self.up(x2_0)], 1))
        x0_2 = self.conv0_2(torch.cat([x0_0, x0_1, self.up(x1_1)], 1))

        x3_0 = self.conv3_0(self.pool(x2_0))
        x2_1 = self.conv2_1(torch.cat([x2_0, self.up(x3_0)], 1))
        x1_2 = self.conv1_2(torch.cat([x1_0, x1_1, self.up(x2_2)], 1))
        x0_3 = self.conv0_3(torch.cat([x0_0, x0_1, x0_2, self.up(x1_2)], 1))

        x4_0 = self.conv4_0(self.pool(x3_0))
        x3_1 = self.conv3_1(torch.cat([x3_0, self.up(x4_0)], 1))
        x2_2 = self.conv2_2(torch.cat([x2_0, x2_1, self.up(x3_1)], 1))
        x1_3 = self.conv1_3(torch.cat([x1_0, x1_1, x1_2, self.up(x2_2)], 1))
        x0_4 = self.conv0_4(torch.cat([x0_0, x0_1, x0_2, x0_3, self.up(x1_3)], 1))

        if self.deep_supervision:
            # If using deep supervision, return multiple output tensors
            output1 = self.final1(x0_1)
            output2 = self.final2(x0_2)
            output3 = self.final3(x0_3)
            output4 = self.final4(x0_4)
            return [output1, output2, output3, output4]
        else:
            # If not using deep supervision, return a single output tensor
            output = self.final(x0_4)
            return output


In [None]:
import torch

class AverageMeter(object):
    """Computes and stores the average and current value"""

    def __init__(self):
        self.reset()

    def reset(self):
        # Reset the values to initial states
        self.val = 0  # Current value
        self.avg = 0  # Running average
        self.sum = 0  # Sum of values
        self.count = 0  # Number of values

    def update(self, val, n=1):
        # Update the average with a new value
        self.val = val
        self.sum += val * n  # Add the new value to the sum
        self.count += n  # Increment the count
        self.avg = self.sum / self.count  # Recalculate the average

def iou_score(output, target):
    smooth = 1e-5  # A small value to avoid division by zero

    if torch.is_tensor(output):
        # If the output is a PyTorch tensor, convert it to a NumPy array
        output = torch.sigmoid(output).data.cpu().numpy()
    if torch.is_tensor(target):
        # If the target is a PyTorch tensor, convert it to a NumPy array
        target = target.data.cpu().numpy()

    # Convert output and target to binary masks (True/False)
    output_ = output > 0.5
    target_ = target > 0.5

    # Calculate the intersection and union of the binary masks
    intersection = (output_ & target_).sum()
    union = (output_ | target_).sum()

    # Calculate the Intersection over Union (IoU) score with smoothing
    return (intersection + smooth) / (union + smooth)


In [None]:
import os
import yaml
import torch
import pandas as pd
import torch.nn as nn
import torch.optim as optim

from glob import glob
from tqdm import tqdm
from collections import OrderedDict
from source.utils import iou_score, AverageMeter
from albumentations import Resize
from albumentations.augmentations import transforms
from sklearn.model_selection import train_test_split
from albumentations.core.composition import Compose, OneOf
from albumentations.augmentations.geometric.rotate import RandomRotate90
from source.network import UNetPP
from source.dataset import DataSet

# Define a function for training the segmentation model
def train(deep_sup, train_loader, model, criterion, optimizer):
    # Initialize average meters to track loss and IoU
    avg_meters = {'loss': AverageMeter(), 'iou': AverageMeter()}
    
    # Set the model to training mode
    model.train()

    # Create a progress bar for training
    pbar = tqdm(total=len(train_loader))

    # Check if GPU is available, and if so, use it
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    
    # Iterate through the training data
    for input, target, _ in train_loader:
        # Move input and target to the GPU
        input = input.to(device)
        target = target.to(device)

        # Compute model output
        if deep_sup:
            outputs = model(input)
            loss = 0
            for output in outputs:
                loss += criterion(output, target)
            loss /= len(outputs)
            iou = iou_score(outputs[-1], target)
        else:
            output = model(input)
            loss = criterion(output, target)
            iou = iou_score(output, target)

        # Zero out gradients, backpropagate, and update model parameters
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Update average meters with loss and IoU
        avg_meters['loss'].update(loss.item(), input.size(0))
        avg_meters['iou'].update(iou, input.size(0))

        # Update the progress bar with the current loss and IoU
        postfix = OrderedDict([('loss', avg_meters['loss'].avg), ('iou', avg_meters['iou'].avg)])
        pbar.set_postfix(postfix)
        pbar.update(1)

    # Close the progress bar
    pbar.close()

    # Return a dictionary with average loss and IoU
    return OrderedDict([('loss', avg_meters['loss'].avg), ('iou', avg_meters['iou'].avg)])


# Define a function for validating the segmentation model
def validate(deep_sup, val_loader, model, criterion):
    # Initialize average meters to track loss and IoU
    avg_meters = {'loss': AverageMeter(), 'iou': AverageMeter()}

    # Set the model to evaluation mode
    model.eval()
    
    # Check if GPU is available, and if so, use it
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    with torch.no_grad():
        # Create a progress bar for validation
        pbar = tqdm(total=len(val_loader))
        
        # Iterate through the validation data
        for input, target, _ in val_loader:
            # Move input and target to the GPU
            input = input.to(device)
            target = target.to(device)

            # Compute model output
            if deep_sup:
                outputs = model(input)
                loss = 0
                for output in outputs:
                    loss += criterion(output, target)
                loss /= len(outputs)
                iou = iou_score(outputs[-1], target)
            else:
                output = model(input)
                loss = criterion(output, target)
                iou = iou_score(output, target)

            # Update average meters with loss and IoU
            avg_meters['loss'].update(loss.item(), input.size(0))
            avg_meters['iou'].update(iou, input.size(0))

            # Update the progress bar with the current loss and IoU
            postfix = OrderedDict([('loss', avg_meters['loss'].avg), ('iou', avg_meters['iou'].avg)]
            pbar.set_postfix(postfix)
            pbar.update(1)

        # Close the progress bar
        pbar.close()

    # Return a dictionary with average loss and IoU for the validation dataset
    return OrderedDict([('loss', avg_meters['loss'].avg), ('iou', avg_meters['iou'].avg)])


In [None]:
import cv2
import yaml
import torch
import numpy as np
import matplotlib.pyplot as plt

from source.network import UNetPP
from argparse import ArgumentParser
from albumentations.augmentations import transforms
from albumentations.core.composition import Compose

# Define a set of image transformations for validation
val_transform = Compose([
    transforms.Resize(256, 256),  # Resize the image to 256x256 pixels
    transforms.Normalize(),  # Normalize the image
])

# Define a function for loading and preprocessing an image
def image_loader(image_name):
    # Read the image using OpenCV
    img = cv2.imread(image_name)
    
    # Apply the validation transformations to the image
    img = val_transform(image=img)["image"]
    
    # Convert the image to a NumPy array of float32 and normalize it
    img = img.astype('float32') / 255
    
    # Transpose the image dimensions to match the PyTorch format (channels, height, width)
    img = img.transpose(2, 0, 1)
    
    return img


In [None]:
# Open the "config.yaml" file for reading
with open("config.yaml") as f:
    # Load the YAML content into a Python dictionary
    config = yaml.load(f)

# Extract configuration values from the dictionary
extn = config["extn"]  # File extension (e.g., ".jpg", ".png")
epochs = config["epochs"]  # Number of training epochs
log_path = config["log_path"]  # Path for log files
mask_path = config["mask_path"]  # Path to the mask images
image_path = config["image_path"]  # Path to the input images
model_path = config["model_path"]  # Path to the saved model


---

## Create log file

In [None]:
# Create an ordered dictionary for logging information during training
log = OrderedDict([
    ('epoch', []),        # List to store epoch numbers
    ('loss', []),         # List to store training loss values
    ('iou', []),          # List to store training IoU (Intersection over Union) values
    ('val_loss', []),     # List to store validation loss values
    ('val_iou', []),      # List to store validation IoU values
])

# Initialize a variable to track the best IoU during training
best_iou = 0

# Initialize a trigger variable, which is used for some condition in the code
trigger = 0


---

## Split images into train and validation set

In [None]:
# Create a wildcard file extension pattern by appending the extension with '*'
extn_ = f"*{extn}"

# Use the 'glob' function to find all image files with the specified extension in the 'image_path' directory
img_ids = glob(os.path.join(image_path, extn_))

# Extract the base filenames (without extension) from the list of image file paths
img_ids = [os.path.splitext(os.path.basename(p))[0] for p in img_ids]

# Split the list of image IDs into training and validation sets using a test size of 20%
train_img_ids, val_img_ids = train_test_split(img_ids, test_size=0.2)


---

## Define data transformations

In [None]:
# Define a set of data augmentation transformations for the training dataset
train_transform = Compose([
    transforms.RandomRotate90(),  # Randomly rotate the image by 90 degrees
    transforms.Flip(),  # Randomly flip the image horizontally or vertically
    OneOf([
        transforms.HueSaturationValue(),  # Randomly change hue, saturation, and value
        transforms.RandomBrightness(),  # Randomly adjust brightness
        transforms.RandomContrast(),  # Randomly adjust contrast
    ], p=1),  # Randomly select one of the color augmentation operations with equal probability (p=1)
    transforms.Resize(256, 256),  # Resize the image to 256x256 pixels
    transforms.Normalize(),  # Normalize the image
])

# Define a set of transformations for the validation dataset
val_transform = Compose([
    transforms.Resize(256, 256),  # Resize the image to 256x256 pixels
    transforms.Normalize(),  # Normalize the image
])


---

## Create train and validation dataset

In [None]:
# Create the training dataset
train_dataset = DataSet(
    img_ids=train_img_ids,    # List of image IDs for training
    img_dir=image_path,       # Directory containing the input images
    mask_dir=mask_path,       # Directory containing the mask images
    img_ext=extn,             # File extension for input images (e.g., ".jpg", ".png")
    mask_ext=extn,            # File extension for mask images
    transform=train_transform  # Data augmentation and preprocessing for training
)

# Create the validation dataset
val_dataset = DataSet(
    img_ids=val_img_ids,      # List of image IDs for validation
    img_dir=image_path,       # Directory containing the input images
    mask_dir=mask_path,       # Directory containing the mask images
    img_ext=extn,             # File extension for input images
    mask_ext=extn,            # File extension for mask images
    transform=val_transform  # Data preprocessing for validation
)


---

## Create train and validation data loaders

In [None]:
# Create a data loader for the training dataset
train_loader = torch.utils.data.DataLoader(
    train_dataset,  # The training dataset
    batch_size=16,    # Batch size for each iteration
    shuffle=True,     # Shuffle the data during training (randomize the order)
    drop_last=True    # Drop the last batch if its size is less than the specified batch size
)

# Create a data loader for the validation dataset
val_loader = torch.utils.data.DataLoader(
    val_dataset,    # The validation dataset
    batch_size=16,  # Batch size for each iteration
    shuffle=False,  # Do not shuffle the data during validation
    drop_last=False  # Do not drop the last batch during validation
)


---

## Create the model object

In [None]:
# Create a UNet++ model with 1 output channel (for binary segmentation) and 3 input channels (for RGB images)
model = UNetPP(1, 3, True)

# Check if a GPU is available and move the model to the GPU
if torch.cuda.is_available():
    model.cuda()

# Define the loss function for binary segmentation (BCEWithLogitsLoss)
criterion = nn.BCEWithLogitsLoss()

# Filter model parameters that require gradients for optimization
params = filter(lambda p: p.requires_grad, model.parameters())

# Define the optimizer (Adam) with a learning rate of 1e-3 and weight decay of 1e-4
optimizer = optim.Adam(params, lr=1e-3, weight_decay=1e-4)


---

## Run the train data loop

In [None]:
# Iterate over the specified number of epochs
for epoch in range(epochs):
    print(f'Epoch [{epoch}/{epochs}]')

    # Train the model for one epoch and record training and validation metrics
    train_log = train(True, train_loader, model, criterion, optimizer)
    val_log = validate(True, val_loader, model, criterion)

    # Print and log the training and validation metrics
    print('loss %.4f - iou %.4f - val_loss %.4f - val_iou %.4f'
          % (train_log['loss'], train_log['iou'], val_log['loss'], val_log['iou']))

    # Update the log dictionary with epoch-specific metrics
    log['epoch'].append(epoch)
    log['loss'].append(train_log['loss'])
    log['iou'].append(train_log['iou'])
    log['val_loss'].append(val_log['loss'])
    log['val_iou'].append(val_log['iou'])

    # Save the log to a CSV file
    pd.DataFrame(log).to_csv(log_path, index=False)

    # Increment the trigger variable
    trigger += 1

    # Check if the validation IoU score is better than the best IoU score seen so far
    if val_log['iou'] > best_iou:
        # Save the model's state dictionary
        torch.save(model.state_dict(), model_path)
        best_iou = val_log['iou']
        print("=> saved best model")
        trigger = 0


In [None]:
import cv2
import yaml
import torch
import numpy as np
import matplotlib.pyplot as plt

from predict import image_loader
from source.network import UNetPP
from argparse import ArgumentParser
from albumentations.augmentations import transforms
from albumentations.core.composition import Compose

%matplotlib inline

---

## Create validation transforms

In [None]:
# Define a transformation pipeline for validation images
val_transform = Compose([
    transforms.Resize(256, 256),  # Resize images to a fixed size of 256x256 pixels
    transforms.Normalize(),  # Normalize pixel values of the images
])


In [None]:
with open("config.yaml") as f:
    config = yaml.load(f)

In [None]:
im_width = config["im_width"]
im_height = config["im_height"]
model_path = config["model_path"]
output_path = config["output_path"]

---

## Load the model

In [None]:
# Create a UNet++ model object
model = UNetPP(1, 3, True)

# Load pre-trained weights from the specified model path
model.load_state_dict(torch.load(model_path))

# Check if a GPU is available and move the model to the GPU
if torch.cuda.is_available():
    model.cuda()

# Set the model's mode to evaluation
model.eval()


---

## Load the test image

In [None]:
test_img = "input/PNG/Original/115.png"

# Load and preprocess the test image using the image_loader function
image = image_loader(test_img)

# Convert the image to a batch of 1 image by adding an additional dimension
image = np.expand_dims(image, 0)

# Convert the NumPy array to a PyTorch tensor
image = torch.from_numpy(image)


In [None]:
# Check if a GPU (CUDA) is available
if torch.cuda.is_available():
    # Transfer the preprocessed image to the GPU
    image = image.to(device="cuda")


---

## Make prediction

In [None]:
# Calculate the mask by passing the input image through the model
mask = model(image)

In [None]:
# Select the last output channel (assuming it's a multi-channel output)
mask = mask[-1]

# Convert the torch tensor to a numpy array
mask = mask.detach().cpu().numpy()

# Remove single-dimensional entries and convert output to a 2D array
mask = np.squeeze(np.squeeze(mask, axis=0), axis=0)

# Convert the output to binary based on a threshold
mask[mask > -2.5] = 255
mask[mask <= -2.5] = 0

# Resize the output mask to the input image size (im_width, im_height)
mask = cv2.resize(mask, (im_width, im_height))

plt.imshow(mask, cmap="gray")

---

## Read and plot the ground truth mask

In [None]:
# Define the path to the ground truth mask image
actual_mask = "input/PNG/Ground Truth/115.png"

# Load the ground truth mask image using plt.imread
am = plt.imread(actual_mask)

plt.imshow(am, cmap="gray")


---