In [2]:
# analyze_keyword_weighting_fixed.py
# 목적: 클래스별 키워드 가중치(loss)에 따른 성능 향상 여부 검증
# 비교: (A) 기본 CrossEntropy, (B) 키워드가중 CrossEntropy
# 지표: weighted F1
# 입력: data/train.csv  (columns: conversation, class)

import os, json, re
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import List, Dict, Tuple

import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report, f1_score, confusion_matrix
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import (
    AutoTokenizer, AutoModelForSequenceClassification,
    get_linear_schedule_with_warmup, set_seed
)

# -----------------------
# 설정
# -----------------------
@dataclass
class CFG:
    model_name: str = "beomi/KcBERT-base"  # 필요시 교체
    data_path: str = "data/train.csv"
    text_col: str = "conversation"
    label_col: str = "class"
    max_len: int = 512           # 상한. 모델 max_position_embeddings와 min 적용
    batch_size: int = 16
    lr: float = 2e-5
    epochs: int = 4
    warmup_ratio: float = 0.1
    seed: int = 2025
    kfold: int = 3
    out_dir: str = "kw_weighting_results"
    # 키워드 가중치 세팅
    token_weight: float = 1.25   # 발견 시 배 가중
    min_df: int = 3              # TF-IDF 최소 등장 문서 수
    topk_per_class: int = 50     # 클래스별 상위 토큰 수

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
os.makedirs(CFG.out_dir, exist_ok=True)
set_seed(CFG.seed)

# -----------------------
# 데이터 / 라벨 매핑
# -----------------------
def load_data(path: str) -> pd.DataFrame:
    df = pd.read_csv(path)
    assert CFG.text_col in df.columns and CFG.label_col in df.columns
    df = df[[CFG.text_col, CFG.label_col]].dropna().reset_index(drop=True)
    df[CFG.text_col] = df[CFG.text_col].astype(str)
    df[CFG.label_col] = df[CFG.label_col].astype(str)
    return df

def build_label_maps(labels: List[str]) -> Tuple[List[str], Dict[str,int], Dict[int,str]]:
    uniq = sorted(list(set(labels)))
    lid = {l:i for i,l in enumerate(uniq)}
    inv = {i:l for l,i in lid.items()}
    return uniq, lid, inv

# -----------------------
# 데이터셋 / 콜레이트
# -----------------------
class RawDataset(Dataset):
    def __init__(self, texts, labels=None):
        self.texts, self.labels = texts, labels
    def __len__(self): return len(self.texts)
    def __getitem__(self, i):
        y = -1 if self.labels is None else self.labels[i]
        return self.texts[i], y

def make_collate(tokenizer, max_len: int):
    def _fn(batch):
        xs, ys = zip(*batch)
        enc = tokenizer(
            list(xs),
            padding="max_length",
            truncation=True,
            max_length=max_len,
            return_tensors="pt",
        )
        return {
            "input_ids": enc["input_ids"],
            "attention_mask": enc["attention_mask"]
        }, torch.tensor(ys, dtype=torch.long)
    return _fn

# -----------------------
# 키워드 사전 생성 (TF-IDF, 토큰 단위)
# -----------------------
def extract_class_keywords(df: pd.DataFrame, labels_order: List[str], tokenizer) -> Dict[int, List[str]]:
    texts_tok = []
    for t in df[CFG.text_col].tolist():
        toks = tokenizer.tokenize(t)[:CFG.max_len - 2]
        texts_tok.append(" ".join(toks))

    tfidf = TfidfVectorizer(min_df=CFG.min_df, token_pattern=r"[^ ]+")
    X = tfidf.fit_transform(texts_tok)
    vocab = np.array(tfidf.get_feature_names_out())

    kw = {}
    y = df[CFG.label_col].values
    for li, lab in enumerate(labels_order):
        mask = (y == lab)
        if mask.sum() == 0:
            kw[li] = []
            continue
        mean_tfidf = X[mask].mean(axis=0).A1
        topk_idx = np.argsort(-mean_tfidf)[:CFG.topk_per_class]
        kw_tokens = vocab[topk_idx].tolist()
        kw[li] = kw_tokens
    return kw

# -----------------------
# 손실 함수
# -----------------------
def loss_ce(logits, labels):
    return F.cross_entropy(logits, labels)

def loss_ce_token_weighted(logits, labels, input_ids, tokenizer, kw_map: Dict[int, List[str]], weight: float):
    base = F.cross_entropy(logits, labels, reduction="none")
    # 클래스별 키토큰 ID 집합
    cls_token_ids = {
        c: set(tokenizer.convert_tokens_to_ids(tok) for tok in toks if tok in tokenizer.vocab)
        for c, toks in kw_map.items()
    }
    B = input_ids.size(0)
    mult = torch.ones(B, device=logits.device)
    for cls_idx, idset in cls_token_ids.items():
        if not idset:
            continue
        id_tensor = torch.tensor(list(idset), device=input_ids.device)
        has_key = torch.isin(input_ids, id_tensor).any(dim=1)
        mask = (labels == cls_idx) & has_key
        mult = torch.where(mask, mult * weight, mult)
    return (base * mult).mean()

# -----------------------
# 학습/평가 루틴
# -----------------------
def train_one(model, loader, optimizer, scheduler, loss_mode, tokenizer, kw_map=None):
    model.train()
    tot = 0.0
    for enc, y in loader:
        enc = {k:v.to(device) for k,v in enc.items()}
        y = y.to(device)
        out = model(**enc)
        if loss_mode == "weighted":
            loss = loss_ce_token_weighted(out.logits, y, enc["input_ids"], tokenizer, kw_map, CFG.token_weight)
        else:
            loss = loss_ce(out.logits, y)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad(set_to_none=True)
        tot += loss.item()
    return tot / max(1, len(loader))

@torch.no_grad()
def evaluate(model, loader, label_names):
    model.eval()
    ys, ps = [], []
    for enc, y in loader:
        enc = {k:v.to(device) for k,v in enc.items()}
        y = y.to(device)
        logits = model(**enc).logits
        pred = logits.argmax(1)
        ys.append(y.cpu()); ps.append(pred.cpu())
    y_true = torch.cat(ys).numpy()
    y_pred = torch.cat(ps).numpy()
    # weighted F1
    overall_f1 = f1_score(y_true, y_pred, average='weighted')
    rep = classification_report(y_true, y_pred, target_names=label_names, digits=4, output_dict=True)
    cm = confusion_matrix(y_true, y_pred)
    return overall_f1, rep, cm

def run_fold(X_tr, y_tr, X_va, y_va, label_names, loss_mode, global_tokenizer, kw_map):
    # 모델 로드
    model = AutoModelForSequenceClassification.from_pretrained(
        CFG.model_name, num_labels=len(label_names)
    ).to(device)

    # 모델 최대 길이와 동기화
    model_max = int(getattr(model.config, "max_position_embeddings", CFG.max_len))
    use_len = min(CFG.max_len, model_max)
    # 토크나이저 길이도 맞춤
    tokenizer = global_tokenizer
    tokenizer.model_max_length = use_len

    # 데이터로더
    train_ds = RawDataset(X_tr, y_tr)
    val_ds   = RawDataset(X_va, y_va)
    collate  = make_collate(tokenizer, use_len)
    tr_loader = DataLoader(train_ds, batch_size=CFG.batch_size, shuffle=True,  collate_fn=collate)
    va_loader = DataLoader(val_ds,   batch_size=CFG.batch_size, shuffle=False, collate_fn=collate)

    steps = len(tr_loader) * CFG.epochs
    opt = torch.optim.AdamW(model.parameters(), lr=CFG.lr, weight_decay=0.01)
    sch = get_linear_schedule_with_warmup(opt, int(CFG.warmup_ratio * steps), steps)

    best_f1, best_rep, best_cm = -1.0, None, None
    for ep in range(1, CFG.epochs + 1):
        tr_loss = train_one(model, tr_loader, opt, sch, loss_mode, tokenizer, kw_map)
        f1, rep, cm = evaluate(model, va_loader, label_names)
        print(f"[{loss_mode}] ep{ep:02d} loss={tr_loss:.4f} f1={f1:.4f} (use_len={use_len})")
        if f1 > best_f1:
            best_f1, best_rep, best_cm = f1, rep, cm
    return best_f1, best_rep, best_cm

# -----------------------
# 메인
# -----------------------
def main():
    df = load_data(CFG.data_path)
    label_names, lid, inv = build_label_maps(df[CFG.label_col].tolist())
    y = np.array([lid[v] for v in df[CFG.label_col].tolist()])
    X = df[CFG.text_col].tolist()

    # 토크나이저(공유). 길이는 fold에서 모델과 동기화.
    global_tok = AutoTokenizer.from_pretrained(CFG.model_name, use_fast=True)

    # 클래스별 키워드 자동 추출 (토큰 단위)
    # 주의: tokenizer.tokenize 결과를 사용하므로 모델 교체시 재실행 권장
    kw_map = extract_class_keywords(df, label_names, global_tok)
    with open(os.path.join(CFG.out_dir, "class_keywords.json"), "w", encoding="utf-8") as f:
        json.dump({label_names[k]: v for k,v in kw_map.items()}, f, ensure_ascii=False, indent=2)

    kf = StratifiedKFold(n_splits=CFG.kfold, shuffle=True, random_state=CFG.seed)
    results = {"baseline": [], "weighted": []}

    fold_id = 0
    for tr_idx, va_idx in kf.split(np.arange(len(X)), y):
        fold_id += 1
        print(f"\n===== Fold {fold_id}/{CFG.kfold} =====")
        X_tr = [X[i] for i in tr_idx]; y_tr = y[tr_idx].tolist()
        X_va = [X[i] for i in va_idx]; y_va = y[va_idx].tolist()

        # (A) 기본 CE
        f1_a, rep_a, cm_a = run_fold(X_tr, y_tr, X_va, y_va, label_names,
                                     loss_mode="baseline", global_tokenizer=global_tok, kw_map=None)
        # (B) 키워드가중 CE
        f1_b, rep_b, cm_b = run_fold(X_tr, y_tr, X_va, y_va, label_names,
                                     loss_mode="weighted", global_tokenizer=global_tok, kw_map=kw_map)

        results["baseline"].append({"f1": f1_a, "report": rep_a, "cm": cm_a.tolist()})
        results["weighted"].append({"f1": f1_b, "report": rep_b, "cm": cm_b.tolist()})

    # 요약
    def summarize(key):
        vals = [r["f1"] for r in results[key]]
        return {"f1_mean": float(np.mean(vals)), "f1_std": float(np.std(vals))}
    summ = {k: summarize(k) for k in results.keys()}

    print("\n=== Summary (weighted F1) ===")
    print("Baseline :", summ["baseline"])
    print("Weighted :", summ["weighted"])

    with open(os.path.join(CFG.out_dir, "summary.json"), "w", encoding="utf-8") as f:
        json.dump({"summary": summ, "details": results}, f, ensure_ascii=False, indent=2)

    # 각 fold 차이
    b = np.array([r["f1"] for r in results["baseline"]])
    w = np.array([r["f1"] for r in results["weighted"]])
    diff = w - b
    print(f"Per-fold ΔF1 (weighted - baseline): {diff.tolist()}")
    print(f"Δ mean={diff.mean():.4f}")

if __name__ == "__main__":
    main()


Token indices sequence length is longer than the specified maximum sequence length for this model (357 > 300). Running this sequence through the model will result in indexing errors



===== Fold 1/3 =====


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at beomi/KcBERT-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


[baseline] ep01 loss=0.7468 f1=0.9002 (use_len=300)
[baseline] ep02 loss=0.2051 f1=0.9103 (use_len=300)
[baseline] ep03 loss=0.0818 f1=0.9180 (use_len=300)
[baseline] ep04 loss=0.0373 f1=0.9329 (use_len=300)


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at beomi/KcBERT-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


KeyboardInterrupt: 