In [1]:
import os
import re
import argparse
from typing import List, Tuple, Dict

import torch
import pandas as pd
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel
from contextlib import contextmanager

from sklearn.metrics import (
    accuracy_score,
    precision_recall_fscore_support,
    confusion_matrix,
    classification_report,
)

from tqdm.auto import tqdm

# =====================================================
# 1. 기본 설정
# =====================================================
BASE_MODEL  = os.getenv("BASE_MODEL", "LGAI-EXAONE/EXAONE-4.0-1.2B")
ADAPTER_DIR = "./korsmishing-qlora-smishing-expl_확장" 
CACHE_DIR   = r"C:\hf_models\_cache"
HF_TOKEN    = os.getenv("HUGGINGFACE_HUB_TOKEN", "").strip() or None

SEED = 42
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

# 추론 설정 (학습 때와 동일하게 맞춤)
GEN_MAX_NEW_TOKENS = 200 
GEN_TEMPERATURE    = 0.1 
GEN_REP_PENALTY    = 1.1 
NO_REPEAT_NGRAM    = 4

# =====================================================
# 2. 유틸: 프롬프트 및 파싱
# =====================================================
@contextmanager
def left_pad(tok):
    old = tok.padding_side
    tok.padding_side = "left"
    try:
        yield
    finally:
        tok.padding_side = old

def build_prompt(text: str) -> str:
    # 시스템 메시지
    system_msg = "너는 문자를 분석하여 스미싱 여부를 판단하고, 그 이유를 설명하는 보안 전문가야."
    
    # 유저 메시지 (간결한 답변 유도)
    user_msg = (
        "다음 $$문자$$를 보고 먼저 $$스미싱 여부$$를 판단한 뒤, 그 이유를 한두 문장으로 제시하세요.\n"
        "형식:\n"
        "$$스미싱 여부$$: (스미싱/정상)\n"
        "$$설명$$: (간단한 설명)\n\n"
        f"$$문자$$\n{text}"
        "<답변>\n"
    )
    
    # EXAONE 공식 포맷 조립
    return (
        f"[|system|]{system_msg}[|endofturn|]\n"
        f"[|user|]{user_msg}[|endofturn|]\n"
        "[|assistant|]"
    )

def normalize_for_label(text: str) -> str:
    return text.replace(" ", "").replace("\n", "")

def parse_label_and_expl(text: str) -> Tuple[str, str]:
    """
    강건한 파싱 로직 적용:
    - 오타(스미링, 스미encing 등) 대응
    - 깨진 태그 대응
    """
    # 1. 특수 토큰 제거
    text = text.replace("[|endofturn|]", "").replace("</s>", "").strip()
    
    label = None
    
    # ------------------------------------------------------------------
    # [1단계] 강력한 정규식 탐지 (오타 포함 패턴 매칭)
    # ------------------------------------------------------------------
    # "$$스미"로 시작해서 "여부$$"로 끝나는 구간 사이에 무엇이 있든 잡아냄
    smishing_pattern = r"\$\$스미.{0,10}여부\$\$[:：]?\s*(스미싱|정상)"
    
    m = re.search(smishing_pattern, text, flags=re.IGNORECASE)
    if m:
        label = m.group(1)
        
    # [2단계] 태그가 아예 깨졌을 때 (예: 스미싱여부: 정상)
    if label is None:
        clean_text = text.replace(" ", "")
        # 스미싱/스미신/스미링 등 다양한 오타 허용
        m2 = re.search(r"스미[싱신링닝핑a-z]*여부[:：]?\s*(스미싱|정상)", clean_text, flags=re.IGNORECASE)
        if m2:
            label = m2.group(1)

    # [3단계] 최후의 수단: 문장 내 키워드 우선순위 확인
    if label is None:
        head = text[:50] # 앞부분 50자만 확인
        has_smishing = "스미싱" in head
        has_normal = "정상" in head
        
        if has_smishing and not has_normal:
            label = "스미싱"
        elif has_normal and not has_smishing:
            label = "정상"
        elif has_smishing and has_normal:
            # 둘 다 있으면 뒤에 나온 걸 선택 (보통 '스미싱 여부: 정상' 형식이므로)
            label = "정상" if head.rfind("정상") > head.rfind("스미싱") else "스미싱"
        else:
            label = "판단불가" # 끝내 못 찾음

    # ------------------------------------------------------------------
    # [설명 파싱] $$설명$$ 뒤에 ``` 같은 거 붙어도 처리
    # ------------------------------------------------------------------
    exp = text
    m_exp = re.search(r"\$\$설명.{0,5}[:：]?\s*(.+)", text, flags=re.S)
    
    if m_exp:
        exp = m_exp.group(1).strip()
        # 불필요한 기호 제거
        exp = re.sub(r"^[```:：\s]+", "", exp)
    else:
        # 태그 없으면 원본 반환 (혹은 추가 정제 가능)
        pass

    return label, exp

# =====================================================
# 3. 배치 추론 함수
# =====================================================
@torch.no_grad()
def batch_generate_raw(texts: List[str], model, tokenizer, batch_size: int = 8) -> List[str]:
    results = []
    
    for i in tqdm(range(0, len(texts), batch_size), desc="Generating", unit="batch"):
        chunk = texts[i:i + batch_size]
        prompts = [build_prompt(t) for t in chunk]
        
        with left_pad(tokenizer):
            enc = tokenizer(
                prompts,
                return_tensors="pt",
                truncation=True,
                max_length=1024, 
                padding=True,
            )
        enc.pop("token_type_ids", None)
        enc = {k: v.to(model.device) for k, v in enc.items()}

        out = model.generate(
            **enc,
            max_new_tokens=GEN_MAX_NEW_TOKENS,
            do_sample=False,
            temperature=GEN_TEMPERATURE,
            repetition_penalty=GEN_REP_PENALTY,
            no_repeat_ngram_size=NO_REPEAT_NGRAM,
            pad_token_id=tokenizer.pad_token_id,
            eos_token_id=tokenizer.eos_token_id,
            use_cache=True,
        )

        in_len = enc["input_ids"].shape[1]
        for j in range(out.size(0)):
            gen_text = tokenizer.decode(
                out[j][in_len:], skip_special_tokens=True
            ).strip()
            results.append(gen_text)

    return results

# =====================================================
# 4. 평가 실행 및 CSV 저장
# =====================================================
def evaluate_csv(
    csv_path: str,
    model, 
    tokenizer,
    text_col: str = "content",
    label_col: str = "label",
    batch_size: int = 8,
    encoding: str = "utf-8-sig",
):
    print(f"[INFO] Loading CSV from {csv_path}")
    try:
        if encoding:
            df = pd.read_csv(csv_path, encoding=encoding)
        else:
            df = pd.read_csv(csv_path)
    except Exception as e:
        print(f"[Error] CSV 로드 실패: {e}")
        return None, None

    # 컬럼 확인
    if text_col not in df.columns or label_col not in df.columns:
        print(f"[Error] CSV에 '{text_col}' 또는 '{label_col}' 컬럼이 없습니다.")
        return None, None

    texts   = df[text_col].astype(str).tolist()
    y_true  = df[label_col].astype(str).tolist()

    print(f"[INFO] Total samples: {len(df)}")
    print("[INFO] Running inference...")
    
    # 1. 추론
    raw_outputs = batch_generate_raw(texts, model, tokenizer, batch_size=batch_size)
    
    # 2. 파싱
    parsed = [parse_label_and_expl(o) for o in raw_outputs]
    y_pred, exp_pred = zip(*parsed)

    # 3. 지표 계산
    labels = ["스미싱", "정상"]
    
    # 라벨에 없는 값('판단불가' 등)은 통계에서 경고를 띄우지 않기 위해 처리 필요할 수 있음
    # 여기서는 sklearn이 알아서 처리하도록 둠 (Warning 뜰 수 있음)

    acc = accuracy_score(y_true, y_pred)
    precision, recall, f1, support = precision_recall_fscore_support(
        y_true, y_pred, labels=labels, average=None, zero_division=0
    )
    
    macro_f1 = precision_recall_fscore_support(y_true, y_pred, average="macro", zero_division=0)[2]

    print("\n========== Evaluation Results ==========")
    print(f"Accuracy: {acc:.4f}")
    print(f"Macro F1: {macro_f1:.4f}")
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, labels=labels, zero_division=0))
    print("========================================")

    # 4. 데이터 저장 준비
    metrics = {
        "Accuracy": acc,
        "Macro_F1": macro_f1,
        "F1_Smishing": f1[0] if len(f1) > 0 else 0,
        "F1_Normal": f1[1] if len(f1) > 1 else 0,
        "Precision_Smishing": precision[0] if len(precision) > 0 else 0,
        "Recall_Smishing": recall[0] if len(recall) > 0 else 0,
    }

    df_preds = pd.DataFrame({
        "text": texts,
        "label_true": y_true,
        "label_pred": list(y_pred),
        "explanation_pred": list(exp_pred),
        "raw_output": raw_outputs,
    })

    return metrics, df_preds

# =====================================================
# 5. 메인 함수 (Jupyter 에러 수정 버전)
# =====================================================
def main():
    parser = argparse.ArgumentParser()
    # --csv_path 기본값 설정
    parser.add_argument("--csv_path",       type=str, default="test.csv", help="Path to test CSV file")
    parser.add_argument("--text_col",       type=str, default="content")
    parser.add_argument("--label_col",      type=str, default="label")
    parser.add_argument("--batch_size",     type=int, default=8)
    parser.add_argument("--encoding",       type=str, default="utf-8-sig")
    parser.add_argument("--metrics_out",    type=str, default="metrics_result.csv")
    parser.add_argument("--preds_out",      type=str, default="predictions_result.csv")
    
    # 어댑터 사용 여부 스위치
    parser.add_argument("--no_adapter",     action="store_true", help="Use base model without LoRA adapter")

    # ★★★ [핵심 수정] parse_args() -> parse_known_args()로 변경 ★★★
    # 이렇게 하면 주피터가 넘기는 -f 인자를 'unknown' 변수로 받아내고 에러를 내지 않습니다.
    args, unknown = parser.parse_known_args()

    if unknown:
        print(f"[INFO] Jupyter 내부 인자 무시됨: {unknown}")

    # 디바이스 설정
    device = "cuda" if torch.cuda.is_available() else "cpu"
    dtype = torch.bfloat16 if (device=="cuda" and torch.cuda.get_device_capability(0)[0] >= 8) else torch.float32
    print(f"[INFO] Using device: {device}, dtype: {dtype}")

    # 토크나이저 로드
    tokenizer = AutoTokenizer.from_pretrained(
        BASE_MODEL, cache_dir=CACHE_DIR, token=HF_TOKEN, use_fast=True
    )
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "left"

    # 모델 로드
    print(f"[INFO] Loading Base Model: {BASE_MODEL}")
    base_model = AutoModelForCausalLM.from_pretrained(
        BASE_MODEL,
        cache_dir=CACHE_DIR,
        token=HF_TOKEN,
        device_map="auto" if device == "cuda" else None,
        torch_dtype=dtype,
    )

    # 스위치: LoRA 쓸지 말지 결정
    if args.no_adapter:
        print("▶▶▶ [Mode] Running with BASE MODEL ONLY (No Adapter) ◀◀◀")
        model = base_model
    else:
        print(f"▶▶▶ [Mode] Loading LoRA Adapter from: {ADAPTER_DIR} ◀◀◀")
        try:
            model = PeftModel.from_pretrained(base_model, ADAPTER_DIR)
        except Exception as e:
            print(f"[Error] 어댑터 로드 실패: {e}")
            print("어댑터 경로가 정확한지, 학습이 제대로 끝났는지 확인해주세요.")
            return
    
    model.eval()

    # 평가 수행
    metrics, df_preds = evaluate_csv(
        csv_path=args.csv_path,
        model=model,
        tokenizer=tokenizer,
        text_col=args.text_col,
        label_col=args.label_col,
        batch_size=args.batch_size,
        encoding=args.encoding,
    )

    if metrics is not None:
        # 결과 저장
        pd.DataFrame([metrics]).to_csv(args.metrics_out, index=False, encoding="utf-8-sig")
        print(f"[INFO] Metrics saved to: {args.metrics_out}")
        
        df_preds.to_csv(args.preds_out, index=False, encoding="utf-8-sig")
        print(f"[INFO] Predictions saved to: {args.preds_out}")

if __name__ == "__main__":
    main()

[INFO] Jupyter 내부 인자 무시됨: ['-f', 'C:\\Users\\m\\AppData\\Roaming\\jupyter\\runtime\\kernel-5f66ebf9-b0d4-4e4f-8e55-e4167922009a.json']
[INFO] Using device: cuda, dtype: torch.bfloat16
[INFO] Loading Base Model: LGAI-EXAONE/EXAONE-4.0-1.2B


`torch_dtype` is deprecated! Use `dtype` instead!


▶▶▶ [Mode] Loading LoRA Adapter from: ./korsmishing-qlora-smishing-expl_확장 ◀◀◀
[INFO] Loading CSV from test.csv
[INFO] Total samples: 2828
[INFO] Running inference...


Generating:   0%|          | 0/354 [00:00<?, ?batch/s]

The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.



Accuracy: 0.9261
Macro F1: 0.9217

Classification Report:
              precision    recall  f1-score   support

         스미싱       0.89      0.99      0.94      1654
          정상       0.99      0.83      0.90      1174

    accuracy                           0.93      2828
   macro avg       0.94      0.91      0.92      2828
weighted avg       0.93      0.93      0.92      2828

[INFO] Metrics saved to: metrics_result.csv
[INFO] Predictions saved to: predictions_result.csv
