# Cross Validation using 5 folds and run the Training and Validation loops

In [None]:
import dataloader
from UModel import UNet
import Config
from torch.nn import BCEWithLogitsLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from torchvision import transforms
from imutils import paths
from tqdm import tqdm
import torch
import time
import os
from torchmetrics.functional import dice
import numpy as np
import albumentations as A
from sklearn.model_selection import KFold
import torch.optim.lr_scheduler as lr_scheduler
import Early_stopping as E
from albumentations.pytorch.transforms import ToTensorV2
import matplotlib.pyplot as plt

# load the image and mask filepaths in a sorted manner
imagePaths = np.array(sorted(list(paths.list_images(Config.Image_dataset_dir))))
maskPaths = np.array(sorted(list(paths.list_images(Config.Mask_dataset_dir))))

# Taking out the test set and leaving the rest of the data for cross validation
X_rest, X_test, y_rest, y_test = train_test_split(imagePaths, maskPaths, train_size=0.9, random_state=42, shuffle = True)

print("Saving testing image paths...")
f = open(Config.TEST_PATHS, "w")
f.write("\n".join(X_test))
f.close()

# Setting up the Kfold cross validation with 5 folds
kf = KFold(n_splits=5, shuffle=True)
index = 0
for train_index, val_index in kf.split(X_rest, y_rest):
    # Splitting the data into training and validation sets
    X_train, X_val = X_rest[train_index], X_rest[val_index]
    y_train, y_val = y_rest[train_index], y_rest[val_index]
    
    f = open(Config.Base_Out+'\\train_paths'+str(index)+'.txt', "w")
    f.write("\n".join(X_train))
    f.close()
    
    f = open(Config.Base_Out+'\\validation_paths'+str(index)+'.txt', "w")
    f.write("\n".join(X_val))
    f.close()
    
    # Define transformations
    train_transform = A.Compose([A.Resize(Config.Input_Height, Config.Input_Width),
                                 A.Normalize(mean=(0.0), std=(1.0)),
                                 A.HorizontalFlip(p=0.5), 
                                 A.RandomRotate90(p=0.5),
                                 ToTensorV2()])
    val_transform = A.Compose([A.Resize(Config.Input_Height, Config.Input_Width),
                               A.Normalize(mean=(0.0), std=(1.0)),
                               A.VerticalFlip(p=0.5), 
                               A.RandomRotate90(p=0.5),
                               ToTensorV2()])
    
    # Create the train and validation datasets
    trainDS = dataloader.dataset(imagePaths=X_train, maskPaths=y_train, transform=train_transform)
    valDS = dataloader.dataset(imagePaths=X_val, maskPaths=y_val, transform=val_transform)
    print(f"There are {len(trainDS)} samples in the training set")
    print(f"There are {len(valDS)} samples in the validation set")
    print('************************************************')

    # Create the train and validation dataloaders
    trainLoader = DataLoader(trainDS, shuffle=True, batch_size=Config.Batch_size,
                             pin_memory=Config.PIN_MEMORY, num_workers=os.cpu_count())

    valLoader = DataLoader(valDS, shuffle=True, batch_size=Config.Batch_size,
                           pin_memory=Config.PIN_MEMORY, num_workers=os.cpu_count())

    # initialize our UNet model
    unet = UNet(n_channels=Config.No_channels, n_classes=Config.No_classes, bilinear=False).to(Config.DEVICE)

    # initialize Binary cross entropy with logit loss function and Adam optimizer
    lossFunc = BCEWithLogitsLoss()
    opt = Adam(unet.parameters(), lr=Config.Init_LR)
    scheduler = lr_scheduler.ReduceLROnPlateau(opt, 'max', patience=10)
    early_stopper = E.EarlyStopper(patience=10, min_delta=0.01)

    # calculate steps per epoch for training, validation and test set
    trainSteps = len(trainDS) // Config.Batch_size
    valSteps = len(valDS) // Config.Batch_size

    # initialize a dictionary to store training history
    CE_loss = {"train_loss": [], "val_loss": []}
    dsc_loss = {"Dice_train": [], "Dice_val": []}

    # loop over epochs
    print("Training the network...")
    startTime = time.time()

    for e in tqdm(range(Config.Num_epochs)):
        # set the model in training mode
        unet.train()

        # initialize the total training and validation loss0
        totalTrainLoss, totalValLoss, dice_score_train, dice_score_val = 0, 0, 0, 0

        # loop over the training set
        for (i, (x, y)) in enumerate(trainLoader):
            # send the input to the device
            (x, y) = (x.to(Config.DEVICE), y.to(Config.DEVICE))

            # perform a forward pss and calculate the training loss
            pred = unet(x)
            CEloss = lossFunc(pred, y)
            dice_loss = dice(torch.sigmoid(pred), y.type(torch.int32))
            total_loss = (1 - dice_loss) + CEloss

            # first, zero out any previously accumulated gradients, then
            # perform backpropagation, and then update model parameters
            opt.zero_grad()
            total_loss.backward()
            opt.step()

            # add the loss to the total training loss so far
            totalTrainLoss += CEloss
            dice_score_train += dice_loss

        # switch off autograd
        with torch.no_grad():
            # set the model in evaluation mode
            unet.eval()

            # loop over the validation set
            for (x, y) in valLoader:
                # send the input to the device
                (x, y) = (x.to(Config.DEVICE), y.to(Config.DEVICE))

                # make the predictions and calculate the validation loss
                pred = unet(x)
                totalValLoss += lossFunc(pred, y)
                dice_score_val += dice(torch.sigmoid(pred), y.type(torch.int32))

        # calculate the average training and validation loss
        avgTrainLoss = totalTrainLoss / trainSteps
        avgValLoss = totalValLoss / valSteps
        avgDiceTrain = dice_score_train / trainSteps
        avgDiceVal = dice_score_val / valSteps
        
        # Checking the validation loss if not changing for early stopping
        if early_stopper.early_stop(avgValLoss):
            break
        
        # Adapting the learining rate based on maximized Dice Score
        scheduler.step(avgDiceVal)
        print(opt.state_dict()['param_groups'][0]['lr'])

        # update the training history
        CE_loss["train_loss"].append(avgTrainLoss.cpu().detach().numpy())
        CE_loss["val_loss"].append(avgValLoss.cpu().detach().numpy())
        dsc_loss["Dice_train"].append(avgDiceTrain.cpu().detach().numpy())
        dsc_loss["Dice_val"].append(avgDiceVal.cpu().detach().numpy())

        # print the model training and validation information
        print("EPOCH: {}/{}".format(e + 1, Config.Num_epochs))
        print("Train loss: {:.4f}, Validation loss: {:.4f}".format(avgTrainLoss, avgValLoss))
        print("Training Dice Score: {:.4f}% , Validation Dice Score: {:.4f}%".format(avgDiceTrain * 100, avgDiceVal * 100))
        del dice_score_train, dice_score_val, avgDiceTrain, avgDiceVal, totalTrainLoss, totalValLoss, avgTrainLoss, avgValLoss

    # display the total time needed to perform the training
    endTime = time.time()
    print("Total time taken to train the model: {:.2f}s".format(endTime - startTime))
    
    # plot the training loss
    plt.style.use("ggplot")
    fig1 = plt.figure(figsize=[15.0,15.0])
    ax1 = fig1.add_subplot(2, 1, 1)
    plt.plot(CE_loss["train_loss"], label="train_loss")
    plt.plot(CE_loss["val_loss"], label="validation_loss")
    plt.title("Training Loss vs. Validation Loss")
    plt.xlabel("Epoch #")
    plt.ylabel("Loss")
    plt.legend(loc="upper right")
    plt.show()
    plt.savefig(os.path.sep.join([Config.Base_Out+'\\plots', "CELoss"+str(index)+".png"]))

    plt.style.use("ggplot")
    fig1 = plt.figure(figsize=[15.0,15.0])
    ax1 = fig1.add_subplot(2, 1, 2)
    plt.plot(dsc_loss["Dice_train"], label="train_dice_score")
    plt.plot(dsc_loss["Dice_val"], label="validation_dice_score")
    plt.title("Training Dice Score vs. Validation Dice Score")
    plt.xlabel("Epoch #")
    plt.ylabel("Dice Score")
    plt.legend(loc="upper left")
    plt.show()
    plt.savefig(os.path.sep.join([Config.Base_Out+'\\plots', "DiceLoss"+str(index)+".png"]))

    index += 1
    # serialize the model to disk
    torch.save(unet, Config.MODEL_PATH)

# Plotting the CE loss and dice scores for training and validation sets

In [None]:
import matplotlib.pyplot as plt

# plot the training loss
plt.style.use("ggplot")
fig1 = plt.figure()
ax1 = fig1.add_subplot(2, 1, 1)
plt.plot(CE_loss["train_loss"], label="train_loss")
plt.plot(CE_loss["val_loss"], label="validation_loss")
plt.title("Training Loss vs. Validation Loss")
plt.xlabel("Epoch #")
plt.ylabel("Loss")
plt.legend(loc="upper right")
plt.savefig(Config.LOSS_PATH)

plt.style.use("ggplot")
fig1 = plt.figure()
ax1 = fig1.add_subplot(2, 1, 2)
plt.plot(dsc_loss["Dice_train"], label="train_dice_loss")
plt.plot(dsc_loss["Dice_val"], label="validation_dice_loss")
plt.title("Training Dice Loss vs. Validation Dice Loss")
plt.xlabel("Epoch #")
plt.ylabel("Dice Loss")
plt.legend(loc="upper right")
plt.savefig(Config.DICE_PATH)

# Make predictions and save them to disc

In [None]:
# import the necessary packages
from UModel import UNet
import Config
import matplotlib.pyplot as plt
import numpy as np
import torch
import torchvision
from torchvision.utils import save_image
from torchvision.io import read_image
import os
import Test
import dataloader
from torch.utils.data import DataLoader
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from imutils import paths

X_test = []
y_test = []

with open("C:\\Users\\Jessica NT MCA\\Desktop\\MA_Abdelrahman\\Master Thesis Project\\5 layer Binary UNET\\Dataset\\output\\test_paths.txt", "r") as f:
    for path in f:
        filename = os.path.basename(path)
        X_test.append(path.rstrip('\n'))
        y = 'C:\\Users\\Jessica NT MCA\\Desktop\\MA_Abdelrahman\\Master Thesis Project\\5 layer Binary UNET\\Dataset\\Masks\\'+filename
        y_test.append(y.rstrip('\n'))
f.close()

X_test = np.array(sorted((X_test)))
y_test = np.array(sorted((y_test)))

test_transform = A.Compose([A.Normalize(mean=(0.0), std=(1.0)),
                            ToTensorV2()])

testDS = dataloader.dataset(imagePaths=X_test, maskPaths=y_test, transform=test_transform)
testLoader = DataLoader(testDS, shuffle=False, batch_size=1,
                             pin_memory=Config.PIN_MEMORY, num_workers=os.cpu_count())

model = torch.load(Config.MODEL_PATH)
Test.save_predictions_as_imgs(imagesPath = X_test, loader = testLoader, model = model, folder = Config.Base_Out+"//predictions", device = Config.DEVICE)

# Plot Predictions vs. Original image and GT mask

In [None]:
import cv2
import numpy as np
import Config
import torch
import matplotlib.pyplot as plt
import Test
import os

imagePaths = open(Config.TEST_PATHS).read().strip().split("\n")
imagePaths = np.random.choice(imagePaths, size=20)

# load our model from disk and flash it to the current device
print("[INFO] load up model...")
unet = torch.load(Config.MODEL_PATH).to(Config.DEVICE)

# iterate over the randomly selected test image paths
for path in imagePaths:
    #make predictions and visualize the results
    ground, pred = Test.make_predictions(unet, path)

# Skeletenation to detect the tips

In [None]:
import cv2
import numpy as np
from PIL import Image
from scipy.ndimage import generic_filter
from skimage.morphology import medial_axis
import matplotlib.pyplot as plt
from torchvision import transforms

# Line ends filter
def lineEnds(P):
    """Central pixel and just one other must be set to be a line end"""
    return 255 * ((P[4]==255) and np.sum(P)==510)

mask_dir = 'C:\\Users\\z004b1tz\\Desktop\\Master Thesis Project\\Dataset\\Masks\\01827_0006_976x976_16bpp.PNG'
img = Image.open(mask_dir)

# define custom transform function
transform = transforms.Compose([
    transforms.ToTensor()
])
 
# transform the pIL image to tensor
# image
img_tr = transform(img)
 
# calculate mean and std
mean, std = img_tr.mean([1,2]), img_tr.std([1,2])
 
# print mean and std
print("mean and std before normalize:")
print("Mean of the image:", mean)
print("Std of the image:", std)

transform_norm = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

# get normalized image
img_normalized = transform_norm(img)

# calculate mean and std
mean, std = img_normalized.mean([1,2]), img_normalized.std([1,2])
 
# print mean and std
print('****************************************')
print("mean and std after normalize:")
print("Mean of the image:", mean)
print("Std of the image:", std)
 
# plot the pixel values
plt.hist(img_normalized.ravel(), bins=50, density=True)
plt.xlabel("pixel values")
plt.ylabel("relative frequency")
plt.title("distribution of pixels")
# # Skeletonize
# skel = (medial_axis(img)*255).astype('uint8')
# skel
# # Find line ends
# result = generic_filter(skel, lineEnds, (3, 3))
# result = Image.fromarray(result)

In [None]:
image = pd.DataFrame(tip, columns = ['path'])
mask = pd.DataFrame(mask, columns = ['path'])
path = pd.concat([mask, image]).drop_duplicates(keep=False)
path

In [None]:
import dataloader
from UModel import UNet
import Config
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
import torchvision.transforms as T
from imutils import paths
from tqdm import tqdm
import torch
import time
import os
from torchmetrics.functional import dice
import numpy as np
import albumentations as A
from sklearn.model_selection import KFold
import torch.optim.lr_scheduler as lr_scheduler
import Early_stopping as E


imagePaths = ['C:\\Users\\lenovo\\Desktop\\Master Thesis Project\\Dataset\\Images\\01824_0000_976x976_16bpp.PNG',
             'C:\\Users\\lenovo\\Desktop\\Master Thesis Project\\Dataset\\Images\\01824_0001_976x976_16bpp.PNG',
             'C:\\Users\\lenovo\\Desktop\\Master Thesis Project\\Dataset\\Images\\01828_0000_1952x1952_16bpp.PNG']

maskPaths = ['C:\\Users\\lenovo\\Desktop\\Master Thesis Project\\Dataset\\Mask\\01824_0000_976x976_16bpp.PNG',
            'C:\\Users\\lenovo\\Desktop\\Master Thesis Project\\Dataset\\Mask\\01824_0001_976x976_16bpp.PNG',
            'C:\\Users\\lenovo\\Desktop\\Master Thesis Project\\Dataset\\Mask\\01828_0000_1952x1952_16bpp.PNG']

# img_trans = T.Compose([T.Resize(976),T.ToTensor()])
# mask_trans = T.Compose([T.Resize(976)])

trainDS = dataloader.dataset(imagePaths=imagePaths, maskPaths=maskPaths)
trainLoader = DataLoader(trainDS, shuffle=True, batch_size=1,
                        pin_memory=Config.PIN_MEMORY, num_workers=os.cpu_count())

unet = UNet(n_channels=3, n_classes=3, bilinear=False).to(Config.DEVICE)

for e in tqdm(range(5)):
    unet.train()

        
    for (i, (x, y)) in enumerate(trainLoader):
            # send the input to the device
        (x, y) = (x.to(Config.DEVICE), y.to(Config.DEVICE))

            # perform a forward pss and calculate the training loss
        pred = unet(x)

        loss = CrossEntropyLoss()

        CE_loss = loss(pred, y)
            
        print('The Shape of the network input is: {}'.format(x.shape))
        print('The Shape of the target is: {}'.format(y.shape))
        print('The Shape of the network output is: {}'.format(pred.shape))
        print(CE_loss)
        print('**********************************************************')
torch.save(unet.state_dict(), 'C:\\Users\\lenovo\\Desktop\\Master Thesis Project\\unet.pth')       

In [None]:
import matplotlib.pyplot as plt
import Test
import cv2
import numpy as np

imagePaths = ['C:\\Users\\lenovo\\Desktop\\Master Thesis Project\\Dataset\\Images\\01825_0000_976x976_16bpp.PNG']

img = cv2.imread(imagePaths[0])
img = np.transpose(img, (2,0,1))

image = np.expand_dims(img, 0)

unet = torch.load_state_dict('C:\\Users\\lenovo\\Desktop\\Master Thesis Project\\unet.pth')

unet.eval()


In [None]:
import Config
import os

for index in range(5):
    path = os.path.sep.join([Config.Base_Out+'\\plots', "DiceLoss"+str(index)+".png"])
    print(path)
