In [2]:
# encoder_only_kfold_tuning.py – versi 1x training per fold, tuning dilakukan per fold

import os, glob, gc, copy, math, random, time, json
import numpy as np, pandas as pd, matplotlib.pyplot as plt
import torch, torch.nn as nn, torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix
from tqdm import tqdm

# ====== KONFIGURASI ======
DATA_DIR    = "/workspace/SPLIT_BEATS_NPY/train"
LABEL_MAP   = {'N':0,'L':1,'R':2,'V':3,'Q':4}
SEED        = 42
N_SPLITS    = 5
EPOCHS      = 20
MAX_LEN     = 512
EMB_DIM     = 512
N_HEADS     = 8
FF_DIM      = 2048
N_LAYERS    = 12
GRID = [{"lr": 2e-5, "batch_size": 32}]  # hanya 1 kombinasi agar train 1x/fold
OUTPUT_BASE = "/workspace/HASIL_ENCODER_Tuned/HASIL_1"

os.makedirs(OUTPUT_BASE, exist_ok=True)
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PAD_ID, CLS_ID  = 256, 257
VOCAB_SIZE      = 258
cls_names       = list(LABEL_MAP.keys())

# ====== UTIL ======
def signal_to_ids(sig):
    norm = ((sig - sig.min()) / (sig.ptp() + 1e-8) * 255).astype(int)
    ids  = np.concatenate(([CLS_ID], norm))[:MAX_LEN]
    mask = np.ones_like(ids)
    if len(ids) < MAX_LEN:
        pad_len = MAX_LEN - len(ids)
        ids  = np.concatenate((ids,  np.full(pad_len, PAD_ID)))
        mask = np.concatenate((mask, np.zeros(pad_len)))
    return ids, mask

# ====== MODEL DEFINITION ======
class EncoderOnlyClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.token_emb = nn.Embedding(VOCAB_SIZE, EMB_DIM, padding_idx=PAD_ID)
        self.pos_emb   = nn.Parameter(torch.zeros(1, MAX_LEN, EMB_DIM))
        enc_layer = nn.TransformerEncoderLayer(d_model=EMB_DIM, nhead=N_HEADS, dim_feedforward=FF_DIM, dropout=0.1, batch_first=True)
        self.encoder  = nn.TransformerEncoder(enc_layer, num_layers=N_LAYERS)
        self.fc       = nn.Linear(EMB_DIM, len(LABEL_MAP))
        nn.init.normal_(self.pos_emb, std=0.02)

    def forward(self, input_ids, attention_mask):
        x = self.token_emb(input_ids) + self.pos_emb
        x = self.encoder(x, src_key_padding_mask=~attention_mask.bool())
        x = (x * attention_mask.unsqueeze(-1)).sum(1) / attention_mask.sum(1, keepdim=True).clamp(min=1e-9)
        return self.fc(x)

# ====== LOAD DATA ======
files, labels = [], []
for cls, idx in LABEL_MAP.items():
    for f in glob.glob(os.path.join(DATA_DIR, cls, "*.npy")):
        files.append(f); labels.append(idx)
files, labels = np.array(files), np.array(labels)

print("Pre‑encoding semua sampel …")
all_ids, all_mask = [], []
for f in tqdm(files):
    ids, msk = signal_to_ids(np.load(f))
    all_ids.append(ids); all_mask.append(msk)
all_ids  = torch.tensor(all_ids,  dtype=torch.long)
all_mask = torch.tensor(all_mask, dtype=torch.long)
labels_t = torch.tensor(labels,   dtype=torch.long)

# ====== K-FOLD TRAINING ======
skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=SEED)
rows_all = []; fold_val_losses = []

for fold, (tr, va) in enumerate(skf.split(all_ids, labels), 1):
    print(f"\n════ FOLD {fold}/{N_SPLITS} ════")
    gc.collect(); torch.cuda.empty_cache()
    
    best_loss = math.inf; best_state = None; best_history = None
    for hp in GRID:
        lr, bs = hp['lr'], hp['batch_size']
        train_ds = TensorDataset(all_ids[tr], all_mask[tr], labels_t[tr])
        val_ds   = TensorDataset(all_ids[va], all_mask[va], labels_t[va])
        train_loader = DataLoader(train_ds, batch_size=bs, shuffle=True)
        val_loader   = DataLoader(val_ds, batch_size=bs)

        model = EncoderOnlyClassifier().to(DEVICE)
        optim = torch.optim.AdamW(model.parameters(), lr=lr)
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optim, EPOCHS)

        history = []
        for epoch in range(1, EPOCHS + 1):
            model.train(); tot_loss = 0; correct = 0; total = 0
            for ids, msk, lbl in train_loader:
                ids, msk, lbl = ids.to(DEVICE), msk.to(DEVICE), lbl.to(DEVICE)
                optim.zero_grad(); out = model(ids, msk)
                loss = F.cross_entropy(out, lbl)
                loss.backward(); torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0); optim.step()
                tot_loss += loss.item() * len(lbl)
                correct += (out.argmax(1) == lbl).sum().item(); total += len(lbl)
            scheduler.step()
            train_loss = tot_loss / total
            train_acc  = correct / total

            model.eval(); val_loss, correct, total = 0, 0, 0
            with torch.no_grad():
                for ids, msk, lbl in val_loader:
                    ids, msk, lbl = ids.to(DEVICE), msk.to(DEVICE), lbl.to(DEVICE)
                    out = model(ids, msk)
                    val_loss += F.cross_entropy(out, lbl, reduction='sum').item()
                    correct += (out.argmax(1) == lbl).sum().item(); total += len(lbl)
            val_loss /= total
            val_acc  = correct / total

            history.append({"epoch": epoch, "train_loss": train_loss, "val_loss": val_loss, "train_acc": train_acc, "val_acc": val_acc})
            print(f"[Fold {fold}][Epoch {epoch}] TL: {train_loss:.4f} | VL: {val_loss:.4f} | TA: {train_acc:.4f} | VA: {val_acc:.4f}")

            if val_loss < best_loss:
                best_loss = val_loss
                best_state = copy.deepcopy(model.state_dict())
                best_history = history.copy()

    # Simpan hasil terbaik fold ini
    out_dir = os.path.join(OUTPUT_BASE, f"fold{fold}"); os.makedirs(out_dir, exist_ok=True)
    torch.save(best_state, os.path.join(out_dir, "best_model.pt"))
    with open(os.path.join(out_dir, "model_config.json"), "w") as f:
        json.dump({"emb_dim": EMB_DIM, "n_layers": N_LAYERS, "n_heads": N_HEADS, "ff_dim": FF_DIM, "max_len": MAX_LEN,
                   "vocab_size": VOCAB_SIZE, "pad_id": PAD_ID, "cls_id": CLS_ID, "label_map": LABEL_MAP}, f, indent=2)
    with open(os.path.join(out_dir, "vocab.txt"), "w") as f:
        f.writelines([f"{i}\n" for i in range(256)] + ["[PAD]\n", "[CLS]\n"])
    pd.DataFrame(best_history).to_csv(os.path.join(out_dir, "history_epoch.csv"), index=False)

    # Evaluasi dan simpan confusion matrix
    model.load_state_dict(best_state); model.eval()
    val_loader = DataLoader(val_ds, batch_size=bs)
    logits, y_true = [], []
    with torch.no_grad():
        for ids, msk, lbl in val_loader:
            ids, msk = ids.to(DEVICE), msk.to(DEVICE)
            logits.append(model(ids, msk).cpu()); y_true.append(lbl)
    preds = torch.cat(logits).argmax(1).numpy(); y_true = torch.cat(y_true).numpy()
    cm = confusion_matrix(y_true, preds, labels=list(range(len(cls_names))))
    plt.figure(figsize=(6,5)); plt.imshow(cm, cmap='Blues'); plt.title(f"Confusion Fold {fold}")
    plt.xticks(range(len(cls_names)), cls_names); plt.yticks(range(len(cls_names)), cls_names)
    for r in range(len(cm)):
        for c in range(len(cm)):
            plt.text(c, r, cm[r,c], ha='center', va='center')
    plt.xlabel('Predicted'); plt.ylabel('True'); plt.tight_layout()
    plt.savefig(os.path.join(out_dir, "confusion_fold.png")); plt.close()

    # Simpan metrik per kelas
    for i, cls in enumerate(cls_names):
        TP = cm[i,i]; FN = cm[i].sum()-TP; FP = cm[:,i].sum()-TP; TN = cm.sum()-TP-FN-FP
        ACC = (TP+TN)/cm.sum(); REC = TP/(TP+FN+1e-8); SPEC = TN/(TN+FP+1e-8); F1 = 2*TP/(2*TP+FP+FN+1e-8)
        rows_all.append({"fold": fold, "kelas": cls, "akurasi": round(ACC,4), "f1": round(F1,4), "recall": round(REC,4), "spesifisitas": round(SPEC,4)})
    fold_val_losses.append((fold, best_loss))

# Simpan metrik akhir
out_df = pd.DataFrame(rows_all)
out_df.to_csv(os.path.join(OUTPUT_BASE, "final_summary_perkelas.csv"), index=False)
best_fold = sorted(fold_val_losses, key=lambda x: x[1])[0][0]
out_df[out_df.fold == best_fold].to_csv(os.path.join(OUTPUT_BASE, "final_summary_best_only.csv"), index=False)

Pre‑encoding semua sampel …


100%|██████████| 28000/28000 [00:02<00:00, 9476.04it/s]



════ FOLD 1/5 ════
[Fold 1][Epoch 1] TL: 0.8875 | VL: 0.6864 | TA: 0.6675 | VA: 0.7498
[Fold 1][Epoch 2] TL: 0.5396 | VL: 0.5824 | TA: 0.8171 | VA: 0.7941
[Fold 1][Epoch 3] TL: 0.4140 | VL: 0.3921 | TA: 0.8582 | VA: 0.8677
[Fold 1][Epoch 4] TL: 0.3408 | VL: 0.4136 | TA: 0.8824 | VA: 0.8623
[Fold 1][Epoch 5] TL: 0.2933 | VL: 0.3507 | TA: 0.9011 | VA: 0.8834
[Fold 1][Epoch 6] TL: 0.2580 | VL: 0.3499 | TA: 0.9139 | VA: 0.8854
[Fold 1][Epoch 7] TL: 0.2210 | VL: 0.2894 | TA: 0.9258 | VA: 0.9109
[Fold 1][Epoch 8] TL: 0.1973 | VL: 0.2977 | TA: 0.9332 | VA: 0.9073
[Fold 1][Epoch 9] TL: 0.1732 | VL: 0.2568 | TA: 0.9413 | VA: 0.9202
[Fold 1][Epoch 10] TL: 0.1483 | VL: 0.2657 | TA: 0.9505 | VA: 0.9189
[Fold 1][Epoch 11] TL: 0.1300 | VL: 0.2606 | TA: 0.9575 | VA: 0.9229
[Fold 1][Epoch 12] TL: 0.1163 | VL: 0.2181 | TA: 0.9612 | VA: 0.9395
[Fold 1][Epoch 13] TL: 0.1003 | VL: 0.2223 | TA: 0.9667 | VA: 0.9387
[Fold 1][Epoch 14] TL: 0.0894 | VL: 0.2448 | TA: 0.9717 | VA: 0.9373
[Fold 1][Epoch 15] TL: 

In [2]:
# test_encoder_only.py
# Uji model EncoderOnlyClassifier pada set uji, hasil: confusion matrix & metrik per-kelas
import os, glob, json, math, copy
import numpy as np
import pandas as pd
import torch, torch.nn as nn, torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import confusion_matrix, f1_score, accuracy_score
import matplotlib.pyplot as plt

# ========== KONFIGURASI ========== 
MODEL_DIR     = r"D:\KULIAH\TELKOM_UNIVERSITY\SEMESTER_8\TA\TA_SKRIPSI_GUE\HASIL_TRAIN\BEATS\HASIL_Encoder_Beats_tuning\HASIL_Encoder_Beats_tuning\fold3"   # ganti ke fold terbaik Anda
TEST_DIR      = r"D:\KULIAH\TELKOM_UNIVERSITY\SEMESTER_8\TA\TA_SKRIPSI_GUE\DATA_UJI_INFERENCE\Beats_TEST"                # root data uji, berisi subfolder kelas
OUTPUT_DIR    = os.path.join(MODEL_DIR, "TEST_EVAL")             # hasil evaluasi uji
BATCH_SIZE    = 64
DEVICE        = torch.device("cuda" if torch.cuda.is_available() else "cpu")
EPS           = 1e-8

os.makedirs(OUTPUT_DIR, exist_ok=True)

# ========== BACA KONFIGURASI MODEL ========== 
config_path = os.path.join(MODEL_DIR, "model_config.json")
state_path  = os.path.join(MODEL_DIR, "best_model.pt")

assert os.path.isfile(config_path), f"Tidak menemukan model_config.json di {config_path}"
assert os.path.isfile(state_path),  f"Tidak menemukan best_model.pt di {state_path}"

with open(config_path, "r") as f:
    CFG = json.load(f)

EMB_DIM    = CFG["emb_dim"]
N_LAYERS   = CFG["n_layers"]
N_HEADS    = CFG["n_heads"]
FF_DIM     = CFG["ff_dim"]
MAX_LEN    = CFG["max_len"]
VOCAB_SIZE = CFG["vocab_size"]
PAD_ID     = CFG["pad_id"]
CLS_ID     = CFG["cls_id"]
LABEL_MAP  = CFG["label_map"]  # dict nama->id seperti {'N':0,'L':1,'R':2,'V':3,'Q':4}

# urutan nama kelas untuk konsistensi tampilan & confusion_matrix
CLS_NAMES = list(LABEL_MAP.keys())
NUM_CLASSES = len(LABEL_MAP)

# ========== UTIL: ENCODING SAMA DENGAN TRAIN ==========
def signal_to_ids(sig: np.ndarray):
    sig = np.asarray(sig).astype(np.float32)
    # Normalisasi ke [0,255] sesuai train
    norm = ((sig - sig.min()) / (sig.ptp() + EPS) * 255).astype(np.int64)
    ids  = np.concatenate(([CLS_ID], norm))[:MAX_LEN]
    mask = np.ones_like(ids, dtype=np.int64)
    if ids.shape[0] < MAX_LEN:
        pad_len = MAX_LEN - ids.shape[0]
        ids  = np.concatenate((ids,  np.full(pad_len, PAD_ID, dtype=np.int64)))
        mask = np.concatenate((mask, np.zeros(pad_len, dtype=np.int64)))
    return ids, mask

# ========== DEFINISI MODEL HARUS IDENTIK ==========
class EncoderOnlyClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.token_emb = nn.Embedding(VOCAB_SIZE, EMB_DIM, padding_idx=PAD_ID)
        self.pos_emb   = nn.Parameter(torch.zeros(1, MAX_LEN, EMB_DIM))
        enc_layer = nn.TransformerEncoderLayer(
            d_model=EMB_DIM, nhead=N_HEADS, dim_feedforward=FF_DIM,
            dropout=0.1, batch_first=True
        )
        self.encoder  = nn.TransformerEncoder(enc_layer, num_layers=N_LAYERS)
        self.fc       = nn.Linear(EMB_DIM, NUM_CLASSES)
        # Positional embedding inisialisasi spt training
        nn.init.normal_(self.pos_emb, std=0.02)

    def forward(self, input_ids, attention_mask):
        x = self.token_emb(input_ids) + self.pos_emb
        x = self.encoder(x, src_key_padding_mask=~attention_mask.bool())
        x = (x * attention_mask.unsqueeze(-1)).sum(1) / attention_mask.sum(1, keepdim=True).clamp(min=1e-9)
        return self.fc(x)

# ========== MUAT MODEL ==========
model = EncoderOnlyClassifier().to(DEVICE)
state = torch.load(state_path, map_location=DEVICE)
model.load_state_dict(state)
model.eval()

# ========== SIAPKAN DATA UJI ==========
test_files, test_labels = [], []
for cls_name, cls_id in LABEL_MAP.items():
    pattern = os.path.join(TEST_DIR, cls_name, "*.npy")
    files = sorted(glob.glob(pattern))
    test_files.extend(files)
    test_labels.extend([cls_id] * len(files))

test_files = np.array(test_files)
test_labels = np.array(test_labels, dtype=np.int64)

assert len(test_files) > 0, f"Tidak ada berkas .npy ditemukan di {TEST_DIR} (cek struktur TEST_DIR/<kelas>/*.npy)."

# Pre-encode
all_ids, all_mask = [], []
for f in test_files:
    sig = np.load(f)
    ids, msk = signal_to_ids(sig)
    all_ids.append(ids); all_mask.append(msk)

all_ids  = torch.tensor(np.stack(all_ids),  dtype=torch.long)
all_mask = torch.tensor(np.stack(all_mask), dtype=torch.long)
y_true_t = torch.tensor(test_labels,       dtype=torch.long)

test_ds = TensorDataset(all_ids, all_mask, y_true_t)
test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False)

# ========== INFERENSI ==========
logits_list, y_true_list = [], []
with torch.no_grad():
    for ids, msk, lbl in test_loader:
        ids, msk = ids.to(DEVICE), msk.to(DEVICE)
        out = model(ids, msk)
        logits_list.append(out.cpu())
        y_true_list.append(lbl)

logits = torch.cat(logits_list, dim=0)
y_true = torch.cat(y_true_list, dim=0).numpy()
y_pred = logits.argmax(dim=1).numpy()

# ========== METRIK GLOBAL ==========
acc  = accuracy_score(y_true, y_pred)
f1_m = f1_score(y_true, y_pred, average="macro")
f1_w = f1_score(y_true, y_pred, average="weighted")

print(f"[TEST] Accuracy: {acc:.4f} | F1-macro: {f1_m:.4f} | F1-weighted: {f1_w:.4f}")

# ========== CONFUSION MATRIX ==========
labels_order = list(range(NUM_CLASSES))  # pastikan urutan konsisten dengan CLS_NAMES
cm = confusion_matrix(y_true, y_pred, labels=labels_order)

# Simpan numerik CM
pd.DataFrame(cm, index=CLS_NAMES, columns=CLS_NAMES).to_csv(os.path.join(OUTPUT_DIR, "confusion_matrix_numeric.csv"))

# Plot CM (angka putih pada diagonal, hitam di luar diagonal)
plt.figure(figsize=(7,6))
plt.imshow(cm, cmap="Blues")
plt.title("Confusion Matrix - TEST SET (Encoder)", pad=12)
plt.xticks(range(NUM_CLASSES), CLS_NAMES)
plt.yticks(range(NUM_CLASSES), CLS_NAMES)
for r in range(NUM_CLASSES):
    for c in range(NUM_CLASSES):
        color = "white" if r == c else "black"
        plt.text(c, r, str(cm[r, c]), ha="center", va="center", fontsize=10, color=color)
plt.xlabel("Predicted"); plt.ylabel("True")
plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, "confmat_test.png"), dpi=300)
plt.close()

# (Opsional) Confusion Matrix ter-normalisasi baris
row_sums = cm.sum(axis=1, keepdims=True) + EPS
cm_norm = cm / row_sums
plt.figure(figsize=(7,6))
plt.imshow(cm_norm, cmap="Blues")
plt.title("Normalized Confusion Matrix - TEST SET (Encoder)", pad=12)
plt.xticks(range(NUM_CLASSES), CLS_NAMES)
plt.yticks(range(NUM_CLASSES), CLS_NAMES)
for r in range(NUM_CLASSES):
    for c in range(NUM_CLASSES):
        val = cm_norm[r, c]
        txt = f"{val:.2f}"
        color = "white" if (r == c and val > 0.5) else "black"
        plt.text(c, r, txt, ha="center", va="center", fontsize=9, color=color)
plt.xlabel("Predicted"); plt.ylabel("True")
plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, "confmat_test_normalized.png"), dpi=300)
plt.close()

# ========== METRIK PER-KELAS ==========
rows = []
total = cm.sum()
for i, cls in enumerate(CLS_NAMES):
    TP = cm[i, i]
    FN = cm[i, :].sum() - TP
    FP = cm[:, i].sum() - TP
    TN = total - TP - FN - FP

    ACC  = (TP + TN) / (total + EPS)
    REC  = TP / (TP + FN + EPS)            # Sensitivitas/Recall
    SPEC = TN / (TN + FP + EPS)            # Spesifisitas
    F1   = (2 * TP) / (2 * TP + FP + FN + EPS)

    rows.append({
        "kelas": cls,
        "akurasi": round(ACC, 6),
        "recall": round(REC, 6),
        "spesifisitas": round(SPEC, 6),
        "f1": round(F1, 6),
        "support": int(cm[i, :].sum())
    })

per_class_df = pd.DataFrame(rows)
per_class_df.to_csv(os.path.join(OUTPUT_DIR, "metrics_per_class_test.csv"), index=False)

# ========== RINGKASAN GLOBAL ==========
summary = {
    "accuracy": round(float(acc), 6),
    "f1_macro": round(float(f1_m), 6),
    "f1_weighted": round(float(f1_w), 6),
    "num_samples": int(len(y_true)),
    "classes": CLS_NAMES
}
with open(os.path.join(OUTPUT_DIR, "metrics_summary_test.json"), "w") as f:
    json.dump(summary, f, indent=2)

print("\n[TEST] Evaluasi selesai.")
print(f"- Confusion matrix: {os.path.join(OUTPUT_DIR, 'confmat_test.png')}")
print(f("- CM normalized: {os.path.join(OUTPUT_DIR, 'confmat_test_normalized.png')}") )
print(f"- Per-kelas CSV : {os.path.join(OUTPUT_DIR, 'metrics_per_class_test.csv')}")
print(f"- Ringkasan JSON: {os.path.join(OUTPUT_DIR, 'metrics_summary_test.json')}")


  output = torch._nested_tensor_from_mask(


[TEST] Accuracy: 0.8546 | F1-macro: 0.8555 | F1-weighted: 0.8555

[TEST] Evaluasi selesai.
- Confusion matrix: D:\KULIAH\TELKOM_UNIVERSITY\SEMESTER_8\TA\TA_SKRIPSI_GUE\HASIL_TRAIN\BEATS\HASIL_Encoder_Beats_tuning\HASIL_Encoder_Beats_tuning\fold3\TEST_EVAL\confmat_test.png


TypeError: '_io.TextIOWrapper' object is not callable