# MONAI의 UNet 클래스
- https://github.com/Project-MONAI/MONAI/blob/dev/docs/source/modules.md
- 참고 코드 : https://github.com/czimaginginstitute/2024_czii_mlchallenge_notebooks/blob/main/3d_unet_monai/train.ipynb

In [None]:
import os
import numpy as np
from pathlib import Path
import torch
import torchinfo
import zarr, copick
from tqdm import tqdm
from monai.data import DataLoader, Dataset, CacheDataset, decollate_batch
from monai.transforms import (
    Compose,
    EnsureChannelFirstd,
    Orientationd,
    AsDiscrete,
    RandFlipd,
    RandRotate90d,
    NormalizeIntensityd,
    RandCropByLabelClassesd,
)
from monai.networks.nets import UNet
from monai.losses import DiceLoss, FocalLoss, TverskyLoss
from monai.metrics import DiceMetric, ConfusionMatrixMetric
import mlflow                                                   # MLflow는 모델을 학습할 때 발생하는 각종 실험 데이터를 기록하는 기능
import mlflow.pytorch

In [None]:
root = copick.from_file(copick_config_path)

copick_user_name = "copickUtils"
copick_segmentation_name = "paintedPicks"
voxel_size = 10
tomo_type = "denoised"

## tomogram types
- raw
- denoside(노이즈 제거)
- filtered(필터링) : 가우시안, median 필터 등 사용
- reconstructed : CT / MRI
- segmented : 특정한 구조나 관심 영역을 분리한 형태, 각 픽셀에 특정 레이블이 할당
- enhanced : 특정 부분의 대비 up, 엣지 강화
- normalized : 픽셀 값 정규화, 스케일링
- cropped(자르기)

In [None]:
# 세그먼트 하는 과정 이해하기.
from copick_utils.segmentation import segmentation_from_picks # 세그먼트(이미지를 분할하는 작업)를 진행
import copick_utils.writers.write as write
from collections import defaultdict # 기본값을 가지는 딕셔너리로, 키가 없을 때에도 기본값을 반환

In [None]:
generate_masks = True # 마스크(이미지를 분할하는 과정에서 객체를 식별하는 데 사용되는 배열) 생성 옵션

In [None]:
# 이미지 세그멘테이션(특정 객체를 분할하는 작업)을 수행하고 결과를 저장하는 과정
if generate_masks:
    target_objects = defaultdict(dict)                              # 각 객체에 대한 정보를 담은 딕셔너리 생성
    for object in root.pickable_objects:
        if object.is_particle:                                      # object.is_particle?
            target_objects[object.name]['label'] = object.label     # 객체 label 저장
            target_objects[object.name]['radius'] = object.radius   # 객체 반지름 정보 저장 => 왜?

    for run in tqdm(root.runs):                     # tqdm을 사용해서 진행 상황을 보여줍니다.
        tomo = run.get_voxel_spacing(10)            # Voxel은 3차원 이미지에서 픽셀, 10이라는 값으로 spacing
        tomo = tomo.get_tomogram(tomo_type).numpy() # tomogram(3차원 이미지를 표현하는 데이터)을 넘파이 배열 형태로 변환
        target = np.zeros(tomo.shape, dtype=np.uint8)
        for pickable_object in root.pickable_objects:                                         # 선택 가능한 객체들(root.pickable_objects) 반복 처리
            pick = run.get_picks(object_name=pickable_object.name, user_id="curation")        # 사용자가 "curation"인 픽 데이터를 가져옴
            if len(pick):
                target = segmentation_from_picks.from_picks(pick[0],
                                                            target,
                                                            target_objects[pickable_object.name]['radius'] * 0.8, # why 0.8? -> resize 개념인건지? , 반경에 0.8을 곱하는 것은 세그멘테이션의 정확도 및 품질을 높이기 위한 조정 작업
                                                            target_objects[pickable_object.name]['label']
                                                            )
        write.segmentation(run, target, copick_user_name, name=copick_segmentation_name)

In [None]:
data_dicts = []
for run in tqdm(root.runs):
    tomogram = run.get_voxel_spacing(voxel_size).get_tomogram(tomo_type).numpy()      # get_voxel_spacing(voxel_size)는 특정 voxel_size(3차원 픽셀 크기)를 적용하여 tomogram 데이터를 불러오는 메소드 #tomo_type은 tomogram의 종류나 형식
    segmentation = run.get_segmentations(name=copick_segmentation_name, user_id=copick_user_name, voxel_size=voxel_size, is_multilabel=True)[0].numpy()
    data_dicts.append({"image": tomogram, "label": segmentation})                     # tomogram과 segmentation 데이터를 하나의 딕셔너리로 묶어 data_dicts 리스트에 추가

print(np.unique(data_dicts[0]['label']))

# 데이터 전처리

In [None]:
my_num_samples = 16                     # 랜덤 크롭(RandCrop) 과정에서 사용할 샘플의 수
train_batch_size = 1
val_batch_size = 1

train_files, val_files = data_dicts[:5], data_dicts[5:7]
print(f"Number of training samples: {len(train_files)}")
print(f"Number of validation samples: {len(val_files)}")

- Non-random transforms (비랜덤 변환): 훈련 및 검증 데이터에 항상 적용될 변환들을 정의
- Random transforms (랜덤 변환): 데이터 증강을 위해 훈련 중 무작위로 적용되는 변환들을 정의

In [None]:
non_random_transforms = Compose([
    EnsureChannelFirstd(keys=["image", "label"], channel_dim="no_channel"),     # EnsureChannelFirstd: 이미지와 라벨의 채널이 첫 번째 축으로 생성
    NormalizeIntensityd(keys="image"),                                          # NormalizeIntensityd: 이미지의 강도를 정규화
    Orientationd(keys=["image", "label"], axcodes="RAS")                        # Orientationd: 이미지와 라벨의 방향을 RAS(좌표계)로 설정 / LPS(Left-Posterior-Superior)
])

random_transforms = Compose([
    RandCropByLabelClassesd(                                                    # RandCropByLabelClassesd: 라벨을 기준으로 특정 클래스가 포함되도록 이미지와 라벨을 무작위로 크롭
        keys=["image", "label"],
        label_key="label",
        spatial_size=[96, 96, 96],                                              # spatial_size는 크롭의 크기, num_samples는 각 이미지에서 자를 샘플의 수.
        num_classes=8,
        num_samples=my_num_samples
    ),
    RandRotate90d(keys=["image", "label"], prob=0.5, spatial_axes=[0, 2]),      # 특정 축 기준으로 90도 회전
    RandFlipd(keys=["image", "label"], prob=0.5, spatial_axis=0)                # 뒤집기
])

- R (Right): 오른쪽 (Right) 방향
- A (Anterior): 앞쪽 (Anterior) 방향
- S (Superior): 위쪽 (Superior) 방향

학습 데이터 설정

In [None]:
train_ds = CacheDataset(data=train_files, transform=non_random_transforms, cache_rate=1.0)  # non_random_transforms를 적용한 후 데이터를 캐시, cache_rate에 따라 데이터 비율
train_ds = Dataset(data=train_ds, transform=random_transforms)                              # 캐시된 데이터셋에 random_transforms를 적용하여 데이터 증강을 진행

데이터로드 설정

In [None]:
train_loader = DataLoader(
    train_ds,
    batch_size=train_batch_size,
    shuffle=True,                             # 데이터 무작위로 섞음
    num_workers=4,                            # 프로세스의 수
    pin_memory=torch.cuda.is_available()      #pin_memory는 GPU 메모리 전송 속도 up
)

검증데이터 설정

In [None]:
val_ds = CacheDataset(data=val_files, transform=non_random_transforms, cache_rate=1.0)
val_ds = Dataset(data=val_ds, transform=random_transforms)

val_loader = DataLoader(
    val_ds,
    batch_size=val_batch_size,
    num_workers=4,
    pin_memory=torch.cuda.is_available(),
    shuffle=False
)

# 모델 설정

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
# Create UNet, DiceLoss and Adam optimizer
model = UNet(
    spatial_dims=3,
    in_channels=1,
    out_channels=len(root.pickable_objects)+1,
    channels=(48, 64, 80, 80),
    strides=(2, 2, 1),
    num_res_units=1,
).to(device)

lr = 1e-3
optimizer = torch.optim.Adam(model.parameters(), lr)
#loss_function = DiceLoss(include_background=True, to_onehot_y=True, softmax=True)  # softmax=True for multiclass
loss_function = TverskyLoss(include_background=True, to_onehot_y=True, softmax=True)  # softmax=True for multiclass , # TverskyLoss는 이미지 분할(세그먼테이션) 작업에서 사용하는 손실 함수
dice_metric = DiceMetric(include_background=False, reduction="mean", ignore_empty=True)  # must use onehot for multiclass
recall_metric = ConfusionMatrixMetric(include_background=False, metric_name="recall", reduction="None")

## Loss 함수
- TverskyLoss : FN(false negative), FP에 대해 가중치 조절, 불균형 클래스 문제에서 성능 up
-  Dice Loss
- 매개변수 :
  - include_background=True : 배경 클래스를 포함할지 여부를 결정
  - to_onehot_y=True : 라벨 데이터를 원-핫 인코딩(One-Hot Encoding)으로 변환
  - softmax=True : 출력값에 소프트맥스를 적용