In [None]:
# 라이브러리 호출
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import LabelEncoder, MaxAbsScaler
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix, classification_report
from imblearn.over_sampling import ADASYN
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm import tqdm

In [None]:
# 데이터 로드
train = pd.read_csv("/content/drive/MyDrive/Data/train.csv")
test = pd.read_csv("/content/drive/MyDrive/Data/test.csv")

In [None]:
# 라벨 인코딩 (SUBCLASS)
y_subclass = train['SUBCLASS']

# 입력 데이터 준비
X = train.drop(columns=['SUBCLASS', 'ID'])
test_X = test.drop(columns=['ID'])

# 변이 정보를 이진화 및 추가 특성 생성 (변이 빈도 반영)
def mutation_features(df):
    binary_features = df.applymap(lambda x: 0 if x == 'WT' else 1)
    mutation_count = df.applymap(lambda x: 0 if x == 'WT' else 1).sum(axis=1)  # 변이 빈도 특성
    df_binary = pd.concat([binary_features, mutation_count.rename("mutation_count")], axis=1)
    return df_binary

X_binary = mutation_features(X)
test_X_binary = mutation_features(test_X)

In [None]:
# 데이터 분리 (훈련/검증 세트)
X_train, X_val, y_train, y_val = train_test_split(X_binary, y_subclass, test_size=0.2, random_state=42, stratify=y_subclass)

In [None]:
# ADASYN으로 오버샘플링 (혼동 클래스 보강)
adasyn = ADASYN(sampling_strategy='minority', random_state=42)
X_train_resampled, y_train_resampled = adasyn.fit_resample(X_train, y_train)

In [None]:
# 클래스 라벨 인코딩 (숫자형으로 변환)
le = LabelEncoder()
y_train_encoded = le.fit_transform(y_train_resampled)
y_val_encoded = le.transform(y_val)
num_classes = len(le.classes_)

# 텐서로 변환
X_train_tensor = torch.tensor(X_train_resampled.values, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_encoded, dtype=torch.long)
X_val_tensor = torch.tensor(X_val.values, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val_encoded, dtype=torch.long)

# DataLoader 생성
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

In [None]:
# 혼동 클래스별 가중치 적용
custom_class_weights = {
    'GBMLGG': 1.5,  # GBMLGG와 OV 간의 혼동 문제
    'OV': 1.5,
    'SARC': 1.5,    # SARC와 SKCM 간의 혼동 문제
    'SKCM': 1.5,
}

# 기존의 클래스 가중치에 혼동되는 클래스에 대한 추가 가중치 적용
unique_classes = le.classes_
final_class_weights = compute_class_weight('balanced', classes=np.unique(y_train_encoded), y=y_train_encoded)

# 클래스별로 가중치 수정
for idx, class_label in enumerate(unique_classes):
    if class_label in custom_class_weights:
        final_class_weights[idx] *= custom_class_weights[class_label]

# PyTorch용 텐서로 변환
class_weights = torch.tensor(final_class_weights, dtype=torch.float32)

In [None]:
# 모델 정의
class GeneMutationNet(nn.Module):
    def __init__(self, input_size, num_classes):
        super(GeneMutationNet, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_size, 2048),
            nn.BatchNorm1d(2048),
            nn.ReLU(),
            nn.Dropout(0.4),

            nn.Linear(2048, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.4),

            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.4),

            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.4),

            nn.Linear(256, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.4),

            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Dropout(0.3),

            nn.Linear(64, num_classes)
        )

    def forward(self, x):
        return self.layers(x)

In [None]:
# 모델 초기화
input_size = X_train_tensor.shape[1]
model = GeneMutationNet(input_size, num_classes)

# 옵티마이저 및 손실 함수 정의 (클래스 가중치 적용)
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5)

# 모델 평가 함수
def evaluate_model(model, val_loader, criterion):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_val_loss = val_loss / len(val_loader)
    accuracy = correct / total
    return avg_val_loss, accuracy

In [None]:
# Confusion Matrix 및 성능 지표 출력 함수
def evaluate_with_metrics(model, val_loader):
    model.eval()
    all_labels = []
    all_predictions = []

    with torch.no_grad():
        for inputs, labels in val_loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())

    cm = confusion_matrix(all_labels, all_predictions)
    print("Confusion Matrix:")
    print(cm)

    plt.figure(figsize=(12, 10))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=le.classes_, yticklabels=le.classes_)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()

    print("Classification Report:")
    print(classification_report(all_labels, all_predictions, target_names=le.classes_))

In [None]:
# 모델 학습 함수
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=50):
    best_val_loss = float('inf')
    patience = 5
    trigger_times = 0

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0

        progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{epochs}', leave=False)

        for i, (inputs, labels) in enumerate(progress_bar):
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            progress_bar.set_postfix(loss=loss.item())

        avg_train_loss = running_loss / len(train_loader)
        print(f'Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}')

        val_loss, val_accuracy = evaluate_model(model, val_loader, criterion)
        print(f'Epoch {epoch+1}/{epochs}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            trigger_times = 0
            torch.save(model.state_dict(), '/content/drive/MyDrive/best_model.pth')
        else:
            trigger_times += 1
            if trigger_times >= patience:
                print('Early stopping!')
                break

    evaluate_with_metrics(model, val_loader)

# 모델 학습
train_model(model, train_loader, val_loader, criterion, optimizer, epochs=50)


In [None]:
# 테스트 데이터에 대한 예측 수행
test_tensor = torch.tensor(test_X_binary.values, dtype=torch.float32)

# 최적 모델 로드 및 예측
model.load_state_dict(torch.load('/content/drive/MyDrive/best_model.pth'))
model.eval()

with torch.no_grad():
    test_outputs = model(test_tensor)
    _, test_predictions = torch.max(test_outputs, 1)

# 예측 결과를 원래 라벨로 변환
predicted_labels = le.inverse_transform(test_predictions.numpy())

# 예측 결과가 담긴 DataFrame 생성
submission = pd.DataFrame({
    'ID': test['ID'],
    'SUBCLASS': predicted_labels
})

# 결과를 CSV 파일로 저장
submission.to_csv('/content/drive/MyDrive/dacon_submission.csv', encoding='UTF-8-sig', index=False)