In [None]:
import os
import random
from tqdm import tqdm
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

import cv2 as cv
import numpy as np

%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.image as mpimg 
from matplotlib.pyplot import imshow

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from torchsummary import summary
import segmentation_models_pytorch as smp

In [None]:
if torch.cuda.is_available():
    dev = 'cuda:0'
else:
    dev = 'cpu'
device = torch.device(dev)

In [None]:
model = smp.FPN(in_channels=1)
model.load_state_dict(torch.load('''Enter Model Path Here'''))
model.to(device)

In [None]:
class dataUpload:
    
    def __init__(self, images_path, masks_path):
        self.image_path = images_path
        self.mask_path = masks_path
        self.images_array, self.masks_array = [], []
    
    def loadData(self):
        for i in tqdm(os.listdir(self.image_path)):
            image = os.path.join(self.image_path, i)
            image = cv.imread(image)
            
            self.images_array.append(image)
        
        for m in tqdm(os.listdir(self.mask_path)):
            mask = os.path.join(self.mask_path, m)
            mask = cv.imread(mask)
            
            self.masks_array.append(mask)
        
        return self.images_array, self.masks_array

In [None]:
def resizeImages(images, masks, height, width):
    
    for i in range(len(images)):
        images[i] = cv.resize(images[i], (width, height))
        masks[i] = cv.resize(masks[i], (width, height))
    
    return images, masks

def toGrayScale(images, masks):
    gray_images, gray_masks = [], []
    for image, mask in zip(images, masks):
        gray_images.append(cv.cvtColor(image, cv.COLOR_BGR2GRAY))
        gray_masks.append(cv.cvtColor(mask, cv.COLOR_BGR2GRAY))
    
    return gray_images, gray_masks

In [None]:
def preProcess(images, masks):
    images, masks = resizeImages(images, masks, 64, 64)
    images, masks = toGrayScale(images, masks)
    images, masks = np.array(images) / 255.0, np.array(masks) / 255.0
    return images, masks

In [None]:
def dataGenerator(images, masks, batch_size, shuffle=True, train=True):
    X, Y = images, masks
    
    index = 0
    num_images = len(images)
    indices = [*range(num_images)]
    if shuffle:
        random.shuffle(indices)
    
    while True:
        if index >= num_images:
            index = 0
            if shuffle:
                random.shuffle(indices)
        index += 1
        
        for i in range(0, num_images, batch_size):
            try:
                x = X[indices[i:i+batch_size]]
                y = Y[indices[i:i+batch_size]]
            except:
                x = X[indices[i:]]
                y = Y[indices[i:]]
            
            if len(x) != batch_size or i == num_images - batch_size:
                yield x, y, 1
            else:
                yield x, y, 0

In [None]:
def diceScore(Y_output, Y):
    gamma = 1
    batch_size = Y_output.size(dim=0)
    
    model_outputs = Y_output.reshape(batch_size, -1).float()
    original_outputs = Y.reshape(batch_size, -1).float()
    intersection = (model_outputs * original_outputs).sum().float()

    return batch_size * (2 * intersection + gamma) / (model_outputs.sum() + original_outputs.sum() + gamma)

In [None]:
def valLoop(val_generator, height, width):
    Y_pred = []
    num_images, dice_score = 0, 0
    
    with torch.no_grad():
        while True:
            X, Y, count = next(val_generator)
            
            X = torch.tensor(X)
            X = torch.reshape(X, (-1, 1, height, width)).float().to(device)
            Y = torch.tensor(Y)
            Y = torch.reshape(Y, (-1, 1, height, width)).to(device)
            
            Y_output = model(X)
            num_images += X.size(dim=0)
            
            Y_output = torch.reshape(Y_output, (-1, height, width, 1))
            sigmoid = nn.Sigmoid()
            Y_output = sigmoid(Y_output)
            
            dice_score += diceScore(Y_output, Y)
            
            Y_pred += list(np.reshape(Y_output.detach().cpu().numpy(), (-1, height, width)))
            
            if count == 1:
                break
    
    return dice_score / num_images, Y_pred

In [None]:
def displayOutputs(val_images, val_masks, Y_pred, number, time):
    num_images = len(val_images)
    
    for index in range(number):
        cv.imshow('Original Grayscale Image', cv.resize(val_images[index], (256, 256)))
        
        cv.imshow('Original Mask', cv.resize(val_masks[index], (256, 256)))
        cv.moveWindow('Original Mask', 300, 0)
        
        cv.imshow('Predicted Mask', cv.resize((Y_pred[index] * 255).astype(np.uint8), (256, 256)))
        cv.moveWindow('Predicted Mask', 300, 300)
        cv.waitKey(int(time * 1000))
    
    cv.destroyAllWindows()

In [None]:
def takeInputs(images_path, masks_path, view=False, number=10, time=2):
    upload = dataUpload(images_path, masks_path)
    images, masks = upload.loadData()
    images, masks = preProcess(images, masks)
    
    val_generator = dataGenerator(images, masks, batch_size=8, shuffle=False, train=False)
    average_dice_score, Y_pred = valLoop(val_generator, height=64, width=64)
    
    print(f'Average Dice Score over Test Set: {average_dice_score: .6f}')
    
    if view:
        displayOutputs(images, masks, Y_pred, number, time)

In [None]:
images_path = input('Path to the Folder Containing Test Images: ')
masks_path = input('Path to the Folder Containing Testing Ground Truth Masks: ')
view = input('Would you like to view the outputs? [y/n]: ')

if view == 'y':
    number = int(input('How many images would you like to view? '))
    time = float(input('How long would you like each image to be displayed (in seconds)? '))

In [None]:
takeInputs(images_path, masks_path, view, number, time)