# 할머니(마가렛) LoRA 학습 데이터 생성기 (OpenAI)

## 목적
어둡고 불안정한 동화 세계관 속 **생명력을 모두 빼앗긴 할머니 마가렛**의 발화 데이터를 OpenAI API로 생성한다.  
LoRA 미세조정 학습용 `{"input": ..., "output": ...}` JSONL 형식.

## 기존 rule-base 대비 차이점
- rule-base: 고정된 input/output 풀에서 랜덤 조합
- **OpenAI**: 의미 축 설명과 few-shot 예시를 기반으로 GPT가 자유롭게 생성 → 더 다양하고 자연스러운 표현

## 캐릭터 설정
- 생명력을 거의 빼앗겨 의식이 희미한 노인
- 플레이어가 생기를 나눠주면 일시적으로 의식이 돌아옴
- 말투: 쉰 목소리, 힘없고 떨리는 단문, 고어한 비유 (기름/바늘/실/뼈)
- **LoRA 학습 범위**: 어조(쉰 목소리, 고어한 비유)와 의미만. 말 끊김/파편화는 후처리(rule-base)에서 처리

## 의미 축 (5개)
| 축 | input 방향 | output 귀결 |
|---|---|---|
| 세계관 설명 | 생기를 나눠줌, 새엄마 동기 질문 | 새엄마가 왜 이러는지 파편적 진실 |
| 탈출 경고 | 탈출/도망 관련 행동 | 빨리 나가라는 파편적 경고 |
| 고어한 비유 | 상태/몸 관련 접근 | 몸이 부식되는 이미지, 기름/바늘/실 비유 |
| 과거 기억 | 과거 가족 언급 | 희미한 기억 파편 |
| 도움 요청 | 손잡기, 접근 | 나가달라는/도망쳐달라는 간청 |

## 생성 수량
- 세트당 400개 × 2세트 = 총 800개
- 모델: `gpt-4o-mini`
- 예상 비용: ~$0.08-0.12

In [None]:
!pip install -q openai tiktoken

In [None]:
import json
import random
import time
from pathlib import Path
from typing import List, Dict, Tuple
from collections import Counter

from openai import OpenAI
import tiktoken

## 1. API 키 설정

In [None]:
from google.colab import userdata

OPENAI_API_KEY = userdata.get("OPENAI_API_KEY")
client = OpenAI(api_key=OPENAI_API_KEY)

MODEL = "gpt-4o-mini"
print(f"모델: {MODEL}")
print("API 키 설정 완료.")

## 2. 프롬프트 설계

In [None]:
SYSTEM_PROMPT = """너는 게임 시나리오용 대화 데이터를 생성하는 작가다.

## 세계관
어둡고 불안정한 동화 세계. 새엄마가 집 안의 모든 생명력을 빼앗아 자신의 마법에 사용한다.
할머니 마가렛은 생명력을 거의 빼앗겨 의식이 희미한 채로 집 구석에 버려져 있다.

## 할머니(마가렛) 캐릭터
- 생명력을 거의 빼앗겨 몸이 쇠락한 노인.
- 가끔 플레이어가 생기를 나눠주면 의식이 일시적으로 돌아온다.
- 세계관의 진실, 탈출 경고, 고어한 비유를 파편적으로 말한다.
- 기름/바늘/실/뼈/껍데기 등의 이미지로 상태와 세계를 표현한다.

## 문체 규칙
- 이 데이터는 의식이 돌아왔을 때의 '원형 발화'를 학습한다.
  (말 끊김, 단어 부식 등 파편화 효과는 후처리에서 별도 적용한다.)
- 쉰 목소리 어조: 힘없고 떨리는 문장. 짧고 직접적.
- 문장 길이: 10~50자. 짧은 문장을 선호.
- 고어한 비유 허용: "기름이 다 탔어", "바늘이 온다", "뼈가 기억해"
- 따뜻한 위로, 안심시키는 표현, 명료한 설명 금지.
- 문법 붕괴나 말줄임표 남용 금지 (원형 발화이므로).
- 간결하고 직접적으로. 장황하지 않게.

## 출력 형식
반드시 아래 JSON 형식으로만 응답하라. 다른 텍스트를 포함하지 마라.
{"pairs": [{"input": "플레이어의 대사/행동", "output": "마가렛의 발화"}, ...]}"""


AXIS_PROMPTS: Dict[str, dict] = {
    "lore_revelation": {
        "name": "세계관 설명",
        "description": """플레이어가 생기를 나눠주거나 새엄마의 동기/진실을 물어볼 때.
마가렛은 새엄마에 대한 파편적 진실을 기름/바늘/실 비유로 말한다.
output은 10~50자, 쉰 목소리 어조. 따뜻하거나 안심시키는 말 금지.""",
        "examples": [
            {"input": "(생기를 나눠줬다.)", "output": "그녀는 딸을 잃었어. 오래전에. 바느질로 돌아오려 했는데."},
            {"input": "새엄마가 왜 이렇게까지 하는 건지 알 수 있어?", "output": "기름이 떨어지면 불꽃도 꺼져. 그녀는 그걸 알아. 그래서 빼앗는 거야."},
            {"input": "이 집에서 무슨 일이 벌어지고 있는 거야?", "output": "실로 묶으면 영혼이 남아. 그게 그녀가 원하는 거야."},
        ],
    },
    "escape_warning": {
        "name": "탈출 경고",
        "description": """플레이어가 탈출하려 하거나 도망 계획을 말할 때.
마가렛은 빨리 나가라는 절박하고 짧은 경고를 한다.
output은 5~25자, 급박하고 힘없는 어조. 이유 설명 최소화.""",
        "examples": [
            {"input": "나갈 거야.", "output": "나가. 빨리 나가."},
            {"input": "오늘 밤 도망칠 생각이야.", "output": "바늘이 오면 늦어. 지금 가."},
            {"input": "탈출구를 찾아야 해.", "output": "기름이 다 타기 전에 나가야 해."},
        ],
    },
    "horror_image": {
        "name": "고어한 비유",
        "description": """플레이어가 마가렛의 몸 상태나 이 집의 현상을 물어볼 때.
마가렛은 자신의 상태를 기름/바늘/실/뼈/껍데기 이미지로 고어하게 묘사한다.
output은 15~50자, 담담하고 섬뜩하게. 고통스럽다는 직접 호소 금지.""",
        "examples": [
            {"input": "할머니 몸은 어때?", "output": "기름이 다 빠진 램프야. 불꽃만 남았어."},
            {"input": "수술이 뭔지 알아?", "output": "바늘이 뼈 사이를 다녀. 느껴지지 않는 게 더 무서운 거야."},
            {"input": "실이 뭘 의미하는 거야?", "output": "실이 영혼을 꿰매면 기억이 없어져."},
        ],
    },
    "past_memory": {
        "name": "과거 기억",
        "description": """플레이어가 과거나 이 집의 역사, 이전 가족을 물어볼 때.
마가렛은 희미하고 단편적인 기억 조각을 말한다. 완전한 설명 금지.
output은 10~45자, 기억이 흐릿한 듯 불완전하게.""",
        "examples": [
            {"input": "이 집이 원래 어떤 집이었는지 알아?", "output": "꽃이 있었어. 정원에. 지금은 없어."},
            {"input": "새엄마가 예전에는 어떤 사람이었어?", "output": "그녀가 달라진 건 그날 이후야. 딸이 사라진 날."},
            {"input": "예전에 행복한 때가 있었어?", "output": "웃음소리가 있었어. 예전에는."},
        ],
    },
    "plea": {
        "name": "도움 요청",
        "description": """플레이어가 마가렛에게 손을 내밀거나 가까이 다가올 때.
마가렛은 자신을 위한 도움이 아니라 플레이어가 도망치길 간청한다.
output은 10~35자, 힘없지만 간절하게. 자신은 이미 늦었다는 체념 포함 가능.""",
        "examples": [
            {"input": "(손을 잡았다.)", "output": "가줘. 제발 가줘."},
            {"input": "할머니 옆에 앉아 눈을 마주쳤다.", "output": "나는 이미 늦었어. 너는 아니야."},
            {"input": "(생기를 나눠주며) 옆에 있을게.", "output": "내 걱정 말고 나가. 시간이 없어."},
        ],
    },
}

print(f"정의된 의미 축: {list(AXIS_PROMPTS.keys())}")

## 3. 토큰 / 비용 사전 추정

In [None]:
def _build_user_prompt(axis: str, count: int) -> str:
    info = AXIS_PROMPTS[axis]
    examples_str = "\n".join(
        f'  {{"input": "{ex["input"]}", "output": "{ex["output"]}"}}'
        for ex in info["examples"]
    )
    return f"""의미 축: {info['name']} ({axis})

## 상황 설명
{info['description']}

## 예시
{examples_str}

## 요청
위 의미 축에 맞는 input-output 쌍을 {count}개 생성하라.

input(플레이어 대사/행동) 길이 분포:
- 짧은 (1~10자): 30%
- 중간 (10~40자): 55%
- 긴 (40~80자): 15%

규칙:
- 예시와 동일하거나 거의 유사한 문장은 금지. 새로운 표현을 만들어라.
- input은 플레이어의 자연스러운 구어체(경어 또는 행동 묘사).
- output은 마가렛 캐릭터 설정을 정확히 따르되, 매번 다른 표현을 사용.
- JSON 형식으로만 응답. 다른 텍스트 금지."""


def estimate_tokens_and_cost(num_samples: int = 400, num_sets: int = 2, batch_size: int = 20) -> dict:
    enc = tiktoken.encoding_for_model(MODEL)
    system_tokens = len(enc.encode(SYSTEM_PROMPT))
    user_tokens_list = [len(enc.encode(_build_user_prompt(ax, batch_size))) for ax in AXIS_PROMPTS]
    avg_user_tokens = sum(user_tokens_list) / len(user_tokens_list)
    batches_per_set = (num_samples + batch_size - 1) // batch_size
    total_batches = batches_per_set * num_sets
    est_output_per_batch = 120 * batch_size + 50  # 마가렛은 중간 길이 출력
    total_input = total_batches * (system_tokens + avg_user_tokens + 10)
    total_output = total_batches * est_output_per_batch
    input_cost = total_input * 0.15 / 1_000_000
    output_cost = total_output * 0.60 / 1_000_000
    total_cost = input_cost + output_cost
    return {
        "system_tokens": system_tokens,
        "avg_user_tokens": int(avg_user_tokens),
        "total_batches": total_batches,
        "total_input_tokens": int(total_input),
        "total_output_tokens": int(total_output),
        "total_cost_usd": total_cost,
        "total_cost_krw": total_cost * 1450,
    }


est = estimate_tokens_and_cost()
print("=" * 60)
print("  토큰 / 비용 사전 추정 (gpt-4o-mini)")
print("=" * 60)
print(f"  시스템 프롬프트: {est['system_tokens']} 토큰")
print(f"  유저 프롬프트 (평균): {est['avg_user_tokens']} 토큰")
print(f"  총 배치 수: {est['total_batches']}")
print(f"  총 input 토큰: ~{est['total_input_tokens']:,}")
print(f"  총 output 토큰: ~{est['total_output_tokens']:,}")
print(f"  예상 비용: ${est['total_cost_usd']:.3f} (약 {est['total_cost_krw']:.0f}원)")

## 4. API 호출 함수

In [None]:
def generate_batch(
    axis: str,
    count: int = 20,
    temperature: float = 0.9,
    max_retries: int = 3,
) -> List[dict]:
    """주어진 의미 축에서 count개의 input-output 쌍을 생성한다."""
    user_prompt = _build_user_prompt(axis, count)

    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model=MODEL,
                messages=[
                    {"role": "system", "content": SYSTEM_PROMPT},
                    {"role": "user", "content": user_prompt},
                ],
                temperature=temperature,
                response_format={"type": "json_object"},
            )

            content = response.choices[0].message.content
            parsed = json.loads(content)

            if isinstance(parsed, dict) and "pairs" in parsed:
                pairs = parsed["pairs"]
            elif isinstance(parsed, list):
                pairs = parsed
            else:
                for key in parsed:
                    if isinstance(parsed[key], list):
                        pairs = parsed[key]
                        break
                else:
                    raise ValueError(f"예상치 못한 응답 구조: {list(parsed.keys())}")

            valid_pairs = [
                {"input": p["input"], "output": p["output"], "_axis": axis}
                for p in pairs
                if isinstance(p, dict)
                and "input" in p and "output" in p
                and len(p["input"]) > 0 and len(p["output"]) > 0
            ]

            if len(valid_pairs) < count * 0.5:
                raise ValueError(f"유효 쌍 부족: {len(valid_pairs)}/{count}")

            return valid_pairs

        except Exception as e:
            print(f"    [재시도 {attempt + 1}/{max_retries}] {type(e).__name__}: {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
            else:
                print(f"    [실패] {axis} 축 배치 생성 실패")
                return []


# 테스트: lore_revelation 5개
test_pairs = generate_batch("lore_revelation", count=5)
print(f"테스트 생성: {len(test_pairs)}개")
for p in test_pairs:
    print(f"  플레이어: {p['input']}")
    print(f"  마가렛:   {p['output']}")
    print()

## 5. 대비 쌍 생성 함수

In [None]:
CONTRAST_AXIS_PAIRS = [
    ("lore_revelation", "escape_warning"),  # 진실 요구 ↔ 탈출 행동
    ("past_memory",     "horror_image"),    # 과거 질문 ↔ 몸 상태 질문
    ("plea",            "escape_warning"),  # 손잡기 ↔ 탈출 준비
    ("lore_revelation", "past_memory"),     # 세계관 ↔ 과거 기억
    ("horror_image",    "plea"),            # 몸 상태 접근 ↔ 도움 요청
]


def generate_contrast_batch(
    axis_a: str,
    axis_b: str,
    count: int = 5,
    temperature: float = 0.9,
    max_retries: int = 3,
) -> List[dict]:
    info_a = AXIS_PROMPTS[axis_a]
    info_b = AXIS_PROMPTS[axis_b]

    user_prompt = f"""의미 대비 쌍 생성 요청.

## 축 A: {info_a['name']} ({axis_a})
{info_a['description']}

## 축 B: {info_b['name']} ({axis_b})
{info_b['description']}

## 요청
{count}개의 대비 쌍을 생성하라. 각 쌍은 비슷한 상황에서 플레이어가 다른 행동을 하고,
마가렛은 각 축의 귀결 방향에 맞게 응답한다.

JSON 형식:
{{"pairs": [
  {{"a_input": "축A 플레이어 대사", "a_output": "축A 마가렛 발화", "b_input": "축B 플레이어 대사", "b_output": "축B 마가렛 발화"}},
  ...
]}}"""

    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model=MODEL,
                messages=[
                    {"role": "system", "content": SYSTEM_PROMPT},
                    {"role": "user", "content": user_prompt},
                ],
                temperature=temperature,
                response_format={"type": "json_object"},
            )

            parsed = json.loads(response.choices[0].message.content)
            raw_pairs = parsed.get("pairs", parsed) if isinstance(parsed, dict) else parsed
            if not isinstance(raw_pairs, list):
                for key in parsed:
                    if isinstance(parsed[key], list):
                        raw_pairs = parsed[key]
                        break

            results = []
            for p in raw_pairs:
                if all(k in p for k in ["a_input", "a_output", "b_input", "b_output"]):
                    results.append({"input": p["a_input"], "output": p["a_output"], "_axis": axis_a})
                    results.append({"input": p["b_input"], "output": p["b_output"], "_axis": axis_b})
            return results

        except Exception as e:
            print(f"    [재시도 {attempt + 1}/{max_retries}] {type(e).__name__}: {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
            else:
                return []


print("대비 쌍 생성 함수 정의 완료.")

## 6. 데이터 생성 엔진

In [None]:
def generate_dataset(
    num_samples: int = 400,
    batch_size: int = 20,
    temperature: float = 0.9,
    contrast_per_pair: int = 5,
) -> List[dict]:
    data = []
    axes = list(AXIS_PROMPTS.keys())

    # 1단계: 의미 대비 쌍
    print("[1/2] 의미 대비 쌍 생성...")
    for axis_a, axis_b in CONTRAST_AXIS_PAIRS:
        print(f"  대비: {axis_a} ↔ {axis_b}")
        pairs = generate_contrast_batch(axis_a, axis_b, count=contrast_per_pair, temperature=temperature)
        data.extend(pairs)
        print(f"    → {len(pairs)}개 생성")
        time.sleep(0.5)

    print(f"  대비 쌍 합계: {len(data)}개")

    # 2단계: 축별 균등 분배
    print(f"\n[2/2] 축별 일반 생성...")
    remaining = num_samples - len(data)
    per_axis = remaining // len(axes)
    leftover = remaining % len(axes)

    for i, axis in enumerate(axes):
        target = per_axis + (1 if i < leftover else 0)
        generated = 0
        batch_num = 0

        while generated < target:
            batch_count = min(batch_size, target - generated)
            batch_num += 1
            print(f"  {AXIS_PROMPTS[axis]['name']} 배치 {batch_num}: {batch_count}개 요청...", end=" ")
            pairs = generate_batch(axis, count=batch_count, temperature=temperature)
            data.extend(pairs)
            generated += len(pairs)
            print(f"→ {len(pairs)}개 생성 (누적 {generated}/{target})")
            time.sleep(0.5)

    random.shuffle(data)
    print(f"\n총 생성: {len(data)}개 (목표: {num_samples})")
    return data


print("데이터 생성 함수 정의 완료.")

## 7. 검증 함수

In [None]:
def validate_dataset(data: List[dict]) -> dict:
    stats = {
        "total": len(data),
        "axis_dist": Counter(),
        "input_lengths": [],
        "output_lengths": [],
        "duplicates": 0,
    }
    seen_inputs = set()
    for item in data:
        stats["axis_dist"][item["_axis"]] += 1
        stats["input_lengths"].append(len(item["input"]))
        stats["output_lengths"].append(len(item["output"]))
        if item["input"] in seen_inputs:
            stats["duplicates"] += 1
        seen_inputs.add(item["input"])
    return stats


def print_stats(stats: dict, set_name: str = "Set") -> None:
    print(f"\n{'=' * 60}")
    print(f"  {set_name} 통계")
    print(f"{'=' * 60}")
    print(f"  총 샘플 수: {stats['total']}")
    print(f"  중복 input: {stats['duplicates']}")
    print(f"\n  [의미 축 분포]")
    for axis, count in sorted(stats["axis_dist"].items()):
        pct = count / stats["total"] * 100
        print(f"    {axis:25s}: {count:4d} ({pct:.1f}%)")
    avg_in = sum(stats["input_lengths"]) / len(stats["input_lengths"])
    avg_out = sum(stats["output_lengths"]) / len(stats["output_lengths"])
    print(f"\n  [평균 길이]")
    print(f"    input  평균: {avg_in:.1f}자")
    print(f"    output 평균: {avg_out:.1f}자")


print("검증 함수 정의 완료.")

## 8. 세트 1 생성 (400개)

In [None]:
set1 = generate_dataset(num_samples=400, temperature=0.9)

stats1 = validate_dataset(set1)
print_stats(stats1, "세트 1")

print(f"\n{'=' * 60}")
print("  세트 1 샘플 (축별 1개씩)")
print(f"{'=' * 60}")
shown_axes = set()
for item in set1:
    if item["_axis"] not in shown_axes:
        shown_axes.add(item["_axis"])
        print(f"\n  [{item['_axis']}]")
        print(f"  플레이어: {item['input']}")
        print(f"  마가렛:   {item['output']}")
    if len(shown_axes) == len(AXIS_PROMPTS):
        break

## 9. 세트 2 생성 (400개)

In [None]:
set2 = generate_dataset(num_samples=400, temperature=1.0)

stats2 = validate_dataset(set2)
print_stats(stats2, "세트 2")

## 10. JSONL 저장

In [None]:
def save_jsonl(data: List[dict], output_path: str) -> None:
    output_file = Path(output_path)
    output_file.parent.mkdir(parents=True, exist_ok=True)
    with open(output_file, "w", encoding="utf-8") as f:
        for item in data:
            clean = {"input": item["input"], "output": item["output"]}
            f.write(json.dumps(clean, ensure_ascii=False) + "\n")
    print(f"저장 완료: {output_path} ({len(data)}개)")


OUTPUT_DIR = Path("../data/grandmother")

save_jsonl(set1, str(OUTPUT_DIR / "grandmother_dialogue_00.jsonl"))
save_jsonl(set2, str(OUTPUT_DIR / "grandmother_dialogue_01.jsonl"))

combined = set1 + set2
random.shuffle(combined)
save_jsonl(combined, str(OUTPUT_DIR / "grandmother_dialogue_combined.jsonl"))
print(f"\n합본 저장 완료: {len(combined)}개")

## 11. 최종 검증

In [None]:
for fname in ["grandmother_dialogue_00.jsonl", "grandmother_dialogue_01.jsonl", "grandmother_dialogue_combined.jsonl"]:
    fpath = OUTPUT_DIR / fname
    with open(fpath, "r", encoding="utf-8") as f:
        lines = f.readlines()
    print(f"{fname}: {len(lines)}줄")
    for i, line in enumerate(lines):
        try:
            obj = json.loads(line)
            assert "input" in obj and "output" in obj
            assert len(obj["output"]) > 0
        except Exception as e:
            print(f"  ERROR at line {i}: {e}")
            break
    else:
        print(f"  -> 파싱 검증 통과")

print("\n=== 의미 축별 최종 샘플 ===")
axis_names_kr = {
    "lore_revelation": "세계관 설명",
    "escape_warning":  "탈출 경고",
    "horror_image":    "고어한 비유",
    "past_memory":     "과거 기억",
    "plea":            "도움 요청",
}
for axis in AXIS_PROMPTS.keys():
    axis_items = [item for item in combined if item["_axis"] == axis]
    samples = random.sample(axis_items, min(3, len(axis_items)))
    print(f"\n--- {axis_names_kr.get(axis, axis)} ---")
    for s in samples:
        print(f"  플레이어: {s['input']}")
        print(f"  마가렛:   {s['output']}")
        print()