In [None]:
import torch
import os
import numpy as np
import pandas as pd
import skimage as ski
import torchvision.transforms as transforms

from sklearn.preprocessing import OneHotEncoder
from torch.utils.data import Dataset, DataLoader

In [59]:
# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using mps device


In [60]:
# Load the Dataset
path = ""
file_list = np.load(path + "data/mos_dataset_processed.npy")
label_list = np.load(path + "data/mos_dataset_labels.npy")
fold_list = np.load(path + "data/mos_dataset_folds.npy")
path_list = np.load(path +"data/mos_dataset_paths.npy")

In [5]:
# Load the Models
model_dict = {}
for file in os.listdir("models/final_models"):
    if file.endswith(".pt"):
        FOLD = file.split("_")[1]
        model_dict[FOLD] = torch.load("models/final_models/" + file, map_location=torch.device('cpu'))

  model_dict[FOLD] = torch.load("models/final_models/" + file, map_location=torch.device('cpu'))


In [None]:
# Copy the DataLoader from the trainer
class CustomDataset(Dataset):
    def __init__(self, file_list, label_list,
                input_transforms,
                color_transforms=None,
                geo_transforms=None):

        # Initialize the list of files and labels
        self.file_list = file_list
        self.label_list = label_list
        self.input_transforms = input_transforms
        self.color_transforms = color_transforms
        self.geo_transforms = geo_transforms


    def __len__(self):
        return len(self.file_list)

    def CLAHE_transform(self, image):
            # redice dimension
            image = torch.mean(image, dim=0).numpy()
            # apply CLAHE
            equalized_img = ski.exposure.equalize_adapthist(image, clip_limit=.5, nbins=32) # prevous was clip=.6, nbins=48
            # Use mediean filter to reduce noise
            equalized_img = ski.filters.median(equalized_img, ski.morphology.disk(2))

            return torch.tensor(equalized_img, dtype=torch.float32)

    def __getitem__(self, idx):
        # Get image and mask
        file = self.file_list[idx]
        input = file[:,:,:3] / 255
        mask = file[:,:,3] > 0

        # Get label
        output = self.label_list[idx]

        # Apply Albumentations color transforms
        if self.color_transforms is not None:
            input = self.color_transforms(image=input.astype('float32'))["image"]

        # Apply transforms
        input = self.input_transforms(input)
        mask = self.input_transforms(mask)

        # Apply CLAHE equalization
        input = self.CLAHE_transform(input)

        # set all values outside the mask to 0
        input[~mask.squeeze(0)] = 0
        input = input.unsqueeze(0)

        # Apply geometric transforms
        if self.geo_transforms is not None:
            input = self.geo_transforms(input)

        return (input, output)

In [None]:
def get_dataset_split(FOLD):
    oh_encoder = OneHotEncoder()
    oh_label_list = oh_encoder.fit_transform(label_list.reshape(-1,1)).toarray().astype(np.uint8)

    # Split the dataset into train and test based on fold
    test_file_list = file_list[(fold_list == FOLD) & (fold_list != -1)]
    test_label_list = oh_label_list[(fold_list == FOLD) & (fold_list != -1)]
    test_path_list = path_list[(fold_list == FOLD) & (fold_list != -1)]

    return test_file_list, test_label_list, test_path_list, oh_encoder

def evaluation(dataloader, model):
    predictions = []
    targets = []
    with torch.no_grad():
        for batch, (X, y) in enumerate(dataloader):
                # Compute prediction and loss
                X = X.to(device)
                y = y.to(device)

                pred = model(X)

                predictions.append(pred.cpu().detach().numpy())
                targets.append(y.cpu().detach().numpy())

    return np.concatenate(predictions), np.concatenate(targets)

In [None]:
image_height, image_width = 192, 384
batch_size = 32

for FOLD in range(5):
    FOLD += 1

    # Get the file list and label list for the test set
    test_file_list, test_label_list, test_path_list, oh_encoder = get_dataset_split(FOLD)

    # Define the transformations
    input_trans = transforms.Compose([transforms.ToTensor(), transforms.Resize((image_height, image_width))])

    # Create an instance of the CustomDataset
    test_dataset = CustomDataset(test_file_list, test_label_list, input_trans)

    # Create a DataLoader for the dataset
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size)

    # Get the model to device
    model = model_dict[str(FOLD)].to(device)
    # Get feature extractor from the model
    feature_extractor = torch.nn.Sequential(*list(model.children())[:-1])

    # Evaluate the model
    predictions, targets = evaluation(test_dataloader, model)
    feature_maps, _ = evaluation(test_dataloader, feature_extractor)

    test_df = pd.DataFrame()
    test_df["PATH"] = test_path_list
    test_df["PRED"] = oh_encoder.inverse_transform(predictions).ravel()
    test_df["PRED%"] = np.max(predictions, axis=1)
    test_df["TARGET"] = oh_encoder.inverse_transform(targets).ravel()

    pd.to_pickle(test_df, path + "results/final_models/test_df_{}.pkl".format(FOLD))
    np.save(path + "results/final_models/feature_maps_{}.npy".format(FOLD), np.asarray(feature_maps))