In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
import numpy as np
import random
from PIL import Image
import tqdm

import torch
from torch.utils.data import DataLoader, Dataset, random_split
from torch.autograd import Variable
from torch import nn
from torch.nn import functional as F

import torchvision
import torchvision.transforms as transforms
from torchvision import models
import torchvision.transforms.functional as TF

#### DataLoader Requirements

In [None]:
channel_means = [0.485, 0.456, 0.406]
channel_stds  = [0.229, 0.224, 0.225]
#################################################################################################

class MyDataset(Dataset):
    def __init__(self, image_paths, target_paths, train=True):
        self.image_paths = image_paths
        self.target_paths = target_paths
        self.train = train
        self.files = os.listdir(self.image_paths)
        self.labels = os.listdir(self.target_paths)
        
        self.color_jitter = transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.1, hue=0.1)

    def transform(self, image, mask):
        # Resize
        resize = transforms.Resize(size=(256, 256))
        image = resize(image)
        mask = resize(mask)
        
        # Random transformations only for training
        if self.train:
            # Random horizontal flipping
            if random.random() > 0.5:
                image = TF.hflip(image)
                mask = TF.hflip(mask)

            # Random vertical flipping
            if random.random() > 0.5:
                image = TF.vflip(image)
                mask = TF.vflip(mask)

        # Transform to tensor
        image = TF.to_tensor(image)
        mask = TF.to_tensor(mask)
        
        # Normalize
        image = TF.normalize(image, channel_means, channel_stds)
        
        return image, mask
    
    def __len__(self):
        return len(self.files)
    
    def __getitem__(self, idx):
        img_name = self.files[idx]
        label_name = self.labels[idx]
        image = Image.open(os.path.join(self.image_paths, img_name))

        # Color jitter only for training
        if self.train:
            image = self.color_jitter(image)
        
        mask = Image.open(os.path.join(self.target_paths, label_name)).convert("L")
        x, y = self.transform(image, mask)
        return x, y, img_name, label_name

In [None]:
DIR_IMG_tr  = os.path.join("./Data_FR/" # this path here changes based on the step at stage 3
                        , 'images')
DIR_MASK_tr = os.path.join("./Data_FR/" # this path here changes based on the step at stage 3
                        , 'masks')
DIR_IMG_v  = os.path.join("./validation/"
                        , 'images')
DIR_MASK_v = os.path.join("./validation/"
                        , 'masks')
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
dataset_T = MyDataset(DIR_IMG_tr, DIR_MASK_tr)
dataset_V = MyDataset(DIR_IMG_v, DIR_MASK_v, train=False)
#Batch Size and Loaders
batch_size = 4
train_loader = DataLoader(dataset_T, batch_size, shuffle=True, 
                          pin_memory=torch.cuda.is_available())
valid_loader = DataLoader(dataset_V, batch_size, shuffle=False, 
                          pin_memory= torch.cuda.is_available())

In [None]:
for i, (input, target, img_name, label_name) in enumerate(train_loader):

    #To Device
    input_var  = input.cuda()
    target_var = torch.round(target).cuda()
    print(input_var.shape, target_var.shape)
    plt.imshow(input_var[1].cpu().permute(1, 2, 0))
    plt.show()
    plt.imshow(target_var[1].cpu().permute(1, 2, 0))
    plt.show()
    break

#### Training Procedure

In [None]:
import copy
import scipy.ndimage as ndimage
from torch.cuda.amp import GradScaler, autocast

def train(train_loader, model, optimizer, validation, scheduler=None, G_A = 2):
    #best_model_wts = model.state_dict()

    #List to Append all losses of Train and Val Epochs
    Validation_losses = []
    Train_losses = []
    
    #Initial min_val_loss
    min_val_los = 10
    
    #Gradient ACcumaltion Size
    gradient_accumulations = G_A
    
    #torch amp scaler
    scaler = GradScaler()
    
    model.zero_grad()
    for epoch in range(start_epoch, end_epoch):
        
        if scheduler is not None:
            scheduler.step()
            
        trn_loss = []
        valid_losses = []
        losses = AverageMeter()

        model.train()
        for i, (input, target, fname, mname) in enumerate(train_loader):
            
            #To Device
            input_var  = input.cuda()
            target_var = torch.round(target).cuda()
            
            #Torch Autocast
            with autocast():
                
                #Model the input
                masks_pred = model(input_var)
                masks_pred = torch.sigmoid(masks_pred)
                
                #Calcualte the Loss
                loss = seg_loss(masks_pred, target_var.squeeze(1).long())
            
            #Step loss appending
            trn_loss.append(loss)
            losses.update(loss)

            # compute gradient and do SGD step            
            scaler.scale(loss / gradient_accumulations).backward()
                
            #updating parameters of the model
            if (i + 1) % gradient_accumulations == 0:
                scaler.step(optimizer)
                scaler.update()
                model.zero_grad()
                
        #Validation for one epoch
        valid_metrics = validation(model, valid_loader)
        valid_loss = valid_metrics['valid_loss']
        
        #Saving Epochs Loss for Train and Validation
        Train_losses.append(np.mean(torch.stack(trn_loss).detach().cpu().numpy()))
        Validation_losses.append(valid_loss)
        
        ##PRITNIGN ONE EPOCH OUTCOME
        print("Epoch:", epoch)
        print(f'\tTrain_loss = {np.mean(torch.stack(trn_loss).detach().cpu().numpy()):.5f}')
        print(f'\tValid_loss = {valid_loss:.5f}')
        print("LR at the end of epoch=", get_lr(optimizer))
        print()

        #Save the model of the current epoch
        if valid_loss < min_val_los:
            print("valid_loss < min_val_los")
            min_val_los = valid_loss
            best_model_wts = copy.deepcopy(model.state_dict())

    model.load_state_dict(best_model_wts)
    return model, Train_losses, Validation_losses, best_model_wts

def validate(model, val_loader):
    losses = AverageMeter()
    model.train(False)
    with torch.no_grad():
        for i, (input, target, fname, mname) in enumerate(val_loader):
            input_var  = input.cuda()
            target_var = torch.round(target).cuda()
            masks_pred = model(input_var)
            masks_pred = torch.sigmoid(masks_pred)
            loss = seg_loss(masks_pred, target_var.squeeze(1).long())
            losses.update(loss.item(), input_var.size(0))
    return {'valid_loss': losses.avg}

In [None]:
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()
    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0
    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count   
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']
    
def save_check_point(state, is_best, file_name = 'checkpoint.pth.tar'):
    torch.save(state, file_name)
    if is_best:
        shutil.copy(file_name, 'model_best.pth.tar')

def calc_crack_pixel_weight(mask_dir):
    avg_w = 0.0
    n_files = 0
    for path in Path(mask_dir).glob('*.*'):
        n_files += 1
        m = ndimage.imread(path)
        ncrack = np.sum((m > 0)[:])
        w = float(ncrack)/(m.shape[0]*m.shape[1])
        avg_w = avg_w + (1-w)
    avg_w /= float(n_files)
    return avg_w / (1.0 - avg_w)

### Model Creation

In [None]:
import segmentation_models_pytorch as smp

model = smp.Unet(
    encoder_name="resnet18",        # choose encoder, e.g. mobilenet_v2 or efficientnet-b7
    encoder_weights="imagenet",     # use `imagenet` pre-trained weights for encoder initialization
    in_channels=3,                  # model input channels (1 for gray-scale images, 3 for RGB, etc.)
    classes=1,                      # model output channels (number of classes in your dataset)
    activation = None
)

In [None]:
x  = torch.rand(1,3,256,256)
model(x).shape

## Loss

In [None]:
from Loss import IoULoss
seg_loss = IoULoss()

### Settings

In [None]:
##Epochs
start_epoch = 0 #start from 0
end_epoch = 150 # ends in 9

##Directory of the Model
model_dir = os.getcwd()

In [None]:
optimizer = torch.optim.SGD(model.parameters(), 
                            lr = 0.01,
                            momentum=0.9,
                            weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.9)

### Load Model

In [None]:
#model.load_state_dict(torch.load(".PTH"))
model = model.cuda()

### Training

In [None]:
model, Train_losses, Validation_losses, best_model_wts = train(train_loader, model, optimizer, validate, scheduler, G_A = 2)

In [None]:
torch.save(model.state_dict(), "Sup_St3-Stn.pth")

In [None]:
validate(model, valid_loader)

### Validate Model

###### Dataloader

In [None]:
class MyDataset_A(Dataset):
    def __init__(self, image_paths, target_paths):
        self.image_paths = image_paths
        self.target_paths = target_paths
        self.files = os.listdir(self.image_paths)
        self.lables = os.listdir(self.target_paths)
        
    def transform(self, image, mask):
        # Resize
        resize = transforms.Resize(size=(256, 256))
        image = resize(image)
        mask = resize(mask)

        # Transform to tensor
        image = TF.to_tensor(image)
        mask = TF.to_tensor(mask)
        
        #Normalise
        image = TF.normalize(image, channel_means, channel_stds)
        
        return image, mask
    
    def __len__(self):
        return len(self.files)
    
    def __getitem__(self,idx):
        img_name = self.files[idx]
        label_name = self.lables[idx]
        image = Image.open(os.path.join(self.image_paths,img_name))
        mask = Image.open(os.path.join(self.target_paths,label_name)).convert("L")
        x, y = self.transform(image, mask)
        return x, y, img_name, label_name

In [None]:
import cv2
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt


def dice(y_true, y_pred):
    return (2 * (y_true * y_pred).sum() + 1e-15) / (y_true.sum() + y_pred.sum() + 1e-15)

def jaccard(y_true, y_pred):
    intersection = (y_true * y_pred).sum()
    union = y_true.sum() + y_pred.sum() - intersection
    return (intersection + 1e-15) / (union + 1e-15)

In [None]:
# Test data path
DIR_IMG_test  = os.path.join("./test/", 
                             'images')
DIR_MASK_test = os.path.join("./test/", 
                             'masks')
dataset_test = MyDataset_A(DIR_IMG_test, DIR_MASK_test)

#Batch Size and Loader
batch_size = 1
test_loader_A = DataLoader(dataset_test, batch_size, shuffle=False, 
                          pin_memory=torch.cuda.is_available())
print(len(test_loader_A.dataset))

DICE = []
MIOU = []
k = 0


# running the test loop   
with torch.no_grad():

    model.eval()
    for i, (input, target, fname, MNAME) in enumerate(test_loader_A):

        #To Device
        input_var  = input.cuda()
        target_var = torch.round(target).cuda()
        k = k + 1
        #Torch Autocast
        with autocast():

            #Model the input
            masks_pred = model(input_var)
            ## Probabiliteis
            masks_pred = F.sigmoid(masks_pred)
            #round the probabiliteis to zeros and ones
            masks_pred = torch.round(masks_pred)

            #Making sure they are both tensors
            target_var = torch.tensor(target_var)
            masks_pred = torch.tensor(masks_pred)

            #calcualting Dice and IoU
            dice_score = dice(masks_pred, target_var[0])
            jaccard_score = jaccard(masks_pred, target_var[0])

            #Appending the calculated metrics
            DICE.append(dice_score.item())
            MIOU.append(jaccard_score.item())


print("Dice", np.mean(np.array(DICE)), np.std(np.array(DICE)))

print("IoU", np.mean(np.array(MIOU)), np.std(np.array(MIOU)))

In [None]:
#test one image 
plt.imshow(torch.tensor(masks_pred)[0][0].cpu())