In [1]:
import nibabel as nib
import os
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

# Read the dataset

In [2]:
import os
import nibabel as nib
import matplotlib.pyplot as plt

# 폴더 경로 설정
base_path = "/workspace/eli"
folders = {
    "train": "01_training",
    "val": "02_validation",
    "test": "03_test"
}

# 다양한 파일 이름 패턴 목록 (이미지와 마스크 파일 분리)
possible_image_patterns = [
    "{patient_number}_CT-ax.nii.gz",
    "{patient_number}.nii.gz",
    "{patient_number}_CT-iso.nii.gz"
]

possible_mask_patterns = [
    "{patient_number}_CT-ax_seg.nii.gz",
    "{patient_number}_seg.nii.gz",
    "{patient_number}_CT-iso_seg.nii.gz"
]

# 각 폴더에서 환자 데이터를 불러오는 함수
def load_nii_files(data_type):
    image_arrays = []
    mask_arrays = []
    
    folder_path = os.path.join(base_path, folders[data_type])
    
    # 환자 번호 폴더 목록 가져오기
    patients_numbers = os.listdir(folder_path)
    
    for patient_number in patients_numbers:
        image_path = None
        mask_path = None
        
        # 이미지 파일 경로 찾기
        for img_pattern in possible_image_patterns:
            temp_image_path = os.path.join(folder_path, patient_number, img_pattern.format(patient_number=patient_number))
            
            if os.path.exists(temp_image_path):
                image_path = temp_image_path
                break
        
        # 마스크 파일 경로 찾기
        for mask_pattern in possible_mask_patterns:
            temp_mask_path = os.path.join(folder_path, patient_number, mask_pattern.format(patient_number=patient_number))
            
            if os.path.exists(temp_mask_path):
                mask_path = temp_mask_path
                break
        
        # 경로 출력 및 Nii 파일 로드
        if image_path and mask_path:
            print(f"Loading image: {image_path}")
            print(f"Loading mask: {mask_path}")
            
            image = nib.load(image_path).get_fdata()
            mask = nib.load(mask_path).get_fdata()
            
            image_arrays.append(image)
            mask_arrays.append(mask)
        else:
            print(f"Warning: Missing files for patient {patient_number}")
    
    return image_arrays, mask_arrays

# 트레이닝, 검증, 테스트 데이터 불러오기
train_image_array, train_mask_array = load_nii_files("train")
val_image_array, val_mask_array = load_nii_files("val")
test_image_array, test_mask_array = load_nii_files("test")

Loading image: /workspace/eli/01_training/verse825/verse825_CT-iso.nii.gz
Loading mask: /workspace/eli/01_training/verse825/verse825_CT-iso_seg.nii.gz
Loading image: /workspace/eli/01_training/verse091/verse091.nii.gz
Loading mask: /workspace/eli/01_training/verse091/verse091_seg.nii.gz
Loading image: /workspace/eli/01_training/verse097/verse097.nii.gz
Loading mask: /workspace/eli/01_training/verse097/verse097_seg.nii.gz
Loading image: /workspace/eli/01_training/verse586/verse586_CT-iso.nii.gz
Loading mask: /workspace/eli/01_training/verse586/verse586_CT-iso_seg.nii.gz
Loading image: /workspace/eli/01_training/GL003/GL003.nii.gz
Loading mask: /workspace/eli/01_training/GL003/GL003_seg.nii.gz
Loading image: /workspace/eli/01_training/verse539/verse539_CT-iso.nii.gz
Loading mask: /workspace/eli/01_training/verse539/verse539_CT-iso_seg.nii.gz
Loading image: /workspace/eli/01_training/verse544/verse544.nii.gz
Loading mask: /workspace/eli/01_training/verse544/verse544_seg.nii.gz
Loading ima

# Make the Dataloader

In [3]:
import numpy as np
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

class MedicalDataset(Dataset):
    def __init__(self, images, masks):
        self.images = images
        self.masks = masks

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        mask = self.masks[idx]

        # 필요시 numpy 배열을 tensor로 변환하고 차원 추가 (1, Depth, Height, Width)
        image = torch.tensor(image, dtype=torch.float32).unsqueeze(0)  # (1, Depth, Height, Width)
        mask = torch.tensor(mask, dtype=torch.float32).unsqueeze(0)    # (1, Depth, Height, Width)

        # 이미지와 마스크를 64x64x64로 리사이즈 (3D interpolate 사용)
        image = F.interpolate(image.unsqueeze(0), size=(64, 64, 64), mode='trilinear', align_corners=True).squeeze(0)
        mask = F.interpolate(mask.unsqueeze(0), size=(64, 64, 64), mode='trilinear', align_corners=True).squeeze(0)

        return image, mask

# 데이터셋 생성
train_dataset = MedicalDataset(train_image_array, train_mask_array)
val_dataset = MedicalDataset(val_image_array, val_mask_array)
test_dataset = MedicalDataset(test_image_array, test_mask_array)

# DataLoader 생성
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# 확인용 출력
for images, masks in train_loader:
    print("Image shape:", images.shape)
    print("Mask shape:", masks.shape)
    break  # 첫 배치만 확인

Image shape: torch.Size([1, 1, 64, 64, 64])
Mask shape: torch.Size([1, 1, 64, 64, 64])


: 

# Model Architecture

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class TransformerBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(TransformerBlock, self).__init__()
        self.attn = nn.MultiheadAttention(embed_dim=in_channels, num_heads=4)
        self.fc1 = nn.Linear(in_channels, out_channels)
        self.fc2 = nn.Linear(out_channels, in_channels)
        self.norm1 = nn.LayerNorm(in_channels)
        self.norm2 = nn.LayerNorm(in_channels)

    def forward(self, x):
        # Self-Attention
        attn_out, _ = self.attn(x, x, x)
        x = self.norm1(x + attn_out)

        # Feed Forward
        ff_out = F.relu(self.fc1(x))
        x = self.norm2(x + self.fc2(ff_out))

        return x

class UNet3D(nn.Module):
    def __init__(self, in_channels=1, num_classes=2):
        super(UNet3D, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv3d(in_channels, 32, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool3d(kernel_size=2, stride=2),
            nn.Conv3d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool3d(kernel_size=2, stride=2),
        )
        
        self.transformer = TransformerBlock(in_channels=64 * 16 * 16 * 16, out_channels=128)

        self.decoder = nn.Sequential(
            nn.ConvTranspose3d(64, 32, kernel_size=2, stride=2),
            nn.ReLU(inplace=True),
            nn.Conv3d(32, num_classes, kernel_size=1)
        )

    def forward(self, x):
        # Encoder
        x = self.encoder(x)
        
        # Flatten for transformer
        b, c, d, h, w = x.shape
        x = x.view(b, c, -1)  # (B, C, D*H*W)
        x = x.permute(0, 2, 1)  # (B, D*H*W, C)

        # Transformer
        x = self.transformer(x)
        
        # Reshape back to 3D
        x = x.permute(0, 2, 1).view(b, c, d, h, w)  # (B, C, D, H, W)

        # Decoder
        x = self.decoder(x)
        return x

# Instantiate the model
model = UNet3D(in_channels=1, num_classes=2)

In [5]:
def iou(preds, targets, smooth=1e-6):
    preds = preds > 0.5  # 이진 분류를 위한 임계값
    targets = targets > 0.5
    intersection = (preds & targets).float().sum((1, 2, 3))  # 각 배치의 교집합
    union = (preds | targets).float().sum((1, 2, 3))  # 각 배치의 합집합
    iou_score = (intersection + smooth) / (union + smooth)  # IoU 계산
    return iou_score.mean()  # 평균 IoU 반환

def dice_loss(preds, targets, smooth=1e-6):
    preds = preds.sigmoid()  # sigmoid로 이진 분류 확률값으로 변환
    targets = targets > 0.5  # 마스크를 이진화

    intersection = (preds * targets).sum(dim=(1, 2, 3, 4))
    dice = (2. * intersection + smooth) / (preds.sum(dim=(1, 2, 3, 4)) + targets.sum(dim=(1, 2, 3, 4)) + smooth)

    return 1 - dice.mean()

import torch.optim as optim

# Hyperparameters
num_epochs = 1000  # Number of epochs to train
learning_rate = 1e-6  # Learning rate
device = torch.device('cuda:3' if torch.cuda.is_available() else 'cpu')  # Use GPU if available

# Move model to device
model = UNet3D(in_channels=1, num_classes=2).to(device)

# Loss function and optimizer
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training function
def train_one_epoch(train_loader, model, optimizer):
    model.train()  # Set model to training mode
    total_loss = 0.0

    for images, masks in train_loader:
        # Move data to the selected device
        images, masks = images.to(device), masks.to(device)

        # GPU 메모리 정리
        torch.cuda.empty_cache()  # 이 위치에서 GPU 메모리 정리

        # Forward pass
        outputs = model(images)

        # Compute loss
        loss = dice_loss(outputs, masks)
        total_loss += loss.item()

        # Backward pass and optimization
        optimizer.zero_grad()  # Clear gradients
        loss.backward()  # Backpropagation
        optimizer.step()  # Update parameters

    return total_loss / len(train_loader)

# Validation function
def validate_one_epoch(val_loader, model):
    model.eval()  # Set model to evaluation mode
    total_loss = 0.0
    total_iou = 0.0

    with torch.no_grad():  # Disable gradient tracking
        for images, masks in val_loader:
            images, masks = images.to(device), masks.to(device)

            # GPU 메모리 정리
            torch.cuda.empty_cache()  # 이 위치에서 GPU 메모리 정리

            # Forward pass
            outputs = model(images)

            # Compute loss
            loss = dice_loss(outputs, masks)
            total_loss += loss.item()

            # Calculate IoU
            iou_score = iou(outputs, masks)
            total_iou += iou_score.item()

    return total_loss / len(val_loader), total_iou / len(val_loader)

# Training loop with model saving
best_val_loss = float('inf')  # To track the best validation loss

for epoch in range(num_epochs):
    train_loss = train_one_epoch(train_loader, model, optimizer)
    val_loss, val_iou = validate_one_epoch(val_loader, model)

    print(f'Epoch {epoch + 1}/{num_epochs}, '
          f'Train Loss: {train_loss:.4f}, '
          f'Validation Loss: {val_loss:.4f}, '
          f'Validation IoU: {val_iou:.4f}')

    # Save the model if validation loss has improved
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), f'unetr_best_epoch.pth')
        print(f"Model saved at epoch {epoch + 1} with validation loss {val_loss:.4f}")

AttributeError: 'torch.device' object has no attribute '_apply'