#### EVALUATION FILE

##### This file is to evaluate the loaded models with an specific validation keys

In [1]:
import torch
import os
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt 
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision.transforms.functional import crop
import gc

from torch.utils.data import Dataset
from Utils.logger import initialize_logger, get_logger
import numpy as np
import torch
from scipy import signal
from Utils.config import (
    USE_PHYSICAL_DATA,
    PATH_DATASET,
    IMG_PATH,
)

from Utils import (
    metrics,
)
# Metrics
metrics = [
    torch.nn.MSELoss(),
    metrics.PerCS()
]

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
LOCAL_SLP_DATASET_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd))),'SLP/danaLab')
SERVER_SLP_DATASET_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd))))),'mnt/DADES2/SLP/SLP/danaLab')

In [18]:
# List of the paths of the test arrays

import json
import os

current_directory = os.getcwd()

f = open(os.path.join(os.path.dirname((current_directory)),'Models/TestJson/test_paths_20240506230502.json'))
 

data = json.load(f)

dic_paths = {}

for key, paths in data.items():
    dic_paths[key] = []
    for i, path in enumerate(paths):
        parts = path.split("SLP", 2) 
        result = "SLP".join(parts[1:]).lstrip('/')
        dic_paths[key].append(result)

print(dic_paths)

if PATH_DATASET == 'Server':
    path_arrays = SERVER_SLP_DATASET_PATH
else:
    path_arrays = LOCAL_SLP_DATASET_PATH
    
p_data = pd.read_csv(os.path.join(path_arrays, 'physiqueData.csv'))
p_data['gender'] = p_data['gender'].str.strip()
p_data = pd.get_dummies(p_data, columns=['gender'])
p_data = p_data.drop('sub_idx', axis=1)
p_data['gender_male'] = p_data['gender_male'].astype(int)
p_data['gender_female'] = p_data['gender_female'].astype(int)


{'pm': ['SLP/danaLab/00032/PMarray/cover2/000014.npy', 'SLP/danaLab/00032/PMarray/cover2/000017.npy', 'SLP/danaLab/00032/PMarray/cover2/000041.npy', 'SLP/danaLab/00032/PMarray/cover1/000014.npy', 'SLP/danaLab/00032/PMarray/cover1/000017.npy', 'SLP/danaLab/00032/PMarray/cover1/000041.npy', 'SLP/danaLab/00032/PMarray/uncover/000014.npy', 'SLP/danaLab/00032/PMarray/uncover/000017.npy', 'SLP/danaLab/00032/PMarray/uncover/000041.npy', 'SLP/danaLab/00100/PMarray/cover2/000014.npy', 'SLP/danaLab/00100/PMarray/cover2/000017.npy', 'SLP/danaLab/00100/PMarray/cover2/000041.npy', 'SLP/danaLab/00100/PMarray/cover1/000014.npy', 'SLP/danaLab/00100/PMarray/cover1/000017.npy', 'SLP/danaLab/00100/PMarray/cover1/000041.npy', 'SLP/danaLab/00100/PMarray/uncover/000014.npy', 'SLP/danaLab/00100/PMarray/uncover/000017.npy', 'SLP/danaLab/00100/PMarray/uncover/000041.npy', 'SLP/danaLab/00034/PMarray/cover2/000014.npy', 'SLP/danaLab/00034/PMarray/cover2/000017.npy', 'SLP/danaLab/00034/PMarray/cover2/000041.npy',

In [None]:
# Function to get all the files of an especific cover or an especific patient

covers = ['cover1','cover2','uncover']

def filter_files(dic,filter):
    filter_dic = {}
    for key, paths in dic.items():
        filter_dic[key]=[]
        for path in paths:
            if filter in path:
                filter_dic[key].append(path)
    return filter_dic

In [1]:
# class CustomDataset

class CustomDataset(Dataset):
    # Dataset for the random option
    # Includes:
    # - IR arrays
    # - PR arrays
    # Physical data
    def __init__(self, ir_paths, pm_paths, p_data, transform=None):

        self.ir_paths = ir_paths
        self.pm_paths = pm_paths
        self.p_data = p_data

        self.transform = transform

    def __len__(self):
        return len(self.ir_paths)

    def __getitem__(self, index):

        input_path = self.ir_paths[index]
        output_path = self.pm_paths[index]

        input_array = self.load_array(input_path)
        output_array = self.load_array(output_path)
        input_array = input_array.astype(np.float32)
        output_array = output_array.astype(np.float32)

        if self.transform:
            input_array = self.transform['input'](input_array)

            if PATH_DATASET == 'Server':
                parts = str(output_path.split("/")[-4])
            else:
                parts = str(output_path.split("\\")[-4])
            number = int(parts)
            p_vector = self.p_data.iloc[number-1]
            weight = p_vector[1]
            tensor_data = torch.tensor(p_vector.values)

            # Applying median filter
            median_array = signal.medfilt2d(output_array)
            max_array = np.maximum(output_array, median_array)

            area_m = 1.03226 / 10000
            ideal_pressure = weight * 9.81 / (area_m * 1000)

            output_array = (max_array / np.sum(max_array)) * ideal_pressure
            output_array = self.transform['output'](output_array)

            if USE_PHYSICAL_DATA:
                return input_array, output_array, tensor_data
            else:
                return input_array, output_array

    def load_array(self, path):
        # Load the array
        array = np.load(path)
        return array


In [2]:
# Function to get the files that we want and do the transforms and create a dataloader to pass the model

filter_dic = filter_files(dic_paths,'cover1')

def crop_array(array):
        return crop(array,7, 29, 140, 66)

# Data transformation if needed
transform = {
        'input': transforms.Compose([
                    transforms.ToTensor(),
                    transforms.Lambda(crop_array),  
                    transforms.Resize((192, 84)),
		            transforms.Normalize(mean=[0.5], std=[0.5]),
                    ]),
        'output': transforms.Compose([transforms.ToTensor()])}


def create_dataloader(dic,p_data,transform):
    
    dataset = CustomDataset(dic['ir'], dic['pm'], p_data, transform=transform)
    print('Len dataset:',len(dataset))

    test_loader = DataLoader(
            dataset, batch_size=1, shuffle=False, num_workers=0, drop_last=True)
    
    test_dataset_info = {
        'Number of samples': len(test_loader.dataset),
        'Batch size': test_loader.batch_size,
        'Number of batches': len(test_loader)
        }
    
    print(f"Val loader info: {test_dataset_info}")

    return test_loader


NameError: name 'cover_files' is not defined

In [2]:
# Function to evaluate with a model loaded the files that we want with the metrics

def evaluation(model,model_file,metrics,test_loader):
    model.load_state_dict(torch.load(model_file, map_location=torch.device('cpu')))
    model.to(DEVICE)

    total_metric = [0, 0]

    model.eval()

    with torch.no_grad():

        for batch_idx, (input_images, target_images) in enumerate(test_loader, 1):

            input_img= input_images.to(DEVICE)
            target_img = target_images.to(DEVICE)

            output_img = model(input_img)

            for i, metric in enumerate(metrics):

                test_metric = metric(output_img, target_img)

                total_metric[i] += test_metric

            # Free memory in each iteration
            del input_images
            del target_images
            torch.cuda.empty_cache()  # Clean CUDA Cache if used GPU
            gc.collect()  # Collect trash to free memory not used

    epoch_metric = [total_metric[0] /
                    len(test_loader), total_metric[1] / len(test_loader)]
    
    print(metrics)
    print(epoch_metric)

    fig, axes = plt.subplots(1, 3, figsize=(15, 5))

    axes[0].imshow(input_images.squeeze().cpu().numpy())
    axes[0].set_title('Input Image')

    axes[0].imshow(target_images.squeeze().cpu().numpy())
    axes[0].set_title('Target Image')

    axes[1].imshow(output_img.squeeze().cpu().numpy())
    axes[1].set_title('Output Image')

    

    fig.suptitle('Input, Target and Output Image', fontsize=12)
    fig.text(0.5, 0.01, f': {epoch_metric:.4f}', ha='center')
    plt.savefig(os.path.join(IMG_PATH,'Comparing_output_model.png'))
    plt.show()
