In [None]:
import numpy as np
import matplotlib.pyplot as plt
import os
from glob import glob
import skimage as ski
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import pandas as pd
from sklearn.model_selection import KFold
from skimage import transform as ski_transform
from torchvision.transforms import ToTensor, Normalize, Compose, RandomRotation
from torchvision import models
import torchvision.transforms.functional as FT
from torcheval.metrics import functional as FM
from torchinfo import summary
import segmentation_models_pytorch as smp

In [None]:
transforms = Compose([
    ToTensor(),
    RandomRotation(10),
    Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
])

def joint_random_rotation(image, label, degrees):
    # Get a random angle from the range [-degrees, degrees]
    angle = RandomRotation.get_params([-degrees, degrees])
    
    # Rotate the image using bilinear interpolation
    image = FT.rotate(image, angle, interpolation=Image.BILINEAR)
    
    # Rotate the label using nearest neighbor to avoid interpolating label values
    label = FT.rotate(label, angle, interpolation=Image.NEAREST)

    return image, label

def joint_random_horizontal_flip(image, label):
    if torch.rand(1) < 0.5:
        image = FT.hflip(image)
        label = FT.hflip(label)
    return image, label

def joint_random_vertical_flip(image, label):
    if torch.rand(1) < 0.5:
        image = FT.vflip(image)
        label = FT.vflip(label)

    return image, label

def joint_random_crop(image, label, output_size):
    # Get the size of the image
    w, h = image.size

    # Get the target size
    th, tw = output_size

    # Randomly crop the image
    i = torch.randint(0, h - th + 1, (1,))
    j = torch.randint(0, w - tw + 1, (1,))
    image = FT.crop(image, i.item(), j.item(), th, tw)
    
    # Crop the label using the same parameters
    label = FT.crop(label, i.item(), j.item(), th, tw)

    return image, label


In [None]:
BATCH_SIZE = 4
EPOCHS = 60
LEARNING_RATE = [0.0001]#,0.0001,0.00001]
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
RESIZE_DIM = (256, 256)
repeats = 3
val_step = 3 # Every 3 epoch carry out validation
seed = 42
np.random.seed(seed)
torch.manual_seed(seed)

In [None]:
## Provide training path and test_path
train_path = r'/home/biomedialab/Desktop/Codes/Assignments/MLDS/Assignment_2/Q2/mlds_assignmet_2_ml_dl/Dataset/train'
test_path = r'/home/biomedialab/Desktop/Codes/Assignments/MLDS/Assignment_2/Q2/mlds_assignmet_2_ml_dl/Dataset/test'

train_csv_path = r'/home/biomedialab/Desktop/Codes/Assignments/MLDS/Assignment_2/Q2/mlds_assignmet_2_ml_dl/train.csv'
test_csv_path = r'/home/biomedialab/Desktop/Codes/Assignments/MLDS/Assignment_2/Q2/mlds_assignmet_2_ml_dl/test.csv'

In [None]:
"""
id : Name of the image
Binary prediction
    Healthy = 0
    Diseased = 1
segmentation prediction
    Probably the id of the diseased pixel
"""
train_csv_data = pd.read_csv(train_csv_path)
#Sort train_csv_data by id
train_csv_data = train_csv_data.sort_values(by='id')
train_csv_data.head(10)


In [None]:
class CustomDataset(Dataset):
    def __init__(self, images, masks, labels, is_train= True, resize_dim = (224, 224)):
        self.resize_dim = resize_dim
        self.images = images
        self.is_train = is_train
        self.image_transform = Compose([
            ToTensor(),
            Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
        ])
        self.label_transform = ToTensor()
        if is_train:
            self.masks = masks
            self.labels = labels
        else:
            self.masks = None
            self.labels = labels

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        if idx >= len(self.images):
            print('Reduce the index count as it is greater than the length of the dataset')
            return None
        if idx < 0:
            print('Index should be greater than or equal to 0')
            return None
        data = {}

        if self.is_train:
            data['image'] = Image.open(self.images[idx])
            data['mask'] = Image.open(self.masks[idx])
            data['class_label'] = self.labels.iloc[idx, 1]
            data['id'] = self.images[idx].split('/')[-1]
            data['image'], data['mask'] = joint_random_rotation(data['image'], data['mask'], 10)
            data['image'], data['mask'] = joint_random_horizontal_flip(data['image'], data['mask'])
            data['image'], data['mask'] = joint_random_vertical_flip(data['image'], data['mask'])
            data['image'], data['mask'] = joint_random_crop(data['image'], data['mask'], self.resize_dim)
            data['image'] = data['image'].resize(self.resize_dim)
            data['mask'] = data['mask'].resize(self.resize_dim)
            data['class_label'] = torch.tensor(data['class_label']).float()
            if self.image_transform:
                data['image'] = self.image_transform(data['image']) 
                data['mask'] = self.label_transform(data['mask'])
            return data
        else:
            data['image'] = Image.open(self.images[idx])
            data['id'] = self.images[idx].split('/')[-1]
            if self.image_transform:
                data['image'] = self.image_transform(data['image'])
            return data
    

In [None]:

# Split the images in val and train from train_path
def create_k_fold_split(train_path, train_csv_path):
    kf = KFold(n_splits=3, shuffle=True, random_state=42)
    train_img_paths = glob(os.path.join(train_path, 'images', '*.png'))
    train_img_paths.sort()
    train_mask_paths = glob(os.path.join(train_path, 'masks', '*.png'))
    train_mask_paths.sort() 
    train_labels = pd.read_csv(train_csv_path)
    train_labels = train_labels.sort_values(by='id')
    train_labels.index = range(len(train_labels))
    split = list(kf.split(train_img_paths))
    return split, train_img_paths, train_mask_paths, train_labels


In [None]:
def get_datasets(train_img_paths, train_mask_paths, train_labels, val_img_paths, val_mask_paths, val_labels):
    train_dataset = CustomDataset(train_img_paths, train_mask_paths, train_labels, is_train=True, resize_dim=RESIZE_DIM)
    val_dataset = CustomDataset(val_img_paths, val_mask_paths, val_labels, is_train=True, resize_dim=RESIZE_DIM)
    return train_dataset, val_dataset

def get_dataloader(train_dataset, val_dataset, batch_size):
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    return train_loader, val_loader

def get_splitted_data(train_indices, val_indices):
    train = [train_img_paths[i] for i in train_indices]
    train_masks = [train_mask_paths[i] for i in train_indices]
    training_labels = train_labels.iloc[train_indices]
    val = [train_img_paths[i] for i in val_indices]
    val_masks = [train_mask_paths[i] for i in val_indices]
    val_labels = train_labels.iloc[val_indices]
    return train, train_masks, training_labels, val, val_masks, val_labels

split, train_img_paths, train_mask_paths, train_labels = create_k_fold_split(train_path, train_csv_path)

In [None]:

def calculate_dice_score(preds, targets):
    """
    Args: 
        preds: Binary prediction mask (B, C, H, W)
        targets: Binary target mask (B, C, H, W)
    Returns:
        Mean Dice score across batch and channels
    """
    dices = []
    smooth = 1e-6
    preds = torch.sigmoid(preds)
    preds = (preds > 0.5).float()
    for i in range(targets.size(0)):
        pred_flat = preds[i].reshape(-1)
        target_flat = targets[i,0].reshape(-1)
        intersection = torch.sum(pred_flat * target_flat)
        if torch.sum(pred_flat) == 0 and torch.sum(target_flat) == 0:
            dices.append(1.0)
        elif torch.sum(pred_flat) == 0 or torch.sum(target_flat) == 0:
            dices.append(0.0)
        else:
            dices.append((2. * intersection + smooth) / (torch.sum(pred_flat) + torch.sum(target_flat) + smooth))
    return torch.mean(torch.tensor(dices)).item()

def calculate_f1_score(preds, targets):
    """
    Args: 
        preds: Binary prediction mask (B, C, H, W)
        targets: Binary target mask (B, C, H, W)
    Returns:
        Mean accuracy across batch and channels
    """
    preds = torch.sigmoid(preds)
    preds = (preds > 0.5).float()
    preds = preds.reshape(-1)
    targets = targets.reshape(-1)
    tp = torch.sum(preds * targets)
    fp = torch.sum(preds) - tp
    fn = torch.sum(targets) - tp
    precision = tp / (tp + fp + 1e-6)

    recall = tp / (tp + fn + 1e-6)
    f1_score = 2 * (precision * recall) / (precision + recall + 1e-6)
    return f1_score.item(), precision.item(), recall.item()


class Trainer:
    def __init__(self, model, optimizer, train_loader, val_loader, scheduler=None, model_name=None,lr = 0.001, fold_number=None):

        self.scheduler = scheduler
        self.hyperparameters = {
            'model_name': model_name,
            'batch_size': BATCH_SIZE,
            'learning_rate': lr,
            'epochs': EPOCHS,
            'val_step': val_step,
            'seed': seed,
            'fold_number': fold_number,
        }
        self.model = model
        self.criterion = smp.losses.DiceLoss(mode='binary', from_logits=True , smooth=1e-7)
        self.optimizer = optimizer
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.best_model_seg = None
        self.best_val_loss = float('inf')
        self.results = {
            'train_loss': [],
            'train_dice_score': [],
            'train_precision': [],
            'train_recall': [],
            'train_f1_score': [],
            'val_loss': [],
            'val_dice_score': [],
            'val_f1_score': [],
            'val_precision': [],
            'val_recall': [],
        }
    
    def train(self, num_epochs):
        for epoch in range(num_epochs):
            self.model.train()
            train_loss = 0
            output_seg = None
            dice_score = 0
            precision = 0
            recall = 0
            f1_score = 0
            val_loss_for_epoch = 0
            for i, data in enumerate(self.train_loader):
                image = data['image'].float().to(DEVICE)
                label = data['mask'].float().to(DEVICE)
                label = label[:, 0, :, :] # Select the first channel
                label = label.unsqueeze(1) # Add a channel dimension
                output_seg, loss = self.train_step(image, label) 
                output_seg = output_seg[:, 0, :, :] # Select the first channel
                output_seg = output_seg.unsqueeze(1) # Add a channel dimension     
                train_loss += loss
                dice_score += calculate_dice_score(output_seg, label)
                tuple = calculate_f1_score(output_seg, label)
                f1_score += tuple[0]
                precision += tuple[1]
                recall += tuple[2]
            train_loss /= len(self.train_loader)
            self.results['train_loss'].append(train_loss)
            self.results['train_dice_score'].append(dice_score / len(self.train_loader))
            self.results['train_f1_score'].append(f1_score / len(self.train_loader))
            self.results['train_precision'].append(precision / len(self.train_loader))
            self.results['train_recall'].append(recall / len(self.train_loader))
            print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Dice Score: {self.results["train_dice_score"][-1]:.4f}, Train F1 score: {self.results["train_f1_score"][-1]:.4f}')
            if (epoch + 1) % self.hyperparameters['val_step'] == 0:
                val_loss_for_epoch=self.validate(epoch)
            if self.scheduler:
                self.scheduler.step(val_loss_for_epoch)
            if self.best_val_loss > np.mean(self.results['val_loss'][-5:]):
                break
    
    def train_step(self, image, label):
        self.optimizer.zero_grad()
        output = self.model(image)
        output = output[:, 0, :, :] # Select the first channel
        output = output.unsqueeze(1) # Add a channel dimension
        loss = self.criterion(output, label)
        loss.backward()
        self.optimizer.step()
        return output, loss.item()

    
    def validate(self, epoch):
        self.model.eval()
        val_loss = 0
        val_dice_score = 0
        val_precision = 0
        val_recall = 0
        val_f1_score = 0
        with torch.no_grad():
            for i, data in enumerate(self.val_loader):
                image = data['image'].float().to(DEVICE)
                label = data['mask'].float().to(DEVICE)
                label = label[:, 0, :, :] # Select the first channel
                label = label.unsqueeze(1) # Add a channel dimension
                output = self.model(image)
                output = output[:, 0, :, :] # Select the first channel
                output = output.unsqueeze(1)
                loss = self.criterion(output, label)
                val_loss += loss.item()
                val_dice_score += calculate_dice_score(output, label)
                tuple = calculate_f1_score(output, label)
                val_f1_score += tuple[0]
                val_precision += tuple[1]
                val_recall += tuple[2]

            val_loss /= len(self.val_loader)
            val_dice_score /= len(self.val_loader)
            self.results['val_loss'].append(val_loss)
            self.results['val_dice_score'].append(val_dice_score)
            self.results['val_f1_score'].append(val_f1_score/ len(self.val_loader))
            self.results['val_precision'].append(val_precision/ len(self.val_loader))
            self.results['val_recall'].append(val_recall/ len(self.val_loader))
            print(f'\n Validation Loss: {val_loss:.4f}, Validation Dice Score: {val_dice_score:.4f}, Validation f1 score: {self.results['val_f1_score'][-1]:.4f}')
            if val_loss < self.best_val_loss:
                self.best_val_loss = val_loss
                self.best_model_seg = self.model.state_dict()
                self.save_model()
                print(f'Best model updated at epoch {epoch+1} with val loss: {val_loss:.4f}')
                #Add early stopping criteria

            return val_loss
    
    def save_model(self):
        filename = f'/home/biomedialab/Desktop/Codes/Assignments/MLDS/Assignment_2/Q2/models/best_seg_{self.hyperparameters['model_name']}_resnet34_model_lr_{self.hyperparameters['learning_rate']}_batch_{self.hyperparameters['batch_size']}_fold_{self.hyperparameters['fold_number']}.pth'
        torch.save(self.best_model_seg, filename)
        print(f'Model saved to {filename}')



# Add this option to your model_names list if you want to try multiple models
model_names = ['Segformer']

for name in model_names:
    lr = 0.0001
    for fold_number in range(3):
        print(f'Training model {name} for fold {fold_number+1}...')
        train_indices, val_indices = split[fold_number]
        train, train_masks, training_labels, val, val_masks, val_labels = get_splitted_data(train_indices, val_indices)
        train_dataset, val_dataset = get_datasets(train, train_masks, training_labels, val, val_masks, val_labels)
        train_loader, val_loader = get_dataloader(train_dataset, val_dataset, BATCH_SIZE)     

        if name == 'Segformer':
            model = smp.Segformer(
                encoder_name="resnet34", 
                encoder_weights="imagenet", 
                in_channels=3, 
                classes=1,
            )
        # Initialize optimizer
        optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5)
        model.to(DEVICE)
        trainer = Trainer(
            model=model,
            optimizer=optimizer,
            train_loader=train_loader,
            val_loader=val_loader,
            model_name=name,
            lr=lr,
            fold_number=fold_number,
        )
        
        trainer.train(num_epochs=EPOCHS)

In [None]:
test_data_csv = pd.read_csv(test_csv_path)
test_img_path = glob(os.path.join(test_path, 'images', '*.png'))
test_img_path.sort()

test_dataset = CustomDataset(test_img_path, None, test_data_csv, is_train=False, resize_dim=RESIZE_DIM)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [None]:
import sys
from PIL import Image
# Path where the models are stored
model_path = r'/home/biomedialab/Desktop/Codes/Assignments/MLDS/Assignment_2/Q2/models/res34'
model_paths = glob(os.path.join(model_path, 'best_seg_Segformer*.pth'))
model_paths.sort()
print(f'Model Paths: {model_paths}')

# Load test CSV to maintain ID ordering
test_csv_data = pd.read_csv(test_csv_path)
test_csv_data = test_csv_data.sort_values(by='id')
test_csv_data.reset_index(drop=True, inplace=True)


# Initialize an empty tensor to accumulate predictions
ensemble_preds = {}
name = None
# Loop through each model in the ensemble
for model_file in model_paths:
    print(f'Processing model: {model_file}')


    if 'Segformer' in model_file:
        name = 'Segformer'
        model_unet = smp.Segformer(
            encoder_name="resnet34",
            encoder_weights="imagenet",
            in_channels=3,
            classes=1)

    model_instance = model_unet
    model_instance.load_state_dict(torch.load(model_file))
    model_instance.eval()
    model_instance.to(DEVICE)
    model_preds = []

    with torch.no_grad():
        for data in test_loader:
            images = data['image'].to(DEVICE)
            ids = data['id']
            outputs = model_instance(images)
            # print(f'Output shape: {outputs.size()}')e
            probs = torch.sigmoid(outputs)
            #convert to binary prediction

            preds = (probs > 0.5).float().cpu()  # [B, 1, H, W]

            for id_val, pred_mask in zip(ids, preds):
                mask_np = pred_mask.squeeze().numpy()
                model_preds.append((id_val, pred_mask.squeeze()))

    # Sort predictions by ID
    model_preds.sort(key=lambda x: x[0])
    ids_sorted = [x[0] for x in model_preds]
    preds_sorted = [x[1] for x in model_preds]
    model_preds_tensor = torch.stack(preds_sorted)
    for id in ids_sorted:
        if id not in ensemble_preds:
            ensemble_preds[id] = model_preds_tensor[ids_sorted.index(id)]
        else:
            ensemble_preds[id] += model_preds_tensor[ids_sorted.index(id)]

# Average ensemble predictions
for id in ensemble_preds:
    ensemble_preds[id] /= len(model_paths)
    # Convert to binary mask
    ensemble_preds[id] = (ensemble_preds[id] > 0.5).float()

ensemble_output_dir = os.path.join(model_path, f'submission_masks_{name}_mobilenet_v2')
os.makedirs(ensemble_output_dir, exist_ok=True)

for id_val, pred_mask in ensemble_preds.items():
    # Convert single-channel mask to uint8 image
    mask_np = (pred_mask.squeeze().numpy() * 255).astype(np.uint8)

    mask_img = Image.fromarray(mask_np)
    
    mask_img.save(os.path.join(ensemble_output_dir,f"{id_val}"))


# Following code generates the masktorle transformation


In [None]:
import numpy as np
import os
from glob import glob
from PIL import Image
import pandas as pd

def mask2rle(img):
    #https://www.kaggle.com/code/paulorzp/rle-functions-run-lenght-encode-decode
    #img: numpy array, 1 - mask, 0 - background
    #Returns run length as string formated
    pixels= img.T.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

def rle2mask(mask_rle, shape=(256,256)):
    #mask_rle: run-length as string formated (start length)
    #shape: (width,height) of array to return 
    #Returns numpy array, 1 - mask, 0 - background
    s = mask_rle.split()
    starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]
    starts -= 1
    ends = starts + lengths
    img = np.zeros(shape[0]*shape[1], dtype=np.uint8)
    for lo, hi in zip(starts, ends):
        img[lo:hi] = 1
    return img.reshape(shape).T

img_path = r'/home/biomedialab/Desktop/Codes/Assignments/MLDS/Assignment_2/Q2/models/mobilenet_v2/submission_masks_Segformer_mobilenet_v2'
submission_path = r'/home/biomedialab/Desktop/Codes/Assignments/MLDS/Assignment_2/Q2/submission_class_resnet34.csv'
submission_path_dir = r'/home/biomedialab/Desktop/Codes/Assignments/MLDS/Assignment_2/Q2'

images = glob(os.path.join(img_path, '*.png'))

encoding = []
for image in images:
    img = Image.open(image)
    id = os.path.basename(image).split('/')[0]
    # convert  to numpy array
    img = np.array(img)
    # print(f'img shape: {img.shape}')
    encoding.append((id, mask2rle(img)))

submission_path_df = pd.read_csv(submission_path)

for id, seg_pred in encoding:
    if seg_pred:
        submission_path_df.loc[submission_path_df['id'] == id, 'segmentation_pred'] = seg_pred
    else:
        submission_path_df.loc[submission_path_df['id'] == id, 'segmentation_pred'] = 'Healthy'

submission_path_df.to_csv(os.path.join(submission_path_dir,'submission_seg_DeepLabV3Plus_mobilenet_v2_class_resnet34.csv'), index=False)
