This notebook illustrates the agent creation process for the **LLM 20 Questions**. Running this notebook produces a `submission.tar.gz` file. You may submit this file directly from the **Submit to competition** heading to the right. Alternatively, from the notebook viewer, click the *Output* tab then find and download `submission.tar.gz`. Click **Submit Agent** at the upper-left of the competition homepage to upload your file and make your submission. 

In [None]:
%%bash
cd /kaggle/working
# pip install -q -U -t /kaggle/working/submission/lib peft
# git clone https://github.com/google/gemma_pytorch.git > /dev/null
mkdir -p /kaggle/working/submission/lib/llama-3/
# mv /kaggle/working/gemma_pytorch/gemma/* /kaggle/working/submission/lib/gemma/
pip install -q peft
cp -r /opt/conda/lib/python3.10/site-packages/peft /kaggle/working/submission/lib/
# pip install -q -i https://pypi.org/simple/ bitsandbytes

In [1]:
%%writefile submission/main.py
# %%writefile submission/main.py
# Setup
import os
import sys, random
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, StoppingCriteria, StoppingCriteriaList
from collections import Counter
# **IMPORTANT:** Set up your system path like this to make your code work
# both in notebooks and in the simulations environment.
KAGGLE_AGENT_PATH = "/kaggle_simulations/agent/"
if os.path.exists(KAGGLE_AGENT_PATH):
    sys.path.insert(0, os.path.join(KAGGLE_AGENT_PATH, 'lib'))
else:
    sys.path.insert(0, "/kaggle/working/submission/lib")

import contextlib
import os
import sys
from pathlib import Path

import torch
# from gemma.config import get_config_for_7b, get_config_for_2b
# from gemma.model import GemmaForCausalLM

if os.path.exists(KAGGLE_AGENT_PATH):
    WEIGHTS_PATH = os.path.join(KAGGLE_AGENT_PATH, "llama-3/transformers/8b-chat-hf/1")
else:
    WEIGHTS_PATH = "/kaggle/input/llama-3/transformers/8b-chat-hf/1"

# Prompt Formatting
import itertools
from typing import Iterable

class KeyworkdStoppingCriteria(StoppingCriteria):
    def __init__(self, stops = []):
        super().__init__()
        self.stops = stops

    def __call__(self, input_ids, scores):
        for stop in self.stops:
            if stop == input_ids[0, -1]:
                return True
        return False

class LlamaFormatter:
    _bos_token = '<|begin_of_text|>'
    _start_header_token = '<|start_header_id|>'
    _end_header_token = '<|end_header_id|>'
    _end_token = '<|eot_id|>'

    def __init__(self, system_prompt: str = None, few_shot_examples: Iterable = None):
        self._system_prompt = system_prompt
        self._few_shot_examples = few_shot_examples
        self._turn_system = f"{self._start_header_token}system{self._end_header_token}\n\n{{}}{self._end_token}"
        self._turn_user = f"{self._start_header_token}user{self._end_header_token}\n\n{{}}{self._end_token}"
        self._turn_model = f"{self._start_header_token}assistant{self._end_header_token}\n\n{{}}{self._end_token}"
        self.reset()

    def __repr__(self):
        return self._state
    
    def system(self, prompt):
        self._state += self._turn_system.format(prompt)
        return self
    
    def user(self, prompt):
        self._state += self._turn_user.format(prompt)
        return self

    def model(self, prompt):
        self._state += self._turn_model.format(prompt)
        return self

    def start_user_turn(self):
        self._state += f"{self._start_header_token}user{self._end_header_token}\n\n"
        return self

    def start_model_turn(self):
        self._state += f"{self._start_header_token}assistant{self._end_header_token}\n\n"
        return self

    def end_turn(self):
        self._state += f"{self._end_token}\n"
        return self

    def reset(self):
        self._state = ""
        self._state += self._bos_token
        if self._system_prompt is not None:
            self.system(self._system_prompt)
        if self._few_shot_examples is not None:
            self.apply_turns(self._few_shot_examples, start_agent='user')
        return self

    def apply_turns(self, turns: Iterable, start_agent: str):
        formatters = [self.model, self.user] if start_agent == 'model' else [self.user, self.model]
        formatters = itertools.cycle(formatters)
        for fmt, turn in zip(formatters, turns):
            fmt(turn)
        return self


# Agent Definitions
import re


@contextlib.contextmanager
def _set_default_tensor_type(dtype: torch.dtype):
    """Set the default torch dtype to the given dtype."""
    torch.set_default_dtype(dtype)
    yield
    torch.set_default_dtype(torch.float)


class LlammaAgent:
    def __init__(self, variant='8b-chat-hf', device='cuda:0', system_prompt=None, few_shot_examples=None):
        self._variant = variant
        self._device = torch.device(device)
        self.formatter = LlamaFormatter(system_prompt=system_prompt, few_shot_examples=few_shot_examples)

        print("Initialize model")
#         model_config = get_config_for_2b() if "2b" in variant else get_config_for_7b()
#         model_config.tokenizer = os.path.join(WEIGHTS_PATH, "tokenizer.model")
#         model_config.quant = "quant" in variant

#         with _set_default_tensor_type(model_config.get_dtype()):
#             model = GemmaForCausalLM(model_config)
#             ckpt_path = os.path.join(WEIGHTS_PATH , f'gemma-{variant}.ckpt')
#             model.load_weights(ckpt_path)
#             self.model = model.to(self._device).eval()
        quantization_config = BitsAndBytesConfig(
            load_in_4bit = True,
#             bnb_4bit_quant_type="nf4",
#             bnb_4bit_compute_dtype=torch.bfloat16,
#             bnb_4bit_use_double_quant=True,
            )
        self.tokenizer = AutoTokenizer.from_pretrained(WEIGHTS_PATH)
        self.model = AutoModelForCausalLM.from_pretrained(WEIGHTS_PATH, device_map='auto', quantization_config=quantization_config)

    def __call__(self, obs, **sampler_kwargs):
        self._start_session(obs)
        prompt = str(self.formatter)
#         print('prompt:' , prompt)
        response = self._call_llm(prompt, **sampler_kwargs)
#         print('response: ',response)
        response = self._parse_response(response, obs)
#         print('res: ', response)

        return response

    def _start_session(self, obs: dict):
        raise NotImplementedError

    def _call_llm(self, prompt, max_new_tokens=32, **sampler_kwargs):
        if sampler_kwargs is None:
            sampler_kwargs = {
                'temperature': 0.00,
                'top_p': 0.1,
                'top_k': 1,
        }
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self._device)
        stopping_criteria = StoppingCriteriaList([KeyworkdStoppingCriteria(stops = [128009,self.tokenizer.eos_token_id])])
        outputs = self.model.generate(
            **inputs,
#             device=self._device,
#             output_len=max_new_tokens,
            max_new_tokens=max_new_tokens,
            stopping_criteria=stopping_criteria,
            **sampler_kwargs,
        )
        len_response = 0
        while outputs[0, inputs.input_ids.shape[-1]-len_response-1] not in (128000, 128006, 128009, 128007):
            len_response += 1
            
        response = self.tokenizer.decode(outputs[0, inputs.input_ids.shape[-1]-len_response:])
        return response

    def _parse_keyword(self, response: str):
        match = re.search(r"(?<=\*\*)([^*]+)(?=\*\*)", response)
        if match is None:
            keyword = ''
        else:
            keyword = match.group().lower()
            keyword = keyword.split(':')[-1].strip()
            if keyword.split()[0] in ('a', 'the'):
                keyword = ' '.join(keyword.split()[1:])
        return keyword

    def _parse_response(self, response: str, obs: dict):
        raise NotImplementedError


def interleave_unequal(x, y):
    return [
        item for pair in itertools.zip_longest(x, y) for item in pair if item is not None
    ]

questions = [
    'Does the keyword related to',
    'Would the keyword be included in the broad category of',
    "Does the keyword begins with the letter '",
    'Would the keyword be considered',
    '',     #그냥 모델이 생성
]
questions_weights = [0.25, 0.3, 0.1, 0.2, 0.15]

class LlammaQuestionerAgent(LlammaAgent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def _start_session(self, obs):
        self.formatter.reset()
        self.formatter.user("Let's play 20 Questions. You are playing the role of the Questioner. The keyword is a specific places or things.")
        turns = interleave_unequal(obs.questions, obs.answers)
        self.formatter.apply_turns(turns, start_agent='model')
        if obs.turnType == 'ask':
            self.formatter.user("Please ask a yes-or-no question.")
            q_idx = random.randint(0, len(questions)-1)
            self.formatter.start_model_turn()
            self.formatter._state += questions[q_idx]
        elif obs.turnType == 'guess':
            self.formatter.user("Now guess the keyword based on the previous questions and answers. Surround your guess with double asterisks.")
            self.formatter.start_model_turn()

    def _parse_response(self, response: str, obs: dict):
        if obs.turnType == 'ask':
            match = re.search(".+?\?", response.replace('*', ''))
            if match is None:
                question = "Is it a place?"
            else:
                question = match.group()
            return question
        elif obs.turnType == 'guess':
            guess = self._parse_keyword(response)
            return guess
        else:
            raise ValueError("Unknown turn type:", obs.turnType)


class LlammaAnswererAgent(LlammaAgent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def _start_session(self, obs):
        self.formatter.reset()
        self.formatter.user(f"Let's play 20 Questions. You are playing the role of the Answerer. The keyword is {obs.keyword} in the category {obs.category}.")
        turns = interleave_unequal(obs.questions, obs.answers)
        self.formatter.apply_turns(turns, start_agent='user')
        self.formatter.user(f"The question is about the keyword {obs.keyword} in the category {obs.category}. Give yes-or-no answer and surround your answer with double asterisks, like **yes** or **no**.")
        self.formatter.start_model_turn()

    def _parse_response(self, response: str, obs: dict):
        answer = self._parse_keyword(response)
        return 'yes' if 'yes' in answer else 'no'


# Agent Creation
system_prompt = "You are an AI assistant designed to play the 20 Questions game. In this game, the Answerer thinks of a keyword and responds to yes-or-no questions by the Questioner. The keyword is a specific places or things."

few_shot_examples = [
#     "Let's play 20 Questions. You are playing the role of the Questioner. Please ask your first question.",
#     "Is it a person?", "no",
#     "Is it a place?", "yes",
#     "Is it a country?", "yes Now guess the keyword.",
#     "**France**", "Correct!",
    "Let's play 20 Questions. You are playing the role of the Questioner. Please ask your first question. The keyword is a specific places or things.",
    "Is it a things?", "no",
    "Is it a city?", "yes",
    "Is it in the Europe?", "yes Now guess the keyword.",
    "**London**", "Wrong!",
    "Does it start with the letter P?", "yes Now guess the keyword.",
    "**Paris**", "Correct!",
    "Let's play 20 Questions. You are playing the role of the Questioner. Please ask your first question. The keyword is a specific places or things.",
    "Is it a things?", "yes",
    "Is it in the room?", "yes",
    "Is it used for study?", "no Now guess the keyword.",
    "**closet**", "Wrong!",
    "Is it related to cleaning up the room?", "yes Now guess the keyword.",
    "**dustbin**", "Correct!",
]


# **IMPORTANT:** Define agent as a global so you only have to load
# the agent you need. Loading both will likely lead to OOM.
agent = None


def get_agent(name: str):
    global agent
    
    if agent is None and name == 'questioner':
        agent = LlammaQuestionerAgent(
            device='cuda:0',
            system_prompt=system_prompt,
            few_shot_examples=few_shot_examples,
        )
    elif agent is None and name == 'answerer':
        agent = LlammaAnswererAgent(
            device='cuda:0',
            system_prompt=system_prompt,
            few_shot_examples=few_shot_examples,
        )
    assert agent is not None, "Agent not initialized."

    return agent


def agent_fn(obs, cfg):
    if obs.turnType == "ask":
        if len(obs.questions) == 0:
            response = 'Is it a place?'
        else:
            response = get_agent('questioner')(obs)
    elif obs.turnType == "guess":
        for i in range(5):
            responses = []
            response = get_agent('questioner')(obs, temperature=0.1, do_sample=True)
            responses.append(response)
        counter = Counter(responses)
        response = counter.most_common(1)[0][0]
        print(responses)
    elif obs.turnType == "answer":
        responses = []
        for i in range(5):
            response = get_agent('answerer')(obs, temperature=0.3, do_sample=True)
            responses.append(response)
        counter = Counter(responses)
        response = counter.most_common(1)[0][0]
        
    if response is None or len(response) <= 1:
        return "yes"
    else:
        return response

Writing submission/main.py


FileNotFoundError: [Errno 2] No such file or directory: 'submission/main.py'

In [None]:
!apt install pigz pv > /dev/null

In [None]:
!tar --use-compress-program='pigz --fast --recursive | pv' -cf submission.tar.gz --exclude='*.gguf'  -C /kaggle/working/submission . -C /kaggle/input/ llama-3/transformers/8b-chat-hf/1