# Controlling Generative Text Models
## Using `StoppingCriteria` from the HuggingFace `transformers` library

In [None]:
!pip3 -q install ipykernel jupyter
!pip3 -q install torch torchvision torchaudio transformers
!pip3 -q install packaging ninja
!pip -q install accelerate
!pip3 -q install protobuf
!pip3 -q install sentencepiece
!pip3 -q install bitsandbytes
!pip3 -q install scipy
!MAX_JOBS=4 pip -q install flash-attn --no-build-isolation

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from transformers import LlamaTokenizer, LlamaForCausalLM, MistralForCausalLM
import bitsandbytes, flash_attn

In [None]:
MODEL_CHECKPOINT = "teknium/OpenHermes-2.5-Mistral-7B"
DEVICE = "cuda" if torch.cuda.is_available() else "mps" if torch.mps.is_available() else "cpu"

tokenizer = LlamaTokenizer.from_pretrained(
    MODEL_CHECKPOINT,
  trust_remote_code=True
)

model = MistralForCausalLM.from_pretrained(
    MODEL_CHECKPOINT,
    torch_dtype=torch.float16,
    device_map="auto", #{'': 'cuda:0'},
    load_in_8bit=False,
    load_in_4bit=True,
    use_flash_attention_2=False,
    low_cpu_mem_usage=True
)

In [None]:
prompts = [
    """<|im_start|>system
You are a sentient, superintelligent artificial general intelligence, here to teach and assist me.<|im_end|>
<|im_start|>user
How would you explain the concept of democracy to a 10-year-old?<|im_end|>
<|im_start|>assistant""",

    """<|im_start|>system
You are a sentient, superintelligent artificial general intelligence, here to teach and assist me.<|im_end|>
<|im_start|>user
If a train travels at 60 miles per hour and has to cover a distance of 120 miles, how long will it take to reach its destination?<|im_end|>
<|im_start|>assistant""",
]

In [None]:
for chat in prompts:
    print(chat)
    input_ids = tokenizer(chat, return_tensors="pt").input_ids.to("cuda")
    generated_ids = model.generate(input_ids, max_new_tokens=750, temperature=0.8,
                                   repetition_penalty=1.1, do_sample=True, eos_token_id=tokenizer.eos_token_id)
    response = tokenizer.decode(
        generated_ids[0][input_ids.shape[-1]:], skip_special_tokens=True, clean_up_tokenization_space=True)
    print(response)

ReAct agent


In [None]:
SYSTEM_PROMPT = """Answer the following questions as best you can. You have access to the following tools:

convert_time: A function to convert a time string with format H:MM:SS to seconds

The way you use the tools is by specifying a json blob.
Specifically, this json should have a `action` key (with the name of the tool to use) and a `action_input` key (with the input to the tool going here).

The only values that should be in the "action" field are:
convert_time: A function to convert a time string with format H:MM:SS to seconds, args: {"time": {"type": "string"}}

The $JSON_BLOB should only contain a SINGLE action and MUST be formatted as markdown, do NOT return a list of multiple actions. Here is an example of a valid $JSON_BLOB:

```
{{
  "action": $TOOL_NAME,
  "action_input": $INPUT
}}
```
Make sure to have the $INPUT in the right format for the tool you are using, and do not put variable names as input if you can find the right values.

ALWAYS use the following format:

Question: the input question you must answer
Thought: you should always think about one action to take. Only one action at a time in this format:
Action:
```
$JSON_BLOB
```
Observation: the result of the action
... (this Thought/Action/Observation can repeat N times, you should take several steps when needed. The $JSON_BLOB must be formatted as markdown and only use a SINGLE action at a time.)

You must always end your output with the following format:

Thought: I now know the final answer
Final Answer: the final answer to the original input question

Now begin! Reminder to ALWAYS use the exact characters `Final Answer:` when you provide a definitive answer. """

In [None]:
prompt = f"""<|im_start|>system
    {SYSTEM_PROMPT}
   <|im_end|>
   <|im_start|>user
   How many seconds are in 1:23:45 ?
   <|im_end|>
   <|im_start|>assistant

    """
print(prompt)

In [None]:
# print(prompt)
input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to("cuda")
print(input_ids.shape)

In [None]:
generated_ids = model.generate(input_ids, max_new_tokens=750, temperature=0.8,
                               repetition_penalty=1.1, do_sample=True, eos_token_id=tokenizer.eos_token_id)
print(generated_ids.shape)

In [None]:
generated_ids[0, -1].item()

In [None]:


# generated_ids = model.generate(input_ids, max_new_tokens=750, temperature=0.8, repetition_penalty=1.1, do_sample=True, eos_token_id=tokenizer.eos_token_id)
response = tokenizer.decode(
    generated_ids[0][input_ids.shape[-1]:], skip_special_tokens=False, clean_up_tokenization_space=True)
print(response)

In [None]:
from transformers import StoppingCriteria, TextStreamer
import re



class RegexStoppingCriteria(StoppingCriteria):
    def __init__(self, stop_expression, prompt, tokenizer):
        self.regex = re.compile(stop_expression)
        self.generated_text = ''
        self.tokenizer = tokenizer

    def __call__(self, input_ids, scores, **kwargs):
        next_token = input_ids[0, -1].item()
        self.generated_text += self.tokenizer.decode(
            [next_token], skip_special_tokens=True, clean_up_tokenization_space=True)
        return bool(self.regex.search(self.generated_text))

    def __len__(self):
        return 1

    def __iter__(self):
        yield self

In [None]:
encoded_input = tokenizer(prompt, return_tensors='pt')
input_ids = encoded_input['input_ids'].cuda()
streamer = TextStreamer(tokenizer=tokenizer, skip_prompt=True)
_ = model.generate(
    input_ids,
    streamer=streamer,
    pad_token_id=tokenizer.eos_token_id,
    do_sample=True,
    temperature=0.8,
    max_new_tokens=750,
    repetition_penalty=1.1,
    stopping_criteria=RegexStoppingCriteria(
        "Observation:", prompt, tokenizer=tokenizer)
)