# IImport Library

In [None]:
import os
import zipfile
import pandas as pd
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from torchvision.models.segmentation import fcn_resnet50
from torch.utils.tensorboard import SummaryWriter
from PIL import Image
import albumentations as A
from albumentations.pytorch import ToTensorV2

# Unzip the dataset

In [None]:
!tar -xzvf /content/drive/MyDrive/"Image Segmentation Workshop"/IDDSPLIT.tar.gz -C /content

# Custom Dataset Class

In [None]:
data_path = '/content/IDDCLEAN'
train_csv_path = os.path.join(data_path, 'train.csv')
val_csv_path = os.path.join(data_path, 'valid.csv')
test_csv_path = os.path.join(data_path, 'test.csv')


class IDDDataClass(Dataset):
    def __init__(self, csv_file, img_dir, mask_dir, transform=None):
        self.data_frame = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.data_frame.iloc[idx, 0])
        mask_name = os.path.join(self.mask_dir, self.data_frame.iloc[idx, 1])

        image = Image.open(img_name).convert('RGB')
        mask = Image.open(mask_name).convert('RGB')

        image = np.array(image)
        mask = np.array(mask)

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']

        mask = index_mapping(mask)

        return image, mask
    
transform = A.Compose([
    A.Resize(512, 512),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.RandomRotate90(p=0.5),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

# DataLoaders

In [None]:
train_images_dir = os.path.join(data_path, 'img')
train_mask_dir = os.path.join(data_path, 'mask')
val_img_dir = os.path.join(data_path, 'img')
val_mask_dir = os.path.join(data_path, 'mask')
test_img_dir = os.path.join(data_path, 'img')
test_mask_dir = os.path.join(data_path, 'mask')

train_dataset = IDDDataClass(train_csv_path, train_images_dir, train_mask_dir, transform=transform)
val_dataset = IDDDataClass(val_csv_path, val_img_dir, val_mask_dir, transform=transform)
test_dataset = IDDDataClass(test_csv_path, test_img_dir, test_mask_dir, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=4)

# Mask Mapping to Index

In [None]:
COLOR_DICT = {
    (128, 64, 128): 0,  # Road
    (244, 35, 232): 2,  # Sidewalk
    (220, 20, 60): 4,   # Person
    (255, 0, 0): 5,     # Rider
    (0, 0, 230): 6,     # Motorcycle
    (119, 11, 32): 7,   # Bicycle
    (0, 0, 142): 9,     # Car
    (0, 0, 70): 10,     # Truck
    (0, 60, 100): 11,   # Bus
    (0, 80, 100): 12,   # Train
    (102, 102, 156): 14 # Wall
}

def index_mapping(mask):
    mask = mask.numpy()
    index_mask = np.zeros(mask.shape[:2], dtype=np.int64)

    for rgb, idx in COLOR_DICT.items():
        index_mask[(mask == rgb).all(axis=2)] = idx

    return torch.tensor(index_mask, dtype=torch.long)

# Check image size

In [None]:
for images, masks in train_loader:
    print(images.shape, masks.shape)
    break

# Set up environment

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models.segmentation import fcn_resnet50
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm

# Ensure the GPU is used if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load Pretrained Model

In [None]:
num_classes = 11

model = fcn_resnet50(pretrained=True)
model.classifier[4] = nn.Conv2d(512, num_classes, kernel_size=1)
model.aux_classifier[4] = nn.Conv2d(256, num_classes, kernel_size=1)
model = model.to(device)
model.train()

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Tensorboard

In [None]:
writer = SummaryWriter(log_dir='/content/logs')

def mean_iou(pred, target, n_classes=21):
    iou_list = []
    pred = torch.argmax(pred, dim=1)
    for cls in range(n_classes):
        pred_inds = pred == cls
        target_inds = target == cls
        intersection = (pred_inds[target_inds]).sum().float()
        union = pred_inds.sum() + target_inds.sum() - intersection
        if union == 0:
            iou_list.append(float('nan'))
        else:
            iou_list.append(intersection / union)
    return torch.mean(torch.tensor(iou_list))

# Training Loop

In [None]:
num_epochs = 25

for epoch in range(num_epochs):
    epoch_loss = 0.0
    epoch_iou = 0.0

    for images, masks in tqdm(train_loader):
        images = images.to(device)
        masks = masks.to(device)

        optimizer.zero_grad()

        outputs = model(images)['out']
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_iou += mean_iou(outputs, masks).item()

    epoch_loss /= len(train_loader)
    epoch_iou /= len(train_loader)

    writer.add_scalar('Loss/train', epoch_loss, epoch)
    writer.add_scalar('IoU/train', epoch_iou, epoch)

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Mean IoU: {epoch_iou:.4f}")

writer.close()

# Visualize the results in TensorBoard
print("Training complete. Run 'tensorboard --logdir=/content/logs' to visualize the results.")

In [None]:
%load_ext tensorboard
%tensorboard --logdir=runs

# IOU

In [None]:
def mean_iou(pred, target, n_classes=num_classes):
    iou_list = []
    pred = torch.argmax(pred, dim=1)
    for cls in range(n_classes):
        pred_inds = pred == cls
        target_inds = target == cls
        intersection = (pred_inds[target_inds]).sum().float()
        union = pred_inds.sum() + target_inds.sum() - intersection
        if union == 0:
            iou_list.append(float('nan'))
        else:
            iou_list.append(intersection / union)
    return torch.mean(torch.tensor(iou_list))

# Performance Metrics

In [None]:
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix
import numpy as np

def calculate_metrics(y_true, y_pred, num_classes):
    metrics = {
        'pixel_accuracy': [],
        'precision': [],
        'recall': [],
        'f1_score': [],
        'iou': [],
        'ap': []
    }
    
    cm = confusion_matrix(y_true, y_pred, labels=range(num_classes))
    for i in range(num_classes):
        TP = cm[i, i]
        FP = cm[:, i].sum() - TP
        FN = cm[i, :].sum() - TP
        TN = cm.sum() - (TP + FP + FN)
        
        pixel_accuracy = (TP + TN) / cm.sum()
        precision = TP / (TP + FP) if (TP + FP) > 0 else 0
        recall = TP / (TP + FN) if (TP + FN) > 0 else 0
        f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        iou = TP / (TP + FP + FN) if (TP + FP + FN) > 0 else 0
        
        metrics['pixel_accuracy'].append(pixel_accuracy)
        metrics['precision'].append(precision)
        metrics['recall'].append(recall)
        metrics['f1_score'].append(f1_score)
        metrics['iou'].append(iou)
        
        precision_recall = precision_recall_fscore_support(y_true == i, y_pred == i, average='binary')
        metrics['ap'].append(precision_recall[2])
    
    return metrics

# Model Evaluation

In [None]:
def evaluate_model(model, data_loader, device, num_classes):
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for images, masks in tqdm(data_loader):
            images = images.to(device)
            masks = masks.to(device)
            outputs = model(images)['out']
            preds = torch.argmax(outputs, dim=1)
            
            all_preds.append(preds.cpu().numpy())
            all_labels.append(masks.cpu().numpy())
    
    all_preds = np.concatenate(all_preds, axis=0).flatten()
    all_labels = np.concatenate(all_labels, axis=0).flatten()
    
    return calculate_metrics(all_labels, all_preds, num_classes)

num_classes = 11
test_metrics = evaluate_model(model, test_loader, device, num_classes)
print(test_metrics)

# Visualize Predictions

In [None]:
def mean_iou_per_image(pred, target, n_classes=num_classes):
    iou_list = []
    pred = torch.argmax(pred, dim=1)
    for cls in range(n_classes):
        pred_inds = pred == cls
        target_inds = target == cls
        intersection = (pred_inds & target_inds).sum().float()
        union = pred_inds.sum() + target_inds.sum() - intersection
        if union == 0:
            iou_list.append(float('nan'))
        else:
            iou_list.append(intersection / union)
    return torch.tensor(iou_list)

In [None]:
def evaluate_model_per_image(model, data_loader, device, num_classes):
    model.eval()
    all_preds = []
    all_labels = []
    all_ious = []
    all_images = []
    
    with torch.no_grad():
        for images, masks in tqdm(data_loader):
            images = images.to(device)
            masks = masks.to(device)
            outputs = model(images)['out']
            preds = torch.argmax(outputs, dim=1)
            
            all_preds.append(preds.cpu().numpy())
            all_labels.append(masks.cpu().numpy())
            all_images.append(images.cpu().numpy())
            for i in range(images.size(0)):
                ious = mean_iou_per_image(outputs[i].unsqueeze(0), masks[i].unsqueeze(0)).cpu().numpy()
                all_ious.append(ious)
    
    all_preds = np.concatenate(all_preds, axis=0)
    all_labels = np.concatenate(all_labels, axis=0)
    all_images = np.concatenate(all_images, axis=0)
    all_ious = np.array(all_ious)
    
    print("Evaluation Complete")
    print("All Images Shape:", all_images.shape)
    print("All Predictions Shape:", all_preds.shape)
    print("All Labels Shape:", all_labels.shape)
    print("All IOUs Shape:", all_ious.shape)
    
    return all_images, all_preds, all_labels, all_ious

test_images, test_preds, test_labels, test_ious = evaluate_model_per_image(model, test_loader, device, num_classes)

In [None]:
import matplotlib.pyplot as plt

def visualize_low_iou_images(images, preds, labels, ious, class_idx, num_images=3):
    low_iou_indices = np.where(ious[:, class_idx] <= 0.5)[0]
    if len(low_iou_indices) < num_images:
        num_images = len(low_iou_indices)
    selected_indices = np.random.choice(low_iou_indices, num_images, replace=False)
    
    for idx in selected_indices:
        plt.figure(figsize=(15, 5))
        plt.subplot(1, 3, 1)
        plt.title('Input Image')
        plt.imshow(images[idx].transpose(1, 2, 0).astype(np.uint8))
        plt.subplot(1, 3, 2)
        plt.title('Ground Truth')
        plt.imshow(labels[idx])
        plt.subplot(1, 3, 3)
        plt.title('Prediction')
        plt.imshow(preds[idx])
        plt.show()

visualize_low_iou_images(test_images, test_preds, test_labels, test_ious, class_idx=0)