In [2]:
!pip install transformers accelerate datasets peft bitsandbytes sentencepiece --quiet


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.4/59.4 MB[0m [31m18.6 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25h

In [3]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

Mon Dec  1 12:11:28 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   55C    P8             10W /   70W |       0MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [4]:
from huggingface_hub import login
login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [5]:
!git clone https://github.com/aivle-agent/complaint_system_AI.git
%cd complaint_system_AI

!ls

!unzip complain_data.zip -d data

!ls data


Cloning into 'complaint_system_AI'...
remote: Enumerating objects: 21, done.[K
remote: Counting objects: 100% (21/21), done.[K
remote: Compressing objects: 100% (16/16), done.[K
remote: Total 21 (delta 5), reused 8 (delta 3), pack-reused 0 (from 0)[K
Receiving objects: 100% (21/21), 12.13 MiB | 7.80 MiB/s, done.
Resolving deltas: 100% (5/5), done.
/content/complaint_system_AI
 bandit_prompt_system.py   complain_quality_shap.py   README.md
 complain_data.zip	  'Complaint data.zip'	      system_test.ipynb
Archive:  complain_data.zip
  inflating: data/중앙행정기관.csv  
  inflating: data/국립아시아문화전당.csv  
  inflating: data/지방행정기관.csv  
  inflating: data/국민신문고.csv  
국립아시아문화전당.csv	지방행정기관.csv  중앙행정기관.csv  국민신문고.csv


In [None]:
import pandas as pd

df = pd.read_csv('data/중앙행정기관.csv')

In [None]:
# data1.head(1)['consulting_content']

In [None]:
import numpy as np
import json
from dataclasses import dataclass
from typing import List, Dict, Any, Optional

import transformers
import torch

# 0. LLaMA 파이프라인 초기화

MODEL_ID = "meta-llama/Meta-Llama-3-8B-Instruct"

llama_pipe = transformers.pipeline(
    "text-generation",
    model=MODEL_ID,
    model_kwargs={"torch_dtype": torch.bfloat16},
    device_map="auto",
)

# 1. 프롬프트 ARM 정의

@dataclass
class PromptArmConfig:
    """
    각 프롬프트 전략(arm)에 대한 메타 정보.
    feature_vector는 이 전략의 성향을 나타내는 임베딩(스타일/목적 가중치 등).
    """
    name: str
    description: str
    feature_vector: np.ndarray  # shape = (d,)

# 2. Logistic Bandit (arm별 w, H + UCB)

class LogisticPromptBandit:
    """
    프롬프트 전략 선택용 logistic bandit.
    - 각 arm별로 logistic 모델 w_a, H_a 유지
    - UCB 스타일로 exploration (mean + beta * uncertainty)
    """
    def __init__(
        self,
        arms: List[PromptArmConfig],
        lambda_reg: float = 1.0,
        eta: float = 0.1,
        beta: float = 1.0,
    ):
        self.arms = arms
        self.n_arms = len(arms)
        self.d = arms[0].feature_vector.shape[0]

        # arm별 parameter, Hessian 근사
        self.w = np.zeros((self.n_arms, self.d))  # w_a
        self.H = [lambda_reg * np.eye(self.d) for _ in range(self.n_arms)]

        self.eta = eta   # step size
        self.beta = beta # exploration 강도

    def _sigmoid(self, z: float) -> float:
        return 1.0 / (1.0 + np.exp(-z))

    def select_arm(self) -> int:
        """
        현재까지의 w, H를 기반으로 어느 프롬프트 전략 arm을 쓸지 선택.
        score_a = (w_a^T x_a) + beta * sqrt( x_a^T H_a^{-1} x_a )
        """
        scores = []
        for a_idx, arm in enumerate(self.arms):
            x = arm.feature_vector  # (d,)
            mean = float(self.w[a_idx].dot(x))

            invH = np.linalg.inv(self.H[a_idx])
            var = float(x.T @ invH @ x)
            ucb = self.beta * np.sqrt(max(var, 1e-12))

            scores.append(mean + ucb)

        chosen = int(np.argmax(scores))
        return chosen

    def update(self, arm_idx: int, reward: float):
        """
        bandit reward(0~1 범위)를 받아 online logistic regression update.
        reward는 verifier에서 나온 스칼라 점수.
        """
        arm = self.arms[arm_idx]
        x = arm.feature_vector
        z = float(self.w[arm_idx].dot(x))
        p = self._sigmoid(z)  # 현재 model이 보는 "성공 확률"

        # logistic loss gradient: -(y - p) * x
        grad = -(reward - p) * x

        # Hessian 근사 업데이트: H_a += x x^T
        self.H[arm_idx] += np.outer(x, x)

        # Online Newton-style step: w <- w - eta * H^{-1} grad
        invH = np.linalg.inv(self.H[arm_idx])
        step = self.eta * (invH @ grad)
        self.w[arm_idx] -= step

    def explain_current_strategy(self) -> List[str]:
        """
        각 arm에 대해 현재 bandit가 보는 "예상 성공도"를 텍스트로 정리.
        """
        lines = []
        for a_idx, arm in enumerate(self.arms):
            x = arm.feature_vector
            z = float(self.w[a_idx].dot(x))
            p = self._sigmoid(z)
            lines.append(
                f"[{arm.name}] "
                f"예상 성공도 ≈ {p:.3f} | "
                f"feature={np.round(x, 2)} | "
                f"설명: {arm.description}"
            )
        return lines

# 3. LLM Generator (Meta-Llama-3 기반)


class LlamaChatGenerator:
    def __init__(self, base_model, lora_path=None):
        from peft import PeftModel

        self.tokenizer = transformers.AutoTokenizer.from_pretrained(base_model)
        model = transformers.AutoModelForCausalLM.from_pretrained(
            base_model, torch_dtype=torch.bfloat16, device_map="auto"
        )

        if lora_path:
            model = PeftModel.from_pretrained(model, lora_path)

        self.pipe = transformers.pipeline(
            "text-generation",
            model=model,
            tokenizer=self.tokenizer,
            device_map="auto",
        )

    def generate(self, system_prompt, user_prompt):
        msgs = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ]

        out = self.pipe(
            msgs,
            max_new_tokens=256,
            do_sample=True,
            temperature=0.2,
            top_p=0.9,
        )[0]["generated_text"]

        # assistant role만 추출
        if isinstance(out, list):
            for m in out:
                if m.get("role") == "assistant":
                    return m["content"]
        return str(out)



def build_prompt_from_arm(
    base_instruction: str,
    complaint_text: str,
    arm: PromptArmConfig,
) -> str:
    """
    arm의 전략 설명을 user 프롬프트에 녹여서 스타일/방향을 제어.
    - base_instruction: system 쪽에 들어가는 역할/전략 기본 설명
    - 여기서 반환하는 건 user_prompt에 들어갈 텍스트
    """
    return (
        f"아래 민원에 대해 답변을 작성하되, 다음 전략 프로필을 따르세요.\n\n"
        f"[전략 이름]\n{arm.name}\n\n"
        f"[전략 설명]\n{arm.description}\n\n"
        f"[민원 내용]\n{complaint_text}\n\n"
        f"[추가 지침]\n"
        f"- 관련 법령/정책과의 일치성을 확인하고, 처리 가능/불가능을 명확히 구분하세요.\n"
        f"- 민원인이 이해하기 쉬운 구조로 답변하고, 필요한 경우 대안/절차를 제시하세요.\n"
        f"- 과도한 약속이나 확정적인 표현은 피하고, '담당 부서의 최종 판단'이 필요함을 밝혀주세요.\n"
    )

# 4. Verifier: LLM 기반 점수 → reward


@dataclass
class VerificationScores:
    """각 품질 지표 스코어 (0~1)."""
    resolution_likelihood: float    # 실제로 해결/처리가 될 가능성
    policy_legal_alignment: float   # 정책/법령·과거 답변과의 일치도
    explanation_clarity: float      # 구조/논리/명료성
    empathy_tone: float             # 민원인 친화적 톤, 갈등 완화
    risk_safety: float              # 기관 입장에서의 안전성 (높을수록 안전)


class SmallVerifier:
    def __init__(self, base_model, lora_path=None):
        from peft import PeftModel

        self.tokenizer = transformers.AutoTokenizer.from_pretrained(base_model)
        model = transformers.AutoModelForSequenceClassification.from_pretrained(
            base_model,
            num_labels=1,   # 0~1 점수
            torch_dtype=torch.bfloat16,
            device_map="auto"
        )

        if lora_path:
            model = PeftModel.from_pretrained(model, lora_path)

        self.model = model

    def evaluate(self, complaint, answer):
        text = f"[COMPLAINT]\n{complaint}\n\n[ANSWER]\n{answer}"
        inputs = self.tokenizer(text, truncation=True, max_length=2048, return_tensors="pt").to("cuda")

        with torch.no_grad():
            score = torch.sigmoid(self.model(**inputs).logits)[0].item()
        return score


def aggregate_reward(
    scores: VerificationScores,
    weights: Optional[Dict[str, float]] = None,
) -> float:
    """
    reward 구조:
      r = w1 * 해결 가능성
        + w2 * 정책/법률 일치
        + w3 * 명료성
        + w4 * 공감
        + w5 * 안전성
    """
    if weights is None:
        weights = {
            "resolution_likelihood": 0.35,
            "policy_legal_alignment": 0.25,
            "explanation_clarity": 0.20,
            "empathy_tone": 0.10,
            "risk_safety": 0.10,
        }

    s = (
        scores.resolution_likelihood * weights["resolution_likelihood"]
        + scores.policy_legal_alignment * weights["policy_legal_alignment"]
        + scores.explanation_clarity * weights["explanation_clarity"]
        + scores.empathy_tone * weights["empathy_tone"]
        + scores.risk_safety * weights["risk_safety"]
    )

    return float(max(0.0, min(1.0, s)))

# 5. 전체 엔진: bandit → generator → verifier → bandit update

class PromptBanditEngine:
    """
    - bandit이 프롬프트 전략 선택
    - LLM이 답변 생성
    - verifier가 품질 측정 → reward 산출
    - bandit 업데이트
    """
    def __init__(
        self,
        arms: List[PromptArmConfig],
        base_instruction: str,
        llm: LlamaChatGenerator,
        verifier: LlamaVerifier,
    ):
        self.bandit = LogisticPromptBandit(arms=arms)
        self.base_instruction = base_instruction
        self.llm = llm
        self.verifier = verifier

    def step(
        self,
        complaint_text: str,
        reward_weights: Optional[Dict[str, float]] = None,
        extra_context: str = "",
    ) -> Dict[str, Any]:
        """
        민원 1건에 대해:
          1) bandit으로 전략 arm 선택
          2) 해당 전략으로 user_prompt 구성 + LLM 답변
          3) verifier로 평가 + reward
          4) bandit update
          5) 결과/로그 반환
        """
        # 1. arm 선택
        arm_idx = self.bandit.select_arm()
        arm = self.bandit.arms[arm_idx]

        # 2. prompt 구성 + LLM 답변
        user_prompt = build_prompt_from_arm(
            base_instruction=self.base_instruction,
            complaint_text=complaint_text,
            arm=arm,
        )
        answer = self.llm.generate(
            system_prompt=self.base_instruction,
            user_prompt=user_prompt,
        )

        # 3. verifier 평가
        scores = self.verifier.evaluate(
            complaint_text=complaint_text,
            answer_text=answer,
            extra_context=extra_context,
        )
        reward = aggregate_reward(scores, weights=reward_weights)

        # 4. bandit 업데이트
        self.bandit.update(arm_idx=arm_idx, reward=reward)

        # 5. 현재 전략 방향성 요약
        strategy_view = self.bandit.explain_current_strategy()

        return {
            "chosen_arm_idx": arm_idx,
            "chosen_arm_name": arm.name,
            "chosen_arm_description": arm.description,
            "user_prompt": user_prompt,
            "answer": answer,
            "verification_scores": scores,
            "reward": reward,
            "strategy_view": strategy_view,
        }


# 6. to be decided

def build_default_arms() -> List[PromptArmConfig]:
    """
      [법률/정책 중시 정도, 해결책 제안 정도, 공감/톤, 단호함(제한 강조), 근거 제시 강조]
    """
    return [
        PromptArmConfig(
            name="법률-정책 최우선",
            description="법령, 조례, 내부지침과의 일치성을 최우선으로 하고 책임 범위를 분명히 하는 보수적 답변.",
            feature_vector=np.array([0.9, 0.6, 0.4, 0.8, 0.7]),
        ),
        PromptArmConfig(
            name="민원인 공감형",
            description="민원인의 감정과 상황을 충분히 공감하고, 이해하기 쉬운 언어로 절차와 한계를 설명하는 답변.",
            feature_vector=np.array([0.6, 0.7, 0.9, 0.3, 0.5]),
        ),
        PromptArmConfig(
            name="해결책 제안형",
            description="현실적으로 가능한 대안과 행동 옵션을 최대한 많이 제시하는 해결 중심 답변.",
            feature_vector=np.array([0.7, 0.9, 0.7, 0.4, 0.8]),
        ),
    ]


def main():
    arms = build_default_arms()

    generator = LlamaChatGenerator(
        base_model="meta-llama/Meta-Llama-3-8B-Instruct",
        lora_path="./gen_lora"
    )

    verifier = SmallVerifier(
        base_model="microsoft/Phi-3-mini-4k-instruct",
        lora_path="./verifier_lora"
    )

    engine = PromptBanditEngine(
        arms=arms,
        base_instruction=BASE_SYS_PROMPT,
        llm=generator,
        verifier=verifier,
    )

    results = []
    for idx, row in tqdm(wide.iterrows(), total=len(wide)):
        complaint = row["complaint_text"]
        context = row.get("topic", "")

        out = engine.step(complaint, extra_context=context)

        results.append({
            "complaint": complaint,
            "answer": out["answer"],
            "reward": out["reward"],
            "arm": out["chosen_arm_name"]
        })

    df_results = pd.DataFrame(results)
    df_results.to_csv("full_batch_results.csv", index=False)




     

if __name__ == "__main__":
    main()

In [None]:
!pip install transformers peft accelerate bitsandbytes sentencepiece sentence-transformers scikit-learn --quiet

In [None]:

import os, re, json
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from tqdm import tqdm

import torch
import transformers
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    AutoModelForSequenceClassification
)
from peft import PeftModel

from sentence_transformers import SentenceTransformer
from sklearn.cluster import DBSCAN
from sklearn.metrics.pairwise import cosine_similarity
import re


#############################################
# 1) 민원 텍스트에서 Q / A 분리
#############################################

def parse_qa(text: str):
    if not isinstance(text, str):
        return "", ""

    # Q: ~~~ A: ~~~ 구조에서 Q, A 각각 분리
    q_match = re.search(r"Q\s*:\s*(.*?)(?:A\s*:|$)", text, flags=re.DOTALL)
    a_match = re.search(r"A\s*:\s*(.*)", text, flags=re.DOTALL)

    q = q_match.group(1).strip() if q_match else ""
    a = a_match.group(1).strip() if a_match else ""
    return q, a

def clean_text(x: Any) -> str:
    if not isinstance(x, str):
        return ""
    x = re.sub(r"\d{2,3}-\d{3,4}-\d{4}", "[TEL]", x)
    x = re.sub(r"\d{6}-\d{7}", "[RRN]", x)
    x = re.sub(r"[가-힣]{2,3}씨", "[NAME]", x)
    return x.strip()

def convert_df_to_qa(df: pd.DataFrame) -> pd.DataFrame:
    rows = []
    for _, row in df.iterrows():
        content = row.get("consulting_content", "")
        q_raw, a_raw = parse_qa(content)

        q = clean_text(q_raw)
        a = clean_text(a_raw)

        if len(q) < 5 or len(a) < 5:
            continue

        rows.append({
            "question": q,
            "answer": a,
            "source": row.get("source", ""),
            "date": row.get("consulting_date", ""),
            "category": row.get("consulting_category", ""),
            "classification": row.get("classification", "")
        })
    return pd.DataFrame(rows)


# 4) Dataset Builder (generator/verifier 학습용)


def build_dataset(df: pd.DataFrame):
    data = []
    for _, row in df.iterrows():
        q = clean_text(row.get("question", ""))
        a = clean_text(row.get("answer", ""))

        if len(q) < 5 or len(a) < 5:
            continue

        data.append({"question": q, "answer": a})
    return data

# 5) Train/Test Split


def split_dataset(all_data, test_ratio=0.1):
    total = len(all_data)
    test_size = max(1, int(total * test_ratio))
    return all_data[:-test_size], all_data[-test_size:]




# ============================================================
# 2. 프롬프트 전략 ARM 정의


@dataclass
class PromptArmConfig:
    name: str
    description: str
    feature_vector: np.ndarray   


def build_prompt_arms() -> List[PromptArmConfig]:
    return [
        PromptArmConfig(
            name="법률-정책 최우선",
            description="법령·조례·내부지침과의 일치성을 최우선으로 하고, 책임 범위를 명확히 하는 보수적 답변.",
            feature_vector=np.array([0.9, 0.4, 0.5]),
        ),
        PromptArmConfig(
            name="민원인 공감형",
            description="민원인의 감정과 상황을 충분히 공감하고, 이해하기 쉬운 언어로 절차와 한계를 설명하는 답변.",
            feature_vector=np.array([0.6, 0.9, 0.6]),
        ),
        PromptArmConfig(
            name="해결책 제안형",
            description="현실적으로 가능한 대안과 행동 옵션을 적극적으로 제시하는 해결 중심 답변.",
            feature_vector=np.array([0.7, 0.6, 0.9]),
        ),
    ]

# 3. Contextual Logistic Bandit
#    (arm feature + complaint context feature)
# 

class ContextualLogisticPromptBandit:
    """
    Contextual Bandit:
      - 각 arm마다 고정 feature_vector
      - 각 민원마다 context_vec (질문 길이, 구체성, 클러스터 등)
      - 입력 phi = concat(arm_feature, ctx_vec)
    """
    def __init__(
        self,
        arms: List[PromptArmConfig],
        ctx_dim: int,
        lambda_reg: float = 1.0,
        eta: float = 0.1,
        beta: float = 1.0,
    ):
        self.arms = arms
        self.n_arms = len(arms)
        self.d_arm = arms[0].feature_vector.shape[0]
        self.d_ctx = ctx_dim
        self.d = self.d_arm + self.d_ctx

        # arm별 parameter, Hessian 근사
        self.w = np.zeros((self.n_arms, self.d))
        self.H = [lambda_reg * np.eye(self.d) for _ in range(self.n_arms)]

        self.eta = eta
        self.beta = beta

    def _sigmoid(self, z: float) -> float:
        return 1.0 / (1.0 + np.exp(-z))

    def _phi(self, arm_idx: int, ctx_vec: np.ndarray) -> np.ndarray:
        arm_feat = self.arms[arm_idx].feature_vector
        return np.concatenate([arm_feat, ctx_vec])  # (d,)

    def select_arm(self, ctx_vec: np.ndarray) -> int:
        scores = []
        for a_idx in range(self.n_arms):
            x = self._phi(a_idx, ctx_vec)
            mean = float(self.w[a_idx].dot(x))

            invH = np.linalg.inv(self.H[a_idx])
            var = float(x.T @ invH @ x)
            ucb = self.beta * np.sqrt(max(var, 1e-12))

            scores.append(mean + ucb)
        return int(np.argmax(scores))

    def update(self, arm_idx: int, ctx_vec: np.ndarray, reward: float):
        x = self._phi(arm_idx, ctx_vec)
        z = float(self.w[arm_idx].dot(x))
        p = self._sigmoid(z)

        grad = -(reward - p) * x
        self.H[arm_idx] += np.outer(x, x)

        invH = np.linalg.inv(self.H[arm_idx])
        step = self.eta * (invH @ grad)
        self.w[arm_idx] -= step

    def explain_current_strategy(self, ctx_vec: np.ndarray) -> List[str]:
        lines = []
        for a_idx, arm in enumerate(self.arms):
            x = self._phi(a_idx, ctx_vec)
            z = float(self.w[a_idx].dot(x))
            p = self._sigmoid(z)
            lines.append(
                f"[{arm.name}] 예상 성공도 ≈ {p:.3f} | arm_feat={np.round(arm.feature_vector, 2)}"
            )
        return lines


# ============================================================
# 4. Question Refiner (간단 버전; 나중에 LLM으로 교체 가능)
# ============================================================

class SimpleQuestionRefiner:
    """
    지금은 간단히 공백 정리 + 기본 클리닝만 수행.
    나중에 LLM 기반으로 '질문 구조 교정' 로직으로 교체 가능.
    """
    def refine(self, raw_text: str) -> str:
        t = clean_text(raw_text)
        # TODO: 여기에 LLM 기반 재구성 로직을 넣어도 됨.
        return t


# ============================================================
# 5. DBSCAN 기반 민원 유형화 (임베딩 + 클러스터)
# ============================================================

class ComplaintClusterer:
    """
    - SentenceTransformer로 임베딩
    - DBSCAN으로 군집
    - 새로운 민원 → 임베딩 → 가장 유사한 기존 포인트의 클러스터 라벨 사용
    """
    def __init__(self, model_name="jhgan/ko-sroberta-multitask", eps=0.4, min_samples=5):
        self.encoder = SentenceTransformer(model_name)
        self.dbscan = DBSCAN(eps=eps, min_samples=min_samples, metric="cosine")
        self.embeddings = None
        self.labels = None

    def fit(self, texts: List[str]):
        self.embeddings = self.encoder.encode(
            texts,
            batch_size=32,
            show_progress_bar=True,
            convert_to_numpy=True,
            normalize_embeddings=True,
        )
        self.dbscan.fit(self.embeddings)
        self.labels = self.dbscan.labels_
        print("클러스터 개수(노이즈 포함):", len(set(self.labels)))

    def predict_cluster(self, text: str) -> int:
        if self.embeddings is None or self.labels is None:
            return -1
        emb = self.encoder.encode([text], convert_to_numpy=True, normalize_embeddings=True)
        sims = cosine_similarity(emb, self.embeddings)[0]
        idx = int(np.argmax(sims))
        return int(self.labels[idx])


# ============================================================
# 6. Context Vector 생성 (질문 특성 + 클러스터)
# ============================================================

def build_context_vector(refined_q: str, cluster_id: int) -> np.ndarray:
    # 길이 (0~1 스케일)
    length = min(len(refined_q) / 500.0, 1.0)
    # 문장 수 (0~1 스케일)
    num_sent = refined_q.count("다.") + refined_q.count(".")
    num_sent = min(num_sent / 10.0, 1.0)
    # 법률 관련 키워드 여부
    has_law = 1.0 if any(k in refined_q for k in ["법", "조항", "제", "시행령"]) else 0.0
    # 클러스터 id normalize (대충 0~1로)
    cluster_norm = 0.0 if cluster_id == -1 else (cluster_id % 20) / 20.0

    return np.array([length, num_sent, has_law, cluster_norm], dtype=float)


# 7. Generator (LLaMA) + Verifier (Phi-3)


class LlamaChatGenerator:
    def __init__(self, model_id="meta-llama/Meta-Llama-3-8B-Instruct"):
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_id,
            torch_dtype=torch.bfloat16,
            device_map="auto",
        )
        self.pipe = transformers.pipeline(
            "text-generation",
            model=self.model,
            tokenizer=self.tokenizer,
            device_map="auto",
        )

    def generate(self, system_prompt: str, user_prompt: str, max_new_tokens: int = 256):
        msgs = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ]
        out = self.pipe(
            msgs,
            max_new_tokens=max_new_tokens,
            do_sample=True,
            temperature=0.2,
            top_p=0.9,
        )[0]["generated_text"]
        return str(out)


class PhiMiniVerifier:
    """
    Q/A를 입력으로 받아 0~1 신뢰도 점수 출력.
    (지금은 단일 score지만, 나중에 다중 지표 확장 가능)
    """
    def __init__(self, model_id="microsoft/Phi-3-mini-4k-instruct"):
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)
        self.model = AutoModelForSequenceClassification.from_pretrained(
            model_id,
            num_labels=1,
            torch_dtype=torch.bfloat16,
            device_map="auto",
        )

    def score(self, question: str, answer: str) -> float:
        text = f"[QUESTION]\n{question}\n\n[ANSWER]\n{answer}"
        inputs = self.tokenizer(
            text,
            truncation=True,
            max_length=2048,
            return_tensors="pt",
        ).to("cuda")

        with torch.no_grad():
            logits = self.model(**inputs).logits
            prob = torch.sigmoid(logits)[0].item()
        return float(prob)

# 8. 프롬프트 조립


BASE_SYS_PROMPT = (
    "당신은 한국 공공기관의 민원 보조 AI입니다. "
    "사실과 법령, 정책과의 일치성, 절차 안내를 중시하며, "
    "과도한 약속이나 확정적인 표현은 피해야 합니다."
)

def build_prompt_from_arm(
    complaint_text: str,
    arm: PromptArmConfig,
) -> str:
    return (
        f"아래 민원에 대해 답변을 작성하되, 제시된 전략 프로필을 따르세요.\n\n"
        f"[전략 이름]\n{arm.name}\n\n"
        f"[전략 설명]\n{arm.description}\n\n"
        f"[민원 내용]\n{complaint_text}\n\n"
        f"[추가 지침]\n"
        f"- 관련 법령/정책과의 일치성을 확인하고, 처리 가능/불가능을 명확히 구분하세요.\n"
        f"- 민원인이 이해하기 쉬운 구조(요약 → 근거 → 절차 → 주의사항)로 답변하세요.\n"
        f"- 과도한 약속이나 확정적인 표현은 피하고, "
        f"'실제 처리 여부는 담당 부서의 최종 판단에 따릅니다.'라는 문장을 반드시 포함하세요.\n"
    )


# ============================================================
# 9. 전체 엔진: Refiner → Cluster → Bandit → Generator → Verifier
# ============================================================

class ComplaintEngine:
    def __init__(
        self,
        arms: List[PromptArmConfig],
        generator: LlamaChatGenerator,
        verifier: PhiMiniVerifier,
        clusterer: ComplaintClusterer,
        refiner: SimpleQuestionRefiner,
        ctx_dim: int,
    ):
        self.arms = arms
        self.generator = generator
        self.verifier = verifier
        self.clusterer = clusterer
        self.refiner = refiner
        self.bandit = ContextualLogisticPromptBandit(arms=arms, ctx_dim=ctx_dim)

    def step(self, raw_complaint: str) -> Dict[str, Any]:
        # 1) 질문 교정
        refined_q = self.refiner.refine(raw_complaint)

        # 2) DBSCAN 기반 유형 클러스터
        cluster_id = self.clusterer.predict_cluster(refined_q)

        # 3) Context feature 벡터
        ctx_vec = build_context_vector(refined_q, cluster_id)

        # 4) Bandit으로 arm 선택
        arm_idx = self.bandit.select_arm(ctx_vec)
        arm = self.arms[arm_idx]

        # 5) Generator 호출 프롬프트 생성
        user_prompt = build_prompt_from_arm(refined_q, arm)
        answer = self.generator.generate(BASE_SYS_PROMPT, user_prompt)

        # 6) Verifier 점수 → reward
        reward = self.verifier.score(refined_q, answer)

        # 7) Bandit 업데이트
        self.bandit.update(arm_idx, ctx_vec, reward)

        # 8) 전략 요약 (현재 context 기준 추정)
        strat_view = self.bandit.explain_current_strategy(ctx_vec)

        return {
            "raw_complaint": raw_complaint,
            "refined_complaint": refined_q,
            "cluster_id": cluster_id,
            "context_vec": ctx_vec,
            "chosen_arm_idx": arm_idx,
            "chosen_arm_name": arm.name,
            "answer": answer,
            "reward": reward,
            "strategy_view": strat_view,
        }


# ============================================================
# 10. 배치 평가 (Train/Test 중 Test 파트에 대해)
# ============================================================

def evaluate_batch(engine: ComplaintEngine, test_data, out_csv="batch_results.csv"):
    logs = []
    for item in tqdm(test_data):
        q = item["question"]
        out = engine.step(q)
        logs.append({
            "question": q,
            "refined_question": out["refined_complaint"],
            "cluster_id": out["cluster_id"],
            "arm": out["chosen_arm_name"],
            "score": out["reward"],
            "answer": out["answer"],
        })
    df = pd.DataFrame(logs)
    df.to_csv(out_csv, index=False)
    return df


# ============================================================
# 11. 실시간 서비스용 래퍼
# ============================================================

def ask(engine: ComplaintEngine, complaint: str):
    out = engine.step(complaint)
    print("========================================")
    print("[선택된 전략] ", out["chosen_arm_name"])
    print("[클러스터 ID]", out["cluster_id"])
    print("[컨텍스트 벡터]", np.round(out["context_vec"], 3))
    print("\n[교정된 민원]\n", out["refined_complaint"])
    print("\n[예상 답변 앞부분]\n", out["answer"][:800])
    print("\n[Verifier 기반 신뢰도 score] ", round(out["reward"], 3))
    print("\n[현재 Bandit 전략 뷰]")
    for line in out["strategy_view"]:
        print("  ", line)
    print("========================================")
    return out


# 12. MAIN: 전체 흐름 연결


def main():
    # 0) 원본 CSV 로드
    raw = pd.read_csv('data/중앙행정기관.csv')

    # 1) Q/A 분리된 DF 생성
    qa_df = convert_df_to_qa(raw)
    print("QA rows:", len(qa_df))
    print(qa_df.head(2))

    # 2) build_dataset + train/test split
    all_data = build_dataset(qa_df)
    print("Total usable Q/A pairs:", len(all_data))

    train_data, test_data = split_dataset(all_data, test_ratio=0.1)
    print("Train size:", len(train_data), "Test size:", len(test_data))

    # 3) 클러스터링에 쓸 질문 텍스트
    train_questions = [d["question"] for d in train_data]

    print("Train questions for clustering:", len(train_questions))
    if len(train_questions) == 0:
        raise RuntimeError("No questions available for clustering. Check parsing/filters.")

    # 4) 클러스터러 학습
    clusterer = ComplaintClusterer()
    clusterer.fit(train_questions)

    # 3) 컴포넌트 초기화
    arms = build_prompt_arms()
    generator = LlamaChatGenerator("meta-llama/Meta-Llama-3-8B-Instruct")
    verifier = PhiMiniVerifier("microsoft/Phi-3-mini-4k-instruct")
    refiner = SimpleQuestionRefiner()

    engine = ComplaintEngine(
        arms=arms,
        generator=generator,
        verifier=verifier,
        clusterer=clusterer,
        refiner=refiner,
        ctx_dim=4,   # build_context_vector에서 4차원 벡터 생성
    )

    # 4) Test 일부에 대한 배치 평가
    print("\n Test set 일부(예: 20개)에 대한 평가 실행 중...")
    df_res = evaluate_batch(engine, test_data[:20], out_csv="batch_results_sample.csv")
    print("샘플 결과 saved to batch_results_sample.csv")
    display(df_res.head())

    # 5) 실시간 예시
    print("\n 실시간 예시 민원 입력 테스트")
    _ = ask(engine, "우리 아파트 단지 앞 도로에 불법주차 차량이 많아서 사고 위험이 큽니다. 어떤 조치를 받을 수 있나요?")


if __name__ == "__main__":
    main()


real test zone

In [6]:
!pip install -U "transformers>=4.40.0" "accelerate" "peft"

Collecting transformers>=4.40.0
  Downloading transformers-4.57.3-py3-none-any.whl.metadata (43 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.0/44.0 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
Downloading transformers-4.57.3-py3-none-any.whl (12.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.0/12.0 MB[0m [31m144.4 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
[?25hInstalling collected packages: transformers
  Attempting uninstall: transformers
    Found existing installation: transformers 4.57.2
    Uninstalling transformers-4.57.2:
      Successfully uninstalled transformers-4.57.2
Successfully installed transformers-4.57.3


In [12]:
# ===================== Generator LoRA 학습 =====================
import math
import re
from typing import Any, List, Dict, Tuple

import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    DataCollatorForLanguageModeling,
    Trainer,
    TrainingArguments,
)
from peft import LoraConfig, get_peft_model

# ---- 0) 공통 유틸 (질문/답변 추출용) ----

def clean_text(x: Any) -> str:
    """전화번호, 주민번호, 이름 패턴 등을 마스킹하고 strip."""
    if not isinstance(x, str):
        return ""
    x = re.sub(r"\d{2,3}-\d{3,4}-\d{4}", "[TEL]", x)
    x = re.sub(r"\d{6}-\d{7}", "[RRN]", x)
    x = re.sub(r"[가-힣]{2,3}씨", "[NAME]", x)
    return x.strip()


def parse_question_answer(full_text: str) -> Tuple[str, str]:
    """
    중앙행정기관 데이터의 consulting_content/complaint_text에서
    Q/A 부분을 분리한다.

    대략적인 패턴:
      "제목 : ...\n\nQ : ...\n...\n\nA : ..."

    Q/A 구분이 애매한 경우엔 heuristic.
    """
    if not isinstance(full_text, str):
        return "", ""

    text = full_text.strip()

    # 1) "A :" 기준으로 먼저 자르기
    a_idx = text.find("\nA :")
    if a_idx == -1:
        a_idx = text.find("\n\nA :")
    if a_idx == -1:
        a_idx = text.find("A :")

    if a_idx != -1:
        q_part = text[:a_idx].strip()
        a_part = text[a_idx:].strip()
        # A : 제거
        a_part = re.sub(r"^A\s*:\s*", "", a_part, flags=re.MULTILINE).strip()
    else:
        q_idx = text.find("Q :")
        if q_idx != -1:
            q_part = text[q_idx:].strip()
            a_part = ""
        else:

            q_part, a_part = text, ""

    # Q : / 제목 : 제거
    q_part = re.sub(r"^제목\s*:\s*", "", q_part, flags=re.MULTILINE)
    q_part = re.sub(r"^Q\s*:\s*", "", q_part, flags=re.MULTILINE)

    q_part = clean_text(q_part)
    a_part = clean_text(a_part)
    return q_part, a_part


def load_wide_from_csv(path: str) -> pd.DataFrame:
    """
    중앙행정기관.csv → wide 포맷으로 피벗.
    complaint_text 컬럼에 원본 질의+답변 전체가 들어있다고 가정.
    """
    df = pd.read_csv(path)
    group_keys = ["source", "consulting_date", "consulting_category", "consulting_content"]

    wide = (
        df.pivot_table(
            index=group_keys,
            columns="classification_category",
            values="classification",
            aggfunc=lambda x: " / ".join(sorted(set(x))),
        )
        .reset_index()
    )

    wide = wide.rename(
        columns={
            "consulting_content": "complaint_text",
            "상담 주제": "topic",
            "상담 사유": "reason",
            "상담 결과": "outcome",
            "상담 요건": "requirement",
            "상담 내용": "summary",
        }
    )
    return wide


def build_qa_dataset_from_wide(wide: pd.DataFrame) -> List[Dict[str, str]]:
    """
    wide['complaint_text']에서 Q/A 추출.
    - parse_question_answer()를 사용하므로 기존 전처리와 동일한 로직.
    리턴: [{"question": q, "answer": a}, ...]
    """
    data = []
    for _, row in wide.iterrows():
        full_text = row.get("complaint_text", "")
        q, a = parse_question_answer(full_text)

        if len(q) < 5 or len(a) < 5:
            continue

        data.append(
            {
                "question": q,
                "answer": a,
                "topic": row.get("topic", ""),
                "reason": row.get("reason", ""),
                "outcome": row.get("outcome", ""),
                "requirement": row.get("requirement", ""),
                "summary": row.get("summary", ""),
            }
        )
    return data


def split_train_val_test(
    all_data: List[Dict[str, str]],
    val_ratio: float = 0.1,
    test_ratio: float = 0.1,
    seed: int = 42,
) -> Tuple[List[Dict], List[Dict], List[Dict]]:
    """
    all_data를 train / val / test로 분리.
    순서를 섞은 뒤 비율대로 잘라서 사용.
    """
    n = len(all_data)
    if n == 0:
        return [], [], []

    idx = np.arange(n)
    rng = np.random.default_rng(seed)
    rng.shuffle(idx)

    val_size = int(n * val_ratio)
    test_size = int(n * test_ratio)

    val_idx = idx[:val_size]
    test_idx = idx[val_size : val_size + test_size]
    train_idx = idx[val_size + test_size :]

    def pick(idxs):
        return [all_data[i] for i in idxs]

    return pick(train_idx), pick(val_idx), pick(test_idx)


# ---- 1) Generator LoRA 하이퍼파라미터 ----

GEN_BASE_MODEL = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
GEN_LORA_OUT_DIR = "./gen_lora"
BASE_SYS_PROMPT = (
    "당신은 한국 공공기관의 민원 답변을 작성하는 공무원 보조 AI입니다. "
    "법령과 정책에 기반하여 답변하고, 실제 처리 여부는 담당 부서의 최종 판단에 따름을 명시해야 합니다."
)

MAX_LEN_GEN = 512
GEN_BATCH_SIZE = 4
GEN_NUM_EPOCHS = 3
GEN_LR = 5e-5
GEN_LOG_STEPS = 100


# ---- 2) SFT Dataset ----

class GeneratorSFTDataset(Dataset):
    """
    Generator SFT용 Dataset
    prompt = system + question
    target = answer 전체 (teacher forcing)
    """

    def __init__(self, data, tokenizer, max_length=512, base_sys_prompt=""):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.base_sys_prompt = base_sys_prompt

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        q = item["question"]
        a = item["answer"]

        prompt = (
            f"<|system|>\n{self.base_sys_prompt}\n\n"
            f"<|user|>\n{q}\n\n"
            f"<|assistant|>\n"
        )
        full_text = prompt + a

        enc = self.tokenizer(
            full_text,
            truncation=True,
            max_length=self.max_length,
            padding="max_length",
            return_tensors="pt",
        )

        input_ids = enc["input_ids"][0]
        attention_mask = enc["attention_mask"][0]
        labels = input_ids.clone()  # causal LM

        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "labels": labels,
        }


# ---- 3) LoRA 학습 함수 (train + val 평가) ----

def train_lora_generator_with_eval(
    train_data: List[Dict[str, str]],
    val_data: List[Dict[str, str]],
    output_dir: str = GEN_LORA_OUT_DIR,
    base_model_name: str = GEN_BASE_MODEL,
    base_sys_prompt: str = BASE_SYS_PROMPT,
    max_length: int = MAX_LEN_GEN,
    batch_size: int = GEN_BATCH_SIZE,
    num_epochs: int = GEN_NUM_EPOCHS,
    lr: float = GEN_LR,
):
    # 1) tokenizer, base model
    tokenizer = AutoTokenizer.from_pretrained(base_model_name)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    base_model = AutoModelForCausalLM.from_pretrained(
        base_model_name,
        torch_dtype=torch.bfloat16,
        device_map="auto",
    )

    # 2) LoRA 래핑
    peft_config = LoraConfig(
        r=16,
        lora_alpha=32,
        lora_dropout=0.1,
        bias="none",
        task_type="CAUSAL_LM",
    )
    model = get_peft_model(base_model, peft_config)
    print(
        "[Generator] LoRA trainable params:",
        sum(p.numel() for p in model.parameters() if p.requires_grad),
    )

    # 3) Dataset 구성
    train_dataset = GeneratorSFTDataset(
        train_data,
        tokenizer=tokenizer,
        max_length=max_length,
        base_sys_prompt=base_sys_prompt,
    )
    val_dataset = GeneratorSFTDataset(
        val_data,
        tokenizer=tokenizer,
        max_length=max_length,
        base_sys_prompt=base_sys_prompt,
    )

    data_collator = DataCollatorForLanguageModeling(
        tokenizer=tokenizer,
        mlm=False,
    )

    # 4) TrainingArguments
    training_args = TrainingArguments(
        output_dir=output_dir,
        per_device_train_batch_size=batch_size,
        num_train_epochs=num_epochs,
        learning_rate=lr,
        logging_steps=GEN_LOG_STEPS,
        save_strategy="epoch", 
        bf16=True,
        report_to="none",
    )

    # 5) Trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,   # val 데이터로 eval
        data_collator=data_collator,
    )

    # 6) 학습
    trainer.train()

    # 7) 학습 후 val set 평가
    metrics = trainer.evaluate()
    print("[Generator] Eval metrics:", metrics)
    if "eval_loss" in metrics:
        try:
            ppl = math.exp(metrics["eval_loss"])
            print(f"[Generator] Eval Perplexity ≈ {ppl:.3f}")
        except OverflowError:
            print("[Generator] Perplexity 계산 실패 (loss가 너무 큼).")

    # 8) LoRA + tokenizer 저장 (마지막 상태)
    trainer.model.save_pretrained(output_dir)
    tokenizer.save_pretrained(output_dir)
    print(f"[Generator LoRA] Saved to {output_dir}")


# ---- 4) main: 전체 플로우 ----

def main():
    # 1) CSV 로드
    csv_path = "data/중앙행정기관.csv" 
    wide = load_wide_from_csv(csv_path)

    # 2) Q/A 추출
    qa_data = build_qa_dataset_from_wide(wide)
    print(f"총 Q/A 개수: {len(qa_data)}")
    if len(qa_data) < 50:
        print("Q/A가 너무 적음. CSV/파싱 로직 다시 확인 필요.")
        return

    # 3) train/val/test 분리
    train_data, val_data, test_data = split_train_val_test(
        qa_data,
        val_ratio=0.1,
        test_ratio=0.1,
        seed=42,
    )
    print(f"Train: {len(train_data)}, Val: {len(val_data)}, Test: {len(test_data)}")

    # 4) LoRA 학습 (train으로 학습, val로 검증)
    train_lora_generator_with_eval(
        train_data=train_data,
        val_data=val_data,
        output_dir=GEN_LORA_OUT_DIR,
        base_model_name=GEN_BASE_MODEL,
        base_sys_prompt=BASE_SYS_PROMPT,
        max_length=MAX_LEN_GEN,
        batch_size=GEN_BATCH_SIZE,
        num_epochs=GEN_NUM_EPOCHS,
        lr=GEN_LR,
    )


if __name__ == "__main__":
    main()


총 Q/A 개수: 4447
Train: 3559, Val: 444, Test: 444


The model is already on multiple devices. Skipping the move to device specified in `args`.


[Generator] LoRA trainable params: 2252800


Step,Training Loss
100,1.2006
200,0.9622
300,0.9545
400,0.9279
500,0.9244
600,0.8988
700,0.8979
800,0.8944
900,0.8958
1000,0.8877


[Generator] Eval metrics: {'eval_loss': 0.8639466762542725, 'eval_runtime': 219.7461, 'eval_samples_per_second': 2.021, 'eval_steps_per_second': 0.255, 'epoch': 3.0}
[Generator] Eval Perplexity ≈ 2.373
[Generator LoRA] Saved to ./gen_lora


In [16]:
# ============================================================
# 0. 기본 import
# ============================================================
import os
from dataclasses import dataclass
from typing import List, Dict, Any, Optional

import numpy as np
import torch
import transformers
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    AutoModelForSequenceClassification,
)
from peft import PeftModel

# ============================================================
# 1. Bandit용 Arm 정의 + UCB 로지스틱 밴딧
# ============================================================

@dataclass
class PromptArmConfig:
    name: str
    description: str
    feature_vector: np.ndarray  # shape = (d,)


class LogisticPromptBandit:
    """
    프롬프트 전략 선택용 logistic bandit (UCB).
    """

    def __init__(
        self,
        arms: List[PromptArmConfig],
        lambda_reg: float = 1.0,
        eta: float = 0.1,
        beta: float = 1.0,
    ):
        self.arms = arms
        self.n_arms = len(arms)
        self.d = arms[0].feature_vector.shape[0]

        # arm별 파라미터 w, 정보 행렬 H
        self.w = np.zeros((self.n_arms, self.d))
        self.H = [lambda_reg * np.eye(self.d) for _ in range(self.n_arms)]

        self.eta = eta
        self.beta = beta

    def _sigmoid(self, z: float) -> float:
        return 1.0 / (1.0 + np.exp(-z))

    def select_arm(self) -> int:
        """
        UCB 기반 arm 선택
        """
        scores = []
        for a_idx, arm in enumerate(self.arms):
            x = arm.feature_vector
            mean = float(self.w[a_idx].dot(x))
            invH = np.linalg.inv(self.H[a_idx])
            var = float(x.T @ invH @ x)
            ucb = self.beta * np.sqrt(max(var, 1e-12))
            scores.append(mean + ucb)
        return int(np.argmax(scores))

    def update(self, arm_idx: int, reward: float):
        """
        선택한 arm에 대해 로지스틱 회귀 gradient 업데이트
        """
        x = self.arms[arm_idx].feature_vector
        z = float(self.w[arm_idx].dot(x))
        p = self._sigmoid(z)  # 예측 성공 확률

        grad = -(reward - p) * x  # 로지스틱 회귀 gradient
        self.H[arm_idx] += np.outer(x, x)
        invH = np.linalg.inv(self.H[arm_idx])
        step = self.eta * (invH @ grad)
        self.w[arm_idx] -= step

    def explain_current_strategy(self) -> List[str]:
        lines = []
        for a_idx, arm in enumerate(self.arms):
            x = arm.feature_vector
            z = float(self.w[a_idx].dot(x))
            p = self._sigmoid(z)
            lines.append(
                f"[{arm.name}] 예상 성공도 ≈ {p:.3f} | "
                f"feature={np.round(x, 2)} | 설명: {arm.description}"
            )
        return lines


def build_default_arms() -> List[PromptArmConfig]:
    """
    기본 3가지 전략 프로필 정의
    """
    return [
        PromptArmConfig(
            name="법률-정책 최우선",
            description="법령, 조례, 내부지침과의 일치성을 최우선으로 하고 책임 범위를 분명히 하는 보수적 답변.",
            feature_vector=np.array([0.9, 0.6, 0.4, 0.8, 0.7]),
        ),
        PromptArmConfig(
            name="민원인 공감형",
            description="민원인의 감정과 상황을 공감하며, 이해하기 쉬운 언어로 절차와 한계를 설명.",
            feature_vector=np.array([0.6, 0.7, 0.9, 0.3, 0.5]),
        ),
        PromptArmConfig(
            name="해결책 제안형",
            description="현실적으로 가능한 대안과 행동 옵션을 최대한 제시하는 해결 중심 답변.",
            feature_vector=np.array([0.7, 0.9, 0.7, 0.4, 0.8]),
        ),
    ]


# ============================================================
# 2. 시스템 프롬프트 + arm 기반 user 프롬프트 구성
# ============================================================

BASE_SYS_PROMPT = (
    "당신은 한국 공공기관의 민원 처리 보조 AI입니다. "
    "사실과 법령에 기반해 답변하고, 정책과 조례를 임의로 확대 해석하지 마세요. "
    "항상 처리 가능 범위와 한계를 분명히 설명하고, "
    "답변 마지막에는 '실제 처리 여부는 담당 부서의 최종 판단에 따릅니다.'라고 명시하세요."
)


def build_prompt_from_arm(complaint_text: str, arm: PromptArmConfig) -> str:
    return (
        f"아래 민원에 대해 답변을 작성하되, 다음 전략 프로필을 따르세요.\n\n"
        f"[전략 이름]\n{arm.name}\n\n"
        f"[전략 설명]\n{arm.description}\n\n"
        f"[민원 내용]\n{complaint_text}\n\n"
        f"[추가 지침]\n"
        f"- 관련 법령/정책과의 일치성을 확인하고, 처리 가능/불가능을 명확히 구분하세요.\n"
        f"- 민원인이 이해하기 쉬운 구조로 답변하고, 필요한 경우 대안/절차를 제시하세요.\n"
        f"- 과도한 약속이나 확정적인 표현은 피하고, '담당 부서의 최종 판단'이 필요함을 밝혀주세요.\n"
    )


# ============================================================
# 3. Generator 래퍼 (TinyLlama + LoRA)
# ============================================================

class LlamaGeneratorWrapper:
    def __init__(self, base_model_name: str, lora_dir: Optional[str] = None):
        self.tokenizer = AutoTokenizer.from_pretrained(base_model_name)
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

        model = AutoModelForCausalLM.from_pretrained(
            base_model_name,
            torch_dtype=torch.bfloat16,
            device_map="auto",
        )

        if lora_dir is not None and os.path.isdir(lora_dir):
            model = PeftModel.from_pretrained(model, lora_dir)
            print(f"[Generator] LoRA loaded from {lora_dir}")
        else:
            print("[Generator] LoRA not found. Using base model only.")

        self.pipe = transformers.pipeline(
            "text-generation",
            model=model,
            tokenizer=self.tokenizer,
            device_map="auto",
        )

    def generate(self, system_prompt: str, user_prompt: str, max_new_tokens: int = 512) -> str:
        """
        Chat-style 입력으로 TinyLlama 호출
        """
        msgs = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ]

        out = self.pipe(
            msgs,
            max_new_tokens=max_new_tokens,
            do_sample=True,
            temperature=0.3,
            top_p=0.9,
        )[0]["generated_text"]

        # 일부 버전에서는 generated_text가 message list일 수 있음
        if isinstance(out, list):
            for m in out:
                if isinstance(m, dict) and m.get("role") == "assistant":
                    return m.get("content", "").strip()
        return str(out)


# ============================================================
# 4. Verifier 래퍼 (DistilBERT + LoRA(optional))
# ============================================================

class VerifierWrapper:
    """
    (질문, 답변) → [0,1] 품질 점수
    """

    def __init__(self, base_model_name: str, lora_dir: Optional[str] = None, device: str = "cpu"):
        self.device = device
        self.tokenizer = AutoTokenizer.from_pretrained(base_model_name)

        model = AutoModelForSequenceClassification.from_pretrained(
            base_model_name,
            num_labels=1,
            problem_type="regression",
            torch_dtype=torch.float32 if device == "cpu" else torch.bfloat16,
        )

        if lora_dir is not None and os.path.isdir(lora_dir):
            model = PeftModel.from_pretrained(model, lora_dir)
            print(f"[Verifier] LoRA loaded from {lora_dir}")
        else:
            print("[Verifier] LoRA not found. Using base model only.")

        self.model = model.to(self.device)
        self.model.eval()

    def evaluate(self, complaint: str, answer: str) -> float:
        text = f"[COMPLAINT]\n{complaint}\n\n[ANSWER]\n{answer}"
        enc = self.tokenizer(
            text,
            truncation=True,
            max_length=256,
            padding="max_length",
            return_tensors="pt",
        ).to(self.device)

        with torch.no_grad():
            logits = self.model(**enc).logits
        score = torch.sigmoid(logits)[0].item()  # [0,1]로 squash
        return float(max(0.0, min(1.0, score)))


# ============================================================
# 5. (간단 버전) MetaVerifier: LM 점수 + 전략 일치도
# ============================================================

class MetaVerifier:
    """
    - LM 기반 품질 점수: f_lm(Q, A)
    - 전략 일치도 점수: f_arm(A, arm)
    을 합쳐서 최종 reward 반환.
    (클러스터 기반 평가는 완전 온라인 모드에서는 사용 안 함)
    """

    def __init__(self, lm_verifier: VerifierWrapper):
        self.lm_verifier = lm_verifier

    def _arm_alignment(self, answer: str, arm: PromptArmConfig) -> float:
        """
        arm 전략과 답변 내용이 얼추 맞는지 간단한 키워드 매칭으로 평가.
        """
        txt = answer[:2000]

        if "법률-정책 최우선" in arm.name:
            keywords = ["법", "조항", "시행령", "조례", "규정"]
        elif "공감형" in arm.name:
            keywords = ["이해", "불편을 드려", "죄송", "공감", "도움이 되셨으면"]
        elif "해결책 제안형" in arm.name:
            keywords = ["대안", "조치", "절차", "신청", "문의", "방법"]
        else:
            keywords = []

        if not keywords:
            return 0.5

        hit = sum(1 for kw in keywords if kw in txt)
        return max(0.3, min(1.0, hit / max(1, len(keywords))))

    def evaluate(self, question: str, answer: str, arm: PromptArmConfig) -> float:
        # 1) LM 기반 품질
        s_lm = self.lm_verifier.evaluate(question, answer)

        # 2) 전략 일치도
        s_arm = self._arm_alignment(answer, arm)

        # 3) (옵션) 클러스터 일관성은 완전 온라인 모드에서는 사용 X → 상수로 둠
        s_cluster = 0.7

        reward = (
            0.5 * s_lm +
            0.3 * s_cluster +
            0.2 * s_arm
        )
        return float(max(0.0, min(1.0, reward)))


# ============================================================
# 6. 질문 정제 stub (나중에 TinyLlama로 개선 가능)
# ============================================================

def refine_question_with_cluster_hints(
    raw_question: str,
) -> str:
    """
    완전 온라인 버전에서는 일단 그대로 반환.
    (원하면 여기서 '질문 요약/구조화'용 LLM 한 번 더 태울 수 있음)
    """
    return raw_question.strip()


# ============================================================
# 7. PromptBanditEngine: 전체 파이프라인 묶기
# ============================================================

class PromptBanditEngine:
    def __init__(
        self,
        arms: List[PromptArmConfig],
        generator: LlamaGeneratorWrapper,
        lm_verifier: VerifierWrapper,
    ):
        self.bandit = LogisticPromptBandit(arms=arms)
        self.generator = generator
        self.meta_verifier = MetaVerifier(lm_verifier=lm_verifier)

    def step(self, raw_complaint: str) -> Dict[str, Any]:
        # 1) bandit으로 arm 선택
        arm_idx = self.bandit.select_arm()
        arm = self.bandit.arms[arm_idx]

        # 2) (옵션) 질문 정제
        refined_q = refine_question_with_cluster_hints(raw_complaint)

        # 3) 프롬프트 생성 + 답변
        user_prompt = build_prompt_from_arm(refined_q, arm)
        answer = self.generator.generate(BASE_SYS_PROMPT, user_prompt)

        # 4) 메타-verifier로 reward 계산
        reward = self.meta_verifier.evaluate(refined_q, answer, arm)

        # 5) bandit 업데이트
        self.bandit.update(arm_idx, reward)

        return {
            "arm_idx": arm_idx,
            "arm_name": arm.name,
            "arm_description": arm.description,
            "question": refined_q,
            "answer": answer,
            "reward": reward,
            "strategy_view": self.bandit.explain_current_strategy(),
        }

def evaluate_batch(engine: PromptBanditEngine, test_data, out_csv="batch_results.csv"):
    logs = []
    for item in test_data:
        q = item["question"]
        out = engine.step(q)
        logs.append({
            "question": q,
            "answer": out["answer"],
            "reward": out["reward"],
            "arm": out["arm_name"],
        })
    df = pd.DataFrame(logs)
    df.to_csv(out_csv, index=False)
    return df


# ============================================================
# 8. 인터랙티브 루프 + 온라인 엔진 빌더
# ============================================================

def interactive_loop(engine: PromptBanditEngine):
    """
    터미널/Colab에서 바로 돌리는 인터랙티브 모드
    """
    print("=== 민원 질문 입력 (종료: 빈 줄 + 엔터) ===")
    while True:
        q = input("\n[민원] > ").strip()
        if not q:
            print("종료합니다.")
            break

        out = engine.step(q)

        print(f"\n[선택된 전략] {out['arm_name']}")
        print("\n[답변]\n", out["answer"][:2000])
        print(f"\n[Meta-Reward] {out['reward']:.3f}")
        print("\n[밴딧 전략 상태]")
        for line in out["strategy_view"]:
            print(" ", line)


def build_online_engine(
    gen_base_model: str = "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
    gen_lora_dir: str = "./gen_lora",
    ver_base_model: str = "distilbert-base-multilingual-cased",
    ver_lora_dir: str = "./verifier_lora",
    device: str = "cpu",
) -> PromptBanditEngine:
    """
    완전 온라인 서비스용 엔진 빌드 함수
    (학습된 LoRA 디렉토리만 있으면 바로 사용)
    """
    generator = LlamaGeneratorWrapper(
        base_model_name=gen_base_model,
        lora_dir=gen_lora_dir,
    )
    verifier = VerifierWrapper(
        base_model_name=ver_base_model,
        lora_dir=ver_lora_dir,
        device=device,
    )
    arms = build_default_arms()
    engine = PromptBanditEngine(
        arms=arms,
        generator=generator,
        lm_verifier=verifier,
    )
    return engine


# ============================================================
# 9. main: 완전 온라인 서비스 entry point
# ============================================================

def main():
    """
    - ./gen_lora 에서 Generator LoRA 불러옴
    - ./verifier_lora 가 있으면 Verifier LoRA도 같이 로드 (없으면 base만)
    - 이후 인터랙티브 루프 실행
    """
    engine = build_online_engine(
        gen_base_model="TinyLlama/TinyLlama-1.1B-Chat-v1.0",
        gen_lora_dir="./gen_lora",
        ver_base_model="distilbert-base-multilingual-cased",
        ver_lora_dir="./verifier_lora",
        device="cpu",  
    )
    interactive_loop(engine)\
    
    test_df = pd.read_csv("test_data.csv")
    df_res = evaluate_batch(engine, test_df)
    print(df_res.head())


if __name__ == "__main__":
    main()


Device set to use cuda:0


[Generator] LoRA loaded from ./gen_lora


Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-multilingual-cased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


[Verifier] LoRA not found. Using base model only.
=== 민원 질문 입력 (종료: 빈 줄 + 엔터) ===

[선택된 전략] 해결책 제안형

[답변]
 [첨단 태양광 기술 및 노동 전문 업체]

[민원 내용]

현황 및 문제점 교량주변은 밤에도 밝아서 산책하기에 좋고 미적으로도 아름다움을 주지만 교량밑은 어둡고 컴컴해 위험하고 또 칙칙하다는 느낌을 주기도 해 경관 상 안좋다는 문제가 있다 개선방안 교량밑에 미디어아트 설치한다 기대효과 시각적 효과 상승 및 미적경관 향상

[추가 지침]
- 관련 법령/정책과의 일치성을 확인하고, 처리 가능/불가능을 명확히 구분하세요.
- 민원인이 이해하기 쉬운 구조로 답변하고, 필요한 경우 대안/절차를 제시하세요.
- 과도한 약속이나 확정적인 표현은 피하고, '담당 부서의 최종 판단'이 필요함을 밝혀주세요.

[민원 내용

[Meta-Reward] 0.524

[밴딧 전략 상태]
  [법률-정책 최우선] 예상 성공도 ≈ 0.500 | feature=[0.9 0.6 0.4 0.8 0.7] | 설명: 법령, 조례, 내부지침과의 일치성을 최우선으로 하고 책임 범위를 분명히 하는 보수적 답변.
  [민원인 공감형] 예상 성공도 ≈ 0.500 | feature=[0.6 0.7 0.9 0.3 0.5] | 설명: 민원인의 감정과 상황을 공감하며, 이해하기 쉬운 언어로 절차와 한계를 설명.
  [해결책 제안형] 예상 성공도 ≈ 0.500 | feature=[0.7 0.9 0.7 0.4 0.8] | 설명: 현실적으로 가능한 대안과 행동 옵션을 최대한 제시하는 해결 중심 답변.
종료합니다.


FileNotFoundError: [Errno 2] No such file or directory: 'test_data.csv'

In [28]:
import os

os.chdir("/content/complaint_system_AI")
print("CWD:", os.getcwd())
print(os.listdir("."))
print("\n[gen_lora 내용]")
print(os.listdir("gen_lora"))


CWD: /content/complaint_system_AI
['system_test.ipynb', 'Complaint data.zip', 'data', 'complain_data.zip', 'gen_lora', 'complain_quality_shap.py', '.git', 'README.md', 'gen_lora.zip', 'bandit_prompt_system.py']

[gen_lora 내용]
['tokenizer_config.json', 'checkpoint-1780', 'checkpoint-890', 'chat_template.jinja', 'tokenizer.model', 'tokenizer.json', 'adapter_config.json', 'special_tokens_map.json', 'adapter_model.safetensors', 'README.md', 'checkpoint-2670']


In [30]:
!rm -f gen_lora.zip

In [31]:
%cd /content/complaint_system_AI
!zip -r gen_lora.zip gen_lora

/content/complaint_system_AI
  adding: gen_lora/ (stored 0%)
  adding: gen_lora/tokenizer_config.json (deflated 69%)
  adding: gen_lora/checkpoint-1780/ (stored 0%)
  adding: gen_lora/checkpoint-1780/tokenizer_config.json (deflated 69%)
  adding: gen_lora/checkpoint-1780/training_args.bin (deflated 53%)
  adding: gen_lora/checkpoint-1780/scheduler.pt (deflated 61%)
  adding: gen_lora/checkpoint-1780/trainer_state.json (deflated 72%)
  adding: gen_lora/checkpoint-1780/chat_template.jinja (deflated 60%)
  adding: gen_lora/checkpoint-1780/tokenizer.model (deflated 55%)
  adding: gen_lora/checkpoint-1780/tokenizer.json (deflated 85%)
  adding: gen_lora/checkpoint-1780/adapter_config.json (deflated 57%)
  adding: gen_lora/checkpoint-1780/rng_state.pth (deflated 26%)
  adding: gen_lora/checkpoint-1780/special_tokens_map.json (deflated 79%)
  adding: gen_lora/checkpoint-1780/adapter_model.safetensors (deflated 8%)
  adding: gen_lora/checkpoint-1780/optimizer.pt (deflated 8%)
  adding: gen_lor

In [32]:
!ls -lh gen_lora.zip

-rw-r--r-- 1 root root 83M Dec  1 16:45 gen_lora.zip


In [35]:
!ls -lh /content/complaint_system_AI/gen_lora.zip

-rw-r--r-- 1 root root 83M Dec  1 16:45 /content/complaint_system_AI/gen_lora.zip


In [None]:
!ls -al /content/complaint_system_AI


total 110084
drwxr-xr-x 5 root root     4096 Dec  1 16:18  .
drwxr-xr-x 1 root root     4096 Dec  1 12:11  ..
-rw-r--r-- 1 root root    17137 Dec  1 12:12  bandit_prompt_system.py
-rw-r--r-- 1 root root 13075801 Dec  1 12:12  complain_data.zip
-rw-r--r-- 1 root root    15137 Dec  1 12:12  complain_quality_shap.py
-rw-r--r-- 1 root root 13075801 Dec  1 12:12 'Complaint data.zip'
drwxr-xr-x 2 root root     4096 Dec  1 12:12  data
drwxr-xr-x 5 root root     4096 Dec  1 15:53  gen_lora
-rw-r--r-- 1 root root 86402045 Dec  1 16:18  gen_lora.zip
drwxr-xr-x 8 root root     4096 Dec  1 12:12  .git
-rw-r--r-- 1 root root      978 Dec  1 12:12  README.md
-rw-r--r-- 1 root root    98443 Dec  1 12:12  system_test.ipynb


In [25]:
from IPython.display import FileLink, display

zip_path = "/content/complaint_system_AI/gen_lora.zip"
display(FileLink(zip_path))

In [26]:
import os

print("현재 작업 디렉토리:", os.getcwd())
print("\n[현재 폴더 내용]")
print(os.listdir("."))

현재 작업 디렉토리: /content/complaint_system_AI

[현재 폴더 내용]
['system_test.ipynb', 'Complaint data.zip', 'data', 'complain_data.zip', 'gen_lora', 'complain_quality_shap.py', '.git', 'README.md', 'gen_lora.zip', 'bandit_prompt_system.py']


In [36]:
import os
os.getcwd()

'/content/complaint_system_AI'

In [41]:
import os, sys

print("CWD:", os.getcwd())
print("gen_lora.zip exists? ->", os.path.exists("/content/complaint_system_AI/gen_lora.zip"))
!ls -lh /content
!ls -lh /content/complaint_system_AI


CWD: /content/complaint_system_AI
gen_lora.zip exists? -> True
total 8.0K
drwxr-xr-x 5 root root 4.0K Dec  1 16:45 complaint_system_AI
drwxr-xr-x 1 root root 4.0K Nov 20 14:30 sample_data
total 108M
-rw-r--r-- 1 root root  17K Dec  1 12:12  bandit_prompt_system.py
-rw-r--r-- 1 root root  13M Dec  1 12:12  complain_data.zip
-rw-r--r-- 1 root root  15K Dec  1 12:12  complain_quality_shap.py
-rw-r--r-- 1 root root  13M Dec  1 12:12 'Complaint data.zip'
drwxr-xr-x 2 root root 4.0K Dec  1 12:12  data
drwxr-xr-x 5 root root 4.0K Dec  1 15:53  gen_lora
-rw-r--r-- 1 root root  83M Dec  1 16:45  gen_lora.zip
-rw-r--r-- 1 root root  978 Dec  1 12:12  README.md
-rw-r--r-- 1 root root  97K Dec  1 12:12  system_test.ipynb


In [39]:
import sys
print('google.colab' in sys.modules)

True


In [45]:
!pip install google.colab



In [50]:
%cd /content/complaint_system_AI

!git config --global user.email "spikasta@snu.ac.kr"
!git config --global user.name "wonnowone"
# 혹시 git 설정 안 돼 있으면 remote 확인
!git remote -v

# zip 파일 add
!git add gen_lora.zip

# 커밋
!git commit -m "Add gen_lora LoRA checkpoint"

# 푸시 (브랜치 이름은 너 repo에 맞춰: main / master / 등)
!git push origin main


/content/complaint_system_AI
origin	https://github.com/aivle-agent/complaint_system_AI.git (fetch)
origin	https://github.com/aivle-agent/complaint_system_AI.git (push)
On branch main
Your branch is ahead of 'origin/main' by 1 commit.
  (use "git push" to publish your local commits)

Untracked files:
  (use "git add <file>..." to include in what will be committed)
	[31mdata/[m
	[31mgen_lora/[m

nothing added to commit but untracked files present (use "git add" to track)
fatal: could not read Username for 'https://github.com': No such device or address


In [None]:
# =====================================================================
# 0. 기본 import & 공통 유틸
# =====================================================================
import os
import re
import json
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Tuple

import numpy as np
import pandas as pd
from tqdm.auto import tqdm

import torch
from torch.utils.data import Dataset

import transformers
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    AutoModelForSequenceClassification,
    DataCollatorForLanguageModeling,
    Trainer,
    TrainingArguments,
)

from peft import LoraConfig, get_peft_model, PeftModel

from sklearn.cluster import DBSCAN
from sentence_transformers import SentenceTransformer


# =====================================================================
# 1. 데이터 전처리: CSV → (question, answer) 리스트
# =====================================================================

def clean_text(x: Any) -> str:
    """전화번호, 주민번호, 이름 패턴 등을 마스킹하고 strip."""
    if not isinstance(x, str):
        return ""
    x = re.sub(r"\d{2,3}-\d{3,4}-\d{4}", "[TEL]", x)
    x = re.sub(r"\d{6}-\d{7}", "[RRN]", x)
    x = re.sub(r"[가-힣]{2,3}씨", "[NAME]", x)
    return x.strip()


def parse_question_answer(full_text: str) -> Tuple[str, str]:
    """
    중앙행정기관 데이터의 consulting_content에서
    Q/A 부분을 분리한다.

    대략적인 패턴:
      "제목 : ...\n\nQ : ...\n...\n\nA : ..."

    Q/A 구분이 애매한 경우엔 heuristics로 처리.
    """
    if not isinstance(full_text, str):
        return "", ""

    text = full_text.strip()

    # 1) "A :" 기준으로 먼저 자르기
    a_idx = text.find("\nA :")
    if a_idx == -1:
        a_idx = text.find("\n\nA :")
    if a_idx == -1:
        a_idx = text.find("A :")

    if a_idx != -1:
        q_part = text[:a_idx].strip()
        a_part = text[a_idx:].strip()
        a_part = re.sub(r"^A\s*:\s*", "", a_part, flags=re.MULTILINE).strip()
    else:
        q_idx = text.find("Q :")
        if q_idx != -1:
            q_part = text[q_idx:].strip()
            a_part = ""
        else:

            q_part, a_part = text, ""

    # Q : 제거
    q_part = re.sub(r"^제목\s*:\s*", "", q_part, flags=re.MULTILINE)
    q_part = re.sub(r"^Q\s*:\s*", "", q_part, flags=re.MULTILINE)

    q_part = clean_text(q_part)
    a_part = clean_text(a_part)
    return q_part, a_part


def load_wide_from_csv(csv_path: str) -> pd.DataFrame:
    df = pd.read_csv(csv_path)
    group_keys = ["source", "consulting_date", "consulting_category", "consulting_content"]

    wide = (
        df.pivot_table(
            index=group_keys,
            columns="classification_category",
            values="classification",
            aggfunc=lambda x: " / ".join(sorted(set(x)))
        )
        .reset_index()
    )

    wide = wide.rename(columns={
        "consulting_content": "full_text",
        "상담 주제": "topic",
        "상담 사유": "reason",
        "상담 결과": "outcome",
        "상담 요건": "requirement",
        "상담 내용": "summary",
    })
    return wide


def build_qa_dataset_from_wide(wide: pd.DataFrame) -> List[Dict[str, str]]:
    """
    wide DataFrame에서 question / answer 추출.
    """
    data = []
    for _, row in wide.iterrows():
        full_text = row.get("full_text", "")
        q, a = parse_question_answer(full_text)

        if len(q) < 5 or len(a) < 5:
            continue

        data.append({
            "question": q,
            "answer": a,
            "topic": row.get("topic", ""),
            "reason": row.get("reason", ""),
            "outcome": row.get("outcome", ""),
            "requirement": row.get("requirement", ""),
            "summary": row.get("summary", ""),
        })
    return data


def split_dataset(all_data: List[Dict[str, str]], test_ratio: float = 0.1):
    total = len(all_data)
    test_size = max(1, int(total * test_ratio))
    train_data = all_data[:-test_size]
    test_data = all_data[-test_size:]
    return train_data, test_data


# =====================================================================
# 2. LoRA Generator 학습용 Dataset & Trainer (SFT)
# =====================================================================

class GeneratorSFTDataset(Dataset):
    """
    Generator LoRA 학습용 SFT Dataset.
    prompt = 시스템 지침 + Q
    target = A
    """

    def __init__(self, data, tokenizer, max_length=512, base_sys_prompt=""):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.base_sys_prompt = base_sys_prompt

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        q = item["question"]
        a = item["answer"]

        # 간단한 chat-style 프롬프트 템플릿
        prompt = (
            f"<|system|>\n{self.base_sys_prompt}\n\n"
            f"<|user|>\n{q}\n\n"
            f"<|assistant|>\n"
        )
        # LM 학습은 prompt + answer 전체를 labels로 사용
        full_text = prompt + a

        enc = self.tokenizer(
            full_text,
            truncation=True,
            max_length=self.max_length,
            padding="max_length",
            return_tensors="pt",
        )

        input_ids = enc["input_ids"][0]
        attention_mask = enc["attention_mask"][0]

        # causal LM 이므로 labels = input_ids
        labels = input_ids.clone()
        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "labels": labels,
        }


def train_lora_generator(
    train_data,
    output_dir="./gen_lora",
    base_model_name="TinyLlama/TinyLlama-1.1B-Chat-v1.0",
    base_sys_prompt="당신은 한국 공공기관의 민원 답변을 작성하는 공무원 보조 AI입니다.",
    max_length=512,
    batch_size=2,
    num_epochs=1,
    lr=5e-5,
):
    """
    Generator LoRA SFT 학습 스켈레톤.
    실제로는 epoch, lr, batch_size 등을 상황에 맞게 조정.
    """
    tokenizer = AutoTokenizer.from_pretrained(base_model_name)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    base_model = AutoModelForCausalLM.from_pretrained(
        base_model_name,
        torch_dtype=torch.bfloat16,
        device_map="auto",
    )

    # LoRA 설정 
    peft_config = LoraConfig(
        r=16,
        lora_alpha=32,
        lora_dropout=0.1,
        bias="none",
        task_type="CAUSAL_LM",
    )
    model = get_peft_model(base_model, peft_config)

    train_dataset = GeneratorSFTDataset(
        train_data,
        tokenizer=tokenizer,
        max_length=max_length,
        base_sys_prompt=base_sys_prompt,
    )

    data_collator = DataCollatorForLanguageModeling(
        tokenizer=tokenizer,
        mlm=False,
    )

    training_args = TrainingArguments(
        output_dir=output_dir,
        per_device_train_batch_size=batch_size,
        num_train_epochs=num_epochs,
        learning_rate=lr,
        logging_steps=10,
        save_strategy="epoch",
        bf16=True,
        report_to="none",
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        data_collator=data_collator,
        train_dataset=train_dataset,
    )

    trainer.train()

    # LoRA 어댑터만 저장
    model.save_pretrained(output_dir)
    tokenizer.save_pretrained(output_dir)
    print(f"[Generator LoRA] Saved to {output_dir}")


# =====================================================================
# 3. LoRA Verifier 학습용 Dataset & Trainer (회귀)
# =====================================================================

class VerifierDataset(Dataset):
    """
    (질문 + 답변) -> quality_score(0~1) 회귀용 Dataset.
    """

    def __init__(self, data, tokenizer, max_length=256):
        self.tokenizer = tokenizer
        self.max_length = max_length

        texts = []
        labels = []

        for item in data:
            q = item["question"]
            a = item["answer"]

            # 간단한 텍스트 결합
            text = f"[COMPLAINT]\n{q}\n\n[ANSWER]\n{a}"
            texts.append(text)

            L = len(a)
            score = max(0.3, min(1.0, L / 1500.0))
            labels.append(score)

        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        enc = self.tokenizer(
            self.texts[idx],
            truncation=True,
            max_length=self.max_length,
            padding="max_length",
            return_tensors="pt",
        )
        input_ids = enc["input_ids"][0]
        attention_mask = enc["attention_mask"][0]
        label = torch.tensor(self.labels[idx], dtype=torch.float)

        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "labels": label,
        }


def train_lora_verifier(
    train_data,
    output_dir="./verifier_lora",
    base_model_name="distilbert-base-multilingual-cased",
    max_length=256,
    batch_size=8,
    num_epochs=1,
    lr=2e-5,
):
    """
    Verifier LoRA 회귀 학습 스켈레톤.
    지금은 pseudo label 사용. 나중에 실제 품질 라벨로 교체.
    """
    tokenizer = AutoTokenizer.from_pretrained(base_model_name)
    base_model = AutoModelForSequenceClassification.from_pretrained(
        base_model_name,
        num_labels=1,
        problem_type="regression",
        torch_dtype=torch.bfloat16,
        device_map="auto",
    )

    peft_config = LoraConfig(
        r=8,
        lora_alpha=16,
        lora_dropout=0.1,
        bias="none",
        task_type="SEQ_CLS",
    )
    model = get_peft_model(base_model, peft_config)

    train_dataset = VerifierDataset(
        train_data,
        tokenizer=tokenizer,
        max_length=max_length,
    )

    training_args = TrainingArguments(
        output_dir=output_dir,
        per_device_train_batch_size=batch_size,
        num_train_epochs=num_epochs,
        learning_rate=lr,
        logging_steps=10,
        save_strategy="epoch",
        bf16=True,
        report_to="none",
    )

    def compute_metrics(eval_pred):
        return {}

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics,
    )

    trainer.train()

    model.save_pretrained(output_dir)
    tokenizer.save_pretrained(output_dir)
    print(f"[Verifier LoRA] Saved to {output_dir}")


# =====================================================================
# 4. DBSCAN 기반 질문 클러스터러 
# =====================================================================

class ComplaintClusterer:
    def __init__(self, model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"):
        self.embedder = SentenceTransformer(model_name)
        self.dbscan = DBSCAN(eps=0.7, min_samples=5, metric="cosine")
        self.embeddings = None
        self.labels = None

    def fit(self, texts: List[str]):
        self.embeddings = self.embedder.encode(texts, convert_to_numpy=True, show_progress_bar=True)
        self.dbscan.fit(self.embeddings)
        self.labels = self.dbscan.labels_
        print("클러스터 개수(노이즈 포함):", len(set(self.labels)))

    def describe_cluster(self, k: int, texts: List[str], topn: int = 5) -> List[str]:
        if self.labels is None:
            return []
        idxs = [i for i, lab in enumerate(self.labels) if lab == k]
        return [texts[i][:200] for i in idxs[:topn]]

    def similar_examples(self, text: str, texts: List[str], topn: int = 5) -> List[str]:
        if self.embeddings is None:
            return []
        v = self.embedder.encode([text], convert_to_numpy=True)[0]
        sims = self.embeddings @ v / (np.linalg.norm(self.embeddings, axis=1) * np.linalg.norm(v) + 1e-8)
        idxs = np.argsort(-sims)[:topn]
        return [texts[i][:200] for i in idxs]


def refine_question_with_cluster_hints(
    raw_question: str,
    clusterer: ComplaintClusterer,
    train_questions: List[str],
    generator_model_id: str = "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
) -> str:
    """
    Cluster 힌트를 이용해서 질문을 정제하는 LLM 호출 (스켈레톤).
    실제로는 TinyLlama나 다른 경량 모델로 '질문 개선'만 수행.
    여기서는 placeholder: 그냥 raw_question 반환.
    """
    
    return raw_question.strip()


# 5. Bandit + Generator + Verifier 엔진


@dataclass
class PromptArmConfig:
    name: str
    description: str
    feature_vector: np.ndarray  # shape = (d,)


class LogisticPromptBandit:
    """
    프롬프트 전략 선택용 logistic bandit (UCB).
    """
    def __init__(
        self,
        arms: List[PromptArmConfig],
        lambda_reg: float = 1.0,
        eta: float = 0.1,
        beta: float = 1.0,
    ):
        self.arms = arms
        self.n_arms = len(arms)
        self.d = arms[0].feature_vector.shape[0]

        self.w = np.zeros((self.n_arms, self.d))
        self.H = [lambda_reg * np.eye(self.d) for _ in range(self.n_arms)]

        self.eta = eta
        self.beta = beta

    def _sigmoid(self, z: float) -> float:
        return 1.0 / (1.0 + np.exp(-z))

    def select_arm(self) -> int:
        scores = []
        for a_idx, arm in enumerate(self.arms):
            x = arm.feature_vector
            mean = float(self.w[a_idx].dot(x))
            invH = np.linalg.inv(self.H[a_idx])
            var = float(x.T @ invH @ x)
            ucb = self.beta * np.sqrt(max(var, 1e-12))
            scores.append(mean + ucb)
        return int(np.argmax(scores))

    def update(self, arm_idx: int, reward: float):
        x = self.arms[arm_idx].feature_vector
        z = float(self.w[arm_idx].dot(x))
        p = self._sigmoid(z)
        grad = -(reward - p) * x
        self.H[arm_idx] += np.outer(x, x)
        invH = np.linalg.inv(self.H[arm_idx])
        step = self.eta * (invH @ grad)
        self.w[arm_idx] -= step

    def explain_current_strategy(self) -> List[str]:
        lines = []
        for a_idx, arm in enumerate(self.arms):
            x = arm.feature_vector
            z = float(self.w[a_idx].dot(x))
            p = self._sigmoid(z)
            lines.append(
                f"[{arm.name}] 예상 성공도 ≈ {p:.3f} | "
                f"feature={np.round(x, 2)} | 설명: {arm.description}"
            )
        return lines


def build_default_arms() -> List[PromptArmConfig]:
    return [
        PromptArmConfig(
            name="법률-정책 최우선",
            description="법령, 조례, 내부지침과의 일치성을 최우선으로 하고 책임 범위를 분명히 하는 보수적 답변.",
            feature_vector=np.array([0.9, 0.6, 0.4, 0.8, 0.7]),
        ),
        PromptArmConfig(
            name="민원인 공감형",
            description="민원인의 감정과 상황을 공감하며, 이해하기 쉬운 언어로 절차와 한계를 설명.",
            feature_vector=np.array([0.6, 0.7, 0.9, 0.3, 0.5]),
        ),
        PromptArmConfig(
            name="해결책 제안형",
            description="현실적으로 가능한 대안과 행동 옵션을 최대한 제시하는 해결 중심 답변.",
            feature_vector=np.array([0.7, 0.9, 0.7, 0.4, 0.8]),
        ),
    ]


BASE_SYS_PROMPT = (
    "당신은 한국 공공기관의 민원 처리 보조 AI입니다. "
    "사실과 법령에 기반해 답변하고, 정책과 조례를 임의로 확대 해석하지 마세요. "
    "항상 처리 가능 범위와 한계를 분명히 설명하고, "
    "답변 마지막에는 '실제 처리 여부는 담당 부서의 최종 판단에 따릅니다.'라고 명시하세요."
)


def build_prompt_from_arm(complaint_text: str, arm: PromptArmConfig) -> str:
    return (
        f"아래 민원에 대해 답변을 작성하되, 다음 전략 프로필을 따르세요.\n\n"
        f"[전략 이름]\n{arm.name}\n\n"
        f"[전략 설명]\n{arm.description}\n\n"
        f"[민원 내용]\n{complaint_text}\n\n"
        f"[추가 지침]\n"
        f"- 관련 법령/정책과의 일치성을 확인하고, 처리 가능/불가능을 명확히 구분하세요.\n"
        f"- 민원인이 이해하기 쉬운 구조로 답변하고, 필요한 경우 대안/절차를 제시하세요.\n"
        f"- 과도한 약속이나 확정적인 표현은 피하고, '담당 부서의 최종 판단'이 필요함을 밝혀주세요.\n"
    )


class LlamaGeneratorWrapper:
    def __init__(self, base_model_name, lora_dir: Optional[str] = None):
        self.tokenizer = AutoTokenizer.from_pretrained(base_model_name)
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

        model = AutoModelForCausalLM.from_pretrained(
            base_model_name,
            torch_dtype=torch.bfloat16,
            device_map="auto",
        )
        if lora_dir is not None and os.path.isdir(lora_dir):
            model = PeftModel.from_pretrained(model, lora_dir)
            print(f"[Generator] LoRA loaded from {lora_dir}")
        else:
            print("[Generator] LoRA not found. Using base model only.")

        self.pipe = transformers.pipeline(
            "text-generation",
            model=model,
            tokenizer=self.tokenizer,
            device_map="auto",
        )

    def generate(self, system_prompt: str, user_prompt: str, max_new_tokens=512) -> str:
        msgs = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ]
        out = self.pipe(
            msgs,
            max_new_tokens=max_new_tokens,
            do_sample=True,
            temperature=0.3,
            top_p=0.9,
        )[0]["generated_text"]

        if isinstance(out, list):
            for m in out:
                if isinstance(m, dict) and m.get("role") == "assistant":
                    return m.get("content", "").strip()
        return str(out)
    
class VerifierWrapper:
    def __init__(self, base_model_name, lora_dir: Optional[str] = None, device="cpu"):
        self.device = device
        self.tokenizer = AutoTokenizer.from_pretrained(base_model_name)
        model = AutoModelForSequenceClassification.from_pretrained(
            base_model_name,
            num_labels=1,
            problem_type="regression",
            torch_dtype=torch.float32 if device == "cpu" else torch.bfloat16,
        )
        if lora_dir is not None and os.path.isdir(lora_dir):
            model = PeftModel.from_pretrained(model, lora_dir)
            print(f"[Verifier] LoRA loaded from {lora_dir}")
        else:
            print("[Verifier] LoRA not found. Using base model only.")

        self.model = model.to(self.device)
        self.model.eval()

    def evaluate(self, complaint: str, answer: str) -> float:
        text = f"[COMPLAINT]\n{complaint}\n\n[ANSWER]\n{answer}"
        enc = self.tokenizer(
            text,
            truncation=True,
            max_length=256,
            padding="max_length",
            return_tensors="pt",
        ).to(self.device)

        with torch.no_grad():
            logits = self.model(**enc).logits
        score = torch.sigmoid(logits)[0].item()
        return float(max(0.0, min(1.0, score)))

class MetaVerifier:
    """
    - LM 기반 품질 점수: f_lm(Q, A)
    - 클러스터 일관성 점수: f_cluster(Q, A, clusterer)
    - 전략 일치도 점수: f_arm(A, arm)
    을 합쳐서 최종 reward 반환.
    """

    def __init__(self, lm_verifier: VerifierWrapper,
                 clusterer: Optional[ComplaintClusterer] = None,
                 train_questions: Optional[List[str]] = None):
        self.lm_verifier = lm_verifier
        self.clusterer = clusterer
        self.train_questions = train_questions or []

    def _cluster_consistency(self, question: str, answer: str) -> float:
        """
        DBSCAN 기반으로 '이 답변이 이 질문 유형에 잘 맞는지'를
        아주 거친 형태로 점수화하는 자리.
        지금은 placeholder로 0.5~1.0 사이 정도로만 두고,
        나중에 sentence-transformers로 Q/A 임베딩 비교해서 넣자.
        """
        if self.clusterer is None or self.clusterer.embeddings is None:
            return 0.7 

        # Q 임베딩 → 가까운 클러스터/예시 Q들 → A도 embed해서 유사도 비교
        return 0.7

    def _arm_alignment(self, answer: str, arm: PromptArmConfig) -> float:
        """
        arm 전략과 답변 내용이 얼추 맞는지 간단한 키워드 매칭으로 평가.     
        """
        txt = answer[:2000]

        if "법률-정책 최우선" in arm.name:
            keywords = ["법", "조항", "시행령", "조례", "규정"]
        elif "공감형" in arm.name:
            keywords = ["이해", "불편을 드려", "죄송", "공감", "도움이 되셨으면"]
        elif "해결책 제안형" in arm.name:
            keywords = ["대안", "조치", "절차", "신청", "문의", "방법"]
        else:
            keywords = []

        if not keywords:
            return 0.5

        hit = sum(1 for kw in keywords if kw in txt)
        return max(0.3, min(1.0, hit / max(1, len(keywords))))

    def evaluate(self, question: str, answer: str, arm: PromptArmConfig) -> float:
        # 1) LM 기반 품질
        s_lm = self.lm_verifier.evaluate(question, answer)

        # 2) 클러스터 일관성
        s_cluster = self._cluster_consistency(question, answer)

        # 3) 전략 일치도
        s_arm = self._arm_alignment(answer, arm)

        # 4) 가중합으로 최종 reward
        reward = (
            0.5 * s_lm +
            0.3 * s_cluster +
            0.2 * s_arm
        )
        return float(max(0.0, min(1.0, reward)))


class PromptBanditEngine:
    def __init__(
        self,
        arms: List[PromptArmConfig],
        generator: LlamaGeneratorWrapper,
        lm_verifier: VerifierWrapper,
        clusterer: Optional[ComplaintClusterer] = None,
        train_questions: Optional[List[str]] = None,
    ):
        self.bandit = LogisticPromptBandit(arms=arms)
        self.generator = generator
        self.meta_verifier = MetaVerifier(
            lm_verifier=lm_verifier,
            clusterer=clusterer,
            train_questions=train_questions,
        )

    def step(self, raw_complaint: str) -> Dict[str, Any]:
        arm_idx = self.bandit.select_arm()
        arm = self.bandit.arms[arm_idx]

        refined_q = refine_question_with_cluster_hints(raw_complaint, self.meta_verifier.clusterer, ...)

        user_prompt = build_prompt_from_arm(refined_q, arm)
        answer = self.generator.generate(BASE_SYS_PROMPT, user_prompt)

        reward = self.meta_verifier.evaluate(refined_q, answer, arm)

        self.bandit.update(arm_idx, reward)

        return {
            "arm_idx": arm_idx,
            "arm_name": arm.name,
            "answer": answer,
            "reward": reward,
            "strategy_view": self.bandit.explain_current_strategy(),
        }


# 6. 배치 평가 + 인터랙티브 질의


def evaluate_batch(engine: PromptBanditEngine, test_data, out_csv="batch_results_sample.csv"):
    logs = []
    for item in tqdm(test_data):
        q = item["question"]
        out = engine.step(q)
        logs.append({
            "question": q,
            "answer": out["answer"],
            "reward": out["reward"],
            "arm": out["arm_name"],
        })
    df = pd.DataFrame(logs)
    df.to_csv(out_csv, index=False)
    return df


def interactive_loop(engine: PromptBanditEngine):
    """
    실시간 질의 응답 루프
    """
    print("=== 민원 질문 입력 (종료: 빈 줄 + 엔터) ===")
    while True:
        q = input("\n[민원] > ").strip()
        if not q:
            print("종료합니다.")
            break
        out = engine.step(q)
        print(f"\n[선택된 전략] {out['arm_name']}")
        print("\n[답변]\n", out["answer"][:1000])
        print(f"\n[Verifier score / reward] {out['reward']:.3f}")
        print("\n[전략 상태]")
        for line in out["strategy_view"]:
            print(" ", line)

# main 함수

def main(
    csv_path: str = "중앙행정기관.csv",
    train_gen_lora: bool = True,      # True로 두면 Generator LoRA 학습 수행
    train_ver_lora: bool = False,      # True로 두면 Verifier LoRA 학습 수행
    eval_samples: int = 100,            # test set에서 평가할 샘플 개수
):
    # 1) 데이터 로드 + Q/A 구축
    print(f"[MAIN] CSV 로드: {csv_path}")
    wide = load_wide_from_csv(csv_path)
    qa_data = build_qa_dataset_from_wide(wide)
    print(f"[MAIN] 총 Q/A 쌍 개수: {len(qa_data)}")

    if len(qa_data) < 20:
        print("[MAIN] 데이터가 너무 적습니다. CSV/파싱을 확인하세요.")
        return

    train_data, test_data = split_dataset(qa_data, test_ratio=0.1)
    print(f"[MAIN] Train: {len(train_data)}, Test: {len(test_data)}")

    # 2) LoRA 학습
    if train_gen_lora:
        print("\n[MAIN] >>> Generator LoRA 학습 시작")
        train_lora_generator(
            train_data,
            output_dir="./gen_lora",
            base_model_name="TinyLlama/TinyLlama-1.1B-Chat-v1.0",
        )
        print("[MAIN] >>> Generator LoRA 학습 완료")

    if train_ver_lora:
        print("\n[MAIN] >>> Verifier LoRA 학습 시작")
        train_lora_verifier(
            train_data,
            output_dir="./verifier_lora",
            base_model_name="distilbert-base-multilingual-cased",
        )
        print("[MAIN] >>> Verifier LoRA 학습 완료")

    # 3) DBSCAN 기반 질문 클러스터링
    train_questions = [d["question"] for d in train_data]
    clusterer = ComplaintClusterer()
    clusterer.fit(train_questions)

    # 4) Generator / Verifier / Bandit 엔진 초기화
    print("\n[MAIN] Generator / Verifier / Bandit 엔진 초기화")

    generator = LlamaGeneratorWrapper(
        base_model_name="TinyLlama/TinyLlama-1.1B-Chat-v1.0",
        lora_dir="./gen_lora",   
    )
    verifier = VerifierWrapper(
        base_model_name="distilbert-base-multilingual-cased",
        lora_dir="./verifier_lora", 
        device="cpu",             
    )

    arms = build_default_arms()
    engine = PromptBanditEngine(arms=arms, generator=generator, verifier=verifier)

    # 5) Test 일부 평가
    n_eval = min(eval_samples, len(test_data))
    print(f"\n[MAIN] >>> Test set 일부({n_eval}개)에 대한 평가 실행")
    df_res = evaluate_batch(engine, test_data[:n_eval], out_csv="batch_results_sample.csv")
    print(df_res.head())

    # 6) 실시간 질의 루프 (서비스 시뮬레이션)
    # interactive_loop(engine)


if __name__ == "__main__":
    # main(csv_path="중앙행정기관.csv", train_gen_lora=False, train_ver_lora=False, eval_samples=10)
    main(csv_path="중앙행정기관.csv", train_gen_lora=True, train_ver_lora=False, eval_samples=10)

    # main()
    
