<a href="https://colab.research.google.com/github/MohamadHBaydoun/COMP9444/blob/MohamadBranch/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!git clone https://github.com/MohamadHBaydoun/COMP9444.git

In [None]:
# 0. Import packages
import os
import numpy as np
import pandas as pd
from PIL import Image
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F

In [None]:
# 1. Create model


# USE RESIDUAL NEURAL NETWORKS AND FCN'S WITH 1X1 CONVOLUTIONS AND UPSAMPLING OPERATIONS
import torch

class CustomFCN(nn.Module):
    def __init__(self, in_channels=3, out_channels=1):
        super(CustomFCN, self).__init__()

        # Downsampling (Encoder)
        self.enc1a = self.conv_chain(in_channels, 3, 3, 8, 1, 1)
        self.enc1 = self.conv_chain(8, 3, 3, 8, 1, 1)
        self.pool1 = nn.MaxPool2d(2, stride=2)

        self.enc2a = self.conv_chain(8, 3, 3, 16, 1, 1)
        self.enc2 = self.conv_chain(16, 3, 3, 16, 1, 1)
        self.pool2 = nn.MaxPool2d(2, stride=2)

        self.enc3a = self.conv_chain(16, 3, 3, 32, 1, 1)
        self.enc3 = self.conv_chain(32, 3, 3, 32, 1, 1)
        self.pool3 = nn.MaxPool2d(2, stride=2)

       # Upsampling (Decoder)
        self.up1 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec1 = self.conv_chain(32, 3, 3, 16, 1, 1)

        self.up2 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec2 = self.conv_chain(16, 3, 3, 8, 1, 1)

        self.up3 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec3 = nn.Conv2d(in_channels=8, out_channels=out_channels, kernel_size=1, padding=0, stride=1)

    def conv_chain(self, Din, L, M, D, S, P):
      return nn.Sequential(
          nn.Conv2d(in_channels=Din, out_channels=D, kernel_size=(L, M), padding=P, stride=S),
          nn.BatchNorm2d(D),
          nn.ReLU(inplace=True)
      )

    def forward(self, x):
        # Encoder
        x1 = self.pool1(self.enc1a(x))
        x2 = self.pool2(self.enc2a(x1))
        x3 = self.pool3(self.enc3a(x2))
        #x1 = self.iterative_forward(x, self.pool1, self.enc1a, self.enc1)
        #x2 = self.iterative_forward(x1, self.pool2, self.enc2a, self.enc2)
        #x3 = self.iterative_forward(x2, self.pool3, self.enc3a, self.enc3)

        # Decoder
        y = self.up1(x3)
        y = self.dec1(y)
        y = y + x2

        y = self.up2(y)
        y = self.dec2(y)
        y = y + x1

        y = self.up3(y)
        y = self.dec3(y)

        return torch.sigmoid(y)
    def iterative_forward(self, x, poolFunc, encFuncA, encFunc):
        # Encoder
        xNew = encFuncA(x)
        for i in range(7):
          xNew = encFunc(xNew)
        return poolFunc(xNew)

# Create model instance
model = CustomFCN()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

print(model)


In [None]:
# 2. Prepare data
IMAGE_PATH = "./COMP9444/top_200_images/images"
LABEL_PATH = "./COMP9444/top_200_images/"

def load_image(path, isLabel=False):
    """Load an RGB image and convert to a tensor"""
    img = Image.open(path).convert('RGB')
    transform = transforms.Compose([
        transforms.Resize((480, 640)),  # Resize images
        transforms.ToTensor(),  # Convert to tensor

    ])
    if isLabel:
      transform = transforms.Compose([
        transforms.Grayscale(num_output_channels=1),
        transforms.Resize((480, 640)),  # Resize images
        transforms.ToTensor(),  # Convert to tensor
    ])
    return transform(img)  # Apply transform

class QuakeCityDataset(torch.utils.data.Dataset):
    def __init__(self, file_list):
        self.file_list = file_list

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        filename = self.file_list[idx]
        img = load_image(os.path.join(IMAGE_PATH, filename))
        # component = load_annotation("component", filename)
        crack = load_image(os.path.join(LABEL_PATH, "crack", filename), isLabel=True)
        # spall = load_annotation("spall", filename)
        # rebar = load_annotation("rebar", filename)
        # ds = load_annotation("ds", filename)
        # depth = load_annotation("depth", filename)
        label = crack
        return img, label

all_files = sorted(os.listdir(IMAGE_PATH))
split_idx = int(0.5*len(all_files))
train_files = all_files[split_idx:]
test_files = all_files[:split_idx]

train_dataset = QuakeCityDataset(train_files)
test_dataset = QuakeCityDataset(test_files)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=False)

img_test_batch, label_test_batch = next(iter(test_loader))
print("Test image batch shape:", img_test_batch.shape)
print("Test label batch shape:", label_test_batch.shape)


In [None]:
# 2.5 Visualise data
def visualize_sample(image, label):
    """Display the image and its 6 label masks."""
    label_names = ["Component", "Crack", "Spall", "Rebar", "Damage State", "Depth"]
    fig, axes = plt.subplots(1, 2, figsize=(20, 10))

    # Show the original image
    axes[0].imshow(image.permute(1, 2, 0))  # Convert (C, H, W) -> (H, W, C)
    axes[0].set_title("Original Image")
    axes[0].axis("off")
    # Show the label mask
    axes[1].imshow(label.squeeze(), cmap='gray')  # Visualize label
    axes[1].set_title(label_names[1])
    axes[1].axis("off")

    plt.tight_layout()
    plt.show()

img_batch, label_batch = next(iter(train_loader))
visualize_sample(img_batch[0], label_batch[0])

In [None]:
# 3. Choose optimizer

criterion = nn.BCELoss()  # binary classification loss
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

In [None]:
import matplotlib.pyplot as plt

losses = []
epochs = 500

for epoch in range(1, epochs):
    for batch_id, (data, target) in enumerate(train_loader):
        data = data.to(device)
        target = target.to(device)

        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)

        loss.backward()
        optimizer.step()

        # Save and print loss
        losses.append(loss.item())
        print('Epoch%3d: zero_grad(): loss=%7.4f output_mean=%7.4f target_mean=%7.4f' %
              (epoch, loss.item(), output.mean().item(), target.mean().item()))


        # Compare results
        output_image = output[-1].cpu().detach().numpy()
        target_image = target[-1].cpu().detach().numpy()

        fig, axes = plt.subplots(1, 2, figsize=(20, 10))

        # Show the original image
        axes[0].imshow(output_image.squeeze(), cmap='gray')  # Convert (C, H, W) -> (H, W, C)
        axes[0].set_title(f"Epoch {epoch} - Output Image")
        axes[0].axis("off")
        # Show the label mask
        axes[1].imshow(target_image.squeeze(), cmap='gray')  # Visualize label
        axes[1].set_title(f"Target Image")
        axes[1].axis("off")

        plt.tight_layout()
        plt.show()

                # Plot live loss curve
        plt.figure(figsize=(10, 4))
        plt.plot(losses, label='Training Loss')
        plt.xlabel('Batch')
        plt.ylabel('Loss')
        plt.title('Live Loss Curve')
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()



In [None]:
!pip install --upgrade torchmetrics


In [None]:
def iou_score(outputs, targets, threshold=0.5):
    outputs = (outputs > threshold).float()
    intersection = (outputs * targets).sum()
    union = outputs.sum() + targets.sum() - intersection
    iou = intersection / union
    return iou.item()

def dice_coefficient(outputs, targets, threshold=0.5):
    outputs = (outputs > threshold).float()
    intersection = (outputs * targets).sum()
    dice = (2 * intersection) / (outputs.sum() + targets.sum())
    return dice.item()

model.eval()
total_iou, total_dice = 0, 0

with torch.no_grad():
    for images, masks in test_loader:
        #images, masks = images.cuda(), masks.cuda()

        with torch.cuda.amp.autocast():
            outputs = model(images)

        total_iou += iou_score(outputs, masks)
        total_dice += dice_coefficient(outputs, masks)

# Average metrics for the epoch
avg_iou = total_iou / len(test_loader)
avg_dice = total_dice / len(test_loader)
print(f'Validation - IoU: {avg_iou:.4f}, Dice: {avg_dice:.4f}')