# Qwen3(4B) 메뉴 조합 점수화 노트북 (Unsloth 스타일)

이 노트북은 **Qwen3-4B-Instruct**를 사용해 아래 4개 카테고리에서 1개씩 뽑은 메뉴 조합의 점수를 계산합니다.

- Appetizer
- Main Dish
- Drink
- Dessert

> Colab에서 바로 실행 가능한 형태로 구성했습니다.


In [None]:
%%capture
import os, re
if "COLAB_" not in "".join(os.environ.keys()):
    !pip install unsloth  # Do this in local & cloud setups
else:
    import torch; v = re.match(r'[\d]{1,}\.[\d]{1,}', str(torch.__version__)).group(0)
    xformers = 'xformers==' + {'2.10':'0.0.34','2.9':'0.0.33.post1','2.8':'0.0.32.post2'}.get(v, "0.0.34")
    !pip install sentencepiece protobuf "datasets==4.3.0" "huggingface_hub>=0.34.0" hf_transfer
    !pip install --no-deps unsloth_zoo bitsandbytes accelerate {xformers} peft trl triton unsloth
!pip install transformers==4.56.2
!pip install --no-deps trl==0.22.2

In [None]:
import json
import random
import itertools
from typing import Dict, List

from unsloth import FastLanguageModel
import torch


In [None]:
# 1) 사용자 지정 데이터셋
DATASET = {
    "appetizer": [
        "salad", "corn soup", "miso soup", "house bread", "cheese",
        "cracker", "Scotch Egg", "mashed potato", "nachos", "pasta"
    ],
    "main_dish": [
        "ramen", "pizza", "fried chicken", "sandwich", "T-bone steak",
        "sushi", "taco", "grilled tofu", "fish and chips", "paella"
    ],
    "drink": [
        "coca-cola", "red wine", "white wine", "sake", "green tea",
        "orange juice", "coke zero", "modelo beer", "coffee", "water"
    ],
    "dessert": [
        "orange", "grape", "pudding", "ice cream", "tart",
        "cheesecake", "macaron", "dango", "muffin", "churro"
    ]
}

print({k: len(v) for k, v in DATASET.items()})
print("총 가능한 조합 수:", len(DATASET["appetizer"]) * len(DATASET["main_dish"]) * len(DATASET["drink"]) * len(DATASET["dessert"]))


In [None]:
# 2) Qwen3 로드 (Unsloth template)
max_seq_length = 2048
candidate_models = [
    "unsloth/Qwen3-4B-Instruct-bnb-4bit",
    "unsloth/Qwen3-4B-unsloth-bnb-4bit",
    "Qwen/Qwen3-4B-Instruct",
]

last_error = None
model = None
tokenizer = None
model_name = None

for candidate in candidate_models:
    try:
        model, tokenizer = FastLanguageModel.from_pretrained(
            model_name = candidate,
            max_seq_length = max_seq_length,
            dtype = None,      # 자동 선택
            load_in_4bit = True,
        )
        model_name = candidate
        break
    except Exception as e:
        last_error = e
        print(f"[skip] {candidate}: {e}")

if model is None:
    raise RuntimeError(
        "Qwen3 모델 로드 실패. candidate_models의 이름을 확인하거나,"
        "허깅페이스 접근 권한/네트워크 상태를 확인하세요."
    ) from last_error

FastLanguageModel.for_inference(model)
print("Loaded:", model_name)


In [None]:
from pathlib import Path

SYSTEM_PROMPT = """You are a professional menu-pairing evaluator.
Follow the style learned from the provided reference examples.
Given exactly one appetizer, one main dish, one drink, and one dessert,
write the response in this exact structure:

Appetizer: <item>
Main Dish: <item>
Drink: <item>
Dessert: <item>

Appetizer–Main Dish Balance (x/20): <1-2 sentence rationale>

Main Dish–Drink Balance (x/20): <1-2 sentence rationale>

Main Dish–Dessert Balance (x/20): <1-2 sentence rationale>

Appetizer–Dessert Balance (x/20): <1-2 sentence rationale>

Drink–Dessert Balance (x/20): <1-2 sentence rationale>

Total Score: <0-100>/100

Overall Evaluation: <concise summary>

Rules:
- Output only the final answer.
- No <think> tags.
- Keep section order and labels exactly the same.
- Use English.
"""

REFERENCE_FILE_CANDIDATES = [
    Path("Food combination.txt"),
    Path("/content/Food combination.txt"),
    Path("/content/food-recommendation/Food combination.txt"),
]

REFERENCE_FILE_URLS = [
    "https://raw.githubusercontent.com/KyleIm/food-recommendation/main/Food%20combination.txt",
]


def _load_reference_text() -> str:
    import urllib.request

    for candidate in REFERENCE_FILE_CANDIDATES:
        if candidate.exists():
            print(f"Using reference template file: {candidate}")
            return candidate.read_text(encoding="utf-8")

    # Colab에서 로컬 파일이 없으면 GitHub raw에서 자동 가져오기
    for url in REFERENCE_FILE_URLS:
        try:
            print(f"Reference file not found locally. Trying download: {url}")
            with urllib.request.urlopen(url, timeout=15) as resp:
                text = resp.read().decode("utf-8")
            if "example 1)" in text.lower() or "Appetizer:" in text:
                out = Path("/content/Food combination.txt")
                out.write_text(text, encoding="utf-8")
                print(f"Downloaded reference template file: {out}")
                return text
        except Exception as e:
            print(f"[skip] download failed: {e}")

    raise FileNotFoundError(
        "Food combination.txt not found locally and download failed. "
        "Upload the file to /content or current directory."
    )


def parse_reference_examples(raw_text: str):
    import re

    blocks = [b.strip() for b in re.split(r"\bexample\s+\d+\)\s*", raw_text, flags=re.IGNORECASE) if b.strip()]
    parsed = []
    field_re = re.compile(
        r"Appetizer:\s*(?P<app>.+?)\n"
        r"Main Dish:\s*(?P<main>.+?)\n"
        r"Drink:\s*(?P<drink>.+?)\n"
        r"Dessert:\s*(?P<dessert>.+?)\n",
        re.DOTALL,
    )

    for i, block in enumerate(blocks, 1):
        m = field_re.search(block)
        if not m:
            continue
        user = (
            "Evaluate this menu combination with the required format:\n"
            f"Appetizer: {m.group('app').strip()}\n"
            f"Main Dish: {m.group('main').strip()}\n"
            f"Drink: {m.group('drink').strip()}\n"
            f"Dessert: {m.group('dessert').strip()}"
        )
        parsed.append({"user": user, "assistant": block.strip()})

    if not parsed:
        raise ValueError("No valid examples parsed from Food combination.txt")
    return parsed


REFERENCE_EXAMPLES = parse_reference_examples(_load_reference_text())
print(f"Loaded reference examples: {len(REFERENCE_EXAMPLES)}")


def build_user_prompt(combo: Dict[str, str]) -> str:
    return (
        "Evaluate this menu combination with the required format:\n"
        f"Appetizer: {combo['appetizer']}\n"
        f"Main Dish: {combo['main_dish']}\n"
        f"Drink: {combo['drink']}\n"
        f"Dessert: {combo['dessert']}"
    )


def build_messages_with_references(combo: Dict[str, str], n_shots: int = 3):
    messages = [{"role": "system", "content": SYSTEM_PROMPT}]

    # 앞쪽 예시 n개를 few-shot으로 넣어 템플릿 문체를 강하게 고정
    for ex in REFERENCE_EXAMPLES[: max(1, n_shots)]:
        messages.append({"role": "user", "content": ex["user"]})
        messages.append({"role": "assistant", "content": ex["assistant"]})

    messages.append({"role": "user", "content": build_user_prompt(combo)})
    return messages


def strip_think(text: str) -> str:
    import re

    if not text:
        return ""
    return re.sub(r"<think>[\s\S]*?</think>", "", text, flags=re.IGNORECASE).strip()


def extract_formatted_answer(text: str) -> str:
    import re

    cleaned = strip_think(text)
    # Appetizer 시작부터 Overall Evaluation까지 잘라냄
    pattern = re.compile(
        r"(Appetizer:[\s\S]*?Overall Evaluation:\s*.+)",
        flags=re.IGNORECASE,
    )
    m = pattern.search(cleaned)
    return m.group(1).strip() if m else cleaned


def extract_total_score(text: str):
    import re

    cleaned = extract_formatted_answer(text)
    m = re.search(r"Total Score:\s*(\d{1,3})\s*/\s*100", cleaned, flags=re.IGNORECASE)
    if not m:
        return None
    return max(0, min(100, int(m.group(1))))


def repair_to_template(combo: Dict[str, str], raw_text: str, max_new_tokens: int = 512):
    repair_messages = [
        {
            "role": "system",
            "content": (
                "Rewrite the following draft into the exact required template. "
                "Return only final answer with the fixed labels and section order."
            ),
        },
        {"role": "user", "content": build_user_prompt(combo)},
        {"role": "assistant", "content": REFERENCE_EXAMPLES[0]["assistant"]},
        {"role": "user", "content": f"Draft to rewrite:\n{strip_think(raw_text)}"},
    ]

    repair_inputs = tokenizer.apply_chat_template(
        repair_messages,
        tokenize=True,
        add_generation_prompt=True,
        return_tensors="pt",
    ).to(model.device)

    attention_mask = repair_inputs.ne(tokenizer.pad_token_id) if tokenizer.pad_token_id is not None else None

    with torch.inference_mode():
        repair_outputs = model.generate(
            input_ids=repair_inputs,
            attention_mask=attention_mask,
            max_new_tokens=max_new_tokens,
            do_sample=False,
            temperature=0.2,
            repetition_penalty=1.05,
        )

    repaired = tokenizer.decode(repair_outputs[0][repair_inputs.shape[-1]:], skip_special_tokens=True)
    return extract_formatted_answer(repaired)


def evaluate_combo_with_qwen3(combo: Dict[str, str], max_new_tokens: int = 700, n_shots: int = 3):
    messages = build_messages_with_references(combo, n_shots=n_shots)

    inputs = tokenizer.apply_chat_template(
        messages,
        tokenize=True,
        add_generation_prompt=True,
        return_tensors="pt",
    ).to(model.device)

    attention_mask = inputs.ne(tokenizer.pad_token_id) if tokenizer.pad_token_id is not None else None

    with torch.inference_mode():
        outputs = model.generate(
            input_ids=inputs,
            attention_mask=attention_mask,
            max_new_tokens=max_new_tokens,
            do_sample=False,
            temperature=0.2,
            repetition_penalty=1.05,
        )

    generated = tokenizer.decode(outputs[0][inputs.shape[-1]:], skip_special_tokens=True)
    response = extract_formatted_answer(generated)
    total_score = extract_total_score(response)

    # 템플릿이 깨졌으면 1회 보정
    if total_score is None or "Overall Evaluation:" not in response:
        response = repair_to_template(combo, generated)
        total_score = extract_total_score(response)

    return {"total_score": total_score, "response": response}




In [None]:
# 3) 카테고리별 1개씩 랜덤으로 뽑아 형식 출력 확인

def sample_combo(seed: int = None) -> Dict[str, str]:
    if seed is not None:
        random.seed(seed)
    return {
        "appetizer": random.choice(DATASET["appetizer"]),
        "main_dish": random.choice(DATASET["main_dish"]),
        "drink": random.choice(DATASET["drink"]),
        "dessert": random.choice(DATASET["dessert"]),
    }

combo = sample_combo(seed=42)
result = evaluate_combo_with_qwen3(combo)

print("선택된 조합:")
print(combo)
print("\n모델 출력:")
print(result["response"])
print("\n추출된 Total Score:", result["total_score"])



In [None]:
# (옵션) 랜덤 3개 조합 평가 후 Total Score 기준 정렬

def evaluate_three_random(seed: int = 0) -> List[Dict]:
    random.seed(seed)
    rows = []

    for _ in range(3):
        combo = sample_combo()
        result = evaluate_combo_with_qwen3(combo)
        rows.append({
            "combo": combo,
            "total_score": result["total_score"],
            "response": result["response"],
        })

    rows.sort(key=lambda x: x["total_score"] if x["total_score"] is not None else -1, reverse=True)
    return rows

ranked = evaluate_three_random(seed=7)
for i, row in enumerate(ranked, 1):
    print(f"#{i} | total_score={row['total_score']} | {row['combo']}")
    print(row['response'])
    print()

