뇌의 위치를 일반화하고, 일반화된 뇌에 해마 mask를 적용하여 해마 영역을 추출

---




/drive/MyDrive/  
-hd_bet/ : 두개골 제거가 진행된 데이터셋  
-template/ : 공간 정렬 및 마스킹에 필요한 템플레이트 (https://git.fmrib.ox.ac.uk/fsl/data_standard)  
-normalize_mri/ : 최종 공간 정렬 및 마스킹이 진행된 데이터




In [None]:
pip install antspyx nibabel numpy scipy



In [None]:
# -*- coding: utf-8 -*-
"""
Colab(T4, High-RAM) 프리셋: 빠른 멀티프로세싱 + 2mm 저해상도 SyN
- 콘솔: 진행률 + 현재 시각(HH:MM:SS)
- 파일 로그: /content/drive/MyDrive/normalize_mri/batch_log.txt

기능:
1) 해마 마스크(subject space) 생성 및 저장
2) 해마만 남긴 원본 해상도 NIfTI(.nii.gz) 저장  ← NEW
3) 해마 ROI(64^3) 볼륨/마스크 및 .npy 저장
"""

import os
import re
import json
import logging
from pathlib import Path
from datetime import datetime
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import get_context

import numpy as np
import nibabel as nib
import scipy.ndimage as ndi

# =============================================================================
# 경로/파라미터
# =============================================================================
ROOT = Path("/content/drive/MyDrive")
HD_BET_DIR   = ROOT / "hd_bet"
TEMPLATE_DIR = ROOT / "template"
OUT_ROOT     = ROOT / "normalize_mri"

TEMPLATE_IMG    = TEMPLATE_DIR / "MNI152_T1_1mm_brain.nii.gz"
HIPPO_MASK_MNI  = TEMPLATE_DIR / "MNI152_T1_1mm_Hipp_mask_dil8.nii.gz"

REG_MODE = "down2mm_syn"   # 'affine_only' | 'quick_syn' | 'down2mm_syn'
THRESH_MASK = 0.5
MARGIN_VOX  = 10
ROI_TARGET  = (64, 64, 64)
CLOSE_ITER  = 1

os.environ["ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"

N_JOBS = 10      # Colab High-RAM 권장
CHUNK  = 200

LOG_PATH  = OUT_ROOT / "batch_log.txt"

# =============================================================================
# 로깅/유틸
# =============================================================================
def setup_logger():
    OUT_ROOT.mkdir(parents=True, exist_ok=True)
    root = logging.getLogger()
    for h in list(root.handlers):
        root.removeHandler(h)
    root.setLevel(logging.INFO)
    fh = logging.FileHandler(LOG_PATH)
    fh.setLevel(logging.INFO)
    fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] [%(processName)s] %(message)s"))
    root.addHandler(fh)

def ensure_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)

def save_np_as_nii_like(vol_np: np.ndarray, like_path: Path, out_path: Path):
    like = nib.load(str(like_path))
    img  = nib.Nifti1Image(vol_np.astype(np.float32), like.affine, like.header)
    nib.save(img, str(out_path))

def crop_bbox_with_margin(vol: np.ndarray, mask: np.ndarray, margin=10):
    idx = np.argwhere(mask > 0)
    if idx.size == 0:
        raise ValueError("마스크가 비어있습니다(>0 voxels 없음). 정합/임계값 확인 필요.")
    zmin, ymin, xmin = idx.min(0); zmax, ymax, xmax = idx.max(0)
    zmin = max(zmin - margin, 0); ymin = max(ymin - margin, 0); xmin = max(xmin - margin, 0)
    zmax = min(zmax + margin, vol.shape[0] - 1)
    ymax = min(ymax + margin, vol.shape[1] - 1)
    xmax = min(xmax + margin, vol.shape[2] - 1)
    vol_roi = vol[zmin:zmax+1, ymin:ymax+1, xmin:xmax+1]
    msk_roi = mask[zmin:zmax+1, ymin:ymax+1, xmin:xmax+1]
    return vol_roi, msk_roi

def resize3d_linear(vol: np.ndarray, out_shape):
    zoom = [o/i for o, i in zip(out_shape, vol.shape)]
    return ndi.zoom(vol, zoom, order=1)

def zscore(vol: np.ndarray):
    mu = float(vol.mean()); sd = float(vol.std()) + 1e-8
    return (vol - mu) / sd

def is_mask_file(path: Path):
    return bool(re.search(r"(?:^|[_\-])mask(?:\.|_|$)", path.stem, re.IGNORECASE))

def subject_id_from_name(path: Path):
    stem = path.stem
    m = re.match(r"^([A-Za-z]?\d+)", stem)
    return m.group(1) if m else stem

# =============================================================================
# 부모 프로세스: 스레드 제한 + ants 선행 임포트
# =============================================================================
os.environ.setdefault("ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS", "2")
os.environ.setdefault("OMP_NUM_THREADS", "2")
os.environ.setdefault("OPENBLAS_NUM_THREADS", "2")
os.environ.setdefault("MKL_NUM_THREADS", "2")
os.environ.setdefault("NUMEXPR_NUM_THREADS", "2")

import ants  # antspyx

# =============================================================================
# 빠른 정합
# =============================================================================
def _register_fast(sub_img, tpl_img):
    reg_kwargs = dict(
        type_of_transform="SyN",
        reg_iterations=(40, 20, 0),
        shrink_factors=(8, 4, 2),
        smoothing_sigmas=(3, 2, 1),
        aff_metric="mattes",
        verbose=False,
        random_seed=0,
    )

    if REG_MODE == "affine_only":
        return ants.registration(
            fixed=tpl_img, moving=sub_img,
            type_of_transform="Affine",
            reg_iterations=(40, 20, 0),
            aff_metric="mattes",
            verbose=False, random_seed=0
        )

    elif REG_MODE == "quick_syn":
        return ants.registration(fixed=tpl_img, moving=sub_img, **reg_kwargs)

    elif REG_MODE == "down2mm_syn":
        # interp_type=0 : nearest / 1 : linear 등
        tpl_ds = ants.resample_image(tpl_img, (2.0, 2.0, 2.0), use_voxels=False, interp_type=0)
        sub_ds = ants.resample_image(sub_img, (2.0, 2.0, 2.0), use_voxels=False, interp_type=0)
        return ants.registration(fixed=tpl_ds, moving=sub_ds, **reg_kwargs)

    return ants.registration(fixed=tpl_img, moving=sub_img, type_of_transform="SyN")

# =============================================================================
# 워커
# =============================================================================
def process_one_subject(sub_path_str: str):
    setup_logger()
    p = Path(sub_path_str)
    try:
        sub_id  = subject_id_from_name(p)
        outdir  = OUT_ROOT / sub_id
        regdir  = outdir / "reg"
        maskdir = outdir / "masks"
        roidir  = outdir / "roi"
        for d in [outdir, regdir, maskdir, roidir]:
            ensure_dir(d)

        logging.info(f"[{sub_id}] 시작: {p.name}")
        sub_img = ants.image_read(str(p))
        tpl_img = ants.image_read(str(TEMPLATE_IMG))
        hip_mni = ants.image_read(str(HIPPO_MASK_MNI))

        # 1) 정합
        reg = _register_fast(sub_img, tpl_img)

        # 2) 해마 마스크(subject space) 생성 및 저장
        hip_mni_bin = (hip_mni > THRESH_MASK).astype("uint8")
        hip_sub = ants.apply_transforms(
            fixed=sub_img,
            moving=hip_mni_bin,
            transformlist=reg["invtransforms"],
            interpolator="nearestNeighbor",
        )
        hip_sub_path = maskdir / "hip_mask_subject_space.nii"
        ants.image_write(hip_sub, str(hip_sub_path))

        # 2-1) (NEW) 해마만 남긴 원본 전체 해상도 NIfTI 저장
        # 격자 확인 후 필요 시 마스크를 원본 격자에 맞춤
        hip_sub_aligned = ants.resample_image_to_target(hip_sub, sub_img, interp_type=0)


        sub_np_full = sub_img.numpy()
        hip_np_full = (hip_sub_aligned.numpy() > 0.5).astype(np.uint8)
        masked_np_full = sub_np_full * hip_np_full

        hip_only_full_path = outdir / "hip_only_subject_space.nii.gz"
        save_np_as_nii_like(masked_np_full, p, hip_only_full_path)

        # 3) ROI 추출 및 저장 (옵션)
        sub_np = sub_img.numpy()
        hip_np = (hip_sub_aligned.numpy() > 0.5).astype(np.uint8)
        if CLOSE_ITER > 0:
            hip_np = ndi.binary_closing(hip_np, iterations=CLOSE_ITER).astype(np.uint8)

        vol_roi, msk_roi = crop_bbox_with_margin(sub_np, hip_np, margin=MARGIN_VOX)
        vol_roi_rs = zscore(resize3d_linear(vol_roi, ROI_TARGET))
        msk_roi_rs = (resize3d_linear(msk_roi, ROI_TARGET) > 0.5).astype(np.uint8)

        roi_vol_path = roidir / f"hip_roi_{ROI_TARGET[0]}^3.nii"
        roi_msk_path = roidir / f"hip_roi_mask_{ROI_TARGET[0]}^3.nii"
        save_np_as_nii_like(vol_roi_rs, p, roi_vol_path)
        save_np_as_nii_like(msk_roi_rs, p, roi_msk_path)
        np.save(roidir / f"hip_roi_{ROI_TARGET[0]}^3.npy", vol_roi_rs[..., None].astype("float32"))

        logging.info(f"[{sub_id}] 완료")
        return (sub_path_str, True, None)
    except BaseException as e:
        logging.exception(f"[실패] {p.name}: {e}")
        return (sub_path_str, False, f"{type(e).__name__}: {e}")

# =============================================================================
# 배치 실행 (콘솔 진행률 + 시각)
# =============================================================================
def run_in_chunks(paths_str):
    total = len(paths_str)
    done = 0
    ok_total, fail_total = 0, 0

    try:
        ctx = get_context("fork")
    except ValueError:
        ctx = get_context("spawn")

    for i in range(0, total, CHUNK):
        batch = paths_str[i:i+CHUNK]
        with ProcessPoolExecutor(max_workers=N_JOBS, mp_context=ctx) as ex:
            for path_str, ok, _ in ex.map(process_one_subject, batch, chunksize=1):
                done += 1
                if ok: ok_total += 1
                else:  fail_total += 1
                now = datetime.now().strftime("%H:%M:%S")
                print(f"{done}/{total}  [{now}]", flush=True)
    return ok_total, fail_total

# =============================================================================
# 메인
# =============================================================================
def main():
    setup_logger()

    if not TEMPLATE_IMG.exists():
        print(f"템플릿 없음: {TEMPLATE_IMG}"); return
    if not HIPPO_MASK_MNI.exists():
        print(f"해마 마스크 없음: {HIPPO_MASK_MNI}"); return

    nii_list = sorted(
        p for p in HD_BET_DIR.glob("**/*.nii*")
        if p.is_file() and not is_mask_file(p)
    )
    if not nii_list:
        print(f"입력 NIfTI를 찾지 못함: {HD_BET_DIR}"); return

    paths_str = [str(p) for p in nii_list]
    print(f"총 {len(paths_str)}개, 워커 {N_JOBS}, 모드 {REG_MODE}")

    success, fail = run_in_chunks(paths_str)
    print(f"완료: 성공 {success}, 실패 {fail}")

    with open(OUT_ROOT / "batch_summary.json", "w", encoding="utf-8") as f:
        json.dump({"total": len(paths_str), "success": success, "fail": fail}, f, ensure_ascii=False, indent=2)

if __name__ == "__main__":
    main()


ModuleNotFoundError: No module named 'ants'

# 만들어진 hip_only파일명을 환자 식별번호로 변경하고, 하나의 폴더로 이동

In [None]:
from pathlib import Path
import shutil

# 경로 설정
ROOT     = Path("/content/drive/MyDrive")
SRC_ROOT = ROOT / "normalize_mri"   # 피험자별 결과 폴더들이 있는 곳
DST_ROOT = ROOT / "hip_only_mri"    # 모을 대상 폴더 (요청사항)
DST_ROOT.mkdir(parents=True, exist_ok=True)

def next_free_path(base_path: Path) -> Path:
    """이미 파일이 있으면 _1, _2 ... 로 충돌 피해서 경로 반환"""
    if not base_path.exists():
        return base_path
    stem, suffix = base_path.stem, base_path.suffix
    i = 1
    while True:
        cand = base_path.with_name(f"{stem}_{i}{suffix}")
        if not cand.exists():
            return cand
        i += 1

copied, skipped, missing = 0, 0, 0

for subdir in sorted(SRC_ROOT.iterdir()):
    if not subdir.is_dir():
        continue
    sid = subdir.name  # 식별번호는 폴더명으로 가정
    src = subdir / "hip_only_subject_space.nii.gz"
    if not src.exists():
        missing += 1
        print(f"[MISS] {sid}: hip_only_subject_space.nii.gz 없음")
        continue

    dst = DST_ROOT / f"{sid}.nii.gz"
    # 동일 파일이 이미 있으면 건너뛰고 싶다면 아래 두 줄을 사용(복사 대신 스킵)
    # if dst.exists():
    #     skipped += 1; print(f"[SKIP] {sid}: 이미 존재"); continue

    # 요청은 모으기이므로 기본은 복사. 충돌 시 _1, _2 ... 로 저장
    dst_final = next_free_path(dst)
    shutil.copy2(src, dst_final)
    copied += 1
    print(f"[OK] {sid} -> {dst_final.name}")

print(f"\n완료: 복사 {copied}건, 스킵 {skipped}건, 원본 누락 {missing}건")
print(f"출력 위치: {DST_ROOT}")


[OK] I1043686 -> I1043686.nii.gz
[OK] I1153585 -> I1153585.nii.gz
[OK] I218391 -> I218391.nii.gz
[OK] I224716 -> I224716.nii.gz
[OK] I224748 -> I224748.nii.gz
[OK] I225667 -> I225667.nii.gz
[OK] I225915 -> I225915.nii.gz
[OK] I228302 -> I228302.nii.gz
[OK] I228854 -> I228854.nii.gz
[OK] I229148 -> I229148.nii.gz
[OK] I229512 -> I229512.nii.gz
[OK] I232428 -> I232428.nii.gz
[OK] I234922 -> I234922.nii.gz
[OK] I235313 -> I235313.nii.gz
[OK] I236921 -> I236921.nii.gz
[OK] I237027 -> I237027.nii.gz
[OK] I237030 -> I237030.nii.gz
[OK] I237210 -> I237210.nii.gz
[OK] I238532 -> I238532.nii.gz
[OK] I238627 -> I238627.nii.gz
[OK] I238745 -> I238745.nii.gz
[OK] I239736 -> I239736.nii.gz
[OK] I239985 -> I239985.nii.gz
[OK] I240491 -> I240491.nii.gz
[OK] I240812 -> I240812.nii.gz
[OK] I240920 -> I240920.nii.gz
[OK] I240936 -> I240936.nii.gz
[OK] I241094 -> I241094.nii.gz
[OK] I241375 -> I241375.nii.gz
[OK] I241692 -> I241692.nii.gz
[OK] I242120 -> I242120.nii.gz
[OK] I242231 -> I242231.nii.gz
[OK]