In [2]:
!pip install git+https://github.com/albumentations-team/albumentations.git

## Check the GPU being used

In [3]:
# setting device on GPU if available, else CPU
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)
print()

#Additional Info when using cuda
if device.type == 'cuda':
    print(torch.cuda.get_device_name(0))
    print('Memory Usage:')
    print('Allocated:', round(torch.cuda.memory_allocated(0)/1024**3,1), 'GB')
    print('Cached:   ', round(torch.cuda.memory_reserved(0)/1024**3,1), 'GB')

In [4]:
import glob
train_img_dir= '../input/vesselnet-code/Data/Data/CHASE/CHASE_1000/train/image'
train_label_dir= '../input/vesselnet-code/Data/Data/CHASE/CHASE_1000/train/label'

#checking len of images data set
train_img_iter=sorted(glob.glob(train_label_dir+'/*'))
print(len(train_img_iter))
#checking len of mask data set

label_img_iter=sorted(glob.glob(train_img_dir+'/*'))
print(len(label_img_iter))

## Dataloader

In [5]:
import os
import numpy as np


from PIL import Image
from torch.utils.data import Dataset
from pathlib import Path

class IMG_Dataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform = None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.images = os.listdir(image_dir)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        img_path = os.path.join(self.image_dir, self.images[index])
        mask_path = os.path.join(self.mask_dir, self.images[index].replace('.jpg', '_mask.png'))
        image = np.array(Image.open(img_path).convert('RGB'), dtype=np.float32)
        mask = np.array(Image.open(mask_path).convert('L'), dtype=np.float32)
        mask[mask == 255.0] = 1.0

        if self.transform is  not None:
            augmentations = self.transform(image = image, mask = mask)
            image = augmentations["image"]
            mask = augmentations["mask"]

        return image, mask

## Utils


In [6]:
import torch
import torchvision

from torch.utils.data import DataLoader
# 
# from Datasets.dataloader import IMG_Dataset



def get_loaders(
        train_dir,
        train_mask_dir,
        val_dir,
        val_mask_dir,
        batch_size,
        train_transform,
        val_transform,
        num_workers=4,
        pin_memory=True
):
    train_dataset = IMG_Dataset(
        image_dir=train_dir,
        mask_dir=train_mask_dir,
        transform=train_transform
    )

    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=True
    )

    val_dataset = IMG_Dataset(
        image_dir=val_dir,
        mask_dir=val_mask_dir,
        transform=val_transform
    )

    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=pin_memory,
        shuffle=True
    )

    return train_loader, val_loader


def save_checkpoint(state, filename="Vessel_Net_checkpoint.pth.tar"):
    print("=> Saving checkpoint")
    torch.save(state, filename)


def load_checkpoint(checkpoint, model):
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint['state_dict'])

    epoch = checkpoint['epoch']

    return epoch


def save_predictions_as_imgs(loader, model, folder="saved_images/", device="cuda"):
    model.eval()
    for idx, (x, y) in enumerate(loader):
        x = x.to(device=device)
        with torch.no_grad():
            preds = torch.sigmoid(model(x))
            preds = (preds > 0.5).float()
        torchvision.utils.save_image(preds, f"{folder}/pred/{idx}.png")
        torchvision.utils.save_image(y.unsqueeze(1), f"{folder}/truth_labels/{idx}.png")
    model.train()

In [7]:
import pandas as pd
prediction = pd.DataFrame(
    columns=['Epoch_no', 'Val_Accuracy', 'Val_IoU', 'Val_Dice', 'Val_f1_score', 'Val_Precision', 'Val_Recall',
             'Val_Specificity', 'Train_Accuracy', 'Train_IoU', 'Train_Dice', 'Train_f1_scorVal_e', 'Train_Precision',
             'Train_Recall', 'Train_Specificity'])

# Mertic result in dataframe

In [8]:
def adding_metrics(epoch_no, train_accuracy, val_accuracy, train_iou, val_iou, train_dice, val_dice, train_f1_score,
                   val_f1_score, train_precision, val_precision, train_recall, val_recall, train_specificity,
                   val_specificity, train_mcc, val_mcc, train_loss, val_loss, load_model, metrics_dir):
    global prediction
    if bool(load_model):
        prediction = pd.read_csv(metrics_dir, index_col=False)

    new_row = {'Epoch_no': epoch_no,

               'Val_Accuracy': val_accuracy,
               'Val_IoU': val_iou,
               'Val_Dice': val_dice,
               'Val_f1_score': val_f1_score,
               'Val_Precision': val_precision,
               'Val_Recall': val_recall,
               'Val_Specificity': val_specificity,
               'Val_MCC': val_mcc,

               'Train_Accuracy': train_accuracy,
               'Train_IoU': train_iou,
               'Train_Dice': train_dice,
               'Train_f1_score': train_f1_score,
               'Train_Precision': train_precision,
               'Train_Recall': train_recall,
               'Train_Specificity': train_specificity,
               'Train_MCC': train_mcc,

               'Train_loss': train_loss,
               'Val_loss': val_loss,
               }
    prediction = prediction.append(new_row, ignore_index=True)
    convert_to_csv(prediction, metrics_dir)
    return prediction

# Testing of metrics scripts

In [9]:
import numpy
import torch
import torch.nn as nn
import torch.nn.functional as F

class DiceLoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        # comment out if your model contains a sigmoid or equivalent activation layer
        inputs = F.sigmoid(inputs)

        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice = (2. * intersection + smooth) / (inputs.sum() + targets.sum() + smooth)
        
        return 1 - dice


class DiceBCELoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceBCELoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        # comment out if your model contains a sigmoid or equivalent activation layer
        inputs = F.sigmoid(inputs)

        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice_loss = 1 - (2. * intersection + smooth) / (inputs.sum() + targets.sum() + smooth)
        BCE = F.binary_cross_entropy(inputs, targets, reduction='mean')
        Dice_BCE = BCE + dice_loss

        return Dice_BCE


class IoULoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(IoULoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        # comment out if your model contains a sigmoid or equivalent activation layer
        inputs = F.sigmoid(inputs)

        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        # intersection is equivalent to True Positive count
        # union is the mutually inclusive area of all labels & predictions
        intersection = (inputs * targets).sum()
        total = (inputs + targets).sum()
        union = total - intersection

        IoU = (intersection + smooth) / (union + smooth)

        return 1 - IoU


# Focal Loss

# PyTorch
ALPHA = 0.8
GAMMA = 2


class FocalLoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(FocalLoss, self).__init__()

    def forward(self, inputs, targets, alpha=ALPHA, gamma=GAMMA, smooth=1):
        # comment out if your model contains a sigmoid or equivalent activation layer
        inputs = F.sigmoid(inputs)

        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        # first compute binary cross-entropy
        BCE = F.binary_cross_entropy(inputs, targets, reduction='mean')
        BCE_EXP = torch.exp(-BCE)
        focal_loss = alpha * (1 - BCE_EXP) ** gamma * BCE

        return focal_loss


# Tversky Loss
ALPHA = 0.5
BETA = 0.5


class TverskyLoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(TverskyLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1, alpha=ALPHA, beta=BETA):
        # comment out if your model contains a sigmoid or equivalent activation layer
        inputs = F.sigmoid(inputs)

        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        # True Positives, False Positives & False Negatives
        TP = (inputs * targets).sum()
        FP = ((1 - targets) * inputs).sum()
        FN = (targets * (1 - inputs)).sum()

        Tversky = (TP + smooth) / (TP + alpha * FP + beta * FN + smooth)

        return 1 - Tversky


# Focal Tversky Loss
ALPHA = 0.5
BETA = 0.5
GAMMA = 1


class FocalTverskyLoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(FocalTverskyLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1, alpha=ALPHA, beta=BETA, gamma=GAMMA):
        # comment out if your model contains a sigmoid or equivalent activation layer
        inputs = F.sigmoid(inputs)

        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        # True Positives, False Positives & False Negatives
        TP = (inputs * targets).sum()
        FP = ((1 - targets) * inputs).sum()
        FN = (targets * (1 - inputs)).sum()

        Tversky = (TP + smooth) / (TP + alpha * FP + beta * FN + smooth)
        FocalTversky = (1 - Tversky) ** gamma

        return FocalTversky



# Combo Loss
ALPHA = 0.5  # < 0.5 penalises FP more, > 0.5 penalises FN more
CE_RATIO = 0.5  # weighted contribution of modified CE loss compared to Dice loss


class ComboLoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(ComboLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1, alpha=ALPHA, beta=BETA, eps=1e-9):
        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        # True Positives, False Positives & False Negatives
        intersection = (inputs * targets).sum()
        dice = (2. * intersection + smooth) / (inputs.sum() + targets.sum() + smooth)

        inputs = torch.clamp(inputs, eps, 1.0 - eps)
        out = - (ALPHA * ((targets * torch.log(inputs)) + ((1 - ALPHA) * (1.0 - targets) * torch.log(1.0 - inputs))))
        weighted_ce = out.mean(-1)
        combo = (CE_RATIO * weighted_ce) - ((1 - CE_RATIO) * dice)

        return combo

In [26]:
import torch
import numpy as np
import torchmetrics
import torchmetrics.functional
import pandas as pd
from sklearn import datasets, metrics
import matplotlib.pyplot as plt
import seaborn as sns
import os
from torch._C import _valgrind_toggle

step = 0


def dice_score(preds, y):
    return (2 * (preds * y).sum()) / (preds + y).sum() + 1e-8


def num_correct_pixels(preds, y):
    return (preds == y).sum()


def num_total_pixels(preds):
    return torch.numel(preds)


def accuracy_score(num_correct, num_pixels):
    return num_correct / num_pixels * 100


def iou(preds, y):
    preds = preds.type(torch.int)
    y = y.type(torch.int)
    intersection = np.logical_and(y, preds)
    union = np.logical_or(y, preds)
    iou_score = intersection.sum() / union.sum()
    return iou_score


def precision(preds, y):
    return torchmetrics.functional.precision(preds, y, num_classes=2, mdmc_average='global')


def recall(preds, y):
    return torchmetrics.functional.recall(preds, y, num_classes=2, mdmc_average='global')


def f1(preds, y):
    return torchmetrics.functional.f1(preds, y, num_classes=2, mdmc_average='global')


def specificity(preds, y):
    return torchmetrics.functional.specificity(preds, y, num_classes=2, mdmc_average='global')


def mcc(preds, y):
    return torchmetrics.functional.matthews_corrcoef(preds, y, num_classes=2)


def make_csv_copy(metrics_dir, prev_metrics_csv_dir):
    metrics_df_old = pd.read_csv(prev_metrics_csv_dir, index_col=False)
    convert_to_csv(metrics_df_old, metrics_dir)


def check_metrics(train_loader, val_loader, model, writer, epoch_no,
                  last_epoch, loss_fn, train_loss, load_model,
                  device="cuda", metrics_dir='./new_metrics.csv',
                  prev_metrics_csv_dir='./prev_metric.csv'):
    # global variables definitions
    global step
    global metrics_dir_path
    metrics_dir_path = metrics_dir
    batch_num_correct = 0
    batch_num_pixels = 0
    val_batch_dice_score = 0
    val_total_iou_score = 0
    val_total_precision = 0
    val_total_recall = 0
    val_total_f1 = 0
    val_total_specificity = 0
    val_total_mcc = 0

    if load_model == True:
        if (not os.path.isfile(metrics_dir)):
            make_csv_copy(metrics_dir, prev_metrics_csv_dir)

    # start of model evaluation
    model.eval()
    with torch.no_grad():
        for x, y in val_loader:
            x = x.to(device)  # storing the input image
            y = y.to(device).unsqueeze(1)  # storing the original mask
            # converting the predictions to a binary map
            preds = torch.sigmoid(model(x))
            predictions = model(x)
            val_loss = loss_fn(predictions, y).item()

            batch_num_pixels += num_total_pixels((preds > 0.5).float())
            y, preds = y.cpu(), (preds > 0.5).float().cpu()

            batch_num_correct += num_correct_pixels((preds > 0.5).float(), y)
            val_batch_dice_score += dice_score((preds > 0.5).float(), y)
            val_total_iou_score += iou((preds > 0.5).float(), y)

            y = y.type(torch.int)
            val_total_f1 += f1((preds > 0.5).float().type(torch.int), y)
            val_total_precision += precision((preds > 0.5).float().type(torch.int), y)
            val_total_recall += recall((preds > 0.5).float().type(torch.int), y)
            val_total_specificity += specificity((preds > 0.5).float().type(torch.int), y)
            val_total_mcc += mcc((preds > 0.5).float().type(torch.int), y)

    val_acc = accuracy_score(batch_num_correct, batch_num_pixels)
    writer["writer"].add_scalar("Training Accuracy", val_acc, global_step=step)
    step += 1
    print('VALIDATION SCORES')
    print(f"Got {batch_num_correct}/{batch_num_pixels} with val_acc {val_acc}")

    print(f"mean IoU score: {val_total_iou_score / len(val_loader)}")

    print(f"mean Dice Score: {val_batch_dice_score / len(val_loader)}")

    print(f"torch-metrics mean f1_score: {val_total_f1 / len(val_loader)}")

    print(f"torch-metrics mean precision: {val_total_precision / len(val_loader)}")

    print(f"torch-metrics mean recall: {val_total_recall / len(val_loader)}")

    print(f"torch-metrics mean specificity: {val_total_specificity / len(val_loader)}")

    print(f"Custom MCC metrics value:{val_total_mcc / len(val_loader)}")

    # global variables definitions
    batch_num_correct = 0
    batch_num_pixels = 0
    train_batch_dice_score = 0
    train_total_iou_score = 0
    train_total_precision = 0
    train_total_recall = 0
    train_total_f1 = 0
    train_total_specificity = 0
    train_total_mcc = 0
    # start of model evaluation
    model.eval()
    with torch.no_grad():
        for x, y in train_loader:
            x = x.to(device)  # storing the input image
            y = y.to(device).unsqueeze(1)  # storing the original mask
            # converting the predictions to a binary map
            preds = torch.sigmoid(model(x))
            predictions = model(x)
            # train_loss = loss_fn(predictions, y).item()

            batch_num_pixels += num_total_pixels((preds > 0.5).float())
            y, preds = y.cpu(), (preds > 0.5).float().cpu()

            batch_num_correct += num_correct_pixels((preds > 0.5).float(), y)
            train_batch_dice_score += dice_score((preds > 0.5).float(), y)
            train_total_iou_score += iou((preds > 0.5).float(), y)

            y = y.type(torch.int)
            train_total_f1 += f1((preds > 0.5).float().type(torch.int), y)
            train_total_precision += precision((preds > 0.5).float().type(torch.int), y)
            train_total_recall += recall((preds > 0.5).float().type(torch.int), y)
            train_total_specificity += specificity((preds > 0.5).float().type(torch.int), y)
            train_total_mcc += mcc((preds > 0.5).float().type(torch.int), y)

    train_acc = accuracy_score(batch_num_correct, batch_num_pixels)

    print('TRAINING SCORES')

    print(f"Got {batch_num_correct}/{batch_num_pixels} with train_acc {train_acc}")

    print(f"mean IoU score: {train_total_iou_score / len(train_loader)}")

    print(f"mean Dice Score: {train_batch_dice_score / len(train_loader)}")

    print(f"torch-metrics mean f1_score: {train_total_f1 / len(train_loader)}")

    print(f"torch-metrics mean precision: {train_total_precision / len(train_loader)}")

    print(f"torch-metrics mean recall: {train_total_recall / len(train_loader)}")

    print(f"torch-metrics mean specificity: {train_total_specificity / len(train_loader)}")

    print(f"Custom MCC metrics value:{train_total_mcc / len(train_loader)}")

    # Adding Metrics to CSV
    print(adding_metrics(epoch_no=int(epoch_no),

                         val_accuracy=val_acc.detach().cpu().numpy(),
                         val_iou=(val_total_iou_score.detach().cpu().numpy() / len(val_loader)),
                         val_dice=(val_batch_dice_score.detach().cpu().numpy() / len(val_loader)),
                         val_f1_score=(val_total_f1.detach().cpu().numpy() / len(val_loader)),
                         val_precision=(val_total_precision.detach().cpu().numpy() / len(val_loader)),
                         val_recall=(val_total_recall.detach().cpu().numpy() / len(val_loader)),
                         val_specificity=(val_total_specificity.detach().cpu().numpy() / len(val_loader)),
                         val_mcc=(val_total_mcc.detach().cpu().numpy() / len(val_loader)),

                         train_accuracy=train_acc.detach().cpu().numpy(),
                         train_iou=(train_total_iou_score.detach().cpu().numpy() / len(train_loader)),
                         train_dice=(train_batch_dice_score.detach().cpu().numpy() / len(train_loader)),
                         train_f1_score=(train_total_f1.detach().cpu().numpy() / len(train_loader)),
                         train_precision=(train_total_precision.detach().cpu().numpy() / len(train_loader)),
                         train_recall=(train_total_recall.detach().cpu().numpy() / len(train_loader)),
                         train_specificity=(train_total_specificity.detach().cpu().numpy() / len(train_loader)),
                         train_mcc=(train_total_mcc.detach().cpu().numpy() / len(train_loader)),

                         train_loss=train_loss,
                         val_loss=val_loss,

                         load_model=load_model,
                         metrics_dir=metrics_dir,
                         )
          )

    # Plotting the ROC and Precision vs recall curves
    preds = preds.numpy().ravel()
    y = y.numpy().ravel()
#    Results_dataframe = pd.read_csv(metrics_dir, index_col=False)
#     plotting_metrics(Results_dataframe)
#     plot_loss(Results_dataframe)
    roc_curve_plot(y, preds)
    precision_recall_curve_plot(y, preds)
    model.train()  # end of model evaluation



In [11]:
from sklearn import datasets, metrics
import matplotlib.pyplot as plt
import numpy as np

def roc_curve_plot(y_true, y_preds):
    
    fpr, tpr, thresholds = metrics.roc_curve(y_true, y_preds)
    AUC_ROC = metrics.roc_auc_score(y_true, y_preds)
    # test_integral = np.trapz(tpr,fpr) #trapz is numpy integration
    print ("\nArea under the ROC curve: " +str(AUC_ROC))
    roc_curve =plt.figure()
    plt.plot(fpr,tpr,'-',label='Area Under the Curve (AUC = %0.4f)' % AUC_ROC)
    plt.title('ROC curve')
    plt.xlabel("FPR (False Positive Rate)")
    plt.ylabel("TPR (True Positive Rate)")
    plt.legend(loc="lower right")
    
    plt.savefig('./metrics_plots/roc_curve_plot.png', dpi=300, bbox_inches='tight')
    plt.close()

    
def precision_recall_curve_plot(y_true, y_preds):
    
    precision, recall, thresholds = metrics.precision_recall_curve(y_true, y_preds)
    precision = np.fliplr([precision])[0]  #so the array is increasing (you won't get negative AUC)
    recall = np.fliplr([recall])[0]  #so the array is increasing (you won't get negative AUC)
    AUC_prec_rec = np.trapz(precision,recall)
    print ("\nArea under Precision-Recall curve: " +str(AUC_prec_rec))
    prec_rec_curve = plt.figure()
    plt.plot(recall,precision,'-',label='Area Under the Curve (AUC = %0.4f)' % AUC_prec_rec)
    plt.title('Precision - Recall curve')
    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.legend(loc="lower right")
    
    plt.savefig('./metrics_plots/precision_recall_curve_plot.png', dpi=300, bbox_inches='tight')
    plt.close()




In [12]:
# y = np.array([1, 1, 2, 2])
# scores = np.array([0.1, 0.4, 0.35, 0.8])
# roc_curve_plot(y,scores)
# precision_recall_curve_plot(y,scores)

In [13]:
## Model def

In [14]:
import torch.nn as nn
import torchvision.transforms.functional as TF


class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=out_channels,
                      kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False),
            nn.Dropout2d(p=0.1, inplace=True),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),

            nn.Conv2d(in_channels=out_channels, out_channels=out_channels,
                      kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False),

            nn.Dropout2d(p=0.1, inplace=True),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
    def forward(self, x):
        return self.conv(x)


class SingleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(SingleConv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=out_channels,
                      kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False),
            nn.Dropout2d(p=0.1, inplace=True),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )
    def forward(self, x):
        x=self.conv(x)

        return x

In [15]:
import numpy as np
import torch
from torch import nn
from torch.nn import init


class ChannelAttention(nn.Module):
    def __init__(self, channel, reduction=16):
        super().__init__()
        self.maxpool = nn.AdaptiveMaxPool2d(1)
        self.avgpool = nn.AdaptiveAvgPool2d(1)
        self.se = nn.Sequential(
            nn.Conv2d(channel, channel // reduction, 1, bias=False),
            nn.ReLU(),
            nn.Conv2d(channel // reduction, channel, 1, bias=False)
        )
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        max_result = self.maxpool(x)
        avg_result = self.avgpool(x)
        max_out = self.se(max_result)
        avg_out = self.se(avg_result)
        output = self.sigmoid(max_out + avg_out)
        return output


class SpatialAttention(nn.Module):
    def __init__(self, kernel_size=7):
        super().__init__()
        self.conv = nn.Conv2d(2, 1, kernel_size=kernel_size, padding=kernel_size // 2)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        max_result, _ = torch.max(x, dim=1, keepdim=True)
        avg_result = torch.mean(x, dim=1, keepdim=True)
        result = torch.cat([max_result, avg_result], 1)
        output = self.conv(result)
        output = self.sigmoid(output)
        return output


class CBAMBlock(nn.Module):

    def __init__(self, channel=512, reduction=16, kernel_size=49):
        super().__init__()
        self.ca = ChannelAttention(channel=channel, reduction=reduction)
        self.sa = SpatialAttention(kernel_size=kernel_size)

    def init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                init.kaiming_normal_(m.weight, mode='fan_out')
                if m.bias is not None:
                    init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                init.constant_(m.weight, 1)
                init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                init.normal_(m.weight, std=0.001)
                if m.bias is not None:
                    init.constant_(m.bias, 0)

    def forward(self, x):
        b, c, _, _ = x.size()
        residual = x
        out = x * self.ca(x)
        out = out * self.sa(out)
        return out + residual

In [16]:
# Import the libraries
import torch
import torch.nn as nn
import torchvision.transforms.functional as TF


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Defining the main Encoder decoder model using the double convolution class
class Vessel_net(nn.Module):
    def __init__(self, in_channels=3, out_channels=1, features=[32,64,128]):
        super(Vessel_net, self).__init__()
        self.downs = nn.ModuleList()
        self.ups = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.singleconv1 = SingleConv(in_channels=features[-1], out_channels=features[-1]*2)
        self.singleconv2 = SingleConv(in_channels=features[-1]*2, out_channels=features[-1]*2)

        # encoder part (down part)
        for feature in features:
            self.downs.append(DoubleConv(in_channels=in_channels, out_channels=feature))
            in_channels = feature

        # decoder part (up part)
        for feature in reversed(features):
            self.ups.append(
                nn.ConvTranspose2d(in_channels=feature * 2,
                                   out_channels=feature,
                                   kernel_size=2,
                                   stride=2
                                   )
            )
            self.ups.append(
                DoubleConv(in_channels=feature * 2, out_channels=feature)
            )

        self.bottleneck = CBAMBlock(channel=256)
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)

    def forward(self, x):
        skip_connections = []
        for down in self.downs:
            x = down(x)
            skip_connections.append(x)
            x = self.pool(x)


        x=self.singleconv1(x)
        x = self.bottleneck(x)
        x=self.singleconv2(x)
        #print('size before entering the decoder ' + str(x.size()))
        # reverse the skip connection list
        skip_connections = skip_connections[::-1]

        for idx in range(0, len(self.ups), 2):
            x = self.ups[idx](x)
            skip_connection = skip_connections[idx // 2]

            if x.shape != skip_connection.shape:
                x = TF.resize(x, size=skip_connection.shape[2:])

            concat_skip = torch.cat((skip_connection, x), dim=1)
            x = self.ups[idx + 1](concat_skip)
        x=self.final_conv(x)
        return x


def test():
    x = torch.randn((5, 3, 960, 960)).to(device)
    model = Vessel_net(in_channels=3, out_channels=1).to(device)
    preds = model(x).to(device)
    total_params = sum(p.numel() for p in model.parameters())
    #print(model)
    print("Shape of the Input image"+str(x.shape))
    print("shape of the output segmentation map"+str(preds.shape))

    print("Number of Model Parameters:", total_params)



if __name__ == '__main__':
    test()


## training

In [17]:
torch.cuda.empty_cache()

In [18]:
def forward_for_train(loss_func, target):
     with torch.cuda.amp.autocast():
            predictions = model(data)
            loss = loss_fn(predictions, targets)
            return loss
        
def backward_for_train(loss, optimizer):
    global running_loss
    optimizer.zero_grad()
    scaler.scale(loss).backward()
    scaler.step(optimizer)
    scaler.update()

    img_grid = torchvision.utils.make_grid(data)
    writer.add_image("Input_image", img_grid)
    # writer.add_histogram("fc1", model.fc1.weight)
    writer.add_scalar("Training Loss", loss, global_step=step)
        
    loop.set_postfix(loss=loss.item())
    running_loss += loss.item()
       
    return running_loss

In [19]:
import os

import torch
import torchvision
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.tensorboard.writer import SummaryWriter
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim

torch.cuda.empty_cache()
# loss_list = pd.DataFrame(columns=['Epoch_no','FocalTverskyLoss'])

train_loss = 10000

step = 0

lr=1e-5
batch_size=8
device="cuda"
epochs= 50
num_workers=2
height=480
width= 480
pin_memory=True
load_model=False

train_dir="../input/vesselnet-code/Data/Data/CHASE/CHASE_1000/train/image"
train_mask="../input/vesselnet-code/Data/Data/CHASE/CHASE_1000/train/label"
val_dir="../input/vesselnet-code/Data/Data/CHASE/CHASE_1000/validate/image"
val_mask="../input/vesselnet-code/Data/Data/CHASE/CHASE_1000/validate/label"
test_dir="../input/vesselnet-code/Data/Data/CHASE/CHASE_1000/test/image"
test_mask="../input/vesselnet-code/Data/Data/CHASE/CHASE_1000/test/label"
load_weights="./Vessel_Net_checkpoint.pth.tar"
prev_metrics_csv_dir='./prev_metrics.csv'
metrics_csv_dir = './new_metrics.csv'

    
def train_fn(loader, model, optimizer, loss_fn, scaler, writer, epoch):
    global train_loss
    global step
    loop = tqdm(loader)
    
    for batch_idx, (data, targets) in enumerate(loop):
        data = data.to(device=device)
        targets = targets.float().unsqueeze(1).to(device=device)
        
        # forward
        with torch.cuda.amp.autocast():
            predictions = model(data)
            train_loss = loss_fn(predictions, targets)

        # backward
        optimizer.zero_grad()
        scaler.scale(train_loss).backward()
        scaler.step(optimizer)
        scaler.update()

        img_grid = torchvision.utils.make_grid(data)
        writer.add_image("Input_image", img_grid)
        # writer.add_histogram("fc1", model.fc1.weight)
        writer.add_scalar("Training Loss", train_loss, global_step = step)
        
        # update tqdm loop
        loop.set_postfix(loss=train_loss.item())
        
        step += 1
        
#     train_loss=running_loss/len(loader)
#     new_loss = {'Epoch_no':epoch,'FocalTverskyLoss':train_loss}
#     loss_list = loss_list.append(new_loss, ignore_index=True)
    
    
    
def main():
    global step,prediction
    
    if(os.path.isfile('metrics.csv')):
        prediction = pd.read_csv('metrics.csv')
        prediction.drop(prediction.columns[prediction.columns.str.contains('unnamed',case = False)],axis = 1, inplace = True)
        
    torch.cuda.empty_cache()
    train_transform = A.Compose(
        [
            A.Resize(height=height, width=width),
            A.Normalize(
                mean=[0.0, 0.0, 0.0],
                std=[1.0, 1.0, 1.0],
                max_pixel_value=255.0,
            ),
            ToTensorV2(),
        ],
    )

    val_transforms = A.Compose(
        [
            A.Resize(height=height, width=width),
            A.Normalize(
                mean=[0.0, 0.0, 0.0],
                std=[1.0, 1.0, 1.0],
                max_pixel_value=255.0,
            ),
            ToTensorV2(),
        ],
    )
    model = Vessel_net(in_channels=3, out_channels=1).to(device)

    train_loader, val_loader = get_loaders(
        train_dir,
        train_mask,
        val_dir,
        val_mask,
        batch_size,
        train_transform,
        val_transforms,
        num_workers,
        pin_memory,
    )

    # since no sigmoid on the output of the model we use with logits
    loss_fn = nn.BCEWithLogitsLoss() # for multiclass classification use cross entropy loss and change  #out_channels
    
    
    optimizer = optim.Adam(model.parameters(), lr=lr,weight_decay=1e-04)
    writer = SummaryWriter(
       f"runs/Dataset/Minibatch {batch_size} LR {lr}" 
    )
    
    
    images, _ = next(iter(train_loader))
    writer.add_graph(model, images.to(device))
    writer.close()

    PREV_EPOCHS = 0
    
    if load_model:
        PREV_EPOCHS = load_checkpoint(torch.load(
            load_weights),
            model)
    print("Loaded Model Metrics")

    scaler = torch.cuda.amp.GradScaler()

    for epoch in range(PREV_EPOCHS+1,epochs+1):
        print("Epoch : " + str(epoch))
        train_fn(train_loader, model, optimizer, loss_fn, scaler, writer=writer, epoch=int(epoch))
    
        if epoch == (epochs):
            last_epoch = True
            checkpoint = {
                    "epoch": epoch,
                    "state_dict": model.state_dict(),
                    "optimizer": optimizer.state_dict(),
                    "loss": loss_fn,
            }

            # save model
            checkpoint_name = './' + model_name + '_' + dataset_name + '_Epochs_' + str(epoch) + '_MODEL.pth.tar'
            save_checkpoint(checkpoint, checkpoint_name)

            # check accuracy
            try:
                os.mkdir('validation_saved_images')
                os.mkdir('validation_saved_images/pred')
                os.mkdir('validation_saved_images/truth_labels')
            except:
                print("Results directory already created")
                pass
            save_predictions_as_imgs(val_loader, model, folder="validation_saved_images", device=device)
        else:
            last_epoch = False
        
        print("Epoch Metrics are being printed for epoch num :" + str(epoch))

        check_metrics(train_loader,
                      val_loader,
                      model,
                      device=device,
                      epoch_no=int(epoch),
                      last_epoch=last_epoch,
                      loss_fn=loss_fn,
                      train_loss=train_loss.item(),
                      load_model=bool(load_model),
                      writer={"writer": writer, "step": step},
                      metrics_dir=metrics_csv_dir,
                      prev_metrics_csv_dir=prev_metrics_csv_dir
                      )
        step += 1




  



In [20]:
# import os
# import argparse
# import torch
# import torchvision
# import albumentations as A
# from albumentations.pytorch import ToTensorV2
# from torch.utils.tensorboard.writer import SummaryWriter
# from tqdm import tqdm
# import torch.nn as nn
# import torch.optim as optim

# # from models.VesselNet import Vessel_net
# # from results.metrics import check_metrics
# # from utils import load_checkpoint, save_checkpoint, get_loaders, save_predictions_as_imgs

# torch.cuda.empty_cache()

# train_loss = 10000

# step = 0

# train_dir="../input/vesselnet-code/Data/Data/CHASE/CHASE_1000/train/image"
# train_mask="../input/vesselnet-code/Data/Data/CHASE/CHASE_1000/train/label"
# val_dir="../input/vesselnet-code/Data/Data/CHASE/CHASE_1000/validate/image"
# val_mask="../input/vesselnet-code/Data/Data/CHASE/CHASE_1000/validate/label"
# test_dir="../input/vesselnet-code/Data/Data/CHASE/CHASE_1000/test/image"
# test_mask="../input/vesselnet-code/Data/Data/CHASE/CHASE_1000/test/label"
# load_weights="./Vessel_Net_checkpoint.pth.tar"




# # USIng Mixed precision training (FP-16 used )
# def train_fn(loader, model, optimizer, loss_fn, scaler, args, writer):
#     global train_loss
#     global step
#     loop = tqdm(loader)
#     for batch_idx, (data, targets) in enumerate(loop):
#         data = data.to(device=args.device)
#         targets = targets.float().unsqueeze(1).to(device=args.device)

#         # forward
#         with torch.cuda.amp.autocast():
#             predictions = model(data)
#             train_loss = loss_fn(predictions, targets)

#         # backward
#         optimizer.zero_grad()
#         scaler.scale(train_loss).backward()
#         scaler.step(optimizer)
#         scaler.update()

#         img_grid = torchvision.utils.make_grid(data)
#         writer.add_image("Input_image", img_grid)
#         # writer.add_histogram("fc1", model.fc1.weight)
#         writer.add_scalar("Training Loss", train_loss, global_step=step)

#         # update tqdm loop
#         loop.set_postfix(loss=train_loss.item())

#         step += 1


# def main(args):
#     global step
#     torch.cuda.empty_cache()
#     train_transform = A.Compose(
#         [
#             A.Resize(height=args.height, width=args.width),
#             A.Normalize(
#                 mean=[0.0, 0.0, 0.0],
#                 std=[1.0, 1.0, 1.0],
#                 max_pixel_value=255.0,
#             ),
#             ToTensorV2(),
#         ],
#     )

#     val_transforms = A.Compose(
#         [
#             A.Resize(height=args.height, width=args.width),
#             A.Normalize(
#                 mean=[0.0, 0.0, 0.0],
#                 std=[1.0, 1.0, 1.0],
#                 max_pixel_value=255.0,
#             ),
#             ToTensorV2(),
#         ],
#     )
#     model = Vessel_net(in_channels=3, out_channels=1).to(args.device)

#     train_loader, val_loader = get_loaders(
#         args.train_dir,
#         args.train_mask,
#         args.val_dir,
#         args.val_mask,
#         args.batch_size,
#         train_transform,
#         val_transforms,
#         args.num_workers,
#         args.pin_memory,
#     )

#     # since no sigmoid on the output of the model we use with logits
#     loss_fn = nn.BCEWithLogitsLoss()  # for multiclass classification use cross entropy loss and change  #out_channels
#     optimizer = optim.Adam(model.parameters(), lr=args.lr,weight_decay=1e-04)
#     writer = SummaryWriter(
#         f"runs/Dataset/Minibatch {args.batch_size} LR {args.lr}"
#     )

#     images, _ = next(iter(train_loader))
#     writer.add_graph(model, images.to(args.device))
#     writer.close()


#     PREV_EPOCHS = 0


#     if args.load_model:
#         PREV_EPOCHS = load_checkpoint(torch.load(
#             args.load_weights),
#             model)
#         print("Loaded Model Metrics")
#         #print("Epoch Metrics are being printed for epoch num :"+str(PREV_EPOCHS))
#         #check_metrics(loader=val_loader, model=model, device=args.device,epoch_no=PREV_EPOCHS, writer={"writer": writer, "step": step},last_epoch= False,load_model= args.load_model)


#     scaler = torch.cuda.amp.GradScaler()


#     for epoch in range(PREV_EPOCHS+1,args.epochs+1):

#         print("Epoch : " + str(epoch))

#         train_fn(train_loader, model, optimizer, loss_fn, scaler, args, writer=writer)

#         # save model and predictions
#         if epoch == (args.epochs):
#             last_epoch = True
#             checkpoint = {
#                 "epoch": epoch,
#                 "state_dict": model.state_dict(),
#                 "optimizer": optimizer.state_dict(),
#                 "loss": loss_fn,
#             }


#             checkpoint_name = './'+args.model_name+'_'+args.dataset_name+'_Epochs_' + str(epoch) + '_MODEL.pth.tar'
#             save_checkpoint(checkpoint, checkpoint_name)
#             try:
#                 os.mkdir('validation_saved_images')
#                 os.mkdir('validation_saved_images/pred')
#                 os.mkdir('validation_saved_images/truth_labels')
#             except:
#                 print("Results directory already created")
#                 pass
#             save_predictions_as_imgs(val_loader, model, folder="validation_saved_images", device=args.device)
#         else:
#             last_epoch = False
#         #CHECK METRICS
#         print("Epoch Metrics are being printed for epoch num :" + str(epoch))

#         check_metrics(train_loader,
#                       val_loader,
#                       model,
#                       device=args.device,
#                       epoch_no=int(epoch),
#                       last_epoch=last_epoch,
#                       loss_fn=loss_fn,
#                       train_loss=train_loss.item(),
#                       load_model=bool(args.load_model),
#                       writer={"writer": writer, "step": step},
#                       metrics_dir=args.metrics_csv_dir,
#                       prev_metrics_csv_dir=args.prev_metrics_csv_dir
#                       )
#         step += 1




    


In [21]:
# if __name__ == "__main__":
#     parser = argparse.ArgumentParser()
#     parser.add_argument("--model_name", default="VesselNet", type=str, help="Name of the saved model")
#     parser.add_argument("--dataset_name", default="CHASE", type=str, help="Name of the data set used")
#     parser.add_argument("--lr", default=1e-3, type=float, help="Learning Rate for training")
#     parser.add_argument("--batch_size", default=4, type=int, help="Batch Size for training")
#     parser.add_argument("--device", default="cuda", help="cuda or cpu")
#     parser.add_argument("--epochs", default=5, type=int, help="Number of Epochs")
#     parser.add_argument("--num_workers", default=2, help="Number of workers")
#     parser.add_argument("--height", default=256, type=int, help="Input Image Height")
#     parser.add_argument("--width", default=56, type=int, help="Input Image width")
#     parser.add_argument("--pin_memory", default=True, help="Pin Memory")
#     parser.add_argument("--load_model",type=bool, default=False, help="Load Pretrained Model")
#     parser.add_argument("--train_dir", default=train_dir, type=str,
#                         help="Training images directory")
#     parser.add_argument("--train_mask", default=train_mask, type=str,
#                         help="Training mask directory")
#     parser.add_argument("--val_dir", default=val_dir, type=str,
#                         help="Validation Image directory")
#     parser.add_argument("--val_mask", type=str, default=val_mask,
#                         help="Validation label directory")
#     parser.add_argument("--test_dir", default=test_dir, type=str,
#                         help="Test image directory")
#     parser.add_argument("--test_mask", default=test_mask, type=str,
#                         help="Test mask directory")
#     parser.add_argument("--load_weights", default="trained.pth.tar", type=str, help="Add training weight path.")
#     parser.add_argument("--metrics_csv_dir",default="./new_metrics.csv",type=str,help="File path to the metrics csv file.")
#     parser.add_argument("--prev_metrics_csv_dir", default="./prev_metrics.csv", type=str,
#                         help="File path to the metrics csv file of the prev loaded model.")
# args = parser.parse_args()


# Saving to csv

In [22]:
def convert_to_csv(prediction, file_name):
    if(os.path.isfile(file_name)):
        os.remove(file_name)
    prediction.to_csv(file_name, header=True)


In [23]:
import seaborn as sns
def plotting_metrics(Results_dataframe):
    try:
        os.mkdir('./metrics_plots')

    except:
        print(" Metrics Plots directory already created")
        pass

    # accuracy
    fig, ax = plt.subplots(figsize=(8, 8))
    sns.lineplot(x='Epoch_no', y='Train_Accuracy', data=Results_dataframe, color='red')
    sns.lineplot(x='Epoch_no', y='Val_Accuracy', data=Results_dataframe, color='blue')
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.title("Accuracy Graph")
    plt.legend(["Train", "Val"], loc="upper right")

    plt.savefig('./metrics_plots/accuracy.png', dpi=300, bbox_inches='tight')
    plt.close()

    # IOU
    fig, ax = plt.subplots(figsize=(8, 8))
    sns.lineplot(x='Epoch_no', y='Train_IoU', data=Results_dataframe, color='red')
    sns.lineplot(x='Epoch_no', y='Val_IoU', data=Results_dataframe, color='blue')
    plt.xlabel("Epoch")
    plt.ylabel("IoU")
    plt.title("IoU Score")
    plt.legend(["Train", "Val"], loc="upper right")

    plt.savefig('./metrics_plots/IoU.png', dpi=300, bbox_inches='tight')
    plt.close()

    # Dice
    fig, ax = plt.subplots(figsize=(8, 8))
    sns.lineplot(x='Epoch_no', y='Train_Dice', data=Results_dataframe, color='red')
    sns.lineplot(x='Epoch_no', y='Val_Dice', data=Results_dataframe, color='blue')
    plt.xlabel("Epoch")
    plt.ylabel("Dice")
    plt.title("Dice")
    plt.legend(["Train", "Val"], loc="upper right")

    plt.savefig('./metrics_plots/dice.png', dpi=300, bbox_inches='tight')
    plt.close()

    # f1_score
    fig, ax = plt.subplots(figsize=(8, 8))
    sns.lineplot(x='Epoch_no', y='Train_f1_score', data=Results_dataframe, color='red')
    sns.lineplot(x='Epoch_no', y='Val_f1_score', data=Results_dataframe, color='blue')
    plt.xlabel("Epoch")
    plt.ylabel("F1 score")
    plt.title("F1 score")
    plt.legend(["Train", "Val"], loc="upper right")

    plt.savefig('./metrics_plots/f1_score.png', dpi=300, bbox_inches='tight')
    plt.close()

    # Precision
    fig, ax = plt.subplots(figsize=(8, 8))
    sns.lineplot(x='Epoch_no', y='Train_Precision', data=Results_dataframe, color='red')
    sns.lineplot(x='Epoch_no', y='Val_Precision', data=Results_dataframe, color='blue')
    plt.xlabel("Epoch")
    plt.ylabel("Precision")
    plt.title("Precision")
    plt.legend(["Train", "Val"], loc="upper right")

    plt.savefig('./metrics_plots/precision.png', dpi=300, bbox_inches='tight')
    plt.close()
    plt.legend(["Train", "Val"], loc="upper right")

    # Recall
    fig, ax = plt.subplots(figsize=(8, 8))
    sns.lineplot(x='Epoch_no', y='Train_Recall', data=Results_dataframe, color='red')
    sns.lineplot(x='Epoch_no', y='Val_Recall', data=Results_dataframe, color='blue')
    plt.xlabel("Epoch")
    plt.ylabel("Recall")
    plt.title("Recall")
    plt.legend(["Train", "Val"], loc="upper right")

    plt.savefig('./metrics_plots/recall.png', dpi=300, bbox_inches='tight')
    plt.close()

    # Specificity
    fig, ax = plt.subplots(figsize=(8, 8))
    sns.lineplot(x='Epoch_no', y='Train_Specificity', data=Results_dataframe, color='red')
    sns.lineplot(x='Epoch_no', y='Val_Specificity', data=Results_dataframe, color='blue')
    plt.xlabel("Epoch")
    plt.ylabel("Specificity")
    plt.title("Specificity")
    plt.legend(["Train", "Val"], loc="upper right")

    plt.savefig('./metrics_plots/specificity.png', dpi=300, bbox_inches='tight')
    plt.close()

    # MCC
    fig, ax = plt.subplots(figsize=(8, 8))
    sns.lineplot(x='Epoch_no', y='Train_MCC', data=Results_dataframe, color='red')
    sns.lineplot(x='Epoch_no', y='Val_MCC', data=Results_dataframe, color='blue')
    plt.xlabel("Epoch")
    plt.ylabel("MCC")
    plt.title("MCC Score")
    plt.legend(["Train", "Val"], loc="upper right")

    plt.savefig('./metrics_plots/MCC.png', dpi=300, bbox_inches='tight')
    plt.close()


In [None]:
main()

In [24]:
# Plot Training and Validation loss vs EPOCHS
def plot_loss(dataframe):
    # IOU
    fig, ax = plt.subplots(figsize=(8, 8))
    sns.lineplot(x='Epoch_no', y='Train_loss', data=dataframe, color='red')
    sns.lineplot(x='Epoch_no', y='Val_loss', data=dataframe, color='blue')
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("LOSS vs EPOCH")
    plt.legend(["Train", "Val"], loc="upper right")
    plt.savefig('./metrics_plots/loss.png', dpi=300, bbox_inches='tight')
    plt.close()


# function to convert it to csv file
def convert_to_csv(prediction, metrics_dir):
    prediction.to_csv(metrics_dir, header=True, index=False)


if __name__ == "__main__":
    # Test Metric Plotting
    metrics_dir_path = 'metrics_plots'
    Results_dataframe = pd.read_csv(metrics_dir_path, index_col=False)
    # plotting_metrics(Results_dataframe)
    plot_loss(Results_dataframe)

In [None]:
Results_dataframe = pd.read_csv(metrics_dir, index_col=False)
plotting_metrics(Results_dataframe)
plot_loss(Results_dataframe)