In [1]:
# import
import os
import pandas as pd
import numpy as np
import nibabel as nib
import shutil
from scipy.ndimage import zoom
from joblib import Parallel, delayed
from tqdm import tqdm
import psutil
from collections import Counter
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader,random_split
from torch.utils.tensorboard import SummaryWriter

import torchvision
import torchvision.models as models
from sklearn.metrics import f1_score, roc_auc_score, confusion_matrix, classification_report,precision_recall_curve,roc_curve,accuracy_score


ImportError: cannot import name 'zoom' from 'scipy.ndimage' (unknown location)

In [2]:
#nii 파일 로드 + 전처리 함수들
# NIfTI 파일 로드
def load_nii(file_path):
    try:
        data = nib.load(file_path).get_fdata(dtype=np.float64)
        if data.ndim == 4 and data.shape[-1] == 1:
            data = data.squeeze(axis=-1)
        elif data.ndim != 3:
            raise ValueError(f"Unexpected data dimensions: {data.shape}. Expected 3D data.")
        return data
    except Exception as e:
        print(f"Error loading NIfTI file {file_path}: {e}")
        return None

# PyTorch 기반 리사이즈 함수
def resize_volume_torch(volume, target_shape=(128, 128, 128)):
    try:
        volume_tensor = torch.tensor(volume, dtype=torch.float32).unsqueeze(0).unsqueeze(0)  # Add batch and channel dims
        resized = F.interpolate(volume_tensor, size=target_shape, mode='trilinear', align_corners=False)
        return resized.squeeze().numpy()
    except Exception as e:
        print(f"Error resizing volume: {e}")
        return None

# 단일 파일 처리
def process_file(file_name, mmse_score, base_path, target_shape):
    file_path = os.path.join(base_path, file_name)
    if not os.path.exists(file_path):
        print(f"File not found: {file_path}")
        return None

    volume = load_nii(file_path)
    if volume is None:
        return None

    resized_volume = resize_volume_torch(volume, target_shape)
    if resized_volume is None:
        return None

    return [resized_volume, mmse_score]

# 데이터셋 생성 - Chunking + 병렬 처리
def create_dataset(dataset, base_path, target_shape=(128, 128, 128), chunk_size=20):
    processed_data = []
    total_chunks = len(dataset) // chunk_size + (1 if len(dataset) % chunk_size != 0 else 0)

    # tqdm으로 진행 상황 표시
    with tqdm(total=total_chunks, desc="Processing Dataset") as pbar:
        for i in range(0, len(dataset), chunk_size):
            chunk = dataset[i:i + chunk_size]
            # 병렬 처리
            processed_chunk = Parallel(n_jobs=4)(
                delayed(process_file)(file_name, mmse_score, base_path, target_shape) for file_name, mmse_score in chunk
            )
            # 유효한 데이터만 추가
            processed_data.extend([item for item in processed_chunk if item is not None])
            # 메모리 디버깅 함수
            # print(f"{psutil.virtual_memory().percent}%")
            # Progress bar 업데이트
            pbar.update(1)
    return processed_data

In [3]:
#file 받아오고 처리하는 함수들
def get_file_list(folder_path, extension=None):
    try:
        file_list = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f)) and (not extension or f.endswith(extension))]
        return file_list
    except Exception as e:
        print(f"Error getting file list from {folder_path}: {e}")
        return []

def rename_and_copy_files(folder_path, target_folder, extension=None):
    try:
        # 대상 폴더 생성
        os.makedirs(target_folder, exist_ok=True)
        file_list = get_file_list(folder_path, extension)
        for file_name in file_list:
            # 날짜 제거를 위한 파일 이름 분리
            parts = file_name.split(",")
            if len(parts) == 3:
                new_name = f"{parts[0]},{parts[2]}"
                old_path = os.path.join(folder_path, file_name)
                new_path = os.path.join(target_folder, new_name)
                shutil.copy(old_path, new_path)
                print(f"Copied and Renamed: {old_path} -> {new_path}")
    except Exception as e:
        print(f"파일 이름 변경 및 복사 중 에러 발생: {e}")

In [4]:
#renamed 폴더로 이름에 환자번호 - mri번호만 남김 - 날짜 없는 파일에 대한 처리
# folder_path = "niis/ADNI4"  # 폴더 경로를 적어주세요
# target_folder = "niis/ADNI4_renamed"  # 복사할 대상 폴더 경로를 적어주세요
# extension = ".gz"  # 필터링할 확장자를 적어주세요 (예: ".gz")
# rename_and_copy_files(folder_path, target_folder, extension)

In [5]:
def load_metadata_and_files(file_list, metadata, metadata_columns):
    dataset = []
    for file in file_list:
        f = file
        file_parts = file.split('.')[0].split(',')
        if len(file_parts) < 2:
            print(f"Invalid file format: {file}")
            continue
        
        mmse = metadata.loc[
            (metadata[metadata_columns[0]] == file_parts[0]) &
            (metadata[metadata_columns[1]] == file_parts[1]), 
            'MMSCORE'
        ]
        
        mmse_value = mmse.values[0] if not mmse.empty else None
        if mmse_value is not None:
            dataset.append([f, mmse_value])
        else:
            pass
            # print(f"No matching MMSE score for file: {file}")
    
    return dataset

In [6]:
# 경로 및 데이터 설정
adni3_base_path = 'niis/ADNI3_renamed'
adni4_base_path = 'niis/ADNI4_renamed'

adni3_metadata = pd.read_csv('metadatas/adni3_metadata_cleaned.csv')
adni4_metadata = pd.read_csv('metadatas/adni4_metadata_cleaned.csv')

adni3_file_list = get_file_list(adni3_base_path, extension='.nii.gz')
adni4_file_list = get_file_list(adni4_base_path, extension='.nii.gz')

adni3_dataset = load_metadata_and_files(adni3_file_list, adni3_metadata, ['PTID', 'MRI Number'])
adni4_dataset = load_metadata_and_files(adni4_file_list, adni4_metadata, ['PTID', 'MRI Number'])

### 데이터셋 생성

In [None]:
# 데이터셋 생성
target_shape = (128, 128, 128)
train_dataset = create_dataset(adni3_dataset, adni3_base_path, target_shape)
test_dataset = create_dataset(adni4_dataset, adni4_base_path, target_shape)
# 디버깅용 출력
for volume, score in train_dataset[:5]:
    print(f"Volume shape: {volume.shape}, MMSE Score: {score}")

In [None]:
for d in train_dataset[:3]:
    for e in d:
        print(e)

In [19]:
class ADNIDataset(Dataset):
    def __init__(self, dataset, normalization=True, score_threshold=27, score_threshold2=23):
        self.dataset = dataset
        self.normalization = normalization
        self.score_threshold = score_threshold
        self.score_threshold2 = score_threshold2

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        volume, score = self.dataset[idx]
        volume = torch.tensor(volume, dtype=torch.float16)

        # 정규화 처리
        if self.normalization:
            mean = volume.mean()
            std = volume.std()
            
            if std>0:
                volume = (volume - mean) / std
            else:
                volume = torch.zeros_like(volume)
        #3채널은 아니잖아요?
        volume = volume.unsqueeze(0)
        
        # regression
        # score = torch.tensor(score, dtype=torch.float32)
        # return volume, score
        
        # two_class Classification - CN이 0이고 MCI 이상이 1
        label = torch.tensor(0.0 if score >= self.score_threshold else 1.0, dtype=torch.float32)
        return volume, label
        
        # three_class Classification - CN, MCI, AD로 3개로 나눔 (0, 1, 2로 라벨링)
        # label = torch.tensor(0.0 if score >= self.score_threshold else (1.0 if score >= self.score_threshold2 else 2.0), dtype=torch.float32)
        # return volume, label


In [50]:
# 모델정의 여기

import torch
import torch.nn as nn
import torch.nn.functional as F

class DenseLayer3D(nn.Module):
    def __init__(self, in_channels, growth_rate, bn_size=4, drop_rate=0):
        super(DenseLayer3D, self).__init__()
        self.bn1 = nn.BatchNorm3d(in_channels)
        self.conv1 = nn.Conv3d(in_channels, bn_size * growth_rate, kernel_size=1, stride=1, bias=False)
        self.bn2 = nn.BatchNorm3d(bn_size * growth_rate)
        self.conv2 = nn.Conv3d(bn_size * growth_rate, growth_rate, kernel_size=3, stride=1, padding=1, bias=False)
        self.drop_rate = drop_rate

    def forward(self, x):
        new_features = self.conv1(F.relu(self.bn1(x)))
        new_features = self.conv2(F.relu(self.bn2(new_features)))
        if self.drop_rate > 0:
            new_features = F.dropout(new_features, p=self.drop_rate, training=self.training)
        return torch.cat([x, new_features], 1)


class DenseBlock3D(nn.Module):
    def __init__(self, num_layers, in_channels, growth_rate, bn_size, drop_rate):
        super(DenseBlock3D, self).__init__()
        layers = []
        for i in range(num_layers):
            layers.append(DenseLayer3D(
                in_channels + i * growth_rate, growth_rate, bn_size, drop_rate
            ))
        self.block = nn.Sequential(*layers)

    def forward(self, x):
        return self.block(x)


class TransitionLayer3D(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(TransitionLayer3D, self).__init__()
        self.bn = nn.BatchNorm3d(in_channels)
        self.conv = nn.Conv3d(in_channels, out_channels, kernel_size=1, stride=1, bias=False)
        self.pool = nn.AvgPool3d(kernel_size=2, stride=2)

    def forward(self, x):
        x = self.conv(F.relu(self.bn(x)))
        x = self.pool(x)
        return x


class DenseNet3D(nn.Module):
    def __init__(self, growth_rate=32, block_config=(6, 12, 24, 16), num_init_features=64, bn_size=4, drop_rate=0, num_classes=2):
        super(DenseNet3D, self).__init__()

        # Initial convolution
        self.features = nn.Sequential(
            nn.Conv3d(1, num_init_features, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm3d(num_init_features),
            nn.ReLU(inplace=True),
            nn.MaxPool3d(kernel_size=3, stride=2, padding=1),
        )

        # Dense Blocks and Transition Layers
        num_features = num_init_features
        for i, num_layers in enumerate(block_config):
            block = DenseBlock3D(num_layers, num_features, growth_rate, bn_size, drop_rate)
            self.features.add_module(f"denseblock{i+1}", block)
            num_features += num_layers * growth_rate
            if i != len(block_config) - 1:
                trans = TransitionLayer3D(num_features, num_features // 2)
                self.features.add_module(f"transition{i+1}", trans)
                num_features = num_features // 2

        # Final batch norm
        self.features.add_module("norm5", nn.BatchNorm3d(num_features))

        # Classifier
        self.classifier = nn.Linear(num_features, num_classes)

    def forward(self, x):
        features = self.features(x)
        out = F.adaptive_avg_pool3d(features, (1, 1, 1)).view(features.size(0), -1)
        out = self.classifier(out)
        return out


def densenet121_3d(num_classes=2):
    return DenseNet3D(
        growth_rate=32,
        block_config=(6, 12, 24, 16),  # DenseNet-121 구성
        num_init_features=64,
        bn_size=4,
        drop_rate=0,
        num_classes=num_classes,
    )


In [None]:
# 모델 학습 기본 설정 - 뭐로하든 여기는 동일해야합니다
def split_dataset(dataset, train_ratio=0.8):
    train_size = int(len(dataset) * train_ratio)
    val_size = len(dataset) - train_size
    return random_split(dataset, [train_size, val_size])

# cuda 안뜨면 다시 설정해줘야함
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 데이터 로더 생성
train_dataset_adni = ADNIDataset(train_dataset)  # ADNIDataset 클래스에서 데이터셋 생성
label_counts = Counter()
for _, label in train_dataset_adni:
    label_counts[int(label.item())]+=1
for label, count in label_counts.items():
    print(f"{label}:{count}")    

train_set, val_set = split_dataset(train_dataset_adni)  # Train-Val 분리

# DataLoader 생성
batch_size = 8  # 배치 크기 설정
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)

In [None]:
#디버깅
print(train_dataset_adni[1][0].shape)

In [None]:
# 이거는 단순 Regression

# 설정

import datetime
now = datetime.datetime.now()
writer = SummaryWriter(log_dir=f"./logs_resnet18_3d/regression/{now.day}-{now.hour}_{now.minute}")

# 모델, 손실 함수, 옵티마이저
model = resnet18_3d().to(device)
register_hooks_for_tensorboard(model, writer)  # TensorBoard를 위한 Hook 등록
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# 학습 함수
def train_model_with_validation_regression(model, train_loader, val_loader, criterion, optimizer, writer, device, epochs):
    global global_step
    global_step = 0

    for epoch in range(epochs):
        # Training 단계
        model.train()
        running_train_loss = 0.0

        for i, (volumes, scores) in enumerate(train_loader):
            global_step += 1

            volumes, scores = volumes.to(device), scores.to(device)

            optimizer.zero_grad()
            outputs = model(volumes)
            loss = criterion(outputs.squeeze(), scores)
            loss.backward()
            optimizer.step()

            running_train_loss += loss.item()

            # TensorBoard에 Training Loss 기록
            writer.add_scalar("Loss/Train", loss.item(), global_step)

        train_loss = running_train_loss / len(train_loader)
        print(f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}")

        # Validation 단계
        model.eval()
        running_val_loss = 0.0
        with torch.no_grad():
            for volumes, scores in val_loader:
                volumes, scores = volumes.to(device), scores.to(device)
                outputs = model(volumes)
                loss = criterion(outputs.squeeze(), scores)
                running_val_loss += loss.item()

        val_loss = running_val_loss / len(val_loader)
        print(f"Epoch {epoch + 1}/{epochs}, Validation Loss: {val_loss:.4f}")

        # TensorBoard에 Validation Loss 기록
        writer.add_scalar("Loss/Validation", val_loss, epoch)

# 학습
epochs = 5  # Epoch 설정
train_model_with_validation_regression(model, train_loader, val_loader, criterion, optimizer, writer, device, epochs)

# TensorBoard 종료
writer.close()


In [None]:
# Two_Class Classification으로 사용할 떄
print('device : ', device)
import datetime
now = datetime.datetime.now()
writer = SummaryWriter(log_dir=f"./logs_resnet18_3d/two_class/{now.day}-{now.hour}_{now.minute}_{now.second}")
print(f"TensorBoard logs are being written to: {writer.log_dir}")

model = densenet121_3d().to(device)
criterion = nn.BCEWithLogitsLoss()  # 이진 분류 손실 함수
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
from sklearn.metrics import f1_score, roc_auc_score, roc_curve

def train_model_with_validation_two_class(model, train_loader, val_loader, criterion, optimizer, writer, device, epochs):
    global global_step
    global_step = 0

    for epoch in range(epochs):
        # Training 단계
        model.train()
        running_train_loss = 0.0

        for i, (volumes, labels) in enumerate(train_loader):  # scores -> labels
            global_step += 1

            volumes, labels = volumes.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(volumes).squeeze()  # [batch_size, 1]

            loss = criterion(outputs, labels.float())  # BCEWithLogitsLoss expects float labels
            loss.backward()
            optimizer.step()

            running_train_loss += loss.item()

            # TensorBoard에 Training Loss 기록
            writer.add_scalar("Loss/Train", loss.item(), global_step)

        train_loss = running_train_loss / len(train_loader)
        print(f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}")

        # Validation 단계
        model.eval()
        running_val_loss = 0.0
        all_labels = []
        all_outputs = []
        with torch.no_grad():
            for volumes, labels in val_loader:  # scores -> labels
                volumes, labels = volumes.to(device), labels.to(device)
                outputs = model(volumes).squeeze()  # [batch_size, 1]
                loss = criterion(outputs, labels.float())
                running_val_loss += loss.item()

                # F1-score 및 ROC-AUC 계산을 위한 값 저장
                all_labels.extend(labels.cpu().numpy())
                all_outputs.extend(torch.sigmoid(outputs).cpu().numpy())

        val_loss = running_val_loss / len(val_loader)
        print(f"Epoch {epoch + 1}/{epochs}, Validation Loss: {val_loss:.4f}")

        # TensorBoard에 Validation Loss 기록
        writer.add_scalar("Loss/Validation", val_loss, epoch)

        # F1-score 계산 및 기록
        val_f1 = f1_score(all_labels, (np.array(all_outputs) > 0.5).astype(int))
        writer.add_scalar("Metrics/F1_Score", val_f1, epoch)

        # ROC-AUC 계산 및 기록
        val_roc_auc = roc_auc_score(all_labels, all_outputs)
        writer.add_scalar("Metrics/ROC_AUC", val_roc_auc, epoch)

        # ROC Curve 계산 및 TensorBoard에 기록
        fpr, tpr, _ = roc_curve(all_labels, all_outputs)
        writer.add_pr_curve("ROC_Curve", torch.tensor(fpr), torch.tensor(tpr), epoch)
        print(f"Epoch {epoch + 1}/{epochs}, F1 Score: {val_f1:.4f}, ROC AUC: {val_roc_auc:.4f}")
epochs = 20
train_model_with_validation_two_class(model, train_loader, val_loader, criterion, optimizer, writer, device, epochs)


In [48]:
torch.save(model.state_dict(),'resnet3d_18_epoch30.pth')

In [None]:
# Three_class Classification일 경우
# 모델, 손실 함수, 옵티마이저
writer = SummaryWriter(log_dir=f"./logs_resnet18_3d/three_class/{now.day}-{now.hour}_{now.minute}")
model = resnet18_3d().to(device)
register_hooks_for_tensorboard(model, writer)  # TensorBoard를 위한 Hook 등록
criterion = nn.CrossEntropyLoss()  # 다중 클래스 분류 손실 함수
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)

# 학습 함수
def train_model_with_validation_three_class(model, train_loader, val_loader, criterion, optimizer, writer, device, epochs):
    global global_step
    global_step = 0

    for epoch in range(epochs):
        # Training 단계
        model.train()
        running_train_loss = 0.0

        for i, (volumes, labels) in enumerate(train_loader): 
            global_step += 1

            volumes, labels = volumes.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(volumes) 
            loss = criterion(outputs, labels)  
            loss.backward()
            optimizer.step()

            running_train_loss += loss.item()

            # TensorBoard에 Training Loss 기록
            writer.add_scalar("Loss/Train", loss.item(), global_step)

        train_loss = running_train_loss / len(train_loader)
        print(f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}")

        # Validation 단계
        model.eval()
        running_val_loss = 0.0
        with torch.no_grad():
            for volumes, labels in val_loader: 
                volumes, labels = volumes.to(device), labels.to(device)
                outputs = model(volumes) 
                loss = criterion(outputs, labels)
                running_val_loss += loss.item()

        val_loss = running_val_loss / len(val_loader)
        print(f"Epoch {epoch + 1}/{epochs}, Validation Loss: {val_loss:.4f}")

        # TensorBoard에 Validation Loss 기록
        writer.add_scalar("Loss/Validation", val_loss, epoch)

train_model_with_validation_three_class(model, train_loader, val_loader, criterion, optimizer, writer, device, epochs)


In [None]:
model = resnet18_3d().to(device)
model.load_state_dict(torch.load('resnet3d_18_epoch30.pth',weights_only=True),)
test_dataset_adni = ADNIDataset(test_dataset)
test_loader = DataLoader(test_dataset_adni, batch_size=4, shuffle=False)




# Test 데이터 평가 및 성능 지표 계산 함수
def evaluate_classification_task(model, test_loader, device):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for volumes, labels in test_loader:
            volumes, labels = volumes.to(device), labels.to(device)
            outputs = model(volumes).squeeze()
            preds = torch.sigmoid(outputs)  # BCEWithLogitsLoss의 경우 sigmoid 적용
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # NumPy 배열로 변환
    all_preds = np.array(all_preds)
    print(all_preds)
    all_labels = np.array(all_labels)

    # 이진 분류 기준 (0.5 threshold)
    binary_preds = (all_preds >= 0.5).astype(int)

    # Accuracy
    accuracy = accuracy_score(all_labels, binary_preds)
    print(f"Accuracy: {accuracy:.4f}")

    # F1 Score
    f1 = f1_score(all_labels, binary_preds)
    print(f"F1 Score: {f1:.4f}")

    # ROC-AUC
    roc_auc = roc_auc_score(all_labels, all_preds)
    print(f"ROC-AUC: {roc_auc:.4f}")

    # Confusion Matrix
    conf_matrix = confusion_matrix(all_labels, binary_preds)
    print("Confusion Matrix:")
    print(conf_matrix)

    # ROC Curve
    fpr, tpr, _ = roc_curve(all_labels, all_preds)
    plt.figure()
    plt.plot(fpr, tpr, label=f"ROC Curve (AUC = {roc_auc:.4f})")
    plt.plot([0, 1], [0, 1], 'k--', label="Random Guess")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("ROC Curve")
    plt.legend()
    plt.grid()
    plt.show()

    # Precision-Recall Curve
    precision, recall, _ = precision_recall_curve(all_labels, all_preds)
    plt.figure()
    plt.plot(recall, precision, label="Precision-Recall Curve")
    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.title("Precision-Recall Curve")
    plt.legend()
    plt.grid()
    plt.show()


# 테스트 데이터셋 평가
evaluate_classification_task(model, test_loader, device)
