# Script for `Testing`

In [None]:
import os
import sys
import re
import traceback
from pathlib import Path
from typing import List, Dict
from datetime import datetime
from copy import deepcopy
from glob import glob
import json
import toml

from tqdm.auto import tqdm
import pandas as pd
import numpy as np
import cv2

import torch
from torch import nn, utils
from torch.utils.data import Dataset, DataLoader
import torchvision

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report


rel_module_path = "./../../modules/"
sys.path.append( str(Path(rel_module_path).resolve()) ) # add path to scan customized module

from logger import init_logger
from fileop import create_new_dir
from dl_utils import set_gpu, ImgDataset, caulculate_metrics, save_model, plot_training_trend, \
                     confusion_matrix_with_class, get_sortedClassMapper_from_dir
from plt_show import plot_in_rgb # server can't use `cv.imshow()`

config_dir = Path( "./../../Config/" ).resolve()

# print("="*100, "\n")

In [None]:
PredByImg_logger = init_logger(r"Predict By Image")

Load `db_path_plan.toml`

In [None]:
with open(config_dir.joinpath("db_path_plan.toml"), mode="r") as f_reader:
    dbpp_config = toml.load(f_reader)

db_root = Path(dbpp_config["root"])

Load `(PredByImg)_vit_b_16.toml`

In [None]:
with open(config_dir.joinpath("(PredByImg)_vit_b_16.toml"), mode="r") as f_reader:
    config = toml.load(f_reader)
    
batch_size        = config["test_opts"]["batch_size"]
debug_mode        = config["test_opts"]["debug_mode"]["enable"]
debug_rand_select = config["test_opts"]["debug_mode"]["rand_select"]
cuda_idx          = config["test_opts"]["cuda"]["index"]

model_history = config["model_prediction"]["history"]
model_desc    = config["model_prediction"]["desc"]

Load `train_config.toml`

In [None]:
load_dir_root = db_root.joinpath(dbpp_config["model_prediction"])
load_dir = os.path.join(load_dir_root, model_history)
train_config_path = os.path.join(load_dir, r"train_config.toml")

with open(train_config_path, mode="r") as f_reader:
    train_config = toml.load(f_reader)

dataset_name             = train_config["dataset"]["name"]
dataset_result_alias     = train_config["dataset"]["result_alias"]
dataset_gen_method       = train_config["dataset"]["gen_method"]
dataset_classif_strategy = train_config["dataset"]["classif_strategy"]
dataset_param_name       = train_config["dataset"]["param_name"]

model_name = train_config["model"]["model_name"]

rand_seed = train_config["train_opts"]["random_seed"]
use_hsv   = train_config["train_opts"]["data"]["use_hsv"]

Generate `path_vars`

In [None]:
dataset_cropped_root = db_root.joinpath(dbpp_config["dataset_cropped"])
dataset_dir = dataset_cropped_root.joinpath(dataset_name, dataset_result_alias, dataset_gen_method, 
                                                      dataset_classif_strategy, dataset_param_name)
assert dataset_dir.exists(), f"Can't find directory: '{dataset_dir}'"

test_selected_dir = os.path.join(dataset_dir, "test", "selected")

Run

In [None]:
# Set GPU
device, device_name = set_gpu(cuda_idx)
PredByImg_logger.info(f"Using '{device}', device_name = '{device_name}'")


# Get datetime
time_stamp = datetime.now().strftime('%Y%m%d_%H_%M_%S')


# Set 'np.random.seed'
np.random.seed(rand_seed)


# Scan classes to create 'class_mapper'
num2class_list, class2num_dict = get_sortedClassMapper_from_dir(test_selected_dir)
PredByImg_logger.info(f"num2class_list = {num2class_list}, class2num_dict = {class2num_dict}")


# Scan tiff
test_img_list = glob(os.path.normpath(f"{test_selected_dir}/*/*.tiff"))
PredByImg_logger.info(f"total = {len(test_img_list)}")
## debug mode: random select [debug_rand_select] images
if debug_mode:
    test_img_list = np.random.choice(test_img_list, size=debug_rand_select, replace=False)
    PredByImg_logger.info(f"Debug mode, only select first {len(test_img_list)}")


# Save 'testing_amount'
testing_amount = f"{{ datatest_{len(test_img_list)} }}_{{ test_{len(test_img_list)} }}"
with open(os.path.normpath(f"{load_dir}/{testing_amount}"), mode="w") as f_writer: pass


# Create 'test_set', 'test_dataloader'
PredByImg_logger.info(f"test_data ({len(test_img_list)})")
[PredByImg_logger.info(f"{i} : img_path = {test_img_list[i]}") for i in range(5)]
test_set = ImgDataset(test_img_list, class_mapper=class2num_dict, resize=(224, 224), 
                      use_hsv=use_hsv)
test_dataloader = DataLoader(test_set, batch_size=batch_size, shuffle=False, pin_memory=True)
PredByImg_logger.info(f"※ : total test batches: {len(test_dataloader)}")


# Read test ( debug mode only )
if debug_mode:
    PredByImg_logger.info(f"Read Test: {test_img_list[-1]}")
    plot_in_rgb(test_img_list[-1], (512, 512))


# Create model ( ref: https://github.com/pytorch/vision/issues/7397 )
PredByImg_logger.info((f"load model from `torchvision`, "
                          f"model_name: '{model_name}', weights: '{model_name}/{model_history}/{model_desc}_model.pth'"))
model = getattr(torchvision.models, model_name)
model = model(weights=None)
## modify model structure
model.heads.head = nn.Linear(in_features=768, out_features=len(class2num_dict), bias=True)
model.to(device)
# print(model)
## load 'model_state_dict'
model_path = os.path.join(load_dir, f"{model_desc}_model.pth")
pth_file = torch.load(model_path, map_location=device) # unpack to device directly
model.load_state_dict(pth_file["model_state_dict"])


# Testing
## testing variable
test_f1_log = {}
pred_list = []
gt_list = []
## progress bar
pbar_n_test = tqdm(total=len(test_dataloader), desc="Test ")
## start testing
## set to evaluation mode
model.eval()
with torch.no_grad(): 
    for batch, data in enumerate(test_dataloader):
        x_test, y_test, crop_name_batch = data
        x_test, y_test = x_test.to(device), y_test.to(device) # move to GPU
        preds = model(x_test)
        pred_prob = torch.nn.functional.softmax(preds, dim=1)
        _, pred_test = torch.max(pred_prob, 1)
        
        ## extend 'pred_list', 'gt_list'
        pred_list.extend(pred_test.cpu().numpy().tolist())
        gt_list.extend(y_test.cpu().numpy().tolist())
        
        ## show predict_status of current_batch in CLI
        PredByImg_logger.info((f"Batch[ {(batch+1):0{len(str(len(test_dataloader)))}} / {len(test_dataloader)} ], "
                               f"# of (ground truth == prediction) in this batch : "
                               f"{(pred_test.cpu() == y_test.cpu()).sum().item():{len(str(len(y_test)))}}/{len(y_test)}"))
        
        ## update 'pbar_n_test'
        pbar_n_test.update(1)
        pbar_n_test.refresh()

pbar_n_test.close()
## end testing


gt_list_to_name = [ num2class_list[i] for i in gt_list ]
pred_list_to_name = [ num2class_list[i] for i in pred_list ]

caulculate_metrics(test_f1_log, None,
                   gt_list_to_name, pred_list_to_name, class2num_dict)
# print(json.dumps(test_f1_log, indent=4))


# Save `test_f1_log`
with open(os.path.normpath(f"{load_dir}/{{Logs}}_PredByImg_maweavg_f1_{test_f1_log['maweavg_f1']}.toml"), mode="w") as f_writer:
    f_writer.write(toml.dumps(test_f1_log))

# Save infomations to a file
with open(os.path.normpath(f"{load_dir}/{{Report}}_PredByImg.log"), mode="w") as f_writer:

    ## change direction of 'sys.stdout'
    orig_stdout = sys.stdout # store original 'sys.stdout'
    sys.stdout = f_writer

    ## write `config`
    config_in_report = deepcopy(config["model_prediction"])
    print("[ model_prediction ]"); print(toml.dumps(config_in_report))
    
    config_in_report = deepcopy(train_config["dataset"])
    print("[ dataset ]"); print(toml.dumps(config_in_report))
    
    print(f"※ For more detail info please refer to its 'train_config' file...\n\n") # newline


    ## write 'classification_report'
    cls_report = classification_report(y_true=gt_list_to_name, y_pred=pred_list_to_name, digits=5)
    print("Classification Report:\n"); print(f"{cls_report}\n")

    ## write 'confusion_matrix'
    #   row: Ground truth
    #   column: predict
    #  *　0　1　2
    #  0 [] [] []
    #  1 [] [] []
    #  2 [] [] []
    #
    confusion_mat = confusion_matrix_with_class(ground_truth=gt_list_to_name, prediction=pred_list_to_name)

    ## recover direct of 'sys.stdout'
    sys.stdout = orig_stdout


# Rename 'load_dir'
## new_name_format = {time_stamp}_{state}_{target_epochs_with_ImgLoadOptions}_{test_f1}
## state = {EarlyStop, Interrupt, Completed, Tested, etc.}
model_history_list = re.split("{|}", model_history)
new_name = f"{model_history_list[0]}{{Tested_PredByImg}}_{{{model_history_list[3]}}}_{{{model_desc}}}_{{maweavg_f1_{test_f1_log['maweavg_f1']}}}" 
os.rename(load_dir, os.path.join(load_dir_root, new_name))