In [None]:
# 1. GitHub 레포지토리 설치
!git clone https://github.com/HaeinGeek/CancerSubtypesAI.git

Cloning into 'CancerSubtypesAI'...
remote: Enumerating objects: 905, done.[K
remote: Counting objects: 100% (49/49), done.[K
remote: Compressing objects: 100% (29/29), done.[K
remote: Total 905 (delta 23), reused 38 (delta 20), pack-reused 856 (from 1)[K
Receiving objects: 100% (905/905), 83.84 MiB | 22.71 MiB/s, done.
Resolving deltas: 100% (493/493), done.
Updating files: 100% (58/58), done.
Downloading data/processed/embeddings/mut_embeddings_mean_max_cls_1.zip (138 MB)
Error downloading object: data/processed/embeddings/mut_embeddings_mean_max_cls_1.zip (d6bca3f): Smudge error: Error downloading data/processed/embeddings/mut_embeddings_mean_max_cls_1.zip (d6bca3f495793afd9b1c3027571dc3daec4558e9646f43eae0e40097294bbe09): batch response: This repository is over its data quota. Account responsible for LFS bandwidth should purchase more data packs to restore access.

Errors logged to /content/CancerSubtypesAI/.git/lfs/logs/20241020T023855.215762571.log
Use `git lfs logs last` to v

In [None]:
# 2. 작업 디렉터리 이동
%cd CancerSubtypesAI

/content/CancerSubtypesAI


In [None]:
# 3. PYTHONPATH 설정 (processing 모듈을 패키지로 인식하도록)
import sys
sys.path.append("/content/CancerSubtypesAI")

In [None]:
# 4. data 업로드
import os
upload_dir = 'data/processed'  # 원하는 디렉토리 경로
os.makedirs(upload_dir, exist_ok=True)

In [None]:
# 파일 업로드
from google.colab import files
uploaded = files.upload()

In [None]:
# 업로드된 파일을 지정 디렉토리로 이동
for filename in uploaded.keys():
    file_path = os.path.join(upload_dir, filename)
    print(f'Moved {filename} to {file_path}/')

# 데이터 로딩

In [None]:
# 데이터 로드
# 저장된 모델 입력 데이터 불러오기
train_input = load_model_input('data/processed/train/input_data.json')

data/processed/train/input_data.json에서 모델 입력 데이터를 불러왔습니다.


In [None]:
# 결과 저장 경로 설정
os.makedirs('./checkpoints', exist_ok=True)
os.makedirs('./logs', exist_ok=True)

In [None]:
train_df['SUBCLASS'].unique()

array(['KIPAN', 'SARC', 'SKCM', 'KIRC', 'GBMLGG', 'STES', 'BRCA', 'THCA',
       'LIHC', 'HNSC', 'PAAD', 'OV', 'PRAD', 'UCEC', 'LAML', 'COAD',
       'ACC', 'LGG', 'LUSC', 'LUAD', 'CESC', 'PCPG', 'THYM', 'BLCA',
       'TGCT', 'DLBC'], dtype=object)

# 모델링

In [None]:
import json
import os
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, accuracy_score
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import ReduceLROnPlateau
import numpy as np

In [None]:
# 하이퍼파라미터 설정
BATCH_SIZE = 16
LEARNING_RATE = 1e-4
NUM_EPOCHS = 50
EARLY_STOPPING_PATIENCE = 3
SCHEDULER_PATIENCE = 3
SCHEDULER_FACTOR = 0.5
VAL_SIZE = 0.2
RANDOM_SEED = 42
DROPOUT_RATE = 0.5
HIDDEN_LAYERS = [512, 256]

In [None]:
# SUBCLASS를 인덱스로 매핑
subclasses = ['KIPAN', 'SARC', 'SKCM', 'KIRC', 'GBMLGG', 'STES', 'BRCA', 'THCA',
       'LIHC', 'HNSC', 'PAAD', 'OV', 'PRAD', 'UCEC', 'LAML', 'COAD',
       'ACC', 'LGG', 'LUSC', 'LUAD', 'CESC', 'PCPG', 'THYM', 'BLCA',
       'TGCT', 'DLBC']
subclass_to_idx = {subclass: idx for idx, subclass in enumerate(sorted(subclasses))}
idx_to_subclass = {idx: subclass for subclass, idx in subclass_to_idx.items()}

In [None]:
# 돌연변이 유형을 인덱스로 매핑
mutation_types = ['WT', 'Silent_Missense', 'Missense', 'Nonsense',
       'Complex_mutation', 'Frameshift', 'Silent_Nonsense', 'Deletion',
       'Insertion', 'Delins', 'Unknown']
mutation_type_to_idx = {mt: idx for idx, mt in enumerate(mutation_types)}

In [None]:
import torch
from torch.utils.data import Dataset
import numpy as np

class CancerDataset(Dataset):
    def __init__(self, data, subclass_to_idx, mutation_type_to_idx, feature_mean=None, feature_std=None, normalize=False):
        self.data = data
        self.subclass_to_idx = subclass_to_idx
        self.mutation_type_to_idx = mutation_type_to_idx
        self.normalize = normalize
        self.feature_mean = feature_mean
        self.feature_std = feature_std

        # 정규화 시 제외할 특성 인덱스 (나중에 계산)
        self.exclude_norm_indices = None

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        features = []

        # --- 수치형 변수 ---
        # num_mutated_genes
        mutation_stats = sample.get('mutation_stats') or {}
        num_mutated_genes = mutation_stats.get('num_mutated_genes', 1)  # 0으로 나누는 것을 방지하기 위해 1로 기본값 설정
        features.append(num_mutated_genes)

        # additional_stats (status_ratio, status_prot_ratio는 제외)
        additional_stats = sample.get('additional_stats') or {}
        avg_mut_num = additional_stats.get('avg_mut_num', 0)
        max_mut_num = additional_stats.get('max_mut_num', 0)
        features.extend([avg_mut_num, max_mut_num])
        # status_ratio와 status_prot_ratio는 별도로 저장
        status_ratio = additional_stats.get('status_ratio', 0)
        status_prot_ratio = additional_stats.get('status_prot_ratio', 0)

        # aa_change_stats
        aa_change_stats = sample.get('aa_change_stats') or {}
        for prop in ['hydrophobicity', 'polarity', 'mw', 'pI', 'charge']:
            prop_stats = aa_change_stats.get(prop) or {}
            for stat in ['{}_min', '{}_max', '{}_mean', '{}_std']:
                value = prop_stats.get(stat.format(prop), 0)
                features.append(value)

        # 임베딩 통계량
        embedding_stats = sample.get('embedding_stats') or {}
        embedding_size = len(embedding_stats.get('mean', []))
        for stat in ['mean', 'min', 'max', 'std']:
            embedding_values = embedding_stats.get(stat, None)
            if embedding_values is not None and len(embedding_values) == embedding_size:
                features.extend(embedding_values)
            else:
                features.extend([0.0] * embedding_size)

        # --- 변이 유형 빈도 비율 특성 ---
        mutation_type_freq = mutation_stats.get('mutation_type_freq', {})
        mutation_type_features = [0.0] * len(self.mutation_type_to_idx)
        for mt, count in mutation_type_freq.items():
            idx_mt = self.mutation_type_to_idx.get(mt)
            if idx_mt is not None:
                # 변이 유형 빈도 비율 계산
                mutation_type_features[idx_mt] = count / num_mutated_genes  # num_mutated_genes가 0인 경우는 없음 (1로 설정)
        # mutation_type_features는 정규화 대상에서 제외
        features.extend(mutation_type_features)

        # --- 범주형 변수 및 정규화 제외 변수 ---
        # status_ratio와 status_prot_ratio 추가
        features.extend([status_ratio, status_prot_ratio])
        # subclass는 라벨로 처리

        # 특성을 텐서로 변환
        features = torch.tensor(features, dtype=torch.float32)

        # 정규화 대상에서 제외할 인덱스 계산 (한 번만 수행)
        if self.exclude_norm_indices is None:
            total_features = len(features)
            # status_ratio와 status_prot_ratio의 인덱스
            status_indices = [-2, -1]  # 마지막 두 요소
            # mutation_type_features의 시작 인덱스와 종료 인덱스
            mutation_type_start = total_features - len(self.mutation_type_to_idx) - 2  # -2는 status_ratio와 status_prot_ratio
            mutation_type_end = mutation_type_start + len(self.mutation_type_to_idx)
            # 정규화 제외 인덱스
            self.exclude_norm_indices = status_indices + list(range(mutation_type_start, mutation_type_end))

        # 정규화
        if self.normalize and self.feature_mean is not None and self.feature_std is not None:
            indices = [i for i in range(len(features)) if i not in self.exclude_norm_indices]
            features_to_normalize = features[indices]
            normalized_features = (features_to_normalize - self.feature_mean) / self.feature_std
            features[indices] = normalized_features

        # 라벨 추출
        subclass = sample.get('subclass', None)
        if subclass is not None:
            label = self.subclass_to_idx[subclass]
            label = torch.tensor(label, dtype=torch.long)
        else:
            label = torch.tensor(-1, dtype=torch.long)

        return features, label

In [None]:
# 모델 정의
class CancerModel(nn.Module):
    def __init__(self, input_size, num_classes, hidden_layers, dropout_rate):
        super(CancerModel, self).__init__()
        layers = []
        prev_size = input_size
        for hidden_size in hidden_layers:
            layers.extend([
                nn.Linear(prev_size, hidden_size),
                nn.BatchNorm1d(hidden_size),
                nn.ReLU(),
                nn.Dropout(dropout_rate)
            ])
            prev_size = hidden_size
        layers.append(nn.Linear(prev_size, num_classes))
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

In [None]:
# # 데이터 분할
train_data, val_data = train_test_split(train_input, test_size=VAL_SIZE, random_state=RANDOM_SEED)

# 훈련 데이터의 특성 평균과 표준편차 계산
from torch.utils.data import DataLoader

# 임시로 normalize=False로 데이터셋 생성하여 전체 특성 수 파악
temp_dataset = CancerDataset(train_data, subclass_to_idx, mutation_type_to_idx, normalize=False)
temp_loader = DataLoader(temp_dataset, batch_size=64, shuffle=False)

all_features = []
for features, _ in temp_loader:
    all_features.append(features)
all_features = torch.cat(all_features, dim=0)

# 정규화 대상 인덱스 추출
exclude_indices = temp_dataset.exclude_norm_indices
include_indices = [i for i in range(all_features.shape[1]) if i not in exclude_indices]

# 정규화 대상 변수들에 대해 평균과 표준편차 계산
features_to_normalize = all_features[:, include_indices]
feature_mean = features_to_normalize.mean(dim=0)
feature_std = features_to_normalize.std(dim=0)
# 표준편차가 0인 경우 대비
feature_std[feature_std == 0] = 1

# 최종 데이터셋 초기화
train_dataset = CancerDataset(train_data, subclass_to_idx, mutation_type_to_idx,
                              feature_mean=feature_mean, feature_std=feature_std, normalize=True)
val_dataset = CancerDataset(val_data, subclass_to_idx, mutation_type_to_idx,
                            feature_mean=feature_mean, feature_std=feature_std, normalize=True)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
# 장치 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# 모델 초기화
model = CancerModel(input_size, num_classes, HIDDEN_LAYERS, DROPOUT_RATE)
model = model.to(device)

# 손실 함수와 옵티마이저 정의
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=SCHEDULER_PATIENCE, factor=SCHEDULER_FACTOR, verbose=True)

In [None]:
# 학습 기록 저장을 위한 리스트
train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []
train_f1_scores = []
val_f1_scores = []

best_val_loss = np.inf
epochs_without_improvement = 0

for epoch in range(NUM_EPOCHS):
    model.train()
    running_loss = 0.0
    all_preds = []
    all_labels = []
    for features, labels in train_loader:
        features = features.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(features)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * features.size(0)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

    epoch_loss = running_loss / len(train_dataset)
    train_losses.append(epoch_loss)
    epoch_acc = accuracy_score(all_labels, all_preds)
    train_accuracies.append(epoch_acc)
    epoch_f1 = f1_score(all_labels, all_preds, average='macro')
    train_f1_scores.append(epoch_f1)

    # 검증 단계
    model.eval()
    val_running_loss = 0.0
    val_all_preds = []
    val_all_labels = []
    with torch.no_grad():
        for features, labels in val_loader:
            features = features.to(device)
            labels = labels.to(device)
            outputs = model(features)
            loss = criterion(outputs, labels)
            val_running_loss += loss.item() * features.size(0)
            _, preds = torch.max(outputs, 1)
            val_all_preds.extend(preds.cpu().numpy())
            val_all_labels.extend(labels.cpu().numpy())
    val_loss = val_running_loss / len(val_dataset)
    val_losses.append(val_loss)
    val_acc = accuracy_score(val_all_labels, val_all_preds)
    val_accuracies.append(val_acc)
    val_f1 = f1_score(val_all_labels, val_all_preds, average='macro')
    val_f1_scores.append(val_f1)

    # 스케줄러에 검증 손실 전달
    scheduler.step(val_loss)

    print('Epoch {}/{} Train Loss: {:.4f} Acc: {:.4f} F1: {:.4f} Val Loss: {:.4f} Acc: {:.4f} F1: {:.4f}'.format(
        epoch+1, NUM_EPOCHS, epoch_loss, epoch_acc, epoch_f1, val_loss, val_acc, val_f1))

    # 검증 손실이 개선되었는지 확인
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        epochs_without_improvement = 0
        # 모델 체크포인트 저장
        checkpoint_path = f'/checkpoints/best_model_epoch_{epoch+1}.pt'
        torch.save(model.state_dict(), checkpoint_path)
        print(f'Model checkpoint saved at {checkpoint_path}')
    else:
        epochs_without_improvement += 1
        if epochs_without_improvement >= EARLY_STOPPING_PATIENCE:
            print('Early stopping triggered.')
            break

Epoch 1/50 Train Loss: nan Acc: 0.0129 F1: 0.0034 Val Loss: nan Acc: 0.0113 F1: 0.0009
Epoch 2/50 Train Loss: nan Acc: 0.0117 F1: 0.0009 Val Loss: nan Acc: 0.0113 F1: 0.0009
Epoch 3/50 Train Loss: nan Acc: 0.0117 F1: 0.0009 Val Loss: nan Acc: 0.0113 F1: 0.0009
Early stopping triggered.


## 학습결과 확인

In [None]:
# 로그 저장
log_data = {
    'train_losses': train_losses,
    'val_losses': val_losses,
    'train_accuracies': train_accuracies,
    'val_accuracies': val_accuracies,
    'train_f1_scores': train_f1_scores,
    'val_f1_scores': val_f1_scores
}
log_path = '/logs/training_logs.npy'
np.save(log_path, log_data)
print(f'Training logs saved at {log_path}')

In [None]:
# 시각화
epochs_range = range(1, len(train_losses) + 1)
plt.figure(figsize=(15, 5))

plt.subplot(1, 3, 1)
plt.plot(epochs_range, train_losses, label='Train Loss')
plt.plot(epochs_range, val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Loss per Epoch')

plt.subplot(1, 3, 2)
plt.plot(epochs_range, train_accuracies, label='Train Acc')
plt.plot(epochs_range, val_accuracies, label='Val Acc')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Accuracy per Epoch')

plt.subplot(1, 3, 3)
plt.plot(epochs_range, train_f1_scores, label='Train F1')
plt.plot(epochs_range, val_f1_scores, label='Val F1')
plt.xlabel('Epoch')
plt.ylabel('Macro F1 Score')
plt.legend()
plt.title('Macro F1 Score per Epoch')

plt.tight_layout()
plt.savefig('/logs/training_plots.png')
plt.show()

## 테스트 데이터 예측

In [None]:
# 테스트 데이터에 대한 예측
# 테스트 데이터 로드
test_input = load_model_input('data/processed/test/input_data.json')

test_dataset = CancerDataset(test_input, subclass_to_idx, mutation_type_to_idx)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# 가장 좋은 모델 로드
best_model_path = checkpoint_path  # 이전에 저장된 가장 좋은 모델의 경로
model.load_state_dict(torch.load(best_model_path))

model.eval()
test_predictions = []
test_ids = []
sample_idx = 0  # 전체 데이터셋에서의 샘플 인덱스 초기화

with torch.no_grad():
    for features, labels in test_loader:
        features = features.to(device)
        outputs = model(features)
        _, preds = torch.max(outputs, 1)
        test_predictions.extend(preds.cpu().numpy())

        # 현재 배치의 샘플 수를 계산
        batch_size = features.size(0)

        # 현재 배치의 샘플에 해당하는 ID를 추출
        batch_samples = test_input[sample_idx : sample_idx + batch_size]
        for sample in batch_samples:
            test_ids.append(sample['id'])

        # 전체 샘플 인덱스 업데이트
        sample_idx += batch_size

# 예측 결과를 ID와 함께 저장
test_results = []
for idx, pred in zip(test_ids, test_predictions):
    subclass_pred = idx_to_subclass[pred]
    test_results.append({'id': idx, 'predicted_subclass': subclass_pred})

# 결과를 파일로 저장
with open('test_predictions.json', 'w') as f:
    json.dump(test_results, f)
print('Test predictions saved to test_predictions.json')