In [1]:
from IPython.display import clear_output
!pip3 install tensorflow_addons
clear_output()

In [2]:
import sys
sys.path.append('/home/jovyan/ChestXray-14')

In [3]:
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_addons as tfa

from modules.utils import get_dataset
from modules.dataset import LABELS, Dataset

2023-03-22 08:03:27.236720: I tensorflow/core/util/util.cc:169] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


In [4]:
import os
CURRENT_PATH = os.path.abspath("")
CURRENT_PATH

'/home/jovyan/ChestXray-14/evaluates'

In [5]:
from pathlib import Path

def create_path_for_save_evaluate_result(
        loss_function_of_model=None, 
        model_name=None,
        weight_option=None,
        fold_num=None
        ):
    if loss_function_of_model is None:
        print("loss_function is None")
        loss_function_of_model = "cross_entropy"
    if model_name is None:
        print("model_name is None")
        model_name = "EfficientNetB0"
    if weight_option is None:
        print("weight_option is None")
        weight_option="None"
    if fold_num is None:
        print("fold_num is None")
        fold_num = 1

    RESULT_EVALUATE_PATH = os.path.join(CURRENT_PATH, "results", "evaluate", loss_function_of_model, f"{model_name}_{weight_option}", f"fold_{fold_num}")
    Path(RESULT_EVALUATE_PATH).mkdir(parents=True, exist_ok=True)
    return RESULT_EVALUATE_PATH

# create_path_for_save_evaluate_result()

In [6]:
import json
import pprint

with open("model_path_config.json", "r") as in_file:
    MODELS_PATHS = json.load(in_file)

pprint.pprint(MODELS_PATHS)

{'DenseNet121': {'Imagenet': {'fold_1': '/home/jovyan/ChestXray-14/results/models/DenseNet121_imagenet_fold_1.h5',
                              'fold_2': '/home/jovyan/ChestXray-14/results/models/DenseNet121_imagenet_fold_2.h5',
                              'fold_3': '/home/jovyan/ChestXray-14/results/models/DenseNet121_imagenet_fold_3.h5',
                              'fold_4': '/home/jovyan/ChestXray-14/results/models/DenseNet121_imagenet_fold_4.h5',
                              'fold_5': '/home/jovyan/ChestXray-14/results/models/DenseNet121_imagenet_fold_5.h5'},
                 'None': {'fold_1': '/home/jovyan/ChestXray-14/results/models/DenseNet121_None_fold_1.h5',
                          'fold_2': '/home/jovyan/ChestXray-14/results/models/DenseNet121_None_fold_2.h5',
                          'fold_3': '/home/jovyan/ChestXray-14/results/models/DenseNet121_None_fold_3.h5',
                          'fold_4': '/home/jovyan/ChestXray-14/results/models/DenseNet121_None_fold_4.h

EVALUATE FUNCTION
---

In [7]:
import pytz
import datetime

ROOT_PATH = "/home/jovyan/ChestXray-14"
num_class = 15 # TODO: change to 15 for multi-labels

class Evaluate:
    def __init__(self, model_path):
        self.y_true = None
        self.y_preds = None
        self.model_path = model_path
        self.model = self.get_model(model_path)
        self.best_thresholds = None
        self.thresholds_200 = None
    
    def get_model(self, path):
        return tf.keras.models.load_model(path)
    
    def get_y_true(self, data):
        y_true=[]
        for X,y in data:
            for label in y:
                y_true.append(label)
        y_true = tf.Variable(y_true)
        self.y_true = y_true
        return y_true

    def get_confusion_metrics(self, y_true, y_preds):
        m = tf.keras.metrics.AUC(multi_label=True)
        m.update_state(y_true, y_preds)

        thresholds = m.thresholds
        variables = m.variables
        TP = variables[0]
        TN = variables[1]
        FP = variables[2]
        FN = variables[3]
        return thresholds, TP, TN, FP, FN

    def model_predict(self, test_dataset):
        return self.model.predict(test_dataset)

    def get_f1_scores_200_thresholds(self, test_dataset):
        self.y_true = self.get_y_true(test_dataset)
        self.y_preds = self.model_predict(test_dataset)
        
        confusion_metrics = self.get_confusion_metrics(self.y_true, self.y_preds)
        thresholds, TP, TN, FP, FN = confusion_metrics
        self.thresholds_200 = thresholds
        f1_class_dict = dict()
        for i in range(len(thresholds)):
            tp, tn, fp, fn = TP[i], TN[i], FP[i], FN[i]
            for label_index in range(num_class):
                f1_score = 2*tp[label_index] / (2*tp[label_index] + fp[label_index] + fn[label_index])
                try:
                    f1_class_dict[LABELS[label_index]].append(f1_score)
                except KeyError:
                    f1_class_dict[LABELS[label_index]] = [f1_score]
        print(LABELS)
        return f1_class_dict
    
    def get_f1_scores(self, test_dataset):
        self.y_true = self.get_y_true(test_dataset)
        self.y_preds = self.model_predict(test_dataset)
        metric = tfa.metrics.MultiLabelConfusionMatrix(num_classes=num_class)
        metric.update_state(self.y_true,
                            np.greater_equal(self.y_preds, self.best_thresholds).astype('int8'))
        result = metric.result()
        
        f1_class_dict = dict()
        for idx, confusion in enumerate(result):
            label = LABELS[idx]
            TP, TN, FP, FN = (confusion[1, 1],
                              confusion[0, 0],
                              confusion[0, 1],
                              confusion[1, 0])
            f1_score = 2*TP / (2*TP + FP + FN)
            f1_class_dict[label] = [f1_score.numpy()]
        return f1_class_dict
    
    def get_precision_scores(self, test_dataset, new_calculate=True):
        if new_calculate is True:
            self.y_true = self.get_y_true(test_dataset)
            self.y_preds = self.model_predict(test_dataset)
        metric = tfa.metrics.MultiLabelConfusionMatrix(num_classes=num_class)
        metric.update_state(self.y_true,
                            np.greater_equal(self.y_preds, self.best_thresholds).astype('int8'))
        result = metric.result()
        
        precision_class_dict = dict()
        for idx, confusion in enumerate(result):
            label = LABELS[idx]
            TP, TN, FP, FN = (confusion[1, 1],
                              confusion[0, 0],
                              confusion[0, 1],
                              confusion[1, 0])
            precision = TP / (TP + FP)
            precision_class_dict[label] = [precision.numpy()]
        return precision_class_dict
    
    def get_recall_scores(self, test_dataset, new_calculate=True):
        if new_calculate is True:
            self.y_true = self.get_y_true(test_dataset)
            self.y_preds = self.model_predict(test_dataset)
        metric = tfa.metrics.MultiLabelConfusionMatrix(num_classes=num_class)
        metric.update_state(self.y_true,
                            np.greater_equal(self.y_preds, self.best_thresholds).astype('int8'))
        result = metric.result()
        
        recall_class_dict = dict()
        for idx, confusion in enumerate(result):
            label = LABELS[idx]
            TP, TN, FP, FN = (confusion[1, 1],
                              confusion[0, 0],
                              confusion[0, 1],
                              confusion[1, 0])
            recall = TP / (TP + FN)
            recall_class_dict[label] = [recall.numpy()]
        return recall_class_dict
    
    def get_best_threshold(self,
                           test_dataset=None,
                           save_best_thresholds=f"{ROOT_PATH}/results/paper/table3_1/best_thresholds.csv",
                           save_200_thresholds=f"{ROOT_PATH}/results/paper/table3_1/f1_per_thresholds.csv"):
        if test_dataset is None:
            fold_num = int(self.model_path.split(".")[0][-1])
            test_dataset = datasets[fold_num-1]
        
        f1_scores_dict = self.get_f1_scores_200_thresholds(test_dataset)
        best_thresholds_dict = {"thresholds": [], "f1_most": [], "label": []}
        for key, value in f1_scores_dict.items():
            f1_arg_max = np.argmax(value)
            best_thresholds_dict["f1_most"].append(value[f1_arg_max].numpy())
            best_thresholds_dict["label"].append(key)
            best_thresholds_dict["thresholds"].append(self.thresholds_200[f1_arg_max])
        
        df = pd.DataFrame(best_thresholds_dict)
        df = df.set_index("label")
        df.to_csv(save_best_thresholds, index=True)
        print(f"{save_best_thresholds} was success!")
        # print(df)
        
        df_200_thresholds = pd.DataFrame(f1_scores_dict)
        df_200_thresholds.to_csv(save_200_thresholds, index=True)
        print(f"{save_200_thresholds} was success!")
        self.best_thresholds = df.copy()["thresholds"].values

    def __enter__(self):
        now = datetime.datetime.now(
            pytz.timezone('Asia/Bangkok')
        )
        print("====" * 3, now.strftime("%Y-%m-%d %H:%M:%S"), "====" * 3)
        return self

    def __exit__(self, *arg):
        self.y_true = None
        self.y_preds = None
        print("\n")

README
---

```
Dear: Chanisara

# Go to change a name of the models in `results` folder first
|_ '/home/jovyan/ChestXray-14/results/models/Resnet50_fold_1.h5' >> '/home/jovyan/ChestXray-14/results/models/Resnet50_None_fold_1.h5'
|_ '/home/jovyan/ChestXray-14/results/models/Resnet50_fold_2.h5' >> '/home/jovyan/ChestXray-14/results/models/Resnet50_None_fold_2.h5'
|_ '/home/jovyan/ChestXray-14/results/models/Resnet50_fold_3.h5' >> '/home/jovyan/ChestXray-14/results/models/Resnet50_None_fold_3.h5'
|_ '/home/jovyan/ChestXray-14/results/models/Resnet50_fold_4.h5' >> '/home/jovyan/ChestXray-14/results/models/Resnet50_None_fold_4.h5'
|_ '/home/jovyan/ChestXray-14/results/models/Resnet50_fold_5.h5' >> '/home/jovyan/ChestXray-14/results/models/Resnet50_None_fold_5.h5'

Regrad: Chonsawat
```

In [8]:
def get_test_dataset_5_fold():
    dataset = Dataset()
    _, test_dataset_fold_1 = dataset.get_kfold(fold_number=1, sample=False)
    _, test_dataset_fold_2 = dataset.get_kfold(fold_number=2, sample=False)
    _, test_dataset_fold_3 = dataset.get_kfold(fold_number=3, sample=False)
    _, test_dataset_fold_4 = dataset.get_kfold(fold_number=4, sample=False)
    _, test_dataset_fold_5 = dataset.get_kfold(fold_number=5, sample=False)
    return {
        1: test_dataset_fold_1,
        2: test_dataset_fold_2,
        3: test_dataset_fold_3,
        4: test_dataset_fold_4,
        5: test_dataset_fold_5
    }

test_datasets = get_test_dataset_5_fold()
test_datasets

2023-03-22 08:03:29.874802: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-03-22 08:03:31.666305: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 31661 MB memory:  -> device: 0, name: NVIDIA A100-SXM4-40GB, pci bus id: 0000:17:00.0, compute capability: 8.0


{1: <PrefetchDataset element_spec=(TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 15), dtype=tf.int64, name=None))>,
 2: <PrefetchDataset element_spec=(TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 15), dtype=tf.int64, name=None))>,
 3: <PrefetchDataset element_spec=(TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 15), dtype=tf.int64, name=None))>,
 4: <PrefetchDataset element_spec=(TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 15), dtype=tf.int64, name=None))>,
 5: <PrefetchDataset element_spec=(TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 15), dtype=tf.int64, name=None))>}

In [12]:
def main():
    for model_name, dict_of_weight in MODELS_PATHS.items():
        
        for weight_option, dict_of_folds in dict_of_weight.items():
            
            for fold_name, path in dict_of_folds.items():
                
                fold_number = int(fold_name.split("_")[-1])
                print(f"{model_name}: {weight_option}: {fold_number}: {path}")
                
                model = Evaluate(path)
                
                test_dataset = test_datasets[fold_number]
                y_preds = Evaluate(path).model_predict(test_dataset)
                y_true = Evaluate(path).get_y_true(test_dataset)
                
                RESULT_EVALUATE_PATH = create_path_for_save_evaluate_result(
                    loss_function_of_model="cross_entropy",
                    model_name=model_name,
                    weight_option=weight_option,
                    fold_num=fold_number
                    )
                
                model.get_best_threshold(
                    test_dataset=test_dataset,
                    save_best_thresholds=f"{RESULT_EVALUATE_PATH}/best_thresholds.csv",
                    save_200_thresholds=f"{RESULT_EVALUATE_PATH}/f1_per_thresholds.csv"
                    )
                
                with model:
                    f1_each_class = model.get_f1_scores(test_dataset)
                    # print(f1_each_class)
                    df = pd.DataFrame(f1_each_class)
                    df.to_csv(f"{RESULT_EVALUATE_PATH}/f1_scores.csv", index=False)

                    precision_each_class = model.get_precision_scores(test_dataset, new_calculate=False)
                    pd.DataFrame(precision_each_class)\
                        .to_csv(f"{RESULT_EVALUATE_PATH}/precision.csv", index=False)

                    recall_each_class = model.get_recall_scores(test_dataset, new_calculate=False)
                    pd.DataFrame(recall_each_class)\
                        .to_csv(f"{RESULT_EVALUATE_PATH}/recall.csv", index=False)
                
                # return None
        print()
        
main()

DenseNet121: None: 1: /home/jovyan/ChestXray-14/results/models/DenseNet121_None_fold_1.h5
['No Finding', 'Atelectasis', 'Consolidation', 'Infiltration', 'Pneumothorax', 'Edema', 'Emphysema', 'Fibrosis', 'Effusion', 'Pneumonia', 'Pleural_Thickening', 'Cardiomegaly', 'Nodule', 'Mass', 'Hernia']
/home/jovyan/ChestXray-14/evaluates/results/evaluate/cross_entropy/DenseNet121_None/fold_1/best_thresholds.csv was success!
/home/jovyan/ChestXray-14/evaluates/results/evaluate/cross_entropy/DenseNet121_None/fold_1/f1_per_thresholds.csv was success!


DenseNet121: None: 2: /home/jovyan/ChestXray-14/results/models/DenseNet121_None_fold_2.h5
['No Finding', 'Atelectasis', 'Consolidation', 'Infiltration', 'Pneumothorax', 'Edema', 'Emphysema', 'Fibrosis', 'Effusion', 'Pneumonia', 'Pleural_Thickening', 'Cardiomegaly', 'Nodule', 'Mass', 'Hernia']
/home/jovyan/ChestXray-14/evaluates/results/evaluate/cross_entropy/DenseNet121_None/fold_2/best_thresholds.csv was success!
/home/jovyan/ChestXray-14/evaluates/