In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## UNET

In [1]:
import torch
import torch.nn as nn

class conv_block(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()

        self.conv1 = nn.Conv2d(in_c, out_c, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(out_c)

        self.conv2 = nn.Conv2d(out_c, out_c, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_c)

        self.relu = nn.ReLU()

    def forward(self, inputs):
        x = self.conv1(inputs)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)

        return x

class encoder_block(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()

        self.conv = conv_block(in_c, out_c)
        self.pool = nn.MaxPool2d((2, 2))

    def forward(self, inputs):
        x = self.conv(inputs)
        p = self.pool(x)

        return x, p

class decoder_block(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()

        self.up = nn.ConvTranspose2d(in_c, out_c, kernel_size=2, stride=2, padding=0)
        self.conv = conv_block(out_c+out_c, out_c)

    def forward(self, inputs, skip):
        x = self.up(inputs)
        x = torch.cat([x, skip], axis=1)
        x = self.conv(x)
        return x

class build_unet(nn.Module):
    def __init__(self):
        super().__init__()

        """ Encoder """
        self.e1 = encoder_block(3, 64)
        self.e2 = encoder_block(64, 128)
        self.e3 = encoder_block(128, 256)
        self.e4 = encoder_block(256, 512)

        """ Bottleneck """
        self.b = conv_block(512, 1024)

        """ Decoder """
        self.d1 = decoder_block(1024, 512)
        self.d2 = decoder_block(512, 256)
        self.d3 = decoder_block(256, 128)
        self.d4 = decoder_block(128, 64)

        """ Classifier """
        self.outputs = nn.Conv2d(64, 1, kernel_size=1, padding=0)

    def forward(self, inputs):
        """ Encoder """
        s1, p1 = self.e1(inputs)
        s2, p2 = self.e2(p1)
        s3, p3 = self.e3(p2)
        s4, p4 = self.e4(p3)

        """ Bottleneck """
        b = self.b(p4)

        """ Decoder """
        d1 = self.d1(b, s4)
        d2 = self.d2(d1, s3)
        d3 = self.d3(d2, s2)
        d4 = self.d4(d3, s1)

        outputs = self.outputs(d4)

        return outputs

if __name__ == "__main__":
    x = torch.randn((2, 3, 512, 512))
    f = build_unet()
    y = f(x)
    print(y.shape)


torch.Size([2, 1, 512, 512])


## LOSS

In [2]:

import torch
import torch.nn as nn
import torch.nn.functional as F

class DiceLoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):

        #comment out if your model contains a sigmoid or equivalent activation layer
        inputs = torch.sigmoid(inputs)

        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice = (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)

        return 1 - dice

class DiceBCELoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceBCELoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):

        #comment out if your model contains a sigmoid or equivalent activation layer
        inputs = torch.sigmoid(inputs)

        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice_loss = 1 - (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)
        BCE = F.binary_cross_entropy(inputs, targets, reduction='mean')
        Dice_BCE = BCE + dice_loss

        return Dice_BCE

## UTILS

In [3]:

import os
import time
import random
import numpy as np
import cv2
import torch

""" Seeding the randomness. """
def seeding(seed):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

""" Create a directory. """
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

""" Calculate the time taken """
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs


## DATA

In [35]:

import os
import numpy as np
import cv2
import torch
from torch.utils.data import Dataset

class DriveDataset(Dataset):
    def __init__(self, images_path, masks_path):

        self.images_path = images_path
        self.masks_path = masks_path
        self.n_samples = len(images_path)

    def __getitem__(self, index):
        """ Reading image """
        image = cv2.imread(self.images_path[index], cv2.IMREAD_COLOR)
        image = image/255.0 ## (512, 512, 3)
        image = np.transpose(image, (2, 0, 1))  ## (3, 512, 512)
        image = image.astype(np.float32)
        image = torch.from_numpy(image)

        """ Reading mask """
        mask = cv2.imread(self.masks_path[index], cv2.IMREAD_GRAYSCALE)
        mask = mask/255.0   ## (512, 512)
        mask = np.expand_dims(mask, axis=0) ## (1, 512, 512)
        mask = mask.astype(np.float32)
        mask = torch.from_numpy(mask)

        return image, mask

    def __len__(self):
        return self.n_samples


## Training

In [38]:
class DriveDataset(Dataset):
    def __init__(self, images_path, masks_path):
        self.images_path = images_path
        self.masks_path = masks_path
        self.n_samples = len(images_path)

    def __getitem__(self, index):
        """ Reading image """
        image_path = self.images_path[index]
        mask_path = self.masks_path[index]

        # Debugging: Print paths
        print(f"Image path: {image_path}")
        print(f"Mask path: {mask_path}")

        # Load image
        image = cv2.imread(image_path, cv2.IMREAD_COLOR)
        if image is None:
            print(f"Warning: Could not read image at {image_path}. Returning placeholder.")
            image = np.zeros((512, 512, 3), dtype=np.float32)  # Placeholder

        image = image / 255.0  # Normalize to [0, 1]
        image = np.transpose(image, (2, 0, 1))  # (3, 512, 512)
        image = image.astype(np.float32)
        image = torch.from_numpy(image)

        # Load mask
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        if mask is None:
            print(f"Warning: Could not read mask at {mask_path}. Returning placeholder.")
            mask = np.zeros((512, 512), dtype=np.float32)  # Placeholder

        mask = mask / 255.0  # Normalize to [0, 1]
        mask = np.expand_dims(mask, axis=0)  # Add channel dimension (1, 512, 512)
        mask = mask.astype(np.float32)
        mask = torch.from_numpy(mask)

        return image, mask

    def __len__(self):
        return self.n_samples


In [39]:
print(f"Train images: {train_x}")
print(f"Train masks: {train_y}")


Train images: ['/content/drive/MyDrive/boundary_drive_file/split_data/train/images']
Train masks: ['/content/drive/MyDrive/boundary_drive_file/split_data/train/masks']


In [40]:
import os
for path in train_x:
    if not os.path.exists(path):
        print(f"Missing file: {path}")


In [None]:
# Train function
def train(model, loader, optimizer, loss_fn, device):
    epoch_loss = 0.0
    model.train()

    for x, y in loader:
        print(f"Batch input shape: {x.shape}")  # Debugging: Print batch input shape
        print(f"Batch target shape: {y.shape}")  # Debugging: Print batch target shape

        x = x.to(device, dtype=torch.float32)
        y = y.to(device, dtype=torch.float32)

        optimizer.zero_grad()
        y_pred = model(x)
        print(f"Model output shape: {y_pred.shape}")  # Debugging: Print model output shape

        loss = loss_fn(y_pred, y)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    epoch_loss = epoch_loss / len(loader)
    return epoch_loss


# Evaluate function
def evaluate(model, loader, loss_fn, device):
    epoch_loss = 0.0
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device, dtype=torch.float32)
            y = y.to(device, dtype=torch.float32)

            y_pred = model(x)
            loss = loss_fn(y_pred, y)
            epoch_loss += loss.item()

    epoch_loss = epoch_loss / len(loader)
    return epoch_loss


# Main script
if __name__ == "__main__":
    """ Seeding """
    torch.manual_seed(42)

    """ Directories """
    os.makedirs("files", exist_ok=True)

    """ Load dataset """
    train_x = sorted(glob("/content/drive/MyDrive/boundary_drive_file/split_data/train/images/*"))
    train_y = sorted(glob("/content/drive/MyDrive/boundary_drive_file/split_data/train/masks/*"))
    valid_x = sorted(glob("/content/drive/MyDrive/boundary_drive_file/split_data/test/images/*"))
    valid_y = sorted(glob("/content/drive/MyDrive/boundary_drive_file/split_data/test/masks/*"))

    assert len(train_x) == len(train_y), "Mismatch between training images and masks count!"
    assert len(valid_x) == len(valid_y), "Mismatch between validation images and masks count!"

    print(f"Dataset Size: Train: {len(train_x)}, Valid: {len(valid_x)}")

    """ Hyperparameters """
    H, W = 512, 512
    batch_size = 2
    num_epochs = 50
    lr = 1e-4
    checkpoint_path = "files/checkpoint.pth"

    """ Dataset and loader """
    train_dataset = DriveDataset(train_x, train_y)
    valid_dataset = DriveDataset(valid_x, valid_y)

    train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=2
    )

    valid_loader = DataLoader(
        dataset=valid_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=2
    )

    """ Device and model """
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = build_unet()  # Replace with your U-Net model
    model = model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, verbose=True)
    loss_fn = DiceBCELoss()  # Replace with your custom loss function

    """ Training loop """
    best_valid_loss = float("inf")

    for epoch in range(num_epochs):
        start_time = time.time()

        train_loss = train(model, train_loader, optimizer, loss_fn, device)
        valid_loss = evaluate(model, valid_loader, loss_fn, device)

        """ Save the best model """
        if valid_loss < best_valid_loss:
            print(f"Validation loss improved from {best_valid_loss:.4f} to {valid_loss:.4f}. Saving model...")
            best_valid_loss = valid_loss
            torch.save(model.state_dict(), checkpoint_path)

        scheduler.step(valid_loss)
        end_time = time.time()

        epoch_mins, epoch_secs = divmod(int(end_time - start_time), 60)
        print(f"Epoch {epoch+1}/{num_epochs} | Time: {epoch_mins}m {epoch_secs}s")
        print(f"Train Loss: {train_loss:.4f} | Valid Loss: {valid_loss:.4f}")

Dataset Size: Train: 72, Valid: 18




Image path: /content/drive/MyDrive/boundary_drive_file/split_data/train/images/p11_aug_31.png
Mask path: /content/drive/MyDrive/boundary_drive_file/split_data/train/masks/p11_mask_aug_31.png
Image path: /content/drive/MyDrive/boundary_drive_file/split_data/train/images/p11_aug_32.png
Mask path: /content/drive/MyDrive/boundary_drive_file/split_data/train/masks/p11_mask_aug_32.png
Image path: /content/drive/MyDrive/boundary_drive_file/split_data/train/images/p11_aug_34.png
Mask path: /content/drive/MyDrive/boundary_drive_file/split_data/train/masks/p11_mask_aug_34.png
Image path: /content/drive/MyDrive/boundary_drive_file/split_data/train/images/lek_shore2_aug_8.png
Mask path: /content/drive/MyDrive/boundary_drive_file/split_data/train/masks/lek_shore2_mask_aug_8.png
Image path: /content/drive/MyDrive/boundary_drive_file/split_data/train/images/p11_aug_30.png
Mask path: /content/drive/MyDrive/boundary_drive_file/split_data/train/masks/p11_mask_aug_30.png
Batch input shape: torch.Size([2,

In [46]:
!ls /content/drive/MyDrive/boundary_drive_file/split_data/test/images
# !ls /content/drive/MyDrive/boundary_drive_file/split_data/test/masks


lek_shore1_aug_0.png	     lek_shore5__2014_aug_22.png  p14_aug_40.png  p5_aug_70.png
lek_shore1_aug_4.png	     p10_aug_26.png		  p14_aug_44.png  p8_aug_81.png
lek_shore3__2018_aug_10.png  p10_aug_28.png		  p17_aug_55.png  p9_aug_86.png
lek_shore3__2018_aug_12.png  p13_aug_35.png		  p17_aug_56.png
lek_shore4__2016_aug_18.png  p13_aug_39.png		  p4_aug_65.png
