In [None]:
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
import numpy as np
import nibabel as nib
import pandas as pd
import torch
import csv

import monai
from monai.inferers import sliding_window_inference
from monai.transforms import (
	AsDiscrete,
	EnsureChannelFirstd,
	Compose,
	CropForegroundd,
	LoadImaged,
	Orientationd,
	Activations,
	EnsureType,
	LoadImage,
	Orientation
)

from monai.networks.nets import UNETR
from monai.metrics import DiceMetric, HausdorffDistanceMetric, compute_hausdorff_distance
from monai.data import CacheDataset, DataLoader, decollate_batch

import numpy as np
from scipy import stats
from scipy.stats import wilcoxon
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
# Model path
model_path = "results/Exp1/Reorient-50-50/best_metric_model.pth"

# Model input size
model_input_size = (128,128,128)

# Jumlah patch input
patch_num = 4

# Validation results dir
output_dir = "output/Reorient-50-50"

# Apakah merupakan data tanpa reorientasi?
is_resized = True
resized_dir = "resized_data"
reorient_dir = "reoriented_data"

# CSV split Validation
val_csv = "CSV/data_val.csv"

# CSV hasil skor dice & hausdorff
dice_csv = "dice_Exp1.csv"
hausdorff_csv = "hd_Exp1.csv"

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = UNETR(
	in_channels=1,
	out_channels=1,
	img_size=(128, 128, 128),
	feature_size=16,
	hidden_size=768,
	mlp_dim=3072,
	num_heads=12,
	proj_type="perceptron",
	norm_name="instance",
	res_block=True,
	dropout_rate=0.0,
).to(device)

os.makedirs(output_dir, exist_ok=True)

model.load_state_dict(torch.load(model_path, map_location=device))

post_label = AsDiscrete(threshold=0.5)
post_pred = Compose([Activations(sigmoid=True), AsDiscrete(threshold=0.5)])
to_numpy = Compose([EnsureType(data_type="numpy")])
dice_metric = DiceMetric(include_background=True, reduction="mean", get_not_nans=False)
hausdorff_metric = HausdorffDistanceMetric(include_background=True, reduction="mean", get_not_nans=False, percentile=95)

infer_transforms = Compose(
	[
		LoadImaged(keys=["image", "label"]),
		EnsureChannelFirstd(keys=["image", "label"]),      
		CropForegroundd(keys=["image", "label"], source_key="image"),
		Orientationd(keys=["image", "label"], axcodes="RAS"),
	]
)

if(not os.path.exists(dice_csv)):
	with open(dice_csv, 'w', newline='') as csv_file:
		writer = csv.writer(csv_file)
		writer.writerow(['dir', 'Normal', 'TTA_1', 'TTA_2', 'TTA_3', 'TTA_4', 'TTA_5'])
		for dir in dirs:
			writer.writerow([dir, '', '', '', '', '', '', '', ''])

if(not os.path.exists(hausdorff_csv)):
	with open(hausdorff_csv, 'w', newline='') as csv_file:
		writer = csv.writer(csv_file)
		writer.writerow(['dir', 'Normal', 'TTA_1', 'TTA_2', 'TTA_3', 'TTA_4', 'TTA_5'])
		for dir in dirs:
			writer.writerow([dir, '', '', '', '', '', '', '', ''])

In [None]:
df_val = pd.read_csv(val_csv)

if (is_resized):
    df_val['TOF_pre'] = df_val['TOF_pre'].str.replace(reorient_dir, resized_dir, regex=False)
    df_val['labels'] = df_val['labels'].str.replace(reorient_dir, resized_dir, regex=False)

TOF_pre_val_path = df_val['TOF_pre']
label_val_path = df_val['labels']
dirs = df_val['case_id']

val_dict = [{"image": image_names, "label": label_names} for image_names, label_names in zip(TOF_pre_val_path, label_val_path)]
val_ds = CacheDataset(data=val_dict, transform=infer_transforms, cache_rate=1.0, num_workers=4)
val_loader = DataLoader(val_ds, batch_size=1, num_workers=4)

eval_dict = {}

with torch.no_grad():
	for i, val_data in enumerate(val_loader):
		dir = dirs[i]
		val_outputs = sliding_window_inference(val_data["image"].to(device), model_input_size, patch_num, model, progress=True)
		val_outputs = [post_pred(j) for j in decollate_batch(val_outputs)]
		val_labels = [post_label(i) for i in decollate_batch(val_data["label"].to(device))]
		dice_metric(y_pred=val_outputs, y=val_labels)
		hausdorff_metric(y_pred=val_outputs, y=val_labels)
		for j, output in enumerate(val_outputs):
			output_np = to_numpy(output).squeeze(0)
			nib_image = nib.Nifti1Image(output_np, affine=np.eye(4))
			
			output_filename = f"{dir}_infer.nii.gz"
			output_filepath = os.path.join(output_dir, dir, output_filename)
			
			nib.save(nib_image, output_filepath)
		dice_score = dice_metric.aggregate().item()
		hausdorff_score = hausdorff_metric.aggregate().item()
		dice_metric.reset()
		hausdorff_metric.reset()
		print(f"Dir: {dir}. Dice Metric: {dice_score:.2f}\tHausdorrf: {hausdorff_score:.2f}\t{i+1}/{len(dirs)}")

		eval_dict[dir] = [dice_score, hausdorff_score]

dice_col = os.path.basename(output_dir)
haus_col = dice_col

with open(dice_csv, 'r', newline='') as file:
	rows = list(csv.reader(file))

header = rows[0]
if dice_col not in header:
	header.append(dice_col)

col_index = header.index(dice_col)
new_rows = [header]

for row in rows[1:]:
	dir_name = row[0]
	score = eval_dict.get(dir_name, [None])[0]
	while len(row) < len(header):
		row.append('')
	if score is not None and row[col_index] == '':
		row[col_index] = f"{score:.4f}"
	new_rows.append(row)

with open(dice_csv, 'w', newline='') as file:
	writer = csv.writer(file)
	writer.writerows(new_rows)

with open(hausdorff_csv, 'r', newline='') as file:
	rows = list(csv.reader(file))

header = rows[0]
if haus_col not in header:
	header.append(haus_col)

haus_index = header.index(haus_col)
new_rows = [header]

for row in rows[1:]:
	dir_name = row[0]
	score = eval_dict.get(dir_name, [None, None])[1]
	while len(row) < len(header):
		row.append('')
	if score is not None and row[haus_index] == '':
		row[haus_index] = f"{score:.4f}"
	new_rows.append(row)

with open(hausdorff_csv, 'w', newline='') as file:
	writer = csv.writer(file)
	writer.writerows(new_rows)