In [None]:
# LLM 20 Questions Starter Notebook
> https://www.kaggle.com/code/ryanholbrook/llm-20-questions-starter-notebook

In [None]:
# 필요한 라이브러리와 GEMMA 모델을 설치합니다.

%%bash
cd /kaggle/working
pip install -q -U -t /kaggle/working/submission/lib immutabledict sentencepiece
git clone https://github.com/google/gemma_pytorch.git > /dev/null
mkdir /kaggle/working/submission/lib/gemma/
mv /kaggle/working/gemma_pytorch/gemma/* /kaggle/working/submission/lib/gemma/

In [None]:
%%writefile submission/main.py
# Setup
# 설정: 필요한 라이브러리를 임포트하고, 시스템 경로를 설정합니다.
import os
import sys

# **IMPORTANT:** Set up your system path like this to make your code work
# both in notebooks and in the simulations environment.
# **중요:** 코드가 노트북과 시뮬레이션 환경 양쪽에서 모두 작동하도록 시스템 경로를 이렇게 설정합니다.
KAGGLE_AGENT_PATH = "/kaggle_simulations/agent/"
if os.path.exists(KAGGLE_AGENT_PATH):
    sys.path.insert(0, os.path.join(KAGGLE_AGENT_PATH, 'lib'))
else:
    sys.path.insert(0, "/kaggle/working/submission/lib")

import contextlib
import os
import sys
from pathlib import Path

import torch
from gemma.config import get_config_for_7b, get_config_for_2b
from gemma.model import GemmaForCausalLM

# 모델 가중치의 경로를 설정합니다. 시뮬레이션 환경에서 실행 중이면 KAGGLE_AGENT_PATH를 사용하고,
# 그렇지 않으면 Kaggle 데이터셋 경로를 사용합니다.
if os.path.exists(KAGGLE_AGENT_PATH):
    WEIGHTS_PATH = os.path.join(KAGGLE_AGENT_PATH, "gemma/pytorch/7b-it-quant/2")
else:
    WEIGHTS_PATH = "/kaggle/input/gemma/pytorch/7b-it-quant/2"

# Prompt Formatting
import itertools  # itertools는 반복 가능한 데이터 구조를 효율적으로 순회하거나 조작하기 위해 사용됩니다.
from typing import Iterable # Iterable 타입은 함수나 메서드가 반복 가능한 객체를 인자로 받을 수 있음을 명시하기 위해 사용됩니다.

# GemmaFormatter 클래스: 사용자와 모델의 대화를 포매팅합니다.
class GemmaFormatter:
    _start_token = '<start_of_turn>' # 대화 시작을 나타내는 토큰
    _end_token = '<end_of_turn>'  # 대화 종료를 나타내는 토큰

    def __init__(self, system_prompt: str = None, few_shot_examples: Iterable = None):
        # 생성자: 시스템 프롬프트와 예시 대화를 초기화합니다.
        self._system_prompt = system_prompt
        self._few_shot_examples = few_shot_examples
        self._turn_user = f"{self._start_token}user\n{{}}{self._end_token}\n"
        self._turn_model = f"{self._start_token}model\n{{}}{self._end_token}\n"
        self.reset()

    def __repr__(self):
        # 객체를 문자열로 표현할 때 사용됩니다.
        return self._state

    def user(self, prompt):
        # 사용자의 대화를 상태에 추가합니다.
        self._state += self._turn_user.format(prompt)
        return self

    def model(self, prompt):
        # 모델의 대화를 상태에 추가합니다.
        self._state += self._turn_model.format(prompt)
        return self

    def start_user_turn(self):
        # 사용자의 턴 시작을 상태에 추가합니다.
        self._state += f"{self._start_token}user\n"
        return self

    def start_model_turn(self):
        # 모델의 턴 시작을 상태에 추가합니다.
        self._state += f"{self._start_token}model\n"
        return self

    def end_turn(self):
        # 턴 종료를 상태에 추가합니다.
        self._state += f"{self._end_token}\n"
        return self

    def reset(self):
        # 상태를 초기화합니다. 이는 새로운 게임이나 대화 세션을 시작할 때 사용됩니다.
        self._state = ""
        # 시스템 프롬프트가 설정되어 있다면, 사용자의 첫 번째 턴으로 추가합니다.
        if self._system_prompt is not None:
            self.user(self._system_prompt)
        # 몇 가지 예시 대화가 제공되었다면, 이를 초기 대화로 설정합니다.
        if self._few_shot_examples is not None:
            self.apply_turns(self._few_shot_examples, start_agent='user')
        return self # 초기화된 상태를 반환합니다.

    def apply_turns(self, turns: Iterable, start_agent: str):
        # start_agent에 따라 대화 순서를 결정합니다. 'model'이면 모델부터 시작합니다.
        formatters = [self.model, self.user] if start_agent == 'model' else [self.user, self.model]
        # formatters 리스트를 순환하며, 사용자와 모델의 턴을 번갈아 가면서 진행합니다.
        formatters = itertools.cycle(formatters)
        # turns 리스트에 있는 각 대화(turn)에 대해, 해당하는 formatter 함수를 호출합니다.
        for fmt, turn in zip(formatters, turns):
            fmt(turn)  # 대화를 현재 상태에 추가합니다.
        return self  # 메서드 체이닝을 위해 객체 자신을 반환합니다.


# Agent Definitions
import re


@contextlib.contextmanager
def _set_default_tensor_type(dtype: torch.dtype):
    """Set the default torch dtype to the given dtype."""
    """
    주어진 dtype으로 PyTorch의 기본 텐서 데이터 타입을 임시로 설정하는 컨텍스트 매니저입니다.
    이 함수는 with 문 내에서 사용될 때, 지정된 dtype으로 기본 텐서 타입을 설정하고,
    with 블록이 종료되면 기본 텐서 타입을 torch.float으로 재설정합니다.
    
    Args:
    dtype (torch.dtype): 설정하고자 하는 PyTorch 텐서의 데이터 타입.
    
    Yields:
    None: 이 컨텍스트 매니저는 특정 값을 생성하지 않습니다.
    """
    torch.set_default_dtype(dtype)
    yield
    torch.set_default_dtype(torch.float)


class GemmaAgent:
    def __init__(self, variant='7b-it-quant', device='cuda:0', system_prompt=None, few_shot_examples=None):
        # 생성자: 모델 변형, 장치, 시스템 프롬프트, 예시 대화를 초기화합니다.
        self._variant = variant
        self._device = torch.device(device)
        self.formatter = GemmaFormatter(system_prompt=system_prompt, few_shot_examples=few_shot_examples)

        print("Initializing model")
        # 모델 설정을 로드합니다. '2b' 변형이면 get_config_for_2b를, 아니면 get_config_for_7b를 사용합니다.
        model_config = get_config_for_2b() if "2b" in variant else get_config_for_7b()
        model_config.tokenizer = os.path.join(WEIGHTS_PATH, "tokenizer.model")
        model_config.quant = "quant" in variant

        # 주어진 dtype으로 모델을 초기화합니다.
        with _set_default_tensor_type(model_config.get_dtype()):
            model = GemmaForCausalLM(model_config)
            ckpt_path = os.path.join(WEIGHTS_PATH , f'gemma-{variant}.ckpt')
            model.load_weights(ckpt_path)
            self.model = model.to(self._device).eval()

    def __call__(self, obs, *args):
        # 관찰(obs)을 기반으로 세션을 시작하고 LLM을 호출하여 응답을 생성합니다.
        self._start_session(obs)
        prompt = str(self.formatter)
        response = self._call_llm(prompt)
        response = self._parse_response(response, obs)
        print(f"{response=}")
        return response

    def _start_session(self, obs: dict):
        # 세션 시작 메서드: 구현되지 않았습니다.
        raise NotImplementedError

    def _call_llm(self, prompt, max_new_tokens=32, **sampler_kwargs):
        # LLM을 호출하여 주어진 프롬프트에 대한 응답을 생성합니다.
        if sampler_kwargs is None:
            sampler_kwargs = {
                'temperature': 0.01,
                'top_p': 0.1,
                'top_k': 1,
        }
        response = self.model.generate(
            prompt,
            device=self._device,
            output_len=max_new_tokens,
            **sampler_kwargs,
        )
        return response

    def _parse_keyword(self, response: str):
        # 응답에서 키워드를 파싱합니다.
        match = re.search(r"(?<=\*\*)([^*]+)(?=\*\*)", response)
        if match is None:
            keyword = ''
        else:
            keyword = match.group().lower()
        return keyword

    def _parse_response(self, response: str, obs: dict):
        # 응답을 파싱하는 메서드: 구현되지 않음
        raise NotImplementedError


def interleave_unequal(x, y):
    # 두 리스트 x, y를 받아 서로 다른 길이의 리스트를 교차로 결합합니다.
    # itertools.zip_longest를 사용하여 더 긴 리스트의 끝까지 요소를 포함시키고, None이 아닌 요소만 반환합니다.
    return [
        item for pair in itertools.zip_longest(x, y) for item in pair if item is not None
    ]


class GemmaQuestionerAgent(GemmaAgent):
    def __init__(self, *args, **kwargs):
        # GemmaAgent 클래스를 상속받아 초기화합니다.
        super().__init__(*args, **kwargs)

    def _start_session(self, obs):
        # 세션을 시작하는 메서드입니다. 포매터를 리셋하고, 사용자에게 20 Questions 게임의 역할을 알립니다.
        self.formatter.reset()
        self.formatter.user("Let's play 20 Questions. You are playing the role of the Questioner.")
        # 질문과 답변을 교차로 결합하여 대화 순서를 생성합니다.
        turns = interleave_unequal(obs.questions, obs.answers)
        self.formatter.apply_turns(turns, start_agent='model')
        # 사용자의 턴 타입에 따라 다음 단계의 안내를 합니다.
        if obs.turnType == 'ask':
            self.formatter.user("Please ask a yes-or-no question.")
        elif obs.turnType == 'guess':
            self.formatter.user("Now guess the keyword. Surround your guess with double asterisks.")
        self.formatter.start_model_turn()

    def _parse_response(self, response: str, obs: dict):
        # 모델의 응답을 파싱하여 질문 또는 추측을 반환합니다.
        if obs.turnType == 'ask':
            # 응답에서 질문을 추출합니다. '*'가 제거된 응답에서 첫 번째 '?'까지를 질문으로 간주합니다.
            match = re.search(".+?\?", response.replace('*', ''))
            if match is None:
                question = "Is it a person?"
            else:
                question = match.group()
            return question
        elif obs.turnType == 'guess':
            # 응답에서 추측된 키워드를 추출합니다.
            guess = self._parse_keyword(response)
            return guess
        else:
            # 알 수 없는 턴 타입에 대한 예외 처리입니다.
            raise ValueError("Unknown turn type:", obs.turnType)


class GemmaAnswererAgent(GemmaAgent):
    def __init__(self, *args, **kwargs):
        # GemmaAgent 클래스를 상속받아 초기화합니다.
        super().__init__(*args, **kwargs)

    def _start_session(self, obs):
        # 세션을 시작하는 메서드입니다. 포매터를 리셋하고, 답변자 역할과 키워드 정보를 사용자에게 알립니다.
        self.formatter.reset()
        self.formatter.user(f"Let's play 20 Questions. You are playing the role of the Answerer. The keyword is {obs.keyword} in the category {obs.category}.")
        # 질문과 답변을 교차로 결합하여 대화 순서를 생성합니다.
        turns = interleave_unequal(obs.questions, obs.answers)
        self.formatter.apply_turns(turns, start_agent='user')
        # 사용자에게 키워드에 대한 예/아니오 답변을 요청하고, 답변을 별표로 감싸도록 안내합니다.
        self.formatter.user(f"The question is about the keyword {obs.keyword} in the category {obs.category}. Give yes-or-no answer and surround your answer with double asterisks, like **yes** or **no**.")
        self.formatter.start_model_turn()

    def _parse_response(self, response: str, obs: dict):
        # 모델의 응답에서 키워드를 파싱하여 'yes' 또는 'no'로 응답합니다.
        answer = self._parse_keyword(response)
        return 'yes' if 'yes' in answer else 'no'


# Agent Creation
system_prompt = "You are an AI assistant designed to play the 20 Questions game. In this game, the Answerer thinks of a keyword and responds to yes-or-no questions by the Questioner. The keyword is a specific person, place, or thing."

few_shot_examples = [
    "Let's play 20 Questions. You are playing the role of the Questioner. Please ask your first question.",
    "Is it a person?", "**no**",
    "Is is a place?", "**yes**",
    "Is it a country?", "**yes** Now guess the keyword.",
    "**France**", "Correct!",
]


# **IMPORTANT:** Define agent as a global so you only have to load
# the agent you need. Loading both will likely lead to OOM.
# **중요:** 에이전트를 전역 변수로 정의하여 필요한 에이전트만 로드합니다.
# 두 에이전트를 모두 로드하면 OOM(Out of Memory)이 발생할 수 있습니다.
agent = None


def get_agent(name: str):
    global agent
    
    # 'questioner' 이름이 주어지고 에이전트가 아직 초기화되지 않았다면, 질문자 에이전트를 생성합니다.
    if agent is None and name == 'questioner':
        agent = GemmaQuestionerAgent(
            device='cuda:0',
            system_prompt=system_prompt,
            few_shot_examples=few_shot_examples,
        )
    # 'answerer' 이름이 주어지고 에이전트가 아직 초기화되지 않았다면, 답변자 에이전트를 생성합니다.
    elif agent is None and name == 'answerer':
        agent = GemmaAnswererAgent(
            device='cuda:0',
            system_prompt=system_prompt, 
            few_shot_examples=few_shot_examples,
        )
    # 에이전트가 정상적으로 초기화되었는지 확인합니다.
    assert agent is not None, "Agent not initialized."

    return agent


def agent_fn(obs, cfg):
    # 관찰된 턴 타입에 따라 적절한 에이전트를 호출합니다.
    if obs.turnType == "ask":
        response = get_agent('questioner')(obs)
    elif obs.turnType == "guess":
        response = get_agent('questioner')(obs)
    elif obs.turnType == "answer":
        response = get_agent('answerer')(obs)
    # 응답이 None이거나 길이가 1 이하인 경우 기본적으로 "yes"를 반환합니다.
    if response is None or len(response) <= 1:
        return "yes"
    else:
        return response

In [2]:
# pigz와 pv 패키지를 설치합니다. 설치 과정에서 발생하는 모든 출력은 /dev/null로 리다이렉트되어 화면에 표시되지 않습니다.

!apt install pigz pv > /dev/null

The operation couldn’t be completed. Unable to locate a Java Runtime that supports apt.
Please visit http://www.java.com for information on installing Java.



In [None]:
# pigz와 pv를 사용하여 압축 프로그램을 지정하고, submission.tar.gz 파일을 생성합니다.
# -C 옵션으로 작업 디렉토리를 /kaggle/working/submission으로 변경한 후 현재 디렉토리(.)의 모든 파일을 추가합니다.
# 이후, -C 옵션으로 작업 디렉토리를 /kaggle/input으로 변경하고, gemma/pytorch/7b-it-quant/2 디렉토리를 추가합니다.
# 이 명령은 submission.tar.gz 파일 내에 두 위치의 파일들을 포함시키며, 압축 과정에서 발생하는 모든 출력은 pv를 통해 시각적으로 표시됩니다.

!tar --use-compress-program='pigz --fast --recursive | pv' -cf submission.tar.gz -C /kaggle/working/submission . -C /kaggle/input/ gemma/pytorch/7b-it-quant/2