In [41]:
# %% [markdown]
# ## Cell 1 — Setup & Config (Improved: MPNet + mutual kNN + smoothing)

import os, sys, json, random, time
from pathlib import Path

import numpy as np
import pandas as pd

# Sklearn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, label_binarize
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import (accuracy_score, precision_score, recall_score,
                             f1_score, confusion_matrix, roc_auc_score,
                             average_precision_score)

# Torch / PyG
import torch
from torch import nn
import torch.nn.functional as F
import torch_geometric
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv
from torch_geometric.nn.norm import BatchNorm  # PyG 2.5.x

# Embed/Plot
from sentence_transformers import SentenceTransformer
import matplotlib.pyplot as plt
import networkx as nx

# --------------------
# Repro & Device
# --------------------
RNG_SEED = 42
random.seed(RNG_SEED); np.random.seed(RNG_SEED); torch.manual_seed(RNG_SEED)

FORCE_CPU = False
if (not FORCE_CPU) and torch.cuda.is_available():
    DEVICE = torch.device("cuda"); torch.cuda.manual_seed_all(RNG_SEED)
elif (not FORCE_CPU) and hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
    DEVICE = torch.device("mps")
else:
    DEVICE = torch.device("cpu")

print(f"torch={torch.__version__} | pyg={torch_geometric.__version__} | device={DEVICE}")

# --------------------
# Paths (internal only)
#  - 네가 사용하던 경로를 그대로 씀
# --------------------
DATA_PATH = Path('../data/0923preprocessed/template_predicate.csv')
OUTDIR = Path('../resgcn_runs')
OUTDIR.mkdir(parents=True, exist_ok=True)

# --------------------
# Columns
# --------------------
TEXT_COL       = 'String'
PREDICATE_COL  = 'predicate'
TYPE_COL       = 'Type'   # optional

# --------------------
# Labels
# --------------------
NOT_DP_NAME = 'Not Dark Pattern'
EXPECTED_CLASS_COUNT = 10
ND_SYN = {
    "none","null","nan","na","n/a","","-","_",
    "no dp","notdp","not a dark pattern","not dark pattern",
    "non dark pattern","nondarkpattern",
    "비다크","비 다크","없음","해당없음","해당 없음","미해당","무"
}

# --------------------
# Embedding (UPGRADED)
# --------------------
EMBEDDER_NAME = 'sentence-transformers/all-mpnet-base-v2'   # ★ 업그레이드 포인트
EMB_TAG = 'mpnet'                                           # 캐시 구분 태그
KNN_METRIC = 'cosine'
DEFAULT_KNN_K = 10
USE_MUTUAL_KNN = True                                       # ★ mutual kNN 사용
EMBED_CACHE = OUTDIR / f'embeddings_{EMB_TAG}.npy'
LABMAP_JSON = OUTDIR / 'label_mapping.json'

# --------------------
# Split ratios
# --------------------
TRAIN_RATIO, VAL_RATIO, TEST_RATIO = 0.7, 0.1, 0.2

# --------------------
# Improved baseline HP (이전 튜닝 베스트를 기본값으로)
# --------------------
IMPROVED_HP = dict(
    hidden=128,
    layers=2,
    dropout=0.1,
    lr=1e-3,
    weight_decay=1e-5,
    knn_k=DEFAULT_KNN_K,
    epochs=60
)

# --------------------
# Loss tweaks
# --------------------
LABEL_SMOOTHING = 0.05    # ★ label smoothing
W_CLIP_MIN, W_CLIP_MAX = 0.5, 2.0  # 1/sqrt(freq) 가중치 클리핑 범위

print("DATA_PATH exists:", DATA_PATH.exists())
print("OUTDIR:", OUTDIR)
print("HP:", IMPROVED_HP)
print(f"Embedder: {EMBEDDER_NAME} | mutual_kNN={USE_MUTUAL_KNN} | cache={EMBED_CACHE.name}")
print(f"Loss: label_smoothing={LABEL_SMOOTHING}, class_weight_clip=({W_CLIP_MIN},{W_CLIP_MAX})")


torch=2.3.1 | pyg=2.5.3 | device=mps
DATA_PATH exists: True
OUTDIR: ../resgcn_runs
HP: {'hidden': 128, 'layers': 2, 'dropout': 0.1, 'lr': 0.001, 'weight_decay': 1e-05, 'knn_k': 10, 'epochs': 60}
Embedder: sentence-transformers/all-mpnet-base-v2 | mutual_kNN=True | cache=embeddings_mpnet.npy
Loss: label_smoothing=0.05, class_weight_clip=(0.5,2.0)


In [42]:
# %% [markdown]
# ## Cell 2 — Load data, normalize labels → enforce 10 classes (Improved run)

import re, json
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from IPython.display import display

# 1) Load
assert DATA_PATH.exists(), f"데이터 없음: {DATA_PATH}"
df = pd.read_csv(DATA_PATH)

# 2) 유연 컬럼 매핑 (대소문자/공백/변형 허용)
lower = {c.lower(): c for c in df.columns}
assert 'string' in lower and 'predicate' in lower, \
    f"CSV에 'String', 'predicate' 열이 필요합니다. 현재 열: {df.columns.tolist()}"

df = df.rename(columns={
    lower['string']: TEXT_COL,
    lower['predicate']: PREDICATE_COL
})
if 'type' in lower:
    df = df.rename(columns={lower['type']: TYPE_COL})

print("Columns:", df.columns.tolist())
print("Rows:", len(df))
display(df.head(3))

# 3) 라벨 정규화: None/동의어 -> Not Dark Pattern
def normalize_pred(v):
    if pd.isna(v):
        return NOT_DP_NAME
    s_raw = str(v).strip()
    s = s_raw.lower()
    s_simple = re.sub(r"[‐-‒–—\-_/\\\.\(\)\[\]\s]+", "", s)  # 기호/공백 제거 버전
    if (s in ND_SYN) or (s_simple in ND_SYN):
        return NOT_DP_NAME
    return s_raw

df[PREDICATE_COL] = df[PREDICATE_COL].map(normalize_pred)

# 4) 클래스 분포 확인 + 10개 강제 체크
vc = df[PREDICATE_COL].value_counts()
print("\nPredicate counts (after normalize):")
display(vc)

assert NOT_DP_NAME in vc.index, f"'{NOT_DP_NAME}' 라벨이 없습니다. ND_SYN 세트를 보강하세요."

uniq = vc.index.nunique()
if uniq != EXPECTED_CLASS_COUNT:
    print(f"⚠️ 클래스 개수 기대={EXPECTED_CLASS_COUNT}, 현재={uniq}")
    print("현재 라벨 목록:", sorted(vc.index.tolist()))
    raise AssertionError("Class count mismatch. CSV 표기/동의어를 점검하세요.")
else:
    print(f"✅ 클래스 {EXPECTED_CLASS_COUNT}개 확인.")

# 5) LabelEncoder 적합 + 매핑 저장
le = LabelEncoder()
y = le.fit_transform(df[PREDICATE_COL].astype(str))

with open(LABMAP_JSON, 'w', encoding='utf-8') as f:
    json.dump({i: c for i, c in enumerate(le.classes_)}, f, ensure_ascii=False, indent=2)

num_classes = len(le.classes_)
print("num_classes:", num_classes)
print("classes:", list(le.classes_))

# 이후 셀에서 df, y, le, num_classes 사용


Columns: ['String', 'Type', 'label', 'predicate']
Rows: 3200


Unnamed: 0,String,Type,label,predicate
0,Seen on product page 79 clicks today,Social Proof,1,Activity Notifications
1,Flash Sale ends in SHOP NOW,Urgency,1,Limited-time Messages
2,17 people have viewed this wine today,Social Proof,1,Activity Notifications



Predicate counts (after normalize):


predicate
Not Dark Pattern                    1600
Activity Notifications               361
Low-stock Messages                   360
Limited-time Messages                240
Countdown Timers                     160
Pressured Selling                    157
Confirmshaming                       137
Trick Questions                      106
High-demand Messages                  40
Testimonials of Uncertain Origin      39
Name: count, dtype: int64

✅ 클래스 10개 확인.
num_classes: 10
classes: ['Activity Notifications', 'Confirmshaming', 'Countdown Timers', 'High-demand Messages', 'Limited-time Messages', 'Low-stock Messages', 'Not Dark Pattern', 'Pressured Selling', 'Testimonials of Uncertain Origin', 'Trick Questions']


In [43]:
# %% [markdown]
# ## Cell 3 — Text → Embeddings with MPNet (cache)

import numpy as np
from sentence_transformers import SentenceTransformer

if EMBED_CACHE.exists():
    X = np.load(EMBED_CACHE)
    print(f"Loaded cached embeddings: {X.shape} from {EMBED_CACHE}")
else:
    st_model = SentenceTransformer(EMBEDDER_NAME, device=str(DEVICE))
    texts = df[TEXT_COL].astype(str).tolist()
    X = st_model.encode(
        texts,
        batch_size=256,
        show_progress_bar=True,
        convert_to_numpy=True,
        normalize_embeddings=True,  # cosine kNN에 유리
    )
    np.save(EMBED_CACHE, X)
    print(f"Computed embeddings and cached: {X.shape} -> {EMBED_CACHE}")


Batches: 100%|██████████| 13/13 [00:12<00:00,  1.07it/s]

Computed embeddings and cached: (3200, 768) -> ../resgcn_runs/embeddings_mpnet.npy





In [44]:
# %% [markdown]
# ## Cell 4 — Build kNN graph (mutual-kNN) and make PyG Data

import numpy as np
import torch
from torch_geometric.data import Data
from sklearn.neighbors import NearestNeighbors

# --- kNN helpers ---
def build_knn_indices(embeddings: np.ndarray, k: int, metric: str):
    """Return neighbor index array of shape (N, k) excluding self."""
    assert k >= 1, "k must be >= 1"
    nbrs = NearestNeighbors(n_neighbors=k+1, metric=metric)
    nbrs.fit(embeddings)
    _, idxs = nbrs.kneighbors(embeddings)
    return idxs[:, 1:]  # drop self

def build_edge_index_from_neighbors(neigh_idx: np.ndarray, mutual: bool = False):
    """
    neigh_idx: (N, k) int array
    mutual=False: standard kNN -> make directed edges i->j, and also add reverse to treat as undirected.
    mutual=True : keep edge (i,j) only if i in N(j) and j in N(i); add both directions for PyG.
    """
    N, K = neigh_idx.shape
    if not mutual:
        rows = np.repeat(np.arange(N), K)
        cols = neigh_idx.reshape(-1)
        # make undirected by adding reverse
        ei = np.vstack([np.concatenate([rows, cols]), np.concatenate([cols, rows])])
        # unique columns
        ei = np.unique(ei, axis=1)
        return ei
    else:
        # Build neighbor sets for mutual test
        neigh_sets = [set(neigh_idx[i].tolist()) for i in range(N)]
        undirected_pairs = []
        for i in range(N):
            for j in neigh_sets[i]:
                if i < j and i in neigh_sets[j]:
                    undirected_pairs.append((i, j))
        undirected_pairs = np.array(undirected_pairs, dtype=np.int64)
        if undirected_pairs.size == 0:
            # Fallback: if mutual is too strict and yields empty graph, revert to standard kNN
            rows = np.repeat(np.arange(N), K)
            cols = neigh_idx.reshape(-1)
            ei = np.vstack([np.concatenate([rows, cols]), np.concatenate([cols, rows])])
            ei = np.unique(ei, axis=1)
            return ei

        # expand to directed for PyG: (i,j) and (j,i)
        rows = np.concatenate([undirected_pairs[:,0], undirected_pairs[:,1]])
        cols = np.concatenate([undirected_pairs[:,1], undirected_pairs[:,0]])
        ei = np.vstack([rows, cols])
        ei = np.unique(ei, axis=1)
        return ei

def make_pyg_data(X_np: np.ndarray, y_np: np.ndarray, edge_index_np: np.ndarray, device: torch.device):
    data = Data(
        x=torch.tensor(X_np, dtype=torch.float32),
        y=torch.tensor(y_np, dtype=torch.long),
        edge_index=torch.tensor(edge_index_np, dtype=torch.long),
    )
    return data.to(device)

# --- build graph with mutual-kNN (if enabled) ---
k_use = IMPROVED_HP['knn_k']
idxs = build_knn_indices(X, k=k_use, metric=KNN_METRIC)
edge_index_np = build_edge_index_from_neighbors(idxs, mutual=USE_MUTUAL_KNN)

data = make_pyg_data(X, y, edge_index_np, DEVICE)

# --- stats print ---
N = X.shape[0]
directed_E = edge_index_np.shape[1]
# estimate undirected pair count (since we added both directions)
undirected_pairs = directed_E // 2
avg_degree = directed_E / N  # since directed includes both directions, this equals undirected*2/N
print(f"nodes={N}, directed_edges={directed_E}, undirected_pairs≈{undirected_pairs}, avg_degree≈{avg_degree:.2f}")
print(f"data.x: {tuple(data.x.shape)}, data.y: {tuple(data.y.shape)}, edge_index: {tuple(data.edge_index.shape)}")
print(f"mutual_kNN={'ON' if USE_MUTUAL_KNN else 'OFF'} | k={k_use}")


nodes=3200, directed_edges=17208, undirected_pairs≈8604, avg_degree≈5.38
data.x: (3200, 768), data.y: (3200,), edge_index: (2, 17208)
mutual_kNN=ON | k=10


In [45]:
# %% [markdown]
# ## Cell 5 — Train/Val/Test masks (7:1:2, stratified)

import numpy as np
import torch
from sklearn.model_selection import train_test_split

N = len(y)
indices = np.arange(N)

# 1) train vs (val+test)
train_idx, tmp_idx, y_train, y_tmp = train_test_split(
    indices, y, test_size=(1 - TRAIN_RATIO),
    random_state=RNG_SEED, stratify=y
)

# 2) val vs test (0.1 : 0.2 유지)
val_rel = VAL_RATIO / (VAL_RATIO + TEST_RATIO)  # 0.1 / 0.3 = 0.333...
val_idx, test_idx, y_val, y_test = train_test_split(
    tmp_idx, y_tmp, test_size=(1 - val_rel),
    random_state=RNG_SEED, stratify=y_tmp
)

# boolean masks
train_mask = torch.zeros(N, dtype=torch.bool); train_mask[train_idx] = True
val_mask   = torch.zeros(N, dtype=torch.bool); val_mask[val_idx]   = True
test_mask  = torch.zeros(N, dtype=torch.bool); test_mask[test_idx] = True

# attach to PyG data (on DEVICE)
data.train_mask = train_mask.to(DEVICE)
data.val_mask   = val_mask.to(DEVICE)
data.test_mask  = test_mask.to(DEVICE)

print(f"Total={N} | train={train_mask.sum().item()} | val={val_mask.sum().item()} | test={test_mask.sum().item()}")
print("Disjoint? ", (train_mask & val_mask).sum().item()==0 and
                   (train_mask & test_mask).sum().item()==0 and
                   (val_mask & test_mask).sum().item()==0)
print("Cover all?", (train_mask | val_mask | test_mask).sum().item()==N)


Total=3200 | train=2239 | val=320 | test=641
Disjoint?  True
Cover all? True


In [46]:
# %% [markdown]
# ## Cell 6 — ResGCN model + moderated class weights & label smoothing (setup)

import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.nn.norm import BatchNorm  # PyG 2.5.x
import numpy as np
import torch

# ------------------------
# 1) ResGCN 정의
# ------------------------
class ResidualGCNBlock(nn.Module):
    def __init__(self, dim_in, dim_out, dropout=0.0):
        super().__init__()
        self.conv = GCNConv(dim_in, dim_out, improved=True)
        self.bn = BatchNorm(dim_out)
        self.dropout = dropout
        self.res_proj = nn.Linear(dim_in, dim_out) if dim_in != dim_out else None

    def forward(self, x, edge_index):
        identity = x
        out = self.conv(x, edge_index)
        out = self.bn(out)
        out = F.relu(out, inplace=True)
        out = F.dropout(out, p=self.dropout, training=self.training)
        if self.res_proj is not None:
            identity = self.res_proj(identity)
        return out + identity

class ResGCN(nn.Module):
    def __init__(self, in_dim, hidden, out_dim, layers=2, dropout=0.1):
        super().__init__()
        dims = [in_dim] + [hidden] * layers
        self.blocks = nn.ModuleList([
            ResidualGCNBlock(dims[i], dims[i+1], dropout=dropout) for i in range(layers)
        ])
        self.head = nn.Linear(hidden, out_dim)

    def forward(self, data):
        x, ei = data.x, data.edge_index
        for blk in self.blocks:
            x = blk(x, ei)
        return self.head(x)

# ------------------------
# 2) 클래스 가중치 (1/sqrt(freq), 평균=1 정규화, [0.5, 2.0] 클리핑)
# ------------------------
train_labels = data.y[data.train_mask].detach().cpu().numpy()
counts = np.bincount(train_labels, minlength=num_classes).astype(float)
inv_sqrt = 1.0 / np.sqrt(np.maximum(counts, 1.0))
class_weights_np = inv_sqrt / inv_sqrt.mean()
class_weights_np = np.clip(class_weights_np, W_CLIP_MIN, W_CLIP_MAX)
class_weights = torch.tensor(class_weights_np, dtype=torch.float32, device=DEVICE)

print("Train counts per class:", counts.tolist())
print("Moderated class weights:", np.round(class_weights_np, 3).tolist())
print(f"Label smoothing ε={LABEL_SMOOTHING}")

# ------------------------
# 3) 모델/옵티마/손실 세팅
# ------------------------
model = ResGCN(
    in_dim=data.x.size(1),
    hidden=int(IMPROVED_HP['hidden']),
    out_dim=num_classes,
    layers=int(IMPROVED_HP['layers']),
    dropout=float(IMPROVED_HP['dropout'])
).to(DEVICE)

optimizer = torch.optim.Adam(
    model.parameters(),
    lr=float(IMPROVED_HP['lr']),
    weight_decay=float(IMPROVED_HP['weight_decay'])
)

# torch>=1.10: F.cross_entropy에 label_smoothing 있음 → 클래스 가중치와 함께 사용
def ce_with_smoothing(logits, targets):
    return F.cross_entropy(
        logits, targets,
        weight=class_weights,
        label_smoothing=float(LABEL_SMOOTHING)
    )

total_params = sum(p.numel() for p in model.parameters())
print(model)
print(f"Total parameters: {total_params:,}")
print("Ready for improved training.")


Train counts per class: [253.0, 96.0, 112.0, 28.0, 168.0, 252.0, 1119.0, 110.0, 27.0, 74.0]
Moderated class weights: [0.615, 0.998, 0.924, 1.848, 0.755, 0.616, 0.5, 0.932, 1.882, 1.137]
Label smoothing ε=0.05
ResGCN(
  (blocks): ModuleList(
    (0): ResidualGCNBlock(
      (conv): GCNConv(768, 128)
      (bn): BatchNorm(128)
      (res_proj): Linear(in_features=768, out_features=128, bias=True)
    )
    (1): ResidualGCNBlock(
      (conv): GCNConv(128, 128)
      (bn): BatchNorm(128)
    )
  )
  (head): Linear(in_features=128, out_features=10, bias=True)
)
Total parameters: 215,178
Ready for improved training.


In [47]:
# %% [markdown]
# ## Cell 7 — Train (improved) + internal test metrics/CM + save model & reports

import time, json
import numpy as np
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
from sklearn.preprocessing import label_binarize
from sklearn.metrics import (accuracy_score, precision_score, recall_score,
                             f1_score, confusion_matrix, roc_auc_score,
                             average_precision_score, classification_report)
from pathlib import Path

# -------- helpers --------
def evaluate_subset(logits_subset, y_true_subset, n_classes: int):
    y_true = y_true_subset.detach().cpu().numpy()
    y_pred = logits_subset.argmax(-1).detach().cpu().numpy()

    acc  = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, average='macro', zero_division=0)
    rec  = recall_score(y_true, y_pred, average='macro', zero_division=0)
    f1   = f1_score(y_true, y_pred, average='macro', zero_division=0)

    # macro AUCs (OvR)
    try:
        y_true_ovr = label_binarize(y_true, classes=np.arange(n_classes))
        y_score = F.softmax(logits_subset, dim=-1).detach().cpu().numpy()
        roc = roc_auc_score(y_true_ovr, y_score, average='macro', multi_class='ovr')
        pr  = average_precision_score(y_true_ovr, y_score, average='macro')
    except Exception:
        roc, pr = float('nan'), float('nan')

    cm = confusion_matrix(y_true, y_pred)
    return dict(acc=acc, prec=prec, rec=rec, f1=f1, roc_auc=roc, pr_auc=pr, cm=cm)

# -------- train --------
PATIENCE = 8  # early stopping patience
best_val, best_state = -1.0, None
stale = 0
hist = []
start = time.time()

for ep in range(1, int(IMPROVED_HP['epochs']) + 1):
    model.train()
    optimizer.zero_grad()
    logits = model(data)
    loss = ce_with_smoothing(logits[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()

    model.eval()
    with torch.no_grad():
        logits_eval = model(data)
        tr = evaluate_subset(logits_eval[data.train_mask], data.y[data.train_mask], num_classes)
        va = evaluate_subset(logits_eval[data.val_mask],   data.y[data.val_mask],   num_classes)

    hist.append((ep, tr['f1'], va['f1']))
    print(f"[Imp] epoch {ep:03d}  loss={loss.item():.4f}  trainF1={tr['f1']:.4f}  valF1={va['f1']:.4f}")

    if va['f1'] > best_val + 1e-6:
        best_val = va['f1']
        best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}
        stale = 0
    else:
        stale += 1
        if stale >= PATIENCE:
            print("Early stopping (improved).")
            break

# load best
if best_state is not None:
    model.load_state_dict({k: v.to(DEVICE) for k, v in best_state.items()})

# -------- eval on test --------
model.eval()
with torch.no_grad():
    logits_final = model(data)
test_metrics = evaluate_subset(logits_final[data.test_mask], data.y[data.test_mask], num_classes)

elapsed = time.time() - start
print(f"\n[Improved] Finished in {elapsed:.1f}s")
print("# Internal Test (macro) — Improved")
for k in ['acc','prec','rec','f1','roc_auc','pr_auc']:
    print(f"{k.upper():>8}: {test_metrics[k]:.4f}")

# -------- save confusion matrix --------
cm = test_metrics['cm']
fig, ax = plt.subplots(figsize=(8,6))
im = ax.imshow(cm, interpolation='nearest'); ax.figure.colorbar(im, ax=ax)
ax.set_title('Confusion Matrix — Internal Test (Improved)')
ax.set_xlabel('Predicted'); ax.set_ylabel('True')
ax.set_xticks(range(num_classes)); ax.set_yticks(range(num_classes))
ax.set_xticklabels([c[:14] for c in list(le.classes_)], rotation=45, ha='right')
ax.set_yticklabels([c[:14] for c in list(le.classes_)])
for (i,j), v in np.ndenumerate(cm):
    ax.text(j, i, int(v), ha='center', va='center', fontsize=8)
plt.tight_layout()
cm_path = OUTDIR / 'cm_internal_improved.png'
plt.savefig(cm_path, dpi=220); plt.close(fig)
print("Saved CM ->", cm_path)

# -------- save classification report --------
y_true_test = data.y[data.test_mask].detach().cpu().numpy()
y_pred_test = logits_final[data.test_mask].argmax(-1).detach().cpu().numpy()
cr_txt = classification_report(y_true_test, y_pred_test, target_names=list(le.classes_), zero_division=0)
cr_path = OUTDIR / 'classification_report_Improved.txt'
with open(cr_path, 'w', encoding='utf-8') as f:
    f.write(cr_txt)
print("Saved classification report ->", cr_path)

# -------- save model (with configs) --------
ckpt_path = OUTDIR / 'resgcn_improved.pt'
torch.save({
    'state_dict': model.state_dict(),
    'hp': IMPROVED_HP,
    'knn_k': int(IMPROVED_HP['knn_k']),
    'config': {
        'in_dim': int(data.x.size(1)),
        'out_dim': int(num_classes),
        'label_mapping': dict(enumerate([str(c) for c in list(le.classes_)])),
        'metric': KNN_METRIC,
        'embedder': EMBEDDER_NAME,
        'mutual_kNN': True
    },
    'class_weights': class_weights.detach().cpu().numpy().tolist(),
    'label_smoothing': float(LABEL_SMOOTHING)
}, ckpt_path)
size_mb = ckpt_path.stat().st_size / (1024*1024)
print(f"Saved improved model -> {ckpt_path} ({size_mb:.2f} MB)")

# -------- append/overwrite summary CSV/JSON --------
summary_row = {
    "Model": "Improved",
    "acc": test_metrics['acc'],
    "prec": test_metrics['prec'],
    "rec": test_metrics['rec'],
    "f1": test_metrics['f1'],
    "roc_auc": test_metrics['roc_auc'],
    "pr_auc": test_metrics['pr_auc'],
    "ModelSizeMB": round(size_mb, 2),
    "kNN_k": int(IMPROVED_HP['knn_k']),
    "embedder": EMBEDDER_NAME,
    "mutual_kNN": True,
    "label_smoothing": LABEL_SMOOTHING
}
# CSV/JSON 저장
import pandas as pd
csv_path = OUTDIR / 'metrics_internal_summary.csv'
if csv_path.exists():
    df_old = pd.read_csv(csv_path)
    df_new = pd.concat([df_old, pd.DataFrame([summary_row])], ignore_index=True)
else:
    df_new = pd.DataFrame([summary_row])
df_new.to_csv(csv_path, index=False)
with open(OUTDIR / 'metrics_internal_summary.json', 'w') as f:
    json.dump(df_new.to_dict(orient='records'), f, indent=2)
print("Saved metrics ->", csv_path)


[Imp] epoch 001  loss=2.4344  trainF1=0.1406  valF1=0.1404
[Imp] epoch 002  loss=2.0355  trainF1=0.3328  valF1=0.3497
[Imp] epoch 003  loss=1.7192  trainF1=0.4150  valF1=0.4126
[Imp] epoch 004  loss=1.4785  trainF1=0.4397  valF1=0.4296
[Imp] epoch 005  loss=1.3057  trainF1=0.4507  valF1=0.4465
[Imp] epoch 006  loss=1.1704  trainF1=0.4518  valF1=0.4546
[Imp] epoch 007  loss=1.0740  trainF1=0.4564  valF1=0.4590
[Imp] epoch 008  loss=1.0010  trainF1=0.4543  valF1=0.4622
[Imp] epoch 009  loss=0.9466  trainF1=0.4558  valF1=0.4632
[Imp] epoch 010  loss=0.8976  trainF1=0.4566  valF1=0.4638
[Imp] epoch 011  loss=0.8651  trainF1=0.4549  valF1=0.4541
[Imp] epoch 012  loss=0.8360  trainF1=0.4545  valF1=0.4447
[Imp] epoch 013  loss=0.8133  trainF1=0.4613  valF1=0.4665
[Imp] epoch 014  loss=0.7903  trainF1=0.4668  valF1=0.4695
[Imp] epoch 015  loss=0.7644  trainF1=0.4856  valF1=0.4695
[Imp] epoch 016  loss=0.7516  trainF1=0.4905  valF1=0.4704
[Imp] epoch 017  loss=0.7354  trainF1=0.5346  valF1=0.47

In [48]:
# %% [markdown]
# ## Cell 8 — Sanity check: 중복/유사문장(누수) 점검 (train ↔ test)

import numpy as np, pandas as pd
from pathlib import Path

# 1) 인덱스 준비
train_idx = np.where(data.train_mask.detach().cpu().numpy())[0]
val_idx   = np.where(data.val_mask.detach().cpu().numpy())[0]
test_idx  = np.where(data.test_mask.detach().cpu().numpy())[0]

# 2) 문자열 정규화(소문자, 공백/기호 최소화)로 "완전 중복" 탐지
def norm_txt(s: str) -> str:
    s = str(s).strip().lower()
    # 너무 과격하게 지우면 다른 문장도 뭉개질 수 있어 최소한만
    return " ".join(s.split())

tr_norm = pd.Series(df.loc[train_idx, TEXT_COL].map(norm_txt).values, index=train_idx)
te_norm = pd.Series(df.loc[test_idx,  TEXT_COL].map(norm_txt).values, index=test_idx)

tr_set = set(tr_norm.values.tolist())
exact_dups = te_norm[te_norm.isin(tr_set)]
exact_dup_rate = len(exact_dups) / len(te_norm) if len(te_norm) else 0.0
print(f"[Exact duplicate check] test 내 train과 완전 동일 문장 비율: {exact_dup_rate:.3%}  (count={len(exact_dups)}/{len(te_norm)})")

# 3) 임베딩 기반 유사도(코사인)로 "근접 중복" 탐지
#    - 각 test 문장에 대해 train 임베딩과의 cosine sim 최대값을 계산
#    - MPNet 임베딩 X는 이미 L2-normalize 되어 있으므로 dot이 곧 cosine
X_tr = X[train_idx]
X_te = X[test_idx]
sims = X_te @ X_tr.T                              # (n_test, n_train)
max_sim = sims.max(axis=1)
argmax_tr = sims.argmax(axis=1)

# 임계치별 요약
for thr in [0.90, 0.95, 0.98, 0.99]:
    rate = float((max_sim >= thr).mean())
    print(f"[Near-dup] cos≥{thr:.2f}: {rate:.3%}  ({int((max_sim>=thr).sum())}/{len(max_sim)})")

# 4) 상위 의심 케이스 테이블 저장
top_k = min(100, len(test_idx))
order = np.argsort(-max_sim)[:top_k]
sus = pd.DataFrame({
    "test_idx": test_idx[order],
    "train_match_idx": train_idx[argmax_tr[order]],
    "cos_sim": max_sim[order],
    "test_text": df.loc[test_idx[order], TEXT_COL].values,
    "train_text": df.loc[train_idx[argmax_tr[order]], TEXT_COL].values,
    "test_label": df.loc[test_idx[order], PREDICATE_COL].values,
    "train_label": df.loc[train_idx[argmax_tr[order]], PREDICATE_COL].values,
})
out_csv = OUTDIR / "leak_audit_top_matches.csv"
sus.to_csv(out_csv, index=False)
print("Saved potential near-duplicates ->", out_csv)

# 5) 클래스별 유사도 분포 요약(테스트 기준)
test_labels = y[test_idx]
df_sim = pd.DataFrame({"label": test_labels, "max_sim": max_sim})
class_names = list(le.classes_)
cls_rows = []
for ci, cname in enumerate(class_names):
    arr = df_sim.loc[df_sim["label"]==ci, "max_sim"].values
    if len(arr)==0:
        continue
    cls_rows.append({
        "class": cname,
        "count": len(arr),
        "mean_max_sim": float(np.mean(arr)),
        "p95_max_sim": float(np.percentile(arr, 95)),
        "p99_max_sim": float(np.percentile(arr, 99)),
        "frac_ge_0.98": float((arr>=0.98).mean()),
    })
cls_df = pd.DataFrame(cls_rows).sort_values("frac_ge_0.98", ascending=False)
cls_csv = OUTDIR / "leak_audit_by_class.csv"
cls_df.to_csv(cls_csv, index=False)
print("Saved class-wise near-dup summary ->", cls_csv)

display(cls_df)


[Exact duplicate check] test 내 train과 완전 동일 문장 비율: 4.524%  (count=29/641)
[Near-dup] cos≥0.90: 19.969%  (128/641)
[Near-dup] cos≥0.95: 11.700%  (75/641)
[Near-dup] cos≥0.98: 6.552%  (42/641)
[Near-dup] cos≥0.99: 6.084%  (39/641)
Saved potential near-duplicates -> ../resgcn_runs/leak_audit_top_matches.csv
Saved class-wise near-dup summary -> ../resgcn_runs/leak_audit_by_class.csv


Unnamed: 0,class,count,mean_max_sim,p95_max_sim,p99_max_sim,frac_ge_0.98
5,Low-stock Messages,72,0.909665,1.000001,1.000001,0.25
2,Countdown Timers,32,0.846843,0.997878,1.0,0.21875
1,Confirmshaming,27,0.802925,0.997182,1.0,0.185185
3,High-demand Messages,8,0.807432,0.986048,0.990256,0.125
4,Limited-time Messages,48,0.81075,0.988279,1.0,0.0625
9,Trick Questions,21,0.717859,0.955315,0.983358,0.047619
0,Activity Notifications,72,0.717467,0.959712,1.0,0.027778
6,Not Dark Pattern,321,0.638126,0.906987,0.99979,0.015576
7,Pressured Selling,32,0.672717,0.867191,0.894432,0.0
8,Testimonials of Uncertain Origin,8,0.637128,0.873781,0.92407,0.0


In [49]:
# %% [markdown]
# ## Cell 9a — Clean Test evaluation (exclude near-dup ≥ 0.98 vs train)

import numpy as np, pandas as pd, torch, torch.nn.functional as F
import matplotlib.pyplot as plt
from pathlib import Path
from sklearn.metrics import (accuracy_score, precision_score, recall_score,
                             f1_score, confusion_matrix, roc_auc_score, average_precision_score,
                             classification_report)
from sklearn.preprocessing import label_binarize

# 0) 보조: 평가 함수
def eval_pack_logits(logits, y_true, n_classes: int):
    y_np  = y_true.detach().cpu().numpy()
    yhat  = logits.argmax(-1).detach().cpu().numpy()
    acc   = accuracy_score(y_np, yhat)
    prec  = precision_score(y_np, yhat, average='macro', zero_division=0)
    rec   = recall_score(y_np, yhat, average='macro', zero_division=0)
    f1    = f1_score(y_np, yhat, average='macro', zero_division=0)
    try:
        y_ovr  = label_binarize(y_np, classes=np.arange(n_classes))
        y_prob = F.softmax(logits, dim=-1).detach().cpu().numpy()
        roc    = roc_auc_score(y_ovr, y_prob, average='macro', multi_class='ovr')
        pr     = average_precision_score(y_ovr, y_prob, average='macro')
    except Exception:
        roc, pr = float('nan'), float('nan')
    cm    = confusion_matrix(y_np, yhat)
    return dict(acc=acc, prec=prec, rec=rec, f1=f1, roc_auc=roc, pr_auc=pr, cm=cm,
                y_true=y_np, y_pred=yhat)

# 1) Improved 모델로 전체 그래프에서 logits 계산
model.eval()
with torch.no_grad():
    logits_full = model(data)

# 2) 필요 시 max_sim 재계산 (Cell 8을 건너뛴 경우 대비)
try:
    max_sim
    train_idx
    test_idx
except NameError:
    train_idx = np.where(data.train_mask.detach().cpu().numpy())[0]
    test_idx  = np.where(data.test_mask.detach().cpu().numpy())[0]
    X_tr, X_te = X[train_idx], X[test_idx]
    sims = X_te @ X_tr.T
    max_sim = sims.max(axis=1)

# 3) clean mask (near-dup ≥ 0.98 제거)
thr = 0.98
clean_keep = max_sim < thr
clean_ids  = np.where(data.test_mask.detach().cpu().numpy())[0][clean_keep]
drop_ids   = np.where(data.test_mask.detach().cpu().numpy())[0][~clean_keep]

print(f"Clean test size: {clean_keep.sum()}/{len(clean_keep)}  (removed {len(drop_ids)} with cos≥{thr})")

# 클래스별 제거 비율 요약
y_test_all = y[data.test_mask.detach().cpu().numpy()]
removed = pd.DataFrame({
    "label": y_test_all[~clean_keep]
})
keeped  = pd.DataFrame({
    "label": y_test_all[clean_keep]
})
by_cls = []
for ci, cname in enumerate(list(le.classes_)):
    tot = int((y_test_all==ci).sum())
    rm  = int((removed["label"]==ci).sum())
    kp  = int((keeped["label"]==ci).sum())
    by_cls.append({"class": cname, "test_count": tot, "removed": rm, "kept": kp, "rm_rate": (rm/tot if tot else 0.0)})
cls_tbl = pd.DataFrame(by_cls).sort_values("rm_rate", ascending=False)
print(cls_tbl.to_string(index=False))

# 4) clean test 로지츠/라벨 추출 → 지표/CM/리포트
mask_clean = torch.zeros(len(y), dtype=torch.bool, device=logits_full.device)
mask_clean[clean_ids] = True
metrics_clean = eval_pack_logits(logits_full[mask_clean], data.y[mask_clean], num_classes)

print("\n# Internal Test (CLEAN, cos<thr)")
for k in ['acc','prec','rec','f1','roc_auc','pr_auc']:
    print(f"{k.upper():>8}: {metrics_clean[k]:.4f}")

# 혼동행렬 저장
fig, ax = plt.subplots(figsize=(8,6))
im = ax.imshow(metrics_clean['cm'], interpolation='nearest'); ax.figure.colorbar(im, ax=ax)
ax.set_title(f'Confusion Matrix — Clean Test (cos<{thr})')
ax.set_xlabel('Predicted'); ax.set_ylabel('True')
ax.set_xticks(range(num_classes)); ax.set_yticks(range(num_classes))
ax.set_xticklabels([c[:14] for c in list(le.classes_)], rotation=45, ha='right')
ax.set_yticklabels([c[:14] for c in list(le.classes_)])
for (i,j), v in np.ndenumerate(metrics_clean['cm']):
    ax.text(j, i, int(v), ha='center', va='center', fontsize=8)
plt.tight_layout()
cm_clean_path = OUTDIR / f'cm_internal_improved_clean{str(thr).replace(".","p")}.png'
plt.savefig(cm_clean_path, dpi=220); plt.close(fig)
print("Saved CM ->", cm_clean_path)

# 분류 리포트 저장
cr_txt = classification_report(metrics_clean['y_true'], metrics_clean['y_pred'],
                               target_names=list(le.classes_), zero_division=0)
cr_path = OUTDIR / f'classification_report_Improved_clean{str(thr).replace(".","p")}.txt'
with open(cr_path, 'w', encoding='utf-8') as f:
    f.write(cr_txt)
print("Saved classification report ->", cr_path)


Clean test size: 599/641  (removed 42 with cos≥0.98)
                           class  test_count  removed  kept  rm_rate
              Low-stock Messages          72       18    54 0.250000
                Countdown Timers          32        7    25 0.218750
                  Confirmshaming          27        5    22 0.185185
            High-demand Messages           8        1     7 0.125000
           Limited-time Messages          48        3    45 0.062500
                 Trick Questions          21        1    20 0.047619
          Activity Notifications          72        2    70 0.027778
                Not Dark Pattern         321        5   316 0.015576
               Pressured Selling          32        0    32 0.000000
Testimonials of Uncertain Origin           8        0     8 0.000000

# Internal Test (CLEAN, cos<thr)
     ACC: 0.9366
    PREC: 0.8732
     REC: 0.8659
      F1: 0.8674
 ROC_AUC: 0.9899
  PR_AUC: 0.9150
Saved CM -> ../resgcn_runs/cm_internal_improved_clea

In [50]:
# %% [markdown]
# ## Cell 10 — Internal graph visuals (Improved: full + test overlay)

import numpy as np
import matplotlib.pyplot as plt
import networkx as nx
from pathlib import Path

def draw_full(coords: np.ndarray, edge_index_np: np.ndarray, labels: np.ndarray,
              class_names, out_path: Path, title: str):
    fig = plt.figure(figsize=(16, 10)); ax = fig.add_subplot(111); ax.set_title(title)

    # 엣지(연하게)
    for (u, v) in zip(edge_index_np[0], edge_index_np[1]):
        ax.plot([coords[u,0], coords[v,0]], [coords[u,1], coords[v,1]], alpha=0.02)

    # 노드(클래스별)
    for ci, cname in enumerate(class_names):
        idx = np.where(labels == ci)[0]
        if len(idx):
            ax.scatter(coords[idx,0], coords[idx,1], s=8, alpha=0.85, label=cname)

    ax.legend(loc='center left', bbox_to_anchor=(1.0, 0.5), frameon=False)
    ax.axis('off'); plt.tight_layout()
    plt.savefig(out_path, dpi=220, bbox_inches='tight'); plt.close(fig)
    print("Saved:", out_path)

def draw_test_overlay(coords: np.ndarray, edge_index_np: np.ndarray, labels: np.ndarray,
                      test_mask_tensor, class_names, out_path: Path, title: str):
    fig = plt.figure(figsize=(16, 10)); ax = fig.add_subplot(111); ax.set_title(title)

    # 엣지(연하게)
    for (u, v) in zip(edge_index_np[0], edge_index_np[1]):
        ax.plot([coords[u,0], coords[v,0]], [coords[u,1], coords[v,1]], alpha=0.02)

    # 배경 전체(회색, 희미하게)
    ax.scatter(coords[:,0], coords[:,1], s=6, alpha=0.12, c='grey')

    # 테스트 노드만 크게 + 검은테두리
    tm = test_mask_tensor.detach().cpu().numpy().astype(bool)
    for ci, cname in enumerate(class_names):
        idx = np.where((labels == ci) & tm)[0]
        if len(idx):
            ax.scatter(coords[idx,0], coords[idx,1],
                       s=140, edgecolor='k', linewidths=0.9, label=cname)

    ax.legend(loc='center left', bbox_to_anchor=(1.0, 0.5), frameon=False)
    ax.axis('off'); plt.tight_layout()
    plt.savefig(out_path, dpi=240, bbox_inches='tight'); plt.close(fig)
    print("Saved:", out_path)

# Improved 러닝의 edge_index_np / data / y / le가 이미 세션에 존재.
# 좌표가 아직 없으면 생성
if 'coords_improved' not in globals():
    G = nx.Graph()
    G.add_nodes_from(range(X.shape[0]))
    G.add_edges_from(list(zip(edge_index_np[0].tolist(), edge_index_np[1].tolist())))
    pos = nx.spring_layout(G, seed=RNG_SEED, iterations=50)
    coords_improved = np.array([pos[i] for i in range(X.shape[0])])

full_path    = OUTDIR / 'graph_internal_full_improved.png'
overlay_path = OUTDIR / 'graph_internal_test_overlay_improved.png'

draw_full(coords_improved, edge_index_np, y, list(le.classes_),
          full_path, f"kNN Graph (mutual={USE_MUTUAL_KNN}, k={IMPROVED_HP['knn_k']}) — all nodes")
draw_test_overlay(coords_improved, edge_index_np, y, data.test_mask,
                  list(le.classes_), overlay_path, "Internal Test Overlay — Improved")


Saved: ../resgcn_runs/graph_internal_full_improved.png
Saved: ../resgcn_runs/graph_internal_test_overlay_improved.png


In [51]:
# %% [markdown]
# ## Cell 11 — Hyperparameter Tuning (Improved pipeline) + retrain best

import time, itertools, random
import numpy as np, pandas as pd, torch
import torch.nn.functional as F
from pathlib import Path
from sklearn.metrics import (accuracy_score, precision_score, recall_score,
                             f1_score, confusion_matrix, roc_auc_score, average_precision_score)
from sklearn.preprocessing import label_binarize
import matplotlib.pyplot as plt

# ---------- 검색 공간 ----------
SPACE = {
    "hidden":       [128, 256],
    "layers":       [2, 3, 4],
    "dropout":      [0.1, 0.3, 0.5],
    "lr":           [1e-3, 5e-4],
    "weight_decay": [1e-5, 5e-4, 1e-3],
    "knn_k":        [8, 10, 12, 15],
}
N_TRIALS = 14   # 부담되면 줄여도 OK
PATIENCE  = 8

def evaluate_subset(logits, y_true, n_classes):
    y_np = y_true.detach().cpu().numpy()
    y_pred = logits.argmax(-1).detach().cpu().numpy()
    acc  = accuracy_score(y_np, y_pred)
    prec = precision_score(y_np, y_pred, average='macro', zero_division=0)
    rec  = recall_score(y_np, y_pred, average='macro', zero_division=0)
    f1   = f1_score(y_np, y_pred, average='macro', zero_division=0)
    try:
        y_ovr = label_binarize(y_np, classes=np.arange(n_classes))
        y_prob = F.softmax(logits, dim=-1).detach().cpu().numpy()
        roc = roc_auc_score(y_ovr, y_prob, average='macro', multi_class='ovr')
        pr  = average_precision_score(y_ovr, y_prob, average='macro')
    except Exception:
        roc, pr = float('nan'), float('nan')
    return dict(acc=acc, prec=prec, rec=rec, f1=f1, roc_auc=roc, pr_auc=pr)

def run_one_trial(hp):
    # 1) 그래프 재구성 (mutual kNN, k=hp['knn_k'])
    idxs = build_knn_indices(X, k=int(hp['knn_k']), metric=KNN_METRIC)
    ei_np = build_edge_index_from_neighbors(idxs, mutual=True)

    data_k = Data(
        x=data.x.detach().clone(),
        y=data.y.detach().clone(),
        edge_index=torch.tensor(ei_np, dtype=torch.long, device=DEVICE),
    )
    data_k.train_mask = data.train_mask
    data_k.val_mask   = data.val_mask
    data_k.test_mask  = data.test_mask

    # 2) 모델/옵티마
    model_k = ResGCN(
        in_dim=data_k.x.size(1),
        hidden=int(hp['hidden']),
        out_dim=num_classes,
        layers=int(hp['layers']),
        dropout=float(hp['dropout'])
    ).to(DEVICE)

    opt = torch.optim.Adam(
        model_k.parameters(),
        lr=float(hp['lr']),
        weight_decay=float(hp['weight_decay'])
    )

    # 3) 학습
    best_val, best_state, stale = -1.0, None, 0
    t0 = time.time()
    for ep in range(1, int(IMPROVED_HP['epochs']) + 1):
        model_k.train()
        opt.zero_grad()
        logits = model_k(data_k)
        loss = ce_with_smoothing(logits[data_k.train_mask], data_k.y[data_k.train_mask])
        loss.backward()
        opt.step()

        model_k.eval()
        with torch.no_grad():
            logits_eval = model_k(data_k)
            val = evaluate_subset(logits_eval[data_k.val_mask], data_k.y[data_k.val_mask], num_classes)
        if val['f1'] > best_val + 1e-6:
            best_val = val['f1']
            best_state = {k: v.detach().cpu().clone() for k, v in model_k.state_dict().items()}
            stale = 0
        else:
            stale += 1
            if stale >= PATIENCE:
                break

    # 4) 베스트 로드 후 test 평가
    if best_state is not None:
        model_k.load_state_dict({k: v.to(DEVICE) for k, v in best_state.items()})
    model_k.eval()
    with torch.no_grad():
        logits_final = model_k(data_k)
        val_metrics  = evaluate_subset(logits_final[data_k.val_mask],  data_k.y[data_k.val_mask],  num_classes)
        test_metrics = evaluate_subset(logits_final[data_k.test_mask], data_k.y[data_k.test_mask], num_classes)

    sec = time.time() - t0
    return val_metrics, test_metrics, sec, ei_np, model_k

# --------- 샘플링 목록 만들기 (랜덤 탐색) ----------
grid = list(itertools.product(
    SPACE['hidden'], SPACE['layers'], SPACE['dropout'],
    SPACE['lr'], SPACE['weight_decay'], SPACE['knn_k']
))
random.Random(RNG_SEED).shuffle(grid)
cands = grid[:N_TRIALS]

records = []
best_row = None
best_model = None
best_ei_np = None

print(f"Tuning {len(cands)} trials...")
for t, (hidden, layers, dropout, lr, weight_decay, knn_k) in enumerate(cands, 1):
    hp = dict(hidden=hidden, layers=layers, dropout=dropout, lr=lr,
              weight_decay=weight_decay, knn_k=knn_k)
    val_m, test_m, sec, ei_np, m = run_one_trial(hp)
    row = dict(trial=t, **hp, sec=round(sec,1),
               val_f1=val_m['f1'], val_acc=val_m['acc'],
               test_f1=test_m['f1'], test_acc=test_m['acc'])
    records.append(row)
    # 베스트 갱신 기준: val_f1 → ties 시 val_acc
    if (best_row is None) or (row['val_f1'] > best_row['val_f1'] + 1e-9) or \
       (abs(row['val_f1']-best_row['val_f1'])<1e-9 and row['val_acc']>best_row['val_acc']):
        best_row = row
        best_model = m
        best_ei_np = ei_np
    print(f"[{t}/{len(cands)}] valF1={row['val_f1']:.4f} valAcc={row['val_acc']:.3f}  hp={hp}")

# 결과 저장/표시
df_trials = pd.DataFrame(records).sort_values(["val_f1","val_acc"], ascending=False)
tune_csv = OUTDIR / "tuning_trials_improved.csv"
df_trials.to_csv(tune_csv, index=False)
print("\nSaved tuning summary ->", tune_csv)
display(df_trials.head(10))

print("\n# Best HP:", {k: best_row[k] for k in ["hidden","layers","dropout","lr","weight_decay","knn_k"]})

# ---------- 베스트 HP로 재학습(그래프 고정=best knn_k) & 평가/저장 ----------
# 그래프/데이터 고정
ei_np = best_ei_np
data_best = Data(
    x=data.x.detach().clone(),
    y=data.y.detach().clone(),
    edge_index=torch.tensor(ei_np, dtype=torch.long, device=DEVICE),
)
data_best.train_mask = data.train_mask
data_best.val_mask   = data.val_mask
data_best.test_mask  = data.test_mask

# 모델/옵티마
hp = {k: best_row[k] for k in ["hidden","layers","dropout","lr","weight_decay","knn_k"]}
model_best = ResGCN(
    in_dim=data_best.x.size(1),
    hidden=int(hp['hidden']),
    out_dim=num_classes,
    layers=int(hp['layers']),
    dropout=float(hp['dropout'])
).to(DEVICE)
opt = torch.optim.Adam(model_best.parameters(), lr=float(hp['lr']), weight_decay=float(hp['weight_decay']))

# 학습
best_val, best_state, stale = -1.0, None, 0
for ep in range(1, int(IMPROVED_HP['epochs']) + 1):
    model_best.train(); opt.zero_grad()
    logits = model_best(data_best)
    loss = ce_with_smoothing(logits[data_best.train_mask], data_best.y[data_best.train_mask])
    loss.backward(); opt.step()
    model_best.eval()
    with torch.no_grad():
        logits_eval = model_best(data_best)
        val = evaluate_subset(logits_eval[data_best.val_mask], data_best.y[data_best.val_mask], num_classes)
    if val['f1'] > best_val + 1e-6:
        best_val = val['f1']
        best_state = {k: v.detach().cpu().clone() for k, v in model_best.state_dict().items()}
        stale = 0
    else:
        stale += 1
        if stale >= PATIENCE:
            break
if best_state is not None:
    model_best.load_state_dict({k: v.to(DEVICE) for k, v in best_state.items()})

# 평가
model_best.eval()
with torch.no_grad():
    logits_fin = model_best(data_best)
    test_m = evaluate_subset(logits_fin[data_best.test_mask], data_best.y[data_best.test_mask], num_classes)

print("\n# Internal Test (macro) — Improved+BestHP")
for k in ['acc','prec','rec','f1','roc_auc','pr_auc']:
    print(f"{k.upper():>8}: {test_m[k]:.4f}")

# 혼동행렬 저장
cm = confusion_matrix(
    data_best.y[data_best.test_mask].detach().cpu().numpy(),
    logits_fin[data_best.test_mask].argmax(-1).detach().cpu().numpy()
)
fig, ax = plt.subplots(figsize=(8,6))
im = ax.imshow(cm, interpolation='nearest'); ax.figure.colorbar(im, ax=ax)
ax.set_title('Confusion Matrix — Internal Test (Improved+BestHP)')
ax.set_xlabel('Predicted'); ax.set_ylabel('True')
ax.set_xticks(range(num_classes)); ax.set_yticks(range(num_classes))
ax.set_xticklabels([c[:14] for c in list(le.classes_)], rotation=45, ha='right')
ax.set_yticklabels([c[:14] for c in list(le.classes_)])
for (i,j), v in np.ndenumerate(cm):
    ax.text(j, i, int(v), ha='center', va='center', fontsize=8)
plt.tight_layout()
cm_path = OUTDIR / 'cm_internal_improved_besthp.png'
plt.savefig(cm_path, dpi=220); plt.close(fig)
print("Saved CM ->", cm_path)

# 모델 저장
ckpt_path = OUTDIR / 'resgcn_improved_besthp.pt'
torch.save({
    'state_dict': model_best.state_dict(),
    'hp': hp,
    'knn_k': int(hp['knn_k']),
    'config': {
        'in_dim': int(data_best.x.size(1)),
        'out_dim': int(num_classes),
        'label_mapping': dict(enumerate([str(c) for c in list(le.classes_)])),
        'metric': KNN_METRIC,
        'embedder': EMBEDDER_NAME,
        'mutual_kNN': True
    },
    'class_weights': class_weights.detach().cpu().numpy().tolist(),
    'label_smoothing': float(LABEL_SMOOTHING)
}, ckpt_path)
size_mb = ckpt_path.stat().st_size / (1024*1024)
print(f"Saved model -> {ckpt_path} ({size_mb:.2f} MB)")

# 메트릭 요약에 추가
row = {
    "Model": "Improved+BestHP",
    "acc": test_m['acc'],
    "prec": test_m['prec'],
    "rec": test_m['rec'],
    "f1": test_m['f1'],
    "roc_auc": test_m['roc_auc'],
    "pr_auc": test_m['pr_auc'],
    "ModelSizeMB": round(size_mb, 2),
    "kNN_k": int(hp['knn_k']),
    "embedder": EMBEDDER_NAME,
    "mutual_kNN": True,
    "label_smoothing": LABEL_SMOOTHING
}
sum_csv = OUTDIR / 'metrics_internal_summary.csv'
if sum_csv.exists():
    df_old = pd.read_csv(sum_csv)
    df_new = pd.concat([df_old, pd.DataFrame([row])], ignore_index=True)
else:
    df_new = pd.DataFrame([row])
df_new.to_csv(sum_csv, index=False)
print("Updated metrics ->", sum_csv)
display(df_new.tail(3))


Tuning 14 trials...
[1/14] valF1=0.2727 valAcc=0.675  hp={'hidden': 256, 'layers': 4, 'dropout': 0.5, 'lr': 0.001, 'weight_decay': 0.001, 'knn_k': 10}
[2/14] valF1=0.4166 valAcc=0.750  hp={'hidden': 256, 'layers': 2, 'dropout': 0.1, 'lr': 0.0005, 'weight_decay': 1e-05, 'knn_k': 8}
[3/14] valF1=0.5203 valAcc=0.822  hp={'hidden': 256, 'layers': 2, 'dropout': 0.5, 'lr': 0.0005, 'weight_decay': 1e-05, 'knn_k': 12}
[4/14] valF1=0.8258 valAcc=0.925  hp={'hidden': 128, 'layers': 3, 'dropout': 0.1, 'lr': 0.0005, 'weight_decay': 0.001, 'knn_k': 8}
[5/14] valF1=0.7779 valAcc=0.912  hp={'hidden': 128, 'layers': 3, 'dropout': 0.5, 'lr': 0.0005, 'weight_decay': 0.0005, 'knn_k': 15}
[6/14] valF1=0.4264 valAcc=0.784  hp={'hidden': 256, 'layers': 4, 'dropout': 0.5, 'lr': 0.0005, 'weight_decay': 1e-05, 'knn_k': 10}
[7/14] valF1=0.7752 valAcc=0.912  hp={'hidden': 128, 'layers': 2, 'dropout': 0.1, 'lr': 0.0005, 'weight_decay': 1e-05, 'knn_k': 12}
[8/14] valF1=0.4020 valAcc=0.750  hp={'hidden': 256, 'laye

Unnamed: 0,trial,hidden,layers,dropout,lr,weight_decay,knn_k,sec,val_f1,val_acc,test_f1,test_acc
11,12,128,2,0.1,0.001,0.001,15,3.3,0.826849,0.921875,0.841466,0.921997
3,4,128,3,0.1,0.0005,0.001,8,3.8,0.825769,0.925,0.838005,0.917317
8,9,128,3,0.1,0.0005,0.001,10,3.7,0.82292,0.934375,0.844765,0.928237
4,5,128,3,0.5,0.0005,0.0005,15,4.3,0.777945,0.9125,0.837959,0.918877
6,7,128,2,0.1,0.0005,1e-05,12,3.3,0.775151,0.9125,0.830548,0.917317
2,3,256,2,0.5,0.0005,1e-05,12,1.5,0.520326,0.821875,0.46169,0.787832
10,11,256,2,0.5,0.001,1e-05,15,1.4,0.50248,0.79375,0.466161,0.765991
13,14,128,4,0.1,0.001,0.0005,10,0.9,0.427038,0.68125,0.357869,0.639626
5,6,256,4,0.5,0.0005,1e-05,10,1.5,0.426357,0.784375,0.377386,0.75195
1,2,256,2,0.1,0.0005,1e-05,8,1.2,0.416649,0.75,0.396484,0.730109



# Best HP: {'hidden': 128, 'layers': 2, 'dropout': 0.1, 'lr': 0.001, 'weight_decay': 0.001, 'knn_k': 15}

# Internal Test (macro) — Improved+BestHP
     ACC: 0.6490
    PREC: 0.3103
     REC: 0.2448
      F1: 0.2420
 ROC_AUC: 0.9607
  PR_AUC: 0.7564
Saved CM -> ../resgcn_runs/cm_internal_improved_besthp.png
Saved model -> ../resgcn_runs/resgcn_improved_besthp.pt (0.83 MB)
Updated metrics -> ../resgcn_runs/metrics_internal_summary.csv


Unnamed: 0,Model,acc,prec,rec,f1,roc_auc,pr_auc,ModelSizeMB,kNN_k,embedder,mutual_kNN,label_smoothing
0,Improved,0.939158,0.882171,0.875349,0.876785,0.990689,0.922132,0.83,10,sentence-transformers/all-mpnet-base-v2,True,0.05
1,Improved+BestHP,0.648986,0.310345,0.244827,0.241957,0.960709,0.756355,0.83,15,sentence-transformers/all-mpnet-base-v2,True,0.05


In [52]:
# %% [markdown]
# ## Cell 11 — Hotfix: use best_model from tuning (no retrain), evaluate & save

import numpy as np, pandas as pd, torch, torch.nn.functional as F
import matplotlib.pyplot as plt
from pathlib import Path
from sklearn.metrics import (accuracy_score, precision_score, recall_score,
                             f1_score, confusion_matrix, roc_auc_score, average_precision_score)
from sklearn.preprocessing import label_binarize

# 0) 가드: 이전 셀의 결과물이 있어야 함
assert 'best_model' in globals() and 'best_row' in globals() and 'best_ei_np' in globals(), \
    "tuning 셀을 먼저 실행해 best_model/best_row/best_ei_np가 메모리에 있어야 합니다."

# 1) best knn_k로 데이터 고정
ei_np = best_ei_np
data_best = Data(
    x=data.x.detach().clone(),
    y=data.y.detach().clone(),
    edge_index=torch.tensor(ei_np, dtype=torch.long, device=DEVICE),
)
data_best.train_mask = data.train_mask
data_best.val_mask   = data.val_mask
data_best.test_mask  = data.test_mask

# 2) best_model 바로 평가
best_model.eval()
with torch.no_grad():
    logits_fin = best_model(data_best)

def evaluate_subset(logits, y_true, n_classes):
    y_np = y_true.detach().cpu().numpy()
    y_pred = logits.argmax(-1).detach().cpu().numpy()
    acc  = accuracy_score(y_np, y_pred)
    prec = precision_score(y_np, y_pred, average='macro', zero_division=0)
    rec  = recall_score(y_np, y_pred, average='macro', zero_division=0)
    f1   = f1_score(y_np, y_pred, average='macro', zero_division=0)
    try:
        y_ovr = label_binarize(y_np, classes=np.arange(n_classes))
        y_prob = F.softmax(logits, dim=-1).detach().cpu().numpy()
        roc = roc_auc_score(y_ovr, y_prob, average='macro', multi_class='ovr')
        pr  = average_precision_score(y_ovr, y_prob, average='macro')
    except Exception:
        roc, pr = float('nan'), float('nan')
    return dict(acc=acc, prec=prec, rec=rec, f1=f1, roc_auc=roc, pr_auc=pr)

val_m  = evaluate_subset(logits_fin[data_best.val_mask],  data_best.y[data_best.val_mask],  num_classes)
test_m = evaluate_subset(logits_fin[data_best.test_mask], data_best.y[data_best.test_mask], num_classes)

print("# Using best_model from tuning (no retrain)")
print(" Best HP:", {k: best_row[k] for k in ['hidden','layers','dropout','lr','weight_decay','knn_k']})
print("\nValidation (should match table ±ε):",
      f"F1={val_m['f1']:.4f}, ACC={val_m['acc']:.4f}")
print("Test        (should match table ±ε):",
      f"F1={test_m['f1']:.4f}, ACC={test_m['acc']:.4f}")

# 3) 혼동행렬 저장
cm = confusion_matrix(
    data_best.y[data_best.test_mask].detach().cpu().numpy(),
    logits_fin[data_best.test_mask].argmax(-1).detach().cpu().numpy()
)
fig, ax = plt.subplots(figsize=(8,6))
im = ax.imshow(cm, interpolation='nearest'); ax.figure.colorbar(im, ax=ax)
ax.set_title('Confusion Matrix — Internal Test (Improved+BestHP from tuning)')
ax.set_xlabel('Predicted'); ax.set_ylabel('True')
ax.set_xticks(range(num_classes)); ax.set_yticks(range(num_classes))
ax.set_xticklabels([c[:14] for c in list(le.classes_)], rotation=45, ha='right')
ax.set_yticklabels([c[:14] for c in list(le.classes_)])
for (i,j), v in np.ndenumerate(cm):
    ax.text(j, i, int(v), ha='center', va='center', fontsize=8)
plt.tight_layout()
cm_path = OUTDIR / 'cm_internal_improved_besthp_from_tuning.png'
plt.savefig(cm_path, dpi=220); plt.close(fig)
print("Saved CM ->", cm_path)

# 4) 모델 저장
ckpt_path = OUTDIR / 'resgcn_improved_besthp_from_tuning.pt'
torch.save({
    'state_dict': best_model.state_dict(),
    'hp': {k: best_row[k] for k in ['hidden','layers','dropout','lr','weight_decay','knn_k']},
    'knn_k': int(best_row['knn_k']),
    'config': {
        'in_dim': int(data_best.x.size(1)),
        'out_dim': int(num_classes),
        'label_mapping': dict(enumerate([str(c) for c in list(le.classes_)])),
        'metric': KNN_METRIC,
        'embedder': EMBEDDER_NAME,
        'mutual_kNN': True
    },
    'class_weights': class_weights.detach().cpu().numpy().tolist(),
    'label_smoothing': float(LABEL_SMOOTHING)
}, ckpt_path)
size_mb = ckpt_path.stat().st_size / (1024*1024)
print(f"Saved model -> {ckpt_path} ({size_mb:.2f} MB)")

# 5) 메트릭 요약 업데이트
row = {
    "Model": "Improved+BestHP(tuning)",
    "acc": test_m['acc'],
    "prec": test_m['prec'],
    "rec": test_m['rec'],
    "f1": test_m['f1'],
    "roc_auc": test_m['roc_auc'],
    "pr_auc": test_m['pr_auc'],
    "ModelSizeMB": round(size_mb, 2),
    "kNN_k": int(best_row['knn_k']),
    "embedder": EMBEDDER_NAME,
    "mutual_kNN": True,
    "label_smoothing": LABEL_SMOOTHING
}
csv_path = OUTDIR / 'metrics_internal_summary.csv'
if csv_path.exists():
    df_old = pd.read_csv(csv_path)
    df_new = pd.concat([df_old, pd.DataFrame([row])], ignore_index=True)
else:
    df_new = pd.DataFrame([row])
df_new.to_csv(csv_path, index=False)
print("Updated metrics ->", csv_path)
display(df_new.tail(3))


# Using best_model from tuning (no retrain)
 Best HP: {'hidden': 128, 'layers': 2, 'dropout': 0.1, 'lr': 0.001, 'weight_decay': 0.001, 'knn_k': 15}

Validation (should match table ±ε): F1=0.8268, ACC=0.9219
Test        (should match table ±ε): F1=0.8415, ACC=0.9220
Saved CM -> ../resgcn_runs/cm_internal_improved_besthp_from_tuning.png
Saved model -> ../resgcn_runs/resgcn_improved_besthp_from_tuning.pt (0.83 MB)
Updated metrics -> ../resgcn_runs/metrics_internal_summary.csv


Unnamed: 0,Model,acc,prec,rec,f1,roc_auc,pr_auc,ModelSizeMB,kNN_k,embedder,mutual_kNN,label_smoothing
0,Improved,0.939158,0.882171,0.875349,0.876785,0.990689,0.922132,0.83,10,sentence-transformers/all-mpnet-base-v2,True,0.05
1,Improved+BestHP,0.648986,0.310345,0.244827,0.241957,0.960709,0.756355,0.83,15,sentence-transformers/all-mpnet-base-v2,True,0.05
2,Improved+BestHP(tuning),0.921997,0.839287,0.848213,0.841466,0.987748,0.900755,0.83,15,sentence-transformers/all-mpnet-base-v2,True,0.05


In [53]:
# %% [markdown]
# ## BestHP (from tuning) — visuals & curves & report (internal)

import numpy as np, pandas as pd, torch, torch.nn.functional as F
import matplotlib.pyplot as plt, networkx as nx
from pathlib import Path
from sklearn.metrics import (classification_report, confusion_matrix,
                             roc_curve, auc, precision_recall_curve,
                             accuracy_score, precision_score, recall_score, f1_score,
                             roc_auc_score, average_precision_score)
from sklearn.preprocessing import label_binarize

# ---------- 0) 준비: 모델/데이터 확보 ----------
# OUTDIR / label encoder / classes
classes = list(le.classes_)
n_classes = len(classes)

# best knn_k의 edge_index / data_best 확보
if 'best_ei_np' not in globals():
    # 튜닝을 안 거친 세션이라면 HP와 모델을 파일에서 복구
    ckpt = OUTDIR / 'resgcn_improved_besthp_from_tuning.pt'
    assert ckpt.exists(), "best_ei_np가 없고 ckpt도 없습니다. 튜닝/핫픽스 셀을 먼저 실행하세요."
    ck = torch.load(ckpt, map_location='cpu')
    knn_k = int(ck.get('knn_k', 15))
    # mutual kNN으로 edge 재구성
    idxs = build_knn_indices(X, k=knn_k, metric=KNN_METRIC)
    best_ei_np = build_edge_index_from_neighbors(idxs, mutual=True)

data_best = Data(
    x=data.x.detach().clone(),
    y=data.y.detach().clone(),
    edge_index=torch.tensor(best_ei_np, dtype=torch.long, device=DEVICE),
)
data_best.train_mask = data.train_mask
data_best.val_mask   = data.val_mask
data_best.test_mask  = data.test_mask

# best_model 확보 (메모리 없으면 ckpt에서 로드)
if 'best_model' not in globals():
    from copy import deepcopy
    ckpt = OUTDIR / 'resgcn_improved_besthp_from_tuning.pt'
    ck = torch.load(ckpt, map_location=DEVICE)
    hp = ck['hp']
    model_tmp = ResGCN(
        in_dim=int(data_best.x.size(1)),
        hidden=int(hp['hidden']),
        out_dim=n_classes,
        layers=int(hp['layers']),
        dropout=float(hp['dropout'])
    ).to(DEVICE)
    model_tmp.load_state_dict(ck['state_dict'])
    best_model = model_tmp

best_model.eval()
with torch.no_grad():
    logits_all = best_model(data_best)

test_mask = data_best.test_mask.detach().cpu().numpy().astype(bool)
y_true = data_best.y.detach().cpu().numpy()[test_mask]
probs  = F.softmax(logits_all, dim=-1).detach().cpu().numpy()[test_mask]
y_pred = probs.argmax(1)

# ---------- 1) 리포트/지표 저장 ----------
report_txt = classification_report(y_true, y_pred, target_names=classes, zero_division=0)
acc  = accuracy_score(y_true, y_pred)
prec = precision_score(y_true, y_pred, average='macro', zero_division=0)
rec  = recall_score(y_true, y_pred, average='macro', zero_division=0)
f1   = f1_score(y_true, y_pred, average='macro', zero_division=0)
y_ovr = label_binarize(y_true, classes=np.arange(n_classes))
roc  = roc_auc_score(y_ovr, probs, average='macro', multi_class='ovr')
pr   = average_precision_score(y_ovr, probs, average='macro')

rep_path = OUTDIR / 'classification_report_internal_besthp.txt'
with open(rep_path, 'w', encoding='utf-8') as f:
    f.write(report_txt + "\n\n")
    f.write(f"ACC={acc:.4f}  PREC={prec:.4f}  REC={rec:.4f}  F1={f1:.4f}\n")
    f.write(f"ROC_AUC={roc:.4f}  PR_AUC={pr:.4f}\n")
print("Saved report ->", rep_path)

# ---------- 2) ROC / PR 커브 ----------
# per-class curves + micro/macro
fpr, tpr, roc_auc = {}, {}, {}
prec_c, rec_c, pr_auc = {}, {}, {}

# micro
fpr["micro"], tpr["micro"], _ = roc_curve(y_ovr.ravel(), probs.ravel())
roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])
prec_c["micro"], rec_c["micro"], _ = precision_recall_curve(y_ovr.ravel(), probs.ravel())
pr_auc["micro"] = auc(rec_c["micro"], prec_c["micro"])

# per-class
for i, cname in enumerate(classes):
    fpr[i], tpr[i], _ = roc_curve(y_ovr[:, i], probs[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])
    prec_c[i], rec_c[i], _ = precision_recall_curve(y_ovr[:, i], probs[:, i])
    pr_auc[i] = auc(rec_c[i], prec_c[i])

# macro = per-class 평균
roc_auc["macro"] = np.mean([roc_auc[i] for i in range(n_classes)])
pr_auc["macro"]  = np.mean([pr_auc[i]  for i in range(n_classes)])

# Plot ROC
fig = plt.figure(figsize=(10,7)); ax = fig.add_subplot(111)
ax.plot(fpr["micro"], tpr["micro"], lw=2.5, label=f"micro-average (AUC={roc_auc['micro']:.3f})")
for i, cname in enumerate(classes):
    ax.plot(fpr[i], tpr[i], lw=1.2, alpha=0.8, label=f"{cname[:18]} (AUC={roc_auc[i]:.3f})")
ax.plot([0,1],[0,1],'k--',lw=1)
ax.set_xlabel('False Positive Rate'); ax.set_ylabel('True Positive Rate')
ax.set_title('ROC Curves — Internal Test (BestHP)')
ax.legend(bbox_to_anchor=(1.02,0.5), loc='center left', frameon=False)
plt.tight_layout()
roc_path = OUTDIR / 'roc_curves_internal_besthp.png'
plt.savefig(roc_path, dpi=220, bbox_inches='tight'); plt.close(fig)
print("Saved ROC ->", roc_path)

# Plot PR
fig = plt.figure(figsize=(10,7)); ax = fig.add_subplot(111)
ax.plot(rec_c["micro"], prec_c["micro"], lw=2.5, label=f"micro-average (AUC={pr_auc['micro']:.3f})")
for i, cname in enumerate(classes):
    ax.plot(rec_c[i], prec_c[i], lw=1.2, alpha=0.8, label=f"{cname[:18]} (AUC={pr_auc[i]:.3f})")
ax.set_xlabel('Recall'); ax.set_ylabel('Precision')
ax.set_title('PR Curves — Internal Test (BestHP)')
ax.legend(bbox_to_anchor=(1.02,0.5), loc='center left', frameon=False)
plt.tight_layout()
pr_path = OUTDIR / 'pr_curves_internal_besthp.png'
plt.savefig(pr_path, dpi=220, bbox_inches='tight'); plt.close(fig)
print("Saved PR ->", pr_path)

# ---------- 3) Per-class bar (Precision/Recall/F1) ----------
per_prec = precision_score(y_true, y_pred, average=None, zero_division=0)
per_rec  = recall_score(y_true, y_pred, average=None, zero_division=0)
per_f1   = f1_score(y_true, y_pred, average=None, zero_division=0)

fig, ax = plt.subplots(figsize=(12,6))
x = np.arange(n_classes)
ax.bar(x-0.25, per_prec, width=0.25, label='Precision')
ax.bar(x,       per_rec,  width=0.25, label='Recall')
ax.bar(x+0.25,  per_f1,   width=0.25, label='F1')
ax.set_xticks(x); ax.set_xticklabels([c[:14] for c in classes], rotation=45, ha='right')
ax.set_ylim(0,1.05); ax.legend()
ax.set_title('Per-class metrics — Internal Test (BestHP)')
plt.tight_layout()
bar_path = OUTDIR / 'per_class_bars_internal_besthp.png'
plt.savefig(bar_path, dpi=220); plt.close(fig)
print("Saved bars ->", bar_path)

# ---------- 4) 그래프 시각화 (BestHP: k=best, mutual) ----------
# 레이아웃은 best 그래프 기반으로 새로 계산
G = nx.Graph(); G.add_nodes_from(range(X.shape[0])); G.add_edges_from(list(zip(best_ei_np[0], best_ei_np[1])))
pos = nx.spring_layout(G, seed=RNG_SEED, iterations=50)
coords_best = np.array([pos[i] for i in range(X.shape[0])])

# 전체
fig = plt.figure(figsize=(16,10)); ax = fig.add_subplot(111)
ax.set_title(f"kNN Graph (mutual=True, k={int(best_row['knn_k'])}) — all nodes (BestHP)")
# edges
for (u,v) in zip(best_ei_np[0], best_ei_np[1]):
    ax.plot([coords_best[u,0], coords_best[v,0]],[coords_best[u,1], coords_best[v,1]], alpha=0.03)
# nodes by class
for ci, cname in enumerate(classes):
    idx = np.where(data_best.y.detach().cpu().numpy() == ci)[0]
    if len(idx): ax.scatter(coords_best[idx,0], coords_best[idx,1], s=8, alpha=0.85, label=cname)
ax.legend(loc='center left', bbox_to_anchor=(1.0,0.5), frameon=False)
ax.axis('off'); plt.tight_layout()
full_best_path = OUTDIR / 'graph_internal_full_besthp.png'
plt.savefig(full_best_path, dpi=220, bbox_inches='tight'); plt.close(fig)
print("Saved graph(full) ->", full_best_path)

# 테스트 오버레이 (정답/오답 테두리)
fig = plt.figure(figsize=(16,10)); ax = fig.add_subplot(111)
ax.set_title("Internal Test Overlay — BestHP")
for (u,v) in zip(best_ei_np[0], best_ei_np[1]):
    ax.plot([coords_best[u,0], coords_best[v,0]],[coords_best[u,1], coords_best[v,1]], alpha=0.02, color='gray')
ax.scatter(coords_best[:,0], coords_best[:,1], s=6, alpha=0.1, c='lightgray')
tm = test_mask
# per class test nodes (큰 점 + 검정 테두리, 정답은 초록/오답은 빨강 테두리)
test_pred = y_pred
test_true = y_true
correct = (test_pred == test_true)
for ci, cname in enumerate(classes):
    idx = np.where((data_best.y.detach().cpu().numpy()==ci) & tm)[0]
    if len(idx):
        edgecols = ['k']*len(idx)
        ax.scatter(coords_best[idx,0], coords_best[idx,1], s=140, edgecolor='k', linewidths=0.9, label=cname)
# 정답/오답 테두리 강조
idx_test = np.where(tm)[0]
ax.scatter(coords_best[idx_test[ correct],0], coords_best[idx_test[ correct],1],
           s=160, facecolors='none', edgecolors='lime', linewidths=1.8, label='Correct')
ax.scatter(coords_best[idx_test[~correct],0], coords_best[idx_test[~correct],1],
           s=160, facecolors='none', edgecolors='red',  linewidths=1.8, label='Misclassified')
ax.legend(loc='center left', bbox_to_anchor=(1.0,0.5), frameon=False)
ax.axis('off'); plt.tight_layout()
overlay_best_path = OUTDIR / 'graph_internal_test_overlay_besthp.png'
plt.savefig(overlay_best_path, dpi=220, bbox_inches='tight'); plt.close(fig)
print("Saved graph(overlay) ->", overlay_best_path)

print("\nDone. Files written under:", OUTDIR)


Saved report -> ../resgcn_runs/classification_report_internal_besthp.txt
Saved ROC -> ../resgcn_runs/roc_curves_internal_besthp.png
Saved PR -> ../resgcn_runs/pr_curves_internal_besthp.png
Saved bars -> ../resgcn_runs/per_class_bars_internal_besthp.png
Saved graph(full) -> ../resgcn_runs/graph_internal_full_besthp.png
Saved graph(overlay) -> ../resgcn_runs/graph_internal_test_overlay_besthp.png

Done. Files written under: ../resgcn_runs
