## Preprocessing for 2.5D

.mha -> .npy

In [None]:
import SimpleITK as sitk
import numpy as np
import os
import matplotlib.pyplot as plt
from dotenv import load_dotenv
from pathlib import Path
import cv2
import random
from tqdm import tqdm

In [None]:
load_dotenv()
# 원본 파일 저장 경로
MHA_PATH_ORIGIN = os.getenv("MHA_PATH_ORIGIN")
MHA_PATH_AB = os.getenv("MHA_PATH_AB")
MHA_PATH_TH = os.getenv("MHA_PATH_TH")
MHA_PATH_HN = os.getenv("MHA_PATH_HN")

# .npy 파일 저장 경로
npy_dir = os.path.join(MHA_PATH_ORIGIN, 'Task2_npy', 'AB')

In [None]:
def get_all_files_pathlib(root_dir, type):
    root_path = Path(root_dir)
    # rglob 결과에서 is_file()인 항목만 필터링
    return [p for p in root_path.rglob('*') if p.is_file() 
            and (p.name == f'{type}.mha')]

def normalize_and_resize_to_float(slice_img, target_size):
    """
    단일 2D 슬라이스를 [-1.0, 1.0] float로 정규화 후 크기 조정
    """
    MIN_HU = -1000
    MAX_HU = 2000
    
    # HU Windowing (Clamping)
    slice_img = np.clip(slice_img, MIN_HU, MAX_HU)

    # min-max 정규화 ([-1.0, 1.0])
    # [MIN_HU, MAX_HU] 범위를 [-1.0, 1.0]로 스케일링
    normalized_slice = ((slice_img - MIN_HU) / (MAX_HU - MIN_HU))* 2.0 - 1.0
    
    
    # 3. 리사이즈 (float 상태로 리사이즈)
    resized_slice = cv2.resize(normalized_slice.astype(np.float32), 
                               target_size, 
                               interpolation=cv2.INTER_AREA)        # 축소 시 가장 권장되는 보간법
    
    return resized_slice # [H, W] 크기의 float32 배열 반환

def process_volume_to_2_5d(volume_path, output_dir, patient_name):
    """
    MHA 볼륨 1개를 2.5D (3-channel) .npy 파일들로 변환하여 저장
    """
    TARGET_SIZE = (256, 256)
    try:
        # 1. MHA 파일 로드
        sitk_img = sitk.ReadImage(volume_path)
        
        # 2. NumPy 배열로 변환 [Depth, Height, Width] (z, y, x)
        volume_data = sitk.GetArrayFromImage(sitk_img)
        
        num_slices = volume_data.shape[0]

        if num_slices < 3:
            print(f"Skipping {patient_name}: Not enough slices (< 3)")
            return

        # 3. 2.5D (3-slice) 처리
        for z in range(1, num_slices - 1):
            
            slice_prev = normalize_and_resize_to_float(volume_data[z - 1, :, :], TARGET_SIZE)
            slice_curr = normalize_and_resize_to_float(volume_data[z,     :, :], TARGET_SIZE)
            slice_next = normalize_and_resize_to_float(volume_data[z + 1, :, :], TARGET_SIZE)
            
            # 4. 채널(channel) 축으로 스택
            # 결과물: [Height, Width, 3] 모양의 'float32' 텐서
            stacked_image = np.stack([slice_prev, slice_curr, slice_next], axis=-1)
            
            # 5. 파일 저장 (Numpy .npy)
            output_filename = f"{patient_name}_slice_{z:04d}.npy"
            output_path = os.path.join(output_dir, output_filename)
            
            np.save(output_path, stacked_image)

    except Exception as e:
        print(f"Error processing {volume_path}: {e}")

In [None]:
pList_AB_cbct = get_all_files_pathlib(MHA_PATH_AB, 'cbct')
pList_AB_ct = get_all_files_pathlib(MHA_PATH_AB, 'ct')
pList_TN_cbct = get_all_files_pathlib(MHA_PATH_TH, 'cbct')
pList_TN_ct = get_all_files_pathlib(MHA_PATH_TH, 'ct')
pList_HN_cbct = get_all_files_pathlib(MHA_PATH_HN, 'cbct')
pList_HN_ct = get_all_files_pathlib(MHA_PATH_HN, 'ct')

abnomial image만 고려

train / test 분할

In [None]:
random.shuffle(pList_AB_cbct)
random.shuffle(pList_AB_ct)

TEST_SPLIT_RATIO = 0.3

cbct_split_idx = int(len(pList_AB_cbct) * TEST_SPLIT_RATIO)
ct_split_idx = int(len(pList_AB_ct) * TEST_SPLIT_RATIO)

test_pList_AB_cbct = pList_AB_cbct[:cbct_split_idx]
train_pList_AB_cbct = pList_AB_cbct[cbct_split_idx:]

test_pList_AB_ct = pList_AB_ct[:ct_split_idx]
train_pList_AB_ct = pList_AB_ct[ct_split_idx:]

In [None]:
# CycleGAN 폴더 구조 생성
dir_map = {
    os.path.join(npy_dir, "trainA"): train_pList_AB_cbct,
    os.path.join(npy_dir, "trainB"): train_pList_AB_ct,
    os.path.join(npy_dir, "testA"): test_pList_AB_cbct,
    os.path.join(npy_dir, "testB"): test_pList_AB_ct,
}


for output_dir, file_list in dir_map.items():
    os.makedirs(output_dir, exist_ok=True)
    print(f"\nProcessing files for: {os.path.basename(output_dir)}")
    print(f"Found {len(file_list)} volumes.")
    
    for i, f_path in enumerate(tqdm(file_list, desc=f"Creating {os.path.basename(output_dir)}")):
        patient_name = f'patient_{i}'
        
        domain_prefix = "cbct" if "A" in output_dir else "ct"
        patient_name = f"{domain_prefix}_{patient_name}"
        process_volume_to_2_5d(f_path, output_dir, patient_name)
print("\n---------------------------------")
print(f"Dataset creation complete")
print("All files are saved as .npy (float32, -1.0 to 1.0)")
print("---------------------------------")