# Installing required libraries

In [None]:
!pip install coremltools
!pip install pycocotools
!pip install transformers
!pip install scikit-learn
!pip install pytorch-lightning
!pip install torch torchvision torchaudio

# conda install -c conda-forge gcc libstdcxx-ng

# Importing required packages

In [None]:
import os
import cv2
import torch
import numpy as np
import torchmetrics
import coremltools as ct
import pytorch_lightning as pl
from PIL import Image,ImageDraw
import torch.nn.functional as F
from pycocotools.coco import COCO
from matplotlib import pyplot as plt
from pytorch_lightning import Trainer
from torchmetrics import JaccardIndex
import torchvision.transforms as transforms
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.callbacks import ModelCheckpoint
from torch.utils.data import DataLoader, IterableDataset
from transformers import SegformerForSemanticSegmentation
from scipy.ndimage import label, find_objects, distance_transform_edt

# Setting SLURM variables

In [2]:
os.environ['SLURM_NTASKS_PER_NODE'] = '8'
os.environ.pop('SLURM_NTASKS', None)

# Print SLURM environment variables to verify
print("SLURM Environment Variables:")
print("SLURM_JOB_ID:", os.environ.get('SLURM_JOB_ID', 'Not Set'))
print("SLURM_NTASKS:", os.environ.get('SLURM_NTASKS', 'Not Set'))
print("SLURM_NTASKS_PER_NODE:", os.environ.get('SLURM_NTASKS_PER_NODE', 'Not Set'))
print("SLURM_JOB_NODELIST:", os.environ.get('SLURM_JOB_NODELIST', 'Not Set'))
print("SLURM_JOB_NAME:", os.environ.get('SLURM_JOB_NAME', 'Not Set'))

# Set float32 matmul precision for Tensor Cores
torch.set_float32_matmul_precision('high')

SLURM Environment Variables:
SLURM_JOB_ID: 6072005
SLURM_NTASKS: Not Set
SLURM_NTASKS_PER_NODE: 8
SLURM_JOB_NODELIST: s1cmp004
SLURM_JOB_NAME: sys/dashboard/sys/jupyternotebook


# Defining file locations

In [3]:
# Image paths
test_dir = './images/merged_test'
train_dir = './images/merged_train'
val_dir = './images/merged_val'

#Annotation Paths
test_ann_file = './annotations/test_merged_7_24_24.json'
train_ann_file = './annotations/train_merged_7_24_24.json'
val_ann_file = './annotations/merged_val_07_24_24.json'

# Normalization

In [4]:
def dynamic_normalization(img_dir, annotation_file):
    coco = COCO(annotation_file)
    img_ids = list(coco.imgs.keys())
    means = []
    stds = []

    for img_id in img_ids:
        img_info = coco.imgs[img_id]
        path = os.path.join(img_dir, img_info['file_name'])
        image = Image.open(path).convert('RGB')

        # Convert image to tensor without normalization
        to_tensor = transforms.ToTensor()
        image_tensor = to_tensor(image)
        
        means.append(image_tensor.mean(dim=(1, 2)))
        stds.append(image_tensor.std(dim=(1, 2)))

    mean = torch.stack(means).mean(dim=0)
    std = torch.stack(stds).mean(dim=0)

    return transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean, std),
    ])

# Defining COCO loader 

In [5]:
# Loader for COCO formatted data
def get_coco_loader(img_dir, annotation_file, transform, augment=False, num_augmentations=1):
    coco = COCO(annotation_file)
    img_ids = list(coco.imgs.keys())
    def loader():
        for img_id in img_ids:
            img_info = coco.imgs[img_id]
            path = os.path.join(img_dir, img_info['file_name'])
            image = Image.open(path).convert('RGB')
            image_tensor = transform(image)
            ann_ids = coco.getAnnIds(imgIds=[img_id])
            anns = coco.loadAnns(ann_ids)
            mask = np.zeros((img_info['height'], img_info['width']), dtype=np.int64)
            for ann in anns:
                mask = np.maximum(mask, coco.annToMask(ann) * ann['category_id'])
            mask_tensor = torch.tensor(mask)
            yield image_tensor, mask_tensor
            
            if augment:
                for _ in range(num_augmentations):
                    aug_image, aug_mask = apply_transform(image, mask)
                    aug_image_tensor = transform(aug_image)
                    aug_mask_tensor = torch.tensor(aug_mask, dtype=torch.int64)
                    yield aug_image_tensor, aug_mask_tensor
    return loader

def get_position_transform():
    transform = transforms.Compose([
        transforms.RandomHorizontalFlip(),  
        transforms.RandomRotation(15),      
    ])
    return transform

def get_color_transform():
    transform = transforms.Compose([
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),  # Randomly changes the brightness, contrast, saturation, and hue
    ])
    return transform

def apply_transform(image, mask):
    # Ensure the mask is in uint8 format then convert to PIL Image
    mask = mask.astype(np.uint8)  
    mask = Image.fromarray(mask) 

    # seed for random transformations
    seed = torch.randint(0, 2**32, (1,)).item()

    # Apply the same position transformation to both image and mask
    torch.manual_seed(seed)
    image = get_position_transform()(image)
    torch.manual_seed(seed)
    mask = get_position_transform()(mask)

    # apply color transformation to image
    image = get_color_transform()(image)

    # Convert mask back to numpy array
    mask = np.array(mask)

    return image, mask

# Custom DataLoader for COCO data
class COCOLoader(IterableDataset):
    def __init__(self, img_dir, annotation_file, transform, augment=False, num_augmentations=1):
        super().__init__()
        self.img_dir = img_dir
        self.annotation_file = annotation_file
        self.transform = transform
        self.augment = augment
        self.num_augmentations = num_augmentations

    def __iter__(self):
        return get_coco_loader(self.img_dir, self.annotation_file, self.transform, self.augment, self.num_augmentations)()

# Defining Test-Train-Val loaders

In [None]:
# Setup data tranforms
test_transform = dynamic_normalization(test_dir, test_ann_file)
train_transform = dynamic_normalization(train_dir, train_ann_file)
val_transform = dynamic_normalization(val_dir, val_ann_file)


# # Setup data loaders
test_loader = DataLoader(COCOLoader(test_dir, test_ann_file, test_transform, False), batch_size=32, num_workers=4)
train_loader = DataLoader(COCOLoader(train_dir, train_ann_file, train_transform), batch_size=32, num_workers=4)
val_loader = DataLoader(COCOLoader(val_dir, val_ann_file, val_transform), batch_size=32, num_workers=4)

# Visualize Images with annotations

In [None]:
for images, masks in val_loader:
    # Convert tensors to numpy arrays
    images_np = images.permute(0, 2, 3, 1).numpy()  # Convert from (N, C, H, W) to (N, H, W, C)
    masks_np = masks.numpy()

    # Plot images and masks
    for i in range(len(images)):
        image_np = images_np[i]
        mask_np = masks_np[i]

        # Overlay mask on image
        plt.figure(figsize=(10, 5))
        plt.subplot(1, 2, 2)
        plt.imshow(image_np)
        plt.imshow(mask_np, alpha=0.5, cmap='jet', interpolation='nearest')  # Overlay mask on image
        plt.title('Image with Pupil Highlighted')
        plt.axis('off')

        plt.show()

# Function to calculate Mean-Iou

In [9]:
def calculate_iou(pred, target, num_classes):
    iou = []
    pred = pred.view(-1)
    target = target.view(-1)
    for cls in range(num_classes):
        pred_inds = pred == cls
        target_inds = target == cls
        intersection = (pred_inds[target_inds]).sum().item()
        union = pred_inds.sum().item() + target_inds.sum().item() - intersection
        if union == 0:
            iou.append(float('nan'))  # If no ground truth pixels exist in this class
        else:
            iou.append(intersection / union)
    return torch.tensor(iou).mean()  # Return mean IoU over classes

# Segformer Model Definition

In [10]:
# PyTorch Lightning Module for Segformer
class SegformerModule(pl.LightningModule):
    
    def __init__(self, num_classes=2):
        super().__init__()
        self.model = SegformerForSemanticSegmentation.from_pretrained(
            'nvidia/segformer-b0-finetuned-ade-512-512',
            num_labels=num_classes,
            ignore_mismatched_sizes=True
        )
        self.test_losses = []
        self.test_ious = []
        self.metrics = torchmetrics.JaccardIndex(task='multiclass', num_classes=self.model.config.num_labels, average='macro')

    def forward(self, pixel_values, labels=None):
        return self.model(pixel_values=pixel_values, labels=labels)

    def training_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images, labels=labels)
        loss = outputs.loss
        self.log('train_loss', loss)

        # Upsample logits to match the size of labels
        logits = outputs.logits
        logits = F.interpolate(logits, size=labels.shape[-2:], mode='bilinear', align_corners=False)
        preds = torch.argmax(logits, dim=1)
        self.metrics.update(preds, labels)
        self.log('train_iou', self.metrics, on_step=False, on_epoch=True)

        return loss

    def validation_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images, labels=labels)
        val_loss = outputs.loss
        self.log('val_loss', val_loss)

        # Upsample logits to match the size of labels
        logits = outputs.logits
        logits = F.interpolate(logits, size=labels.shape[-2:], mode='bilinear', align_corners=False)
        preds = torch.argmax(logits, dim=1)
        self.metrics.update(preds, labels)
        self.log('val_iou', self.metrics, on_step=False, on_epoch=True)

        return val_loss

    def test_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images, labels=labels)
        loss = outputs.loss
        self.log('test_loss', loss)

        # Upsample logits to match the size of labels
        logits = outputs.logits
        logits = F.interpolate(logits, size=labels.shape[-2:], mode='bilinear', align_corners=False)
        preds = torch.argmax(logits, dim=1)
        self.metrics.update(preds, labels)
        self.log('test_iou', self.metrics, on_step=False, on_epoch=True)

        self.test_losses.append(loss.detach())
        return {'test_loss': loss}

    def on_test_epoch_end(self):
        avg_loss = torch.stack(self.test_losses).mean()
        avg_iou = self.metrics.compute()  # Compute the final IoU score
        self.log('avg_test_loss', avg_loss)
        self.log('avg_iou_score', avg_iou)
        self.test_losses.clear()
        self.test_ious.clear()
        self.metrics.reset()  # Reset metric states for the next epoch

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=0.0001)
        return optimizer

In [None]:
#Check If cuda is available
# print(torch.cuda.is_available())
# print(torch.cuda.device_count())

# Model initialization

In [None]:
# Initialize model and checkpoint
num_classes = 2 
segformer = SegformerModule(num_classes)
checkpoint_callback = ModelCheckpoint(monitor = 'val_loss', mode = 'min', dirpath = './checkpoints')
logger = TensorBoardLogger("tb_logs", name="segformer")

# Defining trainer attributes

In [None]:
trainer = Trainer(max_epochs= 128,
                  accelerator='gpu' if torch.cuda.is_available() else 'cpu',
                  devices = 1, 
                  callbacks=[checkpoint_callback]) 

# Training the model

In [None]:
# Train the model
trainer.fit(segformer, train_loader, val_loader)

# Testing the model

In [15]:
# Test the model
trainer.test(model = segformer, dataloaders = test_loader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


loading annotations into memory...
loading annotations into memory...
Done (t=0.02s)Done (t=0.02s)
creating index...

creating index...index created!
loading annotations into memory...

index created!
Done (t=0.01s)
creating index...
index created!
loading annotations into memory...
Done (t=0.01s)
creating index...
index created!


Testing: |          | 0/? [00:00<?, ?it/s]

────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
      avg_iou_score         0.8706876039505005
      avg_test_loss        0.026751693338155746
        test_iou            0.8706876039505005
        test_loss          0.030302807688713074
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test_loss': 0.030302807688713074,
  'test_iou': 0.8706876039505005,
  'avg_test_loss': 0.026751693338155746,
  'avg_iou_score': 0.8706876039505005}]

# Saving the model

In [16]:
model_save_path = './checkpoints/segformer_model.pth'
torch.save(segformer.state_dict(), model_save_path)

# Loading model from checkpoint

In [17]:
# Assuming `checkpoint_callback.best_model_path` contains the path to the best model checkpoint
model_path = checkpoint_callback.best_model_path
segformer = SegformerModule.load_from_checkpoint(model_path, num_classes=2)
segformer.eval()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
segformer.to(device)

Some weights of SegformerForSemanticSegmentation were not initialized from the model checkpoint at nvidia/segformer-b0-finetuned-ade-512-512 and are newly initialized because the shapes did not match:
- decode_head.classifier.bias: found shape torch.Size([150]) in the checkpoint and torch.Size([2]) in the model instantiated
- decode_head.classifier.weight: found shape torch.Size([150, 256, 1, 1]) in the checkpoint and torch.Size([2, 256, 1, 1]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


SegformerModule(
  (model): SegformerForSemanticSegmentation(
    (segformer): SegformerModel(
      (encoder): SegformerEncoder(
        (patch_embeddings): ModuleList(
          (0): SegformerOverlapPatchEmbeddings(
            (proj): Conv2d(3, 32, kernel_size=(7, 7), stride=(4, 4), padding=(3, 3))
            (layer_norm): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
          )
          (1): SegformerOverlapPatchEmbeddings(
            (proj): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
            (layer_norm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
          )
          (2): SegformerOverlapPatchEmbeddings(
            (proj): Conv2d(64, 160, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
            (layer_norm): LayerNorm((160,), eps=1e-05, elementwise_affine=True)
          )
          (3): SegformerOverlapPatchEmbeddings(
            (proj): Conv2d(160, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
            (layer

# Visualizing model's predictions

In [None]:
# Function to de-normalize the image
def denormalize(image_tensor, mean, std):
    image_np = image_tensor.permute(1, 2, 0).cpu().numpy()  # Convert to HWC format
    mean = np.array(mean)
    std = np.array(std)
    image_np = std * image_np + mean  # De-normalize
    image_np = np.clip(image_np * 255, 0, 255).astype(np.uint8)  # Convert to uint8
    return image_np


# def denormalize(image_tensor):
#     mean = np.array([0.485, 0.456, 0.406])
#     std = np.array([0.229, 0.224, 0.225])
#     image_np = image_tensor.permute(1, 2, 0).cpu().numpy()  # Convert to HWC format
#     image_np = std * image_np + mean  # De-normalize
#     image_np = np.clip(image_np * 255, 0, 255).astype(np.uint8)  # Convert to uint8
#     return image_np

color_map = {
    0: (0, 0, 0),   # Background
    1: (255, 0, 0), # Original mask
    2: (0, 255, 0), # Predicted mask
}

# Define mean and std for normalization
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

# Ensure your existing model instance is in evaluation mode
segformer.eval()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
segformer.to(device)  # Move the model to the appropriate device

# Function to overlay mask
def overlay_mask(image_np, mask_np, color):
    vis_shape = image_np.shape
    overlay = image_np.copy()
    for i in range(3):
        overlay[:, :, i] = np.where(mask_np == 1, color[i], overlay[:, :, i])
    return Image.blend(Image.fromarray(image_np), Image.fromarray(overlay), alpha=0.5)

with torch.no_grad():
    for images, masks in test_loader:
        images = images.to(device)
        masks = masks.to(device)

        outputs = segformer(images)
        logits = outputs.logits if hasattr(outputs, 'logits') else outputs

        upsampled_logits = torch.nn.functional.interpolate(logits, size=masks.shape[-2:], mode="bilinear", align_corners=False)
        predicted = upsampled_logits.argmax(dim=1).cpu().numpy()
        masks = masks.cpu().numpy()

        for i in range(len(images)):
            image_np = denormalize(images[i], mean, std)
            original_mask_np = masks[i]
            pred_mask_np = predicted[i]

            original_overlay = overlay_mask(image_np, original_mask_np, color_map[1])
            predicted_overlay = overlay_mask(image_np, pred_mask_np, color_map[1])

            plt.figure(figsize=(10, 5))
            plt.subplot(1, 2, 1)
            plt.imshow(original_overlay)
            plt.title('Image with Original Mask')
            plt.axis('off')

            plt.subplot(1, 2, 2)
            plt.imshow(predicted_overlay)
            plt.title('Image with Predicted Mask')
            plt.axis('off')

            plt.show()
            

# Creating CoreMl object of the model

In [None]:
# Ensure that the existing model instance is in evaluation mode
segformer.eval()

# Move model to the appropriate device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
segformer.to(device)

# Extract the underlying HuggingFace model from the LightningModule
underlying_model = segformer.model

# Dummy input to match the expected input size (batch_size, channels, height, width)
dummy_input = torch.randn(32, 3, 640, 480).to(device)

# Trace the model
traced_model = torch.jit.trace(underlying_model, dummy_input)

# Convert the traced model to Core ML
coreml_model = ct.convert(
    traced_model,
    inputs=[ct.ImageType(name="input", shape=dummy_input.shape, scale=1/255.0, bias=[-0.485, -0.456, -0.406])]
)

# Save the Core ML model
coreml_model.save('cyclops_coreml.mlmodel')