<a href="https://colab.research.google.com/github/eeuunn/NeuroSync/blob/main/Emotional_Persona.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [64]:
import os
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

In [65]:
!curl -O https://raw.githubusercontent.com/monologg/KoBERT-Transformers/master/kobert_transformers/tokenization_kobert.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 10894  100 10894    0     0  48984      0 --:--:-- --:--:-- --:--:-- 49072


In [66]:
!pip install kobert-transformers



In [67]:
import json, itertools, random, pathlib
from typing import List, Dict, Any
import torch, torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from kobert_transformers import get_tokenizer, get_kobert_model
from transformers import get_linear_schedule_with_warmup
from tqdm.auto import tqdm

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

Device: cuda


In [68]:
from tokenization_kobert import KoBertTokenizer
Tokenizer = KoBertTokenizer.from_pretrained('monologg/kobert')

In [69]:
class CFG:
    model_name      = "bert-base-multilingual-cased"
    max_length      = 256          # 텍스트 토큰 최대
    emotion_dim     = 4            # 불안·슬픔·분노·기쁨
    emotion_hidden  = 256
    batch_size      = 32
    num_epochs      = 6
    lr_bert         = 1e-5
    lr_other        = 5e-5
    warmup_ratio    = 0.1
    drop_emotion_p  = 0.1          # 감정 드롭 비율

# 고정 길이 (프레임 수)
T_Q, T_C, T_R, N_CH = 30, 8, 30, 5

In [70]:
class QADataset(Dataset):
    def __init__(self, paths):
        # ── NEW: 경로 리스트 처리 ─────────────────────────
        if isinstance(paths, (str, pathlib.Path)):
            paths = [paths]
        self.data = []
        for p in paths:
            with open(p, encoding="utf-8") as f:
                self.data += [json.loads(line) for line in f]
        # ────────────────────────────────────────────────

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        r = self.data[idx]
        qid       = r.get("qid", f"idx{idx:06d}")
        question  = r["question_text"]
        choices   = r["choices"]       # len==5 고정

        # 감정 필드가 없으면 zero-tensor 주입
        if "emotion_series" in r:
            q_emo = torch.tensor(r["emotion_series"]["question"],  dtype=torch.float)
            c_emo = torch.tensor(r["emotion_series"]["choices"],   dtype=torch.float)
        else:
            q_emo = torch.zeros(T_Q, CFG.emotion_dim)        # [T_Q=60,  D=8]
            c_emo = torch.zeros(N_CH, T_C, CFG.emotion_dim)     # [N_CH=5,  T_C=15, D=8]

        ans_idx   = r.get("answer_index", -1)  # 테스트 세트용 -1 허용

        return qid, question, choices, q_emo, c_emo, ans_idx

In [71]:
def build_collate_fn(tokenizer, *, max_len=256, N_CH=5):
    def collate(batch):
        qids, q_texts, choices_all, _, c_emo_all, ans_all = zip(*batch)
        B = len(batch)

        # 1) 질문 + 선택지 플랫
        flat = list(q_texts)
        for chs in choices_all:
            flat.extend(chs)              # 5개씩 추가

        # 2) 한꺼번에 토크나이즈 → enc: dict of [B*6, L]
        enc = tokenizer(flat, padding="max_length",
                        truncation=True, max_length=max_len,
                        return_tensors="pt")

        # 3) 선택지별로 묶기
        tpcs = []
        for j in range(N_CH):
            idxs = torch.arange(B) * N_CH + j + B   # +B: 질문 부분 지나감
            tpcs.append({k: v[idxs] for k, v in enc.items()})
            # 이제 v[idxs]  →  [B, L]

        c_emo = torch.stack(c_emo_all)   # [B,5,15,8]
        ans   = torch.tensor(ans_all)    # [B]

        return list(qids), tpcs, c_emo, ans
    return collate

In [72]:
class EmotionEncoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.gru = nn.GRU(CFG.emotion_dim, CFG.emotion_hidden, batch_first=True)
    def forward(self, seq):        # [B,T,D]
        _, h = self.gru(seq)
        return h.squeeze(0)        # [B,H_e]

In [73]:
class DecisionModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.bert = get_kobert_model()
        self.emo  = EmotionEncoder()
        self.fc   = nn.Sequential(
            nn.Linear(self.bert.config.hidden_size + CFG.emotion_hidden, 768),
            nn.GELU(), nn.Dropout(0.2),
            nn.Linear(768, 1)
        )
    def forward(self, tpcs, c_emo, use_emotion=True):
        B = c_emo.size(0)
        outs = []
        for i in range(N_CH):
            txt = self.bert(**{k:v.to(device) for k,v in tpcs[i].items()}).pooler_output
            emo = self.emo(c_emo[:,i].to(device)) if use_emotion else \
                  torch.zeros(B, CFG.emotion_hidden, device=device)
            outs.append(self.fc(torch.cat([txt, emo], 1)))
        return torch.cat(outs, 1)   # [B,5]

In [106]:
import torch.nn.functional as F

@torch.no_grad()
def evaluate(model, loader):
    model.eval()
    hit = tot = 0

    # --- 디버깅을 위해 추가 ---
    print("===== 디버깅 시작: 개선된 evaluate 함수 내부 값 확인 =====")
    printed_once = False
    # -------------------------

    for qids, tpcs, _, ans in loader: # c_emo는 무시
        # 배치 크기에 맞는 더미 텐서 생성
        batch_size = ans.size(0)
        dummy_emo = torch.zeros(batch_size, N_CH, T_C, CFG.emotion_dim, device=device)

        # 더미 텐서를 사용해 모델 예측
        logits = model(tpcs, dummy_emo, use_emotion=False)

        # --- 디버깅을 위해 추가 ---
        if not printed_once:
            preds = logits.argmax(-1).cpu()
            print(f"모델 예측값 (preds): {preds}")
            print(f"실제 정답 (ans):   {ans}")
            print(f"예측과 정답 비교:   {preds == ans}")
            printed_once = True
        # -------------------------

        hit   += (logits.argmax(-1).cpu() == ans).sum().item()
        tot   += ans.size(0)

    print("======================== 디버깅 종료 ========================")
    return hit / tot

TEMPERATURE = 2.0      # KL softening
ALPHA       = 0.7      # KD 가중치 (0~1)

def train_epoch(model, loader, opt, sched,
                *, ALPHA=0.5, TEMPERATURE=2.0, device="cuda"):
    model.train()
    ce = nn.CrossEntropyLoss()

    for batch_idx, (qids, tpcs, c_emo, ans) in enumerate(loader):

        # --- 데이터 이동
        ans   = ans.to(device)
        c_emo = c_emo.to(device)

        # --- 1) Teacher (감정 사용) -----------------------------
        logits_T = model(tpcs, c_emo, use_emotion=True)    # [B,5]

        # --- 2) Student (감정 OFF) -----------------------------
        logits_S = model(tpcs, c_emo, use_emotion=False)   # [B,5]

        # --- 3) 손실 계산 --------------------------------------
        loss_ce = ce(logits_S, ans)                        # CE

        kd_loss = F.kl_div(
            F.log_softmax(logits_S / TEMPERATURE, dim=-1),
            F.softmax(    logits_T / TEMPERATURE, dim=-1),
            reduction="batchmean"
        ) * (TEMPERATURE ** 2)

        loss = (1 - ALPHA) * loss_ce + ALPHA * kd_loss

        # --- 🔥 디버깅을 위해 추가할 코드 🔥 ---
        if batch_idx % 20 == 0:  # 20 배치마다 한 번씩 출력
            print(f"  [Train] Epoch {epoch}, Batch {batch_idx}/{len(loader)}, Loss: {loss.item():.4f}")
        # ------------------------------------

        # --- 4) 역전파 & 옵티마이저 ----------------------------
        opt.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        opt.step()
        sched.step()

In [118]:
collate_fn  = build_collate_fn(Tokenizer)

train_ds = QADataset(["set4_Anger_Disgust_Fear_Joy.jsonl", "set2_Anger_Disgust_Fear_Joy.jsonl", "set3_Anger_Disgust_Fear_Joy.jsonl", "set5_Anger_Disgust_Fear_Joy.jsonl"])
val_ds   = QADataset("set1_Anger_Disgust_Fear_Joy.jsonl")

train_ld = DataLoader(train_ds, batch_size=CFG.batch_size, shuffle=True,
                      collate_fn=collate_fn)
val_ld   = DataLoader(val_ds,   batch_size=CFG.batch_size, shuffle=False,
                      collate_fn=collate_fn)

In [119]:
model = DecisionModel().to(device)
bert_p  = [p for n,p in model.named_parameters() if n.startswith("bert.")]
other_p = [p for n,p in model.named_parameters() if not n.startswith("bert.")]

opt = torch.optim.AdamW(
    [{"params": bert_p,  "lr": CFG.lr_bert},
     {"params": other_p, "lr": CFG.lr_other}], weight_decay=0.01)

steps = len(train_ld)*CFG.num_epochs
sch   = get_linear_schedule_with_warmup(opt, int(CFG.warmup_ratio*steps), steps)

In [120]:
best = 0
for epoch in range(1, CFG.num_epochs+1):
    print(f"\nEpoch {epoch}/{CFG.num_epochs}")
    train_epoch(model, train_ld, opt, sch)
    acc = evaluate(model, val_ld)
    print("val acc =", acc)
    if acc > best:
        best = acc
        torch.save(model.state_dict(), "best.pt")
        print("✅ best saved")


Epoch 1/6
  [Train] Epoch 1, Batch 0/1, Loss: 0.8089
===== 디버깅 시작: 개선된 evaluate 함수 내부 값 확인 =====
모델 예측값 (preds): tensor([0])
실제 정답 (ans):   tensor([0])
예측과 정답 비교:   tensor([True])
val acc = 1.0
✅ best saved

Epoch 2/6
  [Train] Epoch 2, Batch 0/1, Loss: 0.7811
===== 디버깅 시작: 개선된 evaluate 함수 내부 값 확인 =====
모델 예측값 (preds): tensor([0])
실제 정답 (ans):   tensor([0])
예측과 정답 비교:   tensor([True])
val acc = 1.0

Epoch 3/6
  [Train] Epoch 3, Batch 0/1, Loss: 0.8267
===== 디버깅 시작: 개선된 evaluate 함수 내부 값 확인 =====
모델 예측값 (preds): tensor([0])
실제 정답 (ans):   tensor([0])
예측과 정답 비교:   tensor([True])
val acc = 1.0

Epoch 4/6
  [Train] Epoch 4, Batch 0/1, Loss: 0.7729
===== 디버깅 시작: 개선된 evaluate 함수 내부 값 확인 =====
모델 예측값 (preds): tensor([0])
실제 정답 (ans):   tensor([0])
예측과 정답 비교:   tensor([True])
val acc = 1.0

Epoch 5/6
  [Train] Epoch 5, Batch 0/1, Loss: 0.7496
===== 디버깅 시작: 개선된 evaluate 함수 내부 값 확인 =====
모델 예측값 (preds): tensor([0])
실제 정답 (ans):   tensor([0])
예측과 정답 비교:   tensor([True])
val acc = 1.0

Epoch 6/6
 

In [121]:
model.load_state_dict(torch.load("best.pt", map_location=device))
model.eval()

def predict(question:str, choices:List[str]):
    tok = Tokenizer
    pair = [f"{question} [SEP] {c}" for c in choices]
    enc = tok(pair, padding=True, truncation=True,
              max_length=CFG.max_length, return_tensors="pt")
    tpcs = [{k:v[i:i+1].to(device) for k,v in enc.items()} for i in range(N_CH)]
    dummy = torch.zeros(1, N_CH, T_C, CFG.emotion_dim, device=device)
    with torch.no_grad():
        out = model(tpcs, dummy, use_emotion=False).softmax(-1).squeeze()
    return int(out.argmax()), out.tolist()

idx, probs = predict(
    "유람선이 침몰했다. 누구를 먼저 구할까?",
    ["A","B","C","D","E"])
print("Predicted:", idx, "| Probs:", probs)

Predicted: 0 | Probs: [0.20132966339588165, 0.2008781135082245, 0.19965198636054993, 0.1993076056241989, 0.19883258640766144]
