In [286]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
import random
import os
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score, average_precision_score
from torch.utils.data import TensorDataset, DataLoader

# from pytorch_tabnet.tab_model import TabNetClassifier

In [288]:
# 🧩 Attention + 예측 모두 manual_weights 반영
# 입력 특성마다 수동 가중치를 더해줌으로써 특정 특성의 중요도를 강조할 수 있음
class CustomAttentiveTransformer(nn.Module):
    def __init__(self, input_dim, manual_weights=None):
        super().__init__()
        self.fc = nn.Linear(input_dim, input_dim)  # 입력 특성 수만큼 attention 점수 계산하는 fully connected layer
        self.manual_weights = manual_weights  # 사용자가 지정한 수동 가중치

    def forward(self, x, prior):
        raw_scores = self.fc(x)  # 입력으로부터 기본 attention 점수를 생성
        if self.manual_weights is not None:
            # manual_weights를 텐서로 변환하여 attention score에 더함
            manual_tensor = torch.tensor(self.manual_weights, dtype=torch.float32).to(x.device)
            raw_scores = raw_scores + manual_tensor  # ✅ 선택 강조: 수동 가중치로 특정 특성의 선택 확률 증가
        mask = F.softmax(raw_scores * prior, dim=-1)  # prior와 곱하고 softmax로 정규화하여 마스크 생성
        return mask  # 최종 attention 마스크 반환


# ✅ TabNet 구조 정의 (간소화 버전)
# attention 마스크를 기반으로 특징을 선택하고, 각 스텝의 변환 결과를 누적해 예측함
class CustomTabNet(nn.Module):
    def __init__(self, input_dim, n_steps=3, decision_dim=64, manual_weights=None):
        super().__init__()
        self.input_dim = input_dim
        self.n_steps = n_steps  # 반복 횟수 (feature selection 및 transformation)
        self.decision_dim = decision_dim  # 각 스텝에서 생성되는 중간 표현 크기
        self.manual_weights = manual_weights  # 사용자가 직접 정의한 특성 중요도 가중치

        # attention 마스크를 생성할 여러 스텝의 attentive transformer 레이어 정의
        self.attentive_layers = nn.ModuleList([
            CustomAttentiveTransformer(input_dim, manual_weights=manual_weights)
            for _ in range(n_steps)
        ])

        # 선택된 특징에 대해 비선형 변환을 수행하는 feature transformer 레이어
        self.feature_transformers = nn.ModuleList([
            nn.Sequential(
                nn.Linear(input_dim, decision_dim),
                nn.ReLU(),
                nn.Linear(decision_dim, decision_dim),
                nn.ReLU()
            ) for _ in range(n_steps)
        ])

        self.output_layer = nn.Linear(decision_dim, 1)  # 출력층 (binary classification)

    def forward(self, x):
        prior = torch.ones_like(x)  # prior는 각 특성의 사용 정도를 관리하는 벡터 (초기에는 모든 특성이 동일 비중)
        outputs = []  # 스텝별 출력 저장 리스트

        for step in range(self.n_steps):
            mask = self.attentive_layers[step](x, prior)  # 현재 스텝에서의 attention 마스크 생성
            x_att = x * mask  # 마스크를 적용해 선택된 특성만 반영

            if self.manual_weights is not None:
                # ✅ 예측 반영 강조: 선택된 입력에 manual_weights를 한 번 더 곱함 (예측에 영향을 주도록)
                manual_tensor = torch.tensor(self.manual_weights, dtype=torch.float32).to(x.device)
                x_att = x_att * manual_tensor

            transformed = self.feature_transformers[step](x_att)  # 선택된 입력을 변환
            outputs.append(transformed)  # 변환 결과 저장
            prior = prior * (1 - mask)  # 선택된 특성은 다음 스텝에서 덜 사용되도록 prior 업데이트

        agg = torch.sum(torch.stack(outputs), dim=0)  # 모든 스텝의 출력 합산
        return torch.sigmoid(self.output_layer(agg))  # 최종 이진 확률값 출력


# ✅ 학습 함수: BCE loss 기준으로 최적화
# 검증 데이터의 PR AUC 기준으로 early stopping 수행
def train_model(model, X_train, y_train, X_val, y_val, lr=1e-3, epochs=100, patience=20):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    if (isinstance(X_train, torch.Tensor) and isinstance(X_val, torch.Tensor)):
        X_train = X_train.to(device)
        X_val = X_val.to(device)
    else:
        X_train = torch.tensor(X_train.to_numpy(), dtype=torch.float32).to(device)
        X_val = torch.tensor(X_val.to_numpy(), dtype=torch.float32).to(device)

    if (isinstance(y_train, torch.Tensor) and isinstance(y_val, torch.Tensor)):
        y_train = y_train.to(device)
        y_val = y_val.to(device)
    else:
        y_train = torch.tensor(y_train.to_numpy(), dtype=torch.float32).to(device)
        y_val = torch.tensor(y_val.to_numpy(), dtype=torch.float32).to(device)
        
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.BCELoss()  # 이진 분류를 위한 binary cross entropy 손실함수 사용

    best_auc = 0
    best_state = None
    patience_counter = 0

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        preds = model(X_train).squeeze()  # 학습 데이터에 대한 예측
        loss = criterion(preds, y_train)  # 손실 계산
        loss.backward()  # 역전파
        optimizer.step()  # 파라미터 업데이트

        # 검증 데이터로 PR AUC 측정
        model.eval()
        with torch.no_grad():
            val_preds = model(X_val).squeeze().detach().cpu().numpy()
            val_labels = y_val.detach().cpu().numpy()
            pr_auc = average_precision_score(val_labels, val_preds)

        # best 모델 저장 및 patience 조건 체크
        if pr_auc > best_auc:
            best_auc = pr_auc
            best_state = model.state_dict()
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                break  # patience 초과 시 학습 조기 종료

    if best_state:
        model.load_state_dict(best_state)  # 가장 성능이 좋았던 모델로 복원


# ✅ 평가 함수: ROC AUC, PR AUC 계산 (검증 or 테스트용)
def evaluate_model(model, X, y):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    if (isinstance(X, torch.Tensor) and isinstance(y, torch.Tensor)):
        None
    else:
        X = torch.tensor(X.to_numpy(), dtype=torch.float32).to(device)
        y = torch.tensor(y.to_numpy(), dtype=torch.float32).to(device)
    model.eval()
    with torch.no_grad():
        probs = model(X).squeeze().cpu().numpy()
        labels = y.cpu().numpy()
    roc = roc_auc_score(labels, probs)
    pr = average_precision_score(labels, probs)
    return roc, pr


# ✅ 교차검증 함수: 주어진 데이터에 대해 K-fold PR AUC / ROC AUC 평균 성능 측정
def cross_validate_model(X, y, manual_weights=None, k=5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=42)
    roc_list, pr_list = [], []

    for train_idx, val_idx in skf.split(X, y):
        X_train = torch.tensor(X[train_idx], dtype=torch.float32).to(device)
        y_train = torch.tensor(y[train_idx], dtype=torch.float32).to(device)
        X_val = torch.tensor(X[val_idx], dtype=torch.float32).to(device)
        y_val = torch.tensor(y[val_idx], dtype=torch.float32).to(device)

        model = CustomTabNet(input_dim=X.shape[1], manual_weights=manual_weights)
        train_model(model, X_train, y_train, X_val, y_val)
        roc, pr = evaluate_model(model, X_val, y_val)
        roc_list.append(roc)
        pr_list.append(pr)

    # 평균 및 표준편차 출력
    print(f"ROC AUC (mean ± std): {np.mean(roc_list):.4f} ± {np.std(roc_list):.4f}")
    print(f"PR  AUC (mean ± std): {np.mean(pr_list):.4f} ± {np.std(pr_list):.4f}")
    return roc_list, pr_list

# ✅ 랜덤 서치 함수: 다양한 하이퍼파라미터 조합 중 가장 좋은 성능을 가진 구성 탐색

def random_search(X, y, n_trials=10, param_grid=None):
    if param_grid is None:
        param_grid = {
            'n_steps': [2, 3, 4],  # 스텝 수
            'decision_dim': [32, 64, 128],  # 중간 표현 차원
            'lr': [1e-3, 5e-4],  # 학습률
            'patience': [5, 10],  # early stopping 기다림
        }

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
    results = []

    for trial in range(n_trials):
        # 랜덤하게 하이퍼파라미터 조합 선택
        config = {k: random.choice(v) for k, v in param_grid.items()}
        print(f"Trial {trial+1}/{n_trials} | Config: {config}")
        roc_scores, pr_scores = [], []

        for train_idx, val_idx in skf.split(X, y):
            X_train = torch.tensor(X[train_idx], dtype=torch.float32).to(device)
            y_train = torch.tensor(y[train_idx], dtype=torch.float32).to(device)
            X_val = torch.tensor(X[val_idx], dtype=torch.float32).to(device)
            y_val = torch.tensor(y[val_idx], dtype=torch.float32).to(device)

            model = CustomTabNet(
                input_dim=X.shape[1],
                n_steps=config['n_steps'],
                decision_dim=config['decision_dim']
            )
            train_model(model, X_train, y_train, X_val, y_val,
                        lr=config['lr'], patience=config['patience'])

            roc, pr = evaluate_model(model, X_val, y_val)
            roc_scores.append(roc)
            pr_scores.append(pr)

        avg_roc = np.mean(roc_scores)
        avg_pr = np.mean(pr_scores)
        results.append((config, avg_roc, avg_pr))

    # PR AUC 기준으로 내림차순 정렬하여 상위 결과 반환
    results.sort(key=lambda x: x[2], reverse=True)
    for i, (cfg, roc, pr) in enumerate(results):
        print(f"Rank {i+1}: PR AUC = {pr:.4f}, ROC AUC = {roc:.4f} | Config: {cfg}")

    return results

In [290]:
# 전체 시드 고정 코드 (한 번만 실행하면 됨)
def set_seed(seed=42):
    os.environ['PYTHONHASHSEED'] = str(seed)  # 해시 무작위성 제거
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)

    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)  # 여러 GPU 쓸 경우

    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

In [292]:
# ✅ 3. 교차검증 성능 평가
cross_validate_model(X_train.to_numpy(), y_train.to_numpy())

ROC AUC (mean ± std): 0.4956 ± 0.0416
PR  AUC (mean ± std): 0.1586 ± 0.0183


([0.5, 0.5679140951954117, 0.5, 0.4481161963249988, 0.46197969976117365],
 [0.14659685863874344,
  0.19220159783593532,
  0.1467248908296943,
  0.16377347087382643,
  0.1435115605020268])

In [63]:
from sklearn.model_selection import train_test_split
data = pd.read_csv('data/(최종)_서울열선_광진도로.csv')
X = pd.get_dummies(data[['도로 종류', '도로폭', '경사각', '최근접_시설의_평균거리', '종합_평균_기온', '생활인구', '최근접_시설들_최소거리', '최근접_시설들_최대거리']])
y = data['열선']

X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.2, random_state=42)
X_train = X_train.astype('float')
X_test = X_test.astype('float')
X = X.astype('float')

In [296]:
# 사용예시

# TabNetClassifier 초기화
model = CustomTabNet(input_dim=15)

# 학습 전 수동 가중치 설정
# manual_weights = np.array([0.0, 1.5, 0.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])

# 모델 내부 Attention Layer에 가중치 전달
# CustomAttentiveTransformer(15, manual_weights)

# fit 호출
train_model(model, X_train=X_train, y_train=y_train, X_val = X_test, y_val = y_test)
evaluate_model(model, X= X_test, y= y_test)

(0.5878029771646793, 0.22561407283699864)

In [48]:
X_train.to_numpy()

array([[ 9.71511968e-01,  1.36137368e+02, -1.74322200e+00, ...,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00],
       [ 1.15915578e+00,  3.66472346e+02, -1.87292600e+00, ...,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00],
       [ 9.96087770e+00,  1.36263448e+02, -2.04503300e+00, ...,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00],
       ...,
       [ 1.61736308e-01,  3.88010033e+02, -1.61081200e+00, ...,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00],
       [ 5.48780740e+00,  2.74204454e+02, -1.47322000e-01, ...,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00],
       [ 3.24288688e-02,  2.01955198e+02, -1.14795500e+00, ...,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00]])

In [65]:
print(model.weight_updater)

<bound method TabNetClassifier.weight_updater of TabNetClassifier(n_d=8, n_a=8, n_steps=3, gamma=1.3, cat_idxs=[], cat_dims=[], cat_emb_dim=[], n_independent=2, n_shared=2, epsilon=1e-15, momentum=0.02, lambda_sparse=0.001, seed=0, clip_value=1, verbose=0, optimizer_fn=<class 'torch.optim.adam.Adam'>, optimizer_params={'lr': 0.02}, scheduler_fn=None, scheduler_params={}, mask_type='sparsemax', input_dim=None, output_dim=None, device_name='auto', n_shared_decoder=1, n_indep_decoder=1, grouped_features=[])>
