## Section 1: Basic Decoding Algorithms

In [None]:
pip install transformers datasets evaluate markdownify

In [None]:
"""set device and random seeds"""

######################################################
#  The following helper functions are given to you.
######################################################

from tqdm.notebook import tqdm
import torch
import torch.nn.functional as F

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'device: {device}')

def set_seed(seed=19260817):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed()

In [None]:
"""load datasets"""

######################################################
#  The following helper code is given to you.
######################################################

from datasets import load_dataset

dataset = load_dataset('Ximing/ROCStories')
train_data, dev_data, test_data = dataset['train'], dataset['validation'], dataset['test']

print(train_data[0])

In [None]:
"""prepare evaluation"""

######################################################
#  The following helper code is given to you.
######################################################

from evaluate import load
from transformers import RobertaForSequenceClassification, RobertaTokenizer

perplexity_scorer = load("perplexity", module_type="metric")
cola_model_name = "textattack/roberta-base-CoLA"
cola_tokenizer = RobertaTokenizer.from_pretrained(cola_model_name)
cola_model = RobertaForSequenceClassification.from_pretrained(cola_model_name).to(device)

def batchify(data, batch_size):
    assert batch_size > 0

    batch = []
    for item in data:
        # Yield next batch
        if len(batch) == batch_size:
            yield batch
            batch = []

        batch.append(item)

    # Yield last un-filled batch
    if len(batch) != 0:
        yield batch

In [None]:
"""set up evaluation metric"""

######################################################
#  The following helper code is given to you.
######################################################

def compute_perplexity(texts, model='gpt2', batch_size=8):
    score = perplexity_scorer.compute(predictions=texts, add_start_token=True, batch_size=batch_size, model_id=model)
    return score['mean_perplexity']


def compute_fluency(texts, batch_size=8):
  scores = []
  for b_texts in batchify(texts, batch_size):
    inputs = cola_tokenizer(texts, padding=True, truncation=True, return_tensors="pt").to(device)
    with torch.no_grad():
      logits = cola_model(**inputs).logits
      probs = logits.softmax(dim=-1)
      scores.extend(probs[:, 1].tolist())
  return sum(scores) / len(scores)


def compute_diversity(texts):
    unigrams, bigrams, trigrams = [], [], []
    total_words = 0
    for gen in texts:
        o = gen.split(' ')
        total_words += len(o)
        for i in range(len(o)):
            unigrams.append(o[i])
        for i in range(len(o) - 1):
            bigrams.append(o[i] + '_' + o[i + 1])
        for i in range(len(o) - 2):
            trigrams.append(o[i] + '_' + o[i + 1] + '_' + o[i + 2])
    return len(set(unigrams)) / len(unigrams), len(set(bigrams)) / len(bigrams), len(set(trigrams)) / len(trigrams)


def evaluate(generations, experiment):
  generations = [_ for _ in generations if _ != '']
  perplexity = compute_perplexity(generations)
  fluency = compute_fluency(generations)
  diversity = compute_diversity(generations)
  print(experiment)
  print(f'perplexity = {perplexity:.2f}')
  print(f'fluency = {fluency:.2f}')
  print(f'diversity = {diversity[0]:.2f}, {diversity[1]:.2f}, {diversity[2]:.2f}')
  print()

debug_sents = ["This restaurant is awesome", "My dog is cute and I love it.", "Today is sunny."]
evaluate(debug_sents, 'debugging run')

In [None]:
"""load model and tokenizer"""

######################################################
#  The following helper code is given to you.
######################################################

from transformers import GPT2LMHeadModel, GPT2Tokenizer

model_name = 'gpt2'
tokenizer = GPT2Tokenizer.from_pretrained(model_name, pad_token="<|endoftext|>")
model = GPT2LMHeadModel.from_pretrained(model_name).to(device)
model.eval()

In this section, you will implement a few basic decoding algorithms:
1. Greedy decoding
2. Vanilla sampling
3. Temperature sampling
4. Top-k sampling
5. Top-p sampling

We have provided a wrapper function `decode()` that takes care of batching, controlling max length, and handling the EOS token.
You will be asked to implement the core function of each method: *given the pre-softmax logits of the next token, decide what the next token is.*

**The wrapper calls the core function of each decoding algorithm, which you will implement in the subsections below.**

In [None]:
"""decode main wrapper function"""

######################################################
#  The following helper code is given to you.
######################################################

def decode(prompts, max_len, method, **kwargs):
  encodings_dict = tokenizer(prompts, return_tensors="pt", padding=True)
  input_ids = encodings_dict['input_ids'].to(device)
  attention_mask = encodings_dict['attention_mask'].to(device)

  model_kwargs = {'attention_mask': attention_mask, "use_cache": False}
  batch_size, input_seq_len = input_ids.shape

  unfinished_sequences = torch.ones(batch_size, dtype=torch.long, device=device)

  for step in range(max_len):
    model_inputs = model.prepare_inputs_for_generation(input_ids, **model_kwargs)
    cache_position = torch.arange(0, input_ids.shape[1], dtype=torch.long, device=input_ids.device)
    model_kwargs["cache_position"] = cache_position
    with torch.no_grad():
      outputs = model(**model_inputs, return_dict=True, output_attentions=False, output_hidden_states=False)

    if step == 0:
      last_non_masked_idx = torch.sum(attention_mask, dim=1) - 1
      next_token_logits = outputs.logits[range(batch_size), last_non_masked_idx, :]
    else:
      next_token_logits = outputs.logits[:, -1, :]

    log_prob = F.log_softmax(next_token_logits, dim=-1)

    if method == 'greedy':
      next_tokens = greedy(next_token_logits)
    elif method == 'sample':
      next_tokens = sample(next_token_logits)
    elif method == 'temperature':
      next_tokens = temperature(next_token_logits, t=kwargs.get('t', 0.8))
    elif method == 'topk':
      next_tokens = topk(next_token_logits, k=kwargs.get('k', 20))
    elif method == 'topp':
      next_tokens = topp(next_token_logits, p=kwargs.get('p', 0.7))

    # finished sentences should have their next token be a padding token
    next_tokens = next_tokens * unfinished_sequences + tokenizer.pad_token_id * (1 - unfinished_sequences)

    input_ids = torch.cat([input_ids, next_tokens[:, None]], dim=-1)
    model_kwargs = model._update_model_kwargs_for_generation(outputs, model_kwargs, is_encoder_decoder=model.config.is_encoder_decoder)

    # if eos_token was found in one sentence, set sentence to finished
    unfinished_sequences = unfinished_sequences.mul((next_tokens != tokenizer.eos_token_id).long())

    if unfinished_sequences.max() == 0:
      break

  response_ids = input_ids[:, input_seq_len:]
  response_text = [tokenizer.decode(output, skip_special_tokens=True, clean_up_tokenization_spaces=True) for output in response_ids]

  return response_text

In [None]:
"""debug helper code"""

######################################################
#  The following helper code is given to you.
######################################################

# For debugging, we duplicate a single prompt 10 times so that we obtain 10 generations for the same prompt
dev_prompts = [dev_data[0]['prompt']] * 10

def print_generations(prompts, generations):
  for prompt, generation in zip(prompts, generations):
    print(f'{[prompt]} ==> {[generation]}')

### 1.1 Greedy Decoding

In [None]:
def greedy(next_token_logits):
  '''
  inputs:
  - next_token_logits: Tensor(size = (B, V), dtype = float)
  outputs:
  - next_tokens: Tensor(size = (B), dtype = long)
  '''

  # TODO: compute `next_tokens` from `next_token_logits`.
  # Hint: use torch.argmax()
  next_tokens = torch.argmax(next_token_logits, dim=1)

  return next_tokens


generations = decode(dev_prompts, max_len=20, method='greedy')
print_generations(dev_prompts, generations)

### 1.2 Vanilla Sampling and Temperature Sampling

In [None]:
def sample(next_token_logits):
  '''
  inputs:
  - next_token_logits: Tensor(size = (B, V), dtype = float)
  outputs:
  - next_tokens: Tensor(size = (B), dtype = long)
  '''

  # TODO: compute the probabilities `probs` from the logits.
  # Hint: `probs` should have size (B, V)
  probs = torch.softmax(next_token_logits, dim=1)

  # TODO: compute `next_tokens` from `probs`.
  # Hint: use torch.multinomial()
  next_tokens = torch.multinomial(probs, num_samples=1).squeeze(1)

  return next_tokens


set_seed()
generations = decode(dev_prompts, max_len=20, method='sample')
print_generations(dev_prompts, generations)

In [None]:
def temperature(next_token_logits, t):
  '''
  inputs:
  - next_token_logits: Tensor(size = (B, V), dtype = float)
  - t: float
  outputs:
  - next_tokens: Tensor(size = (B), dtype = long)
  '''

  # TODO: compute the probabilities `probs` from the logits, with temperature applied.
  probs = torch.softmax(next_token_logits / t, dim=1)

  # TODO: compute `next_tokens` from `probs`.
  next_tokens = torch.multinomial(probs, num_samples=1).squeeze(1)

  return next_tokens


set_seed()
generations = decode(dev_prompts, max_len=20, method='temperature', t=0.8)
print_generations(dev_prompts, generations)

### 1.3 Top-k Sampling

In [None]:
def topk(next_token_logits, k):
  '''
  inputs:
  - next_token_logits: Tensor(size = (B, V), dtype = float)
  - k: int
  outputs:
  - next_tokens: Tensor(size = (B), dtype = long)
  '''

  # TODO: Keep only top-k tokens with highest probabilities.
  # Hint: use torch.topk()
  topk_logits, topk_indices = torch.topk(next_token_logits, k, dim=1)

  # Create a mask to zero out all logits not in top-k
  indices_to_remove = next_token_logits < topk_logits[:, -1].unsqueeze(1)

  # Mask the logits
  next_token_logits[indices_to_remove] = float('-inf')

  # TODO: Sample from the masked logits
  probs = torch.softmax(next_token_logits, dim=1)
  next_tokens = torch.multinomial(probs, num_samples=1).squeeze(1)

  return next_tokens


set_seed()
generations = decode(dev_prompts, max_len=20, method='topk', k=20)
print_generations(dev_prompts, generations)

### 1.4 Top-p Sampling

In [None]:
def topp(next_token_logits, p):
  '''
  inputs:
  - next_token_logits: Tensor(size = (B, V), dtype = float)
  - p: float
  outputs:
  - next_tokens: Tensor(size = (B), dtype = long)
  '''

  # TODO: Sort the logits in descending order, and compute
  # the cumulative probabilities `cum_probs` on the sorted logits
  sorted_logits, sorted_indices = torch.sort(next_token_logits, dim=1, descending=True)
  sorted_probs = torch.softmax(sorted_logits, dim=1)
  cum_probs = torch.cumsum(sorted_probs, dim=1)

  # Create a mask to zero out all logits not in top-p
  sorted_indices_to_remove = cum_probs > p
  sorted_indices_to_remove[:, 1:] = sorted_indices_to_remove[:, :-1].clone()
  sorted_indices_to_remove[:, 0] = 0
  # Restore mask to original indices
  indices_to_remove = sorted_indices_to_remove.scatter(dim=1, index=sorted_indices, src=sorted_indices_to_remove)

  # Mask the logits
  next_token_logits[indices_to_remove] = float('-inf')

  # TODO: Sample from the masked logits
  probs = torch.softmax(next_token_logits, dim=1)
  next_tokens = torch.multinomial(probs, num_samples=1).squeeze(1)

  return next_tokens


set_seed()
generations = decode(dev_prompts, max_len=20, method='topp', p=0.7)
print_generations(dev_prompts, generations)

### 1.5: Evaluation

Run the following cell to obtain the evaluation results, which you should include in your writeup.
Also don't forget to answer the questions.

In [None]:
prompts = [item['prompt'] for item in test_data][:10]
GENERATIONS_PER_PROMPT = 10
MAX_LEN = 100

for experiment in ['greedy', 'sample', 'temperature', 'topk', 'topp']:
  generations = []
  for prompt in tqdm(prompts):
    generations += decode([prompt] * GENERATIONS_PER_PROMPT, max_len=MAX_LEN, method=experiment)
  evaluate(generations, experiment)

### Discussion
- Q1.1: In greedy decoding, what do you observe when generating 10 times from the test prompt?
    - All 10 generations are exactly identical. This is because the model consistently selects the token with the highest probability at each step without introducing any randomness.

- Q1.2: In vanilla sampling, what do you observe when generating 10 times from the test prompt?
    - The 10 generations are highly diverse, with each output being different in content, structure and even coherence. It seems to me that most of the continuations bear little meaningful connection to the given prompt.

- Q1.3: In temperature sampling, play around with the value of temperature $t$. Which value of $t$ makes it equivalent to greedy decoding? Which value of $t$ makes it equivalent to vanilla sampling?
    - When $t\to 0$, the probability distribution becomes really sharp and concentrates nearly all the probability mass on the highest logit. In this case, the sampling almost always selects the top token, which behaves like **greedy encoding**.
    - When $t=1$, the raw probabilities are not scaled, which makes temperature sampling behave identically to **vanilla sampling.**

- Q1.4: In top-$p$ sampling, play around with the value of $p$. Which value of $p$ makes it equivalent to greedy decoding? Which value of $p$ makes it equivalent to vanilla sampling?
    - When $p\to 0$, only the single probable token is selected (we can observe from the code that ``sorted_indices_to_remove[:, 0] = 0`` ensures that the token with highest probability is never removed), which makes top-p sampling equivalent to **greedy decoding**.
    - When $p=1$, the full probability mass is retained, thus the sampling behaves equivalently to **vanilla sampling**.

- Q1.5: In top-k sampling, play around with the value of k. Which value of k makes it equivalent to greedy decoding? Which value of $k$ makes it equivalent to vanilla sampling?
    - When $k=1$, the model always selects the token with the highest probability. There is no randomness left and it behaves exactly like **greedy decoding**.
    - When $k=|V|=50257$, the model retains all tokens for sampling. This is equivalent to **vanilla sampling**.

- Q1.6: Report the evaluation metrics (perplexity, fluency, diversity) of all 5 decoding methods. Which methods have the best and worst perplexity? Fluency? Diversity?

I organized the evaluation metrics into a table.

|Method|Preplexity|Fluency|Diversity (1,2,3-gram)|
|---|---|---|---|
|Greedy|2.08|0.78|0.01, 0.02, 0.03|
|Sample|70.58|0.34|0.43, 0.90, 0.99|
|Temperature|16.61|0.68|0.33, 0.79, 0.95|
|Top-k|12.24|0.74|0.26, 0.75, 0.96|
|Top-p|12.40|0.72|0.29, 0.76, 0.96|


* **Perplexity**: Greedy decoding has the best perplexity (2.08), and vanilla sampling has the worst (70.58)
* **Fluency**: Greedy decoding has the best fluency (0.78), and vanilla sampling has the worst (0.34)
* **Diversity**: Vanilla sampling has the best diversity (0.43, 0.90, 0.99), and greedy decoding has the worst (0.01, 0.02, 0.03)

**Greedy decoding** achieves the best perplexity and fluency but produces almost no diversity. **Vanilla sampling** offers the highest diversity but suffers from poor perplexity and fluency.

**Temperature**, **top-k**, and **top-p sampling** strike a better balance, with top-k slightly outperforming the others in terms of fluency and perplexity while maintaining good diversity.

## Section 2: Applying Large Language Models to Few Shot Math Reasoning

In [None]:
pip install -q vllm bitblas # restart runtime session after install requirements

In [None]:
import os
os.environ["HF_HOME"] = "/content/.cache/huggingface" # set the cache directory to personal disk to avoid frequent downloads

In [None]:
from vllm import LLM, SamplingParams
model_id = "Qwen/Qwen2.5-7B-Instruct-GPTQ-Int4"
llm = LLM(model=model_id, enforce_eager=True, quantization="gptq")

In [None]:
######################################################
#  The following helper code is given to you.
######################################################

from openai import OpenAI
from transformers import AutoTokenizer

class VLLMClient:
    def __init__(self, model_id, **kwargs):
      self.model_id = model_id

    def __call__(self, prompt: str, **kwargs):
      response = llm.generate(
            prompts=prompt,
            sampling_params=SamplingParams(
                temperature=kwargs.get("temperature", 0.2),
                max_tokens=kwargs.get("max_tokens", 256),
                stop=["###"]
            )
      )
      return response[0].outputs[0].text
model = VLLMClient(model_id)
model("San Francisco is a", max_tokens=42)

In [None]:
######################################################
#  The following helper code is given to you.
######################################################

GSM_EXAMPLARS = [
    {
        "question": "There are 15 trees in the grove. Grove workers will plant trees in the grove today. After they are done, there will be 21 trees. How many trees did the grove workers plant today?",
        "cot_answer": "There are 15 trees originally. Then there were 21 trees after some more were planted. So there must have been 21 - 15 = 6. So the answer is 6.",
        "short_answer": "6"
    },
    {
        "question": "If there are 3 cars in the parking lot and 2 more cars arrive, how many cars are in the parking lot?",
        "cot_answer": "There are originally 3 cars. 2 more cars arrive. 3 + 2 = 5. So the answer is 5.",
        "short_answer": "5"
    },
    {
        "question": "Leah had 32 chocolates and her sister had 42. If they ate 35, how many pieces do they have left in total?",
        "cot_answer": "Originally, Leah had 32 chocolates. Her sister had 42. So in total they had 32 + 42 = 74. After eating 35, they had 74 - 35 = 39. So the answer is 39.",
        "short_answer": "39"
    },
    {
        "question": "Jason had 20 lollipops. He gave Denny some lollipops. Now Jason has 12 lollipops. How many lollipops did Jason give to Denny?",
        "cot_answer": "Jason started with 20 lollipops. Then he had 12 after giving some to Denny. So he gave Denny 20 - 12 = 8. So the answer is 8.",
        "short_answer": "8"
    },
    {
        "question": "Shawn has five toys. For Christmas, he got two toys each from his mom and dad. How many toys does he have now?",
        "cot_answer": "Shawn started with 5 toys. If he got 2 toys each from his mom and dad, then that is 4 more toys. 5 + 4 = 9. So the answer is 9.",
        "short_answer": "9"
    },
    {
        "question": "There were nine computers in the server room. Five more computers were installed each day, from monday to thursday. How many computers are now in the server room?",
        "cot_answer": "There were originally 9 computers. For each of 4 days, 5 more computers were added. So 5 * 4 = 20 computers were added. 9 + 20 is 29. So the answer is 29.",
        "short_answer": "29"
    },
    {
        "question": "Michael had 58 golf balls. On tuesday, he lost 23 golf balls. On wednesday, he lost 2 more. How many golf balls did he have at the end of wednesday?",
        "cot_answer": "Michael started with 58 golf balls. After losing 23 on tuesday, he had 58 - 23 = 35. After losing 2 more, he had 35 - 2 = 33 golf balls. So the answer is 33.",
        "short_answer": "33"
    },
    {
        "question": "Olivia has $23. She bought five bagels for $3 each. How much money does she have left?",
        "cot_answer": "Olivia had 23 dollars. 5 bagels for 3 dollars each will be 5 x 3 = 15 dollars. So she has 23 - 15 dollars left. 23 - 15 is 8. So the answer is 8.",
        "short_answer": "8"
    }
]

In [None]:
!mkdir -p data
!wget -q -O data/gsm8k.jsonl https://raw.githubusercontent.com/ranpox/comp3361-spring2025/refs/heads/main/assignments/A3/data/gsm8k.jsonl
!wget -q -O data/simpleqa.jsonl https://raw.githubusercontent.com/ranpox/comp3361-spring2025/refs/heads/main/assignments/A3/data/simpleqa.jsonl
!wget -q -O data/math.jsonl https://raw.githubusercontent.com/ranpox/comp3361-spring2025/refs/heads/main/assignments/A3/data/math.jsonl
!wget -q -O data/gaia.jsonl https://raw.githubusercontent.com/ranpox/comp3361-spring2025/refs/heads/main/assignments/A3/data/gaia.jsonl

In [None]:
######################################################
#  The following helper code is given to you.
######################################################

import json

def load_eval_data(task):
    with open(f"data/{task}.jsonl", "r") as f:
        return [json.loads(line) for line in f]

tasks = ["gsm8k", "simpleqa", "math", "gaia"]

for task in tasks:
    print(f"Example of {task} dataset:")
    print(json.dumps(load_eval_data(task)[0], indent=4))

In [None]:
######################################################
#  The following helper code is given to you.
######################################################

import os
import datetime
import threading
from pathlib import Path
from tqdm.notebook import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

APPEND_ANSWER_LOCK = threading.Lock()

def append_answer(entry: dict, jsonl_file: str) -> None:
    jsonl_file = Path(jsonl_file)
    jsonl_file.parent.mkdir(parents=True, exist_ok=True)
    with APPEND_ANSWER_LOCK, open(jsonl_file, "a", encoding="utf-8") as fp:
        fp.write(json.dumps(entry) + "\n")
    assert os.path.exists(jsonl_file), "File not found!"

def answer_single_question(example, agent, answers_file, action_type):
    augmented_question = example["question"]
    if example["source"] == "SimpleQA":
        augmented_question += " Answer with only the final number."
    if example["source"] == "MATH":
        augmented_question += " Write code, not latex."

    start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    answer = str(agent.run(augmented_question))

    end_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    annotated_example = {
        "model_id": model.model_id,
        "agent_action_type": action_type,
        "question": augmented_question,
        "original_question": example["question"],
        "answer": answer,
        "true_answer": example["true_answer"],
        "source": example["source"],
        "start_time": start_time,
        "end_time": end_time,
    }
    append_answer(annotated_example, answers_file)

def answer_questions(
    task,
    agent,
    action_type: str = "vanilla",
    answers_file: str = None,
    parallel_workers: int = 4,
):
    eval_data = load_eval_data(task)

    print(f"Starting processing and writing output to '{answers_file}'")

    answered_questions = []
    if os.path.exists(answers_file):
        with open(answers_file, "r") as f:
            for line in f:
                answered_questions.append(json.loads(line)["original_question"])

    examples_todo = [example for example in eval_data if example["question"] not in answered_questions]


    for i, example in enumerate(tqdm(examples_todo)):
        answer_single_question(example, agent, answers_file, action_type)

In [None]:
!wget -q -O eval_utils.py https://raw.githubusercontent.com/ranpox/comp3361-spring2025/refs/heads/main/assignments/A3/evaluate.py

In [None]:
from eval_utils import score_answers, extract_numbers

class FewShotReasoner:
    def __init__(self, model, n_shots):
        self.model = model
        self.n_shots = n_shots

    def run(self, task):
        prompt = self.build_input(task)
        raw_output = self.model(prompt, max_tokens=64)
        return extract_numbers(raw_output.strip())

    def build_input(self, task):
        prompt = "Answer the following questions.\n"
        for example in GSM_EXAMPLARS[:self.n_shots]:
            prompt += f"Question: {example['question']}\nAnswer: {example['short_answer']}\n###\n"
        prompt += f"Question: {task}\nAnswer: "
        return prompt



def run_gsm8k_fewshot_reasoner(task="gsm8k", model_id=model_id, action_type="vanilla"):
    reasoner_answers_file = f"output/{model_id.replace('/', '__')}__{action_type}__{task}.jsonl"
    chat_agent = FewShotReasoner(model, 8)
    answer_questions(task, chat_agent, action_type, reasoner_answers_file)
    df = score_answers([reasoner_answers_file])
    print(df)

run_gsm8k_fewshot_reasoner()

In [None]:
from eval_utils import extract_numbers

class FewShotCoTReasoner:
    def __init__(self, model, n_shots):
        self.model = model
        self.n_shots = n_shots

    def run(self, task):
        prompt = self.build_input(task)
        cot_output = self.model(prompt)
        return extract_numbers(cot_output.strip())[-1]

    def build_input(self, task):
        prompt = "Answer the following grade-school math word problems step-by-step. Show intermediate calculations, and be careful with percentages, time units, discounts, and compound conditions. \n"
        for example in GSM_EXAMPLARS[:self.n_shots]:
            prompt += f"Question: {example['question']}\nAnswer: {example['cot_answer']}\n###\n"
        prompt += f"Question: {task}\nAnswer: "
        return prompt


def run_gsm8k_fewshot_cot_reasoner(task="gsm8k", model_id=model_id, action_type="vanilla"):
    reasoner_answers_file = f"output/{model_id.replace('/', '__')}__{action_type}__{task}__cot.jsonl"
    chat_agent = FewShotCoTReasoner(model, 8)
    answer_questions(task, chat_agent, action_type, reasoner_answers_file)
    df = score_answers([reasoner_answers_file])
    print(df)

run_gsm8k_fewshot_cot_reasoner()

## Section 3:Building and Evaluating LLM Agents

In [None]:
######################################################
#  The following helper code is given to you.
######################################################

from transformers import AutoTokenizer
class ChatAgent:
    def __init__(self, model):
        self.model = model
        self.tokenizer = AutoTokenizer.from_pretrained(model.model_id)

    def run(self, task):
        prompt = self.build_input(task)
        return self.model(prompt=prompt)

    def build_input(self, task):
        messages = [{"role": "user", "content": task}]
        return self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)

action_type="vanilla"
chat_agent_answers_files = {}
for task in ["simpleqa", "math", "gaia"]:
    chat_agent_answers_file = f"output/{model_id.replace('/', '__')}__{action_type}__{task}.jsonl"
    chat_agent = ChatAgent(model)
    answer_questions(task, chat_agent, action_type, chat_agent_answers_file, parallel_workers=8)
    chat_agent_answers_files[task] = chat_agent_answers_file
score_answers(chat_agent_answers_files.values())

In [None]:
######################################################
#  The following helper code is given to you.
######################################################

from io import StringIO
import sys
import inspect
from typing import Any, Dict, Union, Optional, Tuple, List

class Tool:
    name: str
    description: str
    inputs: Dict[str, Dict[str, Union[str, type, bool]]]
    output_type: str

    def __init__(self, *args, **kwargs):
        self.is_initialized = False

    def forward(self, *args, **kwargs):
        return NotImplementedError("Write this method in your subclass of `Tool`.")

    def __call__(self, *args, **kwargs):
        outputs = self.forward(*args, **kwargs)
        return outputs

    def to_dict(self) -> dict:
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": {
                    "type": "object",
                    "properties": self.inputs,
                    "required": list(self.inputs.keys()),
                },
            },
            "strict": True
        }


In [None]:
######################################################
#  The following helper code is given to you.
######################################################

from typing import Any

class FinalAnswerTool(Tool):
    name = "final_answer"
    description = "Use this tool to return your final answer to the user. Only use when you have high confidence in your answer."
    inputs = {"answer": {"type": "any", "description": "The final answer to the problem"}}
    output_type = "any"

    def forward(self, answer: Any) -> Any:
        return answer

In [None]:
######################################################
#  The following helper code is given to you.
######################################################

final_answer_tool = FinalAnswerTool()
final_answer_tool("Hello, world!")

tools = [
    final_answer_tool.to_dict()
]

messages = [
    {"role": "user", "content": "What year was the municipality of Ramiriqu\u00ed, Boyac\u00e1, Colombia, founded?  Answer with only the final number."}
]
tokenizer = AutoTokenizer.from_pretrained(model.model_id)
print("="*10 + "Input Prompt" + "="*10 + "\n")
input_prompt = tokenizer.apply_chat_template(messages, tools=tools, tokenize=False, add_generation_prompt=True)
print("\n\n" + input_prompt)
print("="*10 + "Response" + "="*10 + "\n")
response = model(input_prompt)
print("\n\n" + response)

In [None]:
class GoogleSearchTool(Tool):
    name = "web_search"
    description = """Search the web for current or factual information using a query. Use this when you do not know the answer or need updated information."""
    inputs = {
        "query": {"type": "string", "description": "The search query to perform."},
    }
    output_type = "string"

    def __init__(self, provider: str = "serper"):
        super().__init__()
        self.provider = provider
        if provider != "serper":
            raise ValueError(f"Unsupported provider: {provider}")

        self.api_key = os.getenv("SERPER_API_KEY")
        if not self.api_key:
            raise EnvironmentError("SERPER_API_KEY environment variable is not set.")

    def forward(self, query: str) -> str:
        import requests

        # Register Google Search Api through https://serper.dev/
        # Use the organic key from search results to build this tool.

        url = "https://google.serper.dev/search"
        headers = {
            "X-API-KEY": self.api_key,
            "Content-Type": "application/json"
        }
        payload = {
            "q": query
        }

        response = requests.post(url, headers=headers, json=payload)

        if response.status_code != 200:
            raise RuntimeError(f"Serper API request failed: {response.status_code} - {response.text}")

        data = response.json()
        results = data.get("organic", [])

        if not results:
            return "No search results found."

        formatted_results = []
        for i, result in enumerate(results[:3], 1):
            snippet = result.get("snippet", "")
            link = result.get("link", "")
            formatted_results.append(f" {i}. Link. {link} About. {snippet}")

        return "||".join(formatted_results)

In [None]:
import os
os.environ["SERPER_API_KEY"] = "dbcf96fefaca8c2235ab8e1d65651e7211ac9b41" # remember to remove your key for submission
google_search_tool = GoogleSearchTool()
google_search_tool("Space Exploration Technologies Corp.")

In [None]:
pip install markdownify

In [None]:
class VisitWebpageTool(Tool):
    name = "visit_webpage"
    description = (
        "Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages."
    )
    inputs = {
        "url": {
            "type": "string",
            "description": "The url of the webpage to visit.",
        }
    }
    output_type = "string"

    def __init__(self, max_output_length: int = 10000):
        super().__init__()
        self.max_output_length = max_output_length

    def forward(self, url: str) -> str:
        try:
            import re
            import requests
            from markdownify import markdownify
            from requests.exceptions import RequestException
        except ImportError as e:
            raise ImportError(
                "You must install packages `markdownify` and `requests` to run this tool."
            ) from e

        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()

            html = response.text
            markdown_content = markdownify(html)

            # Optional: normalize excessive line breaks
            cleaned = re.sub(r'\n{3,}', '\n\n', markdown_content.strip())

            return cleaned[:self.max_output_length]

        except requests.exceptions.Timeout:
            return "The request timed out. Please try again later or check the URL."
        except RequestException as e:
            return f"Error fetching the webpage: {str(e)}"
        except Exception as e:
            return f"An unexpected error occurred: {str(e)}"

In [None]:
visit_page_tool = VisitWebpageTool()
visit_page_tool("https://www.spacex.com/")

In [None]:
import re
import json
from typing import List, Dict, Any, Callable, Optional

system_prompt = """
You are an expert assistant who can solve any task using tool calls. You will be given a task to solve as best you can.
To do so, you have been given access to some tools. 

You need to think step-by-step. Explain your reasoning before using a tool.

Here are the rules you should always follow to solve your task:
1. Always call a tool when solving a task. Wrap the tool call inside <tool_call>...</tool_call>.
2. Use actual values in tool calls—don’t use variable names.
3. Only call a tool if you really need it. If you can answer without it, do that instead.
4. Never repeat a tool call with the same input.
5. After doing a web search, only visit the URLs you get from the search. Don’t make up or guess any links."""


class ToolCallAgent:
    """
    A minimal agent that uses a language model to interact with tools
    based on the ToolCallingAgent pattern.
    """
    def __init__(
        self,
        model: Callable,
        tools: List[Tool],
        max_steps: int = 5,
        final_answer_tool_name: str = "final_answer",
        print_log: bool = True
    ):
        self.model = model
        self.tokenizer = AutoTokenizer.from_pretrained(model.model_id)
        self.tools_definitions = [tool.to_dict() for tool in tools]
        self.tool_executors = {tool.name: tool for tool in tools}
        self.max_steps = max_steps
        self.final_answer_tool_name = final_answer_tool_name

        self.log_func = lambda x: print(x) if print_log else lambda _: None

        # Validate that the final answer tool has an executor
        if self.final_answer_tool_name not in self.tool_executors:
            raise ValueError(f"Executor for final answer tool '{self.final_answer_tool_name}' not found in tool_executors.")

    @staticmethod
    def fix_closing_tags(text: str) -> str:
        """A compromise, and I don't know why the model failed to append the closing tag"""
        result = []
        i = 0
        while i < len(text):
            start_tag = text.find("<tool_call>{", i)
            if start_tag == -1:
                result.append(text[i:])
                break
    
            result.append(text[i:start_tag])
            brace_start = text.find("{", start_tag)
            if brace_start == -1:
                result.append(text[start_tag:])
                break
    
            brace_count = 0
            end = brace_start
            while end < len(text):
                if text[end] == '{':
                    brace_count += 1
                elif text[end] == '}':
                    brace_count -= 1
                    if brace_count == 0:
                        break
                end += 1
    
            if brace_count != 0:
                result.append(text[start_tag:])
                break
    
            json_block = text[start_tag:end + 1]
            result.append(json_block)
    
            remainder = text[end + 1:]
            if not remainder.lstrip().startswith("</tool_call>"):
                result.append("</tool_call>")
                i = end + 1
            else:
                close_index = remainder.find("</tool_call>")
                result.append(remainder[:close_index + len("</tool_call>")])
                i = end + 1 + close_index + len("</tool_call>")
    
        return ''.join(result)
    
    def _execute_tool(self, tool_name: str, parsed_args: Any) -> str:
        executor = self.tool_executors[tool_name]
        try:
            # self.log_func(f"[INFO] Executing tool '{tool_name}' with args: {parsed_args}")
            tool_result = executor(**parsed_args)
            self.log_func(f"[INFO] Tool '{tool_name}' executed.")
            # The final_answer tool itself returns the value, we don't stringify it here yet
            return tool_result
        except Exception as e:
            print(f"[ERROR] Error executing tool '{tool_name}' with args {parsed_args}: {e}")
            # Return the error message as the observation
            return f"[ERROR] Error during execution of tool '{tool_name}': {e}"

    def _parse_tool_calls(self, content: str):
        """Try parse the tool calls."""
        matches = re.findall(r'<tool_call>(.*?)</tool_call>', content, re.DOTALL)
        tool_calls = []
        for match in matches:
            try:
                tool_call_data = json.loads(match.strip())
                tool_calls.append(tool_call_data)
            except json.JSONDecodeError as e:
                self.log_func(f"[ERROR] JSON decode failed on tool call: {e}")
        return tool_calls

    def run(self, task: str) -> Any:
        """
        Runs the agent loop starting with the initial messages.

        Args:
            task: The initial user prompt.

        Returns:
            The final answer extracted from the final_answer tool call,
            or None if max_steps is reached or an error occurs.
        """

        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": task}
        ]

        self.log_func(f"[INFO] Starting agent run with max_steps={self.max_steps}")
        # self.log_func("[INFO] Initial messages:")
        # self.log_func(json.dumps(messages, indent=2))
        self.log_func("-" * 30)

        for step in range(self.max_steps):
            self.log_func(f"----------- Step {step + 1} -----------")
            # self.log_func(json.dumps(messages, indent=2))

            # For guidance on handling and executing function calls, please refer to [this documentation](https://docs.sglang.ai/backend/function_calling.html).
            # self.log_func(f"[INFO] Current message at step {step + 1}: {messages}")
            current_prompt = self.tokenizer.apply_chat_template(
                messages,
                tools=self.tools_definitions,
                tokenize=False,
                add_generation_prompt=True
            )
            # self.log_func(f"[INFO] Current prompt at step {step + 1}: {current_prompt}")
            response = self.fix_closing_tags(model(current_prompt))
            self.log_func(f"[INFO] Model response at step {step + 1}: {response}")
            messages.append({"role": "assistant", "content": response})

            tool_calls = self._parse_tool_calls(response)
            if len(tool_calls) == 0:
                self.log_func("[INFO] No tool calls this round.")
                self.log_func("-" * 30)
                continue

            for tool_call in tool_calls:
                try:
                    tool_name = tool_call["name"]
                    parsed_args = tool_call.get("arguments", {})

                    result = self._execute_tool(tool_name, parsed_args)
                    # self.log_func(f"[INFO] Result from tools: {result}")
                    messages.append({
                        "role": "assistant",
                        "content": f"Tool {tool_name} has obtained the results {result}"
                    })

                    if tool_name == self.final_answer_tool_name:
                        self.log_func(f"[INFO] Final answer received: {result}")
                        return result
                except Exception as e:
                    self.log_func(f"[ERROR] Failed to execute tool call: {e}")
            
            self.log_func("-" * 30) # Separator for next step

        self.log_func("[INFO] Maximum steps reached without a final answer.")
        return None

In [None]:
######################################################
#  The following helper code is given to you.
######################################################

def run_simpleqa_tool_calling_agent(task="simpleqa", model_id=model_id, action_type="tool_calling"):
    search_agent_answers_file = f"output/{model_id.replace('/', '__')}__{action_type}__{task}.jsonl"
    search_agent = ToolCallAgent(
        model=model,
        tools=[GoogleSearchTool(), VisitWebpageTool(), FinalAnswerTool()],
        max_steps=10
    )
    answer_questions(task, search_agent, action_type, search_agent_answers_file)
    df = score_answers([search_agent_answers_file])
    print(df)

run_simpleqa_tool_calling_agent()

In [None]:
!wget -q -O local_python_executor.py https://raw.githubusercontent.com/ranpox/comp3361-spring2025/refs/heads/main/assignments/A3/local_python_executor.py

In [None]:
######################################################
#  The following helper code is given to you.
######################################################

from local_python_executor import (
    BASE_BUILTIN_MODULES,
    BASE_PYTHON_TOOLS,
    evaluate_python_code,
)


class PythonInterpreterTool(Tool):
    name = "python_interpreter"
    description = "This is a tool for evaluating python code. It can be used to perform calculations. To generate valid Python code, first draft your code in Markdown using ```python```, then write it as a one-line string in the `code` field. Make sure it is valid JSON using double quotes and escaped newlines."
    inputs = {
        "code": {
            "type": "string",
            "description": "The python code to run in interpreter",
        }
    }
    output_type = "string"

    def __init__(self, *args, authorized_imports=None, **kwargs):
        if authorized_imports is None:
            self.authorized_imports = list(set(BASE_BUILTIN_MODULES))
        else:
            self.authorized_imports = list(set(BASE_BUILTIN_MODULES) | set(authorized_imports))
        self.inputs = {
            "code": {
                "type": "string",
                "description": (
                    "The code snippet to evaluate. All variables used in this snippet must be defined in this same snippet, "
                    f"else you will get an error. This code can only import the following python libraries: {self.authorized_imports}."
                ),
            }
        }
        self.base_python_tools = BASE_PYTHON_TOOLS
        self.python_evaluator = evaluate_python_code
        super().__init__(*args, **kwargs)

    def forward(self, code: str) -> str:
        state = {}
        output = str(
            self.python_evaluator(
                code,
                state=state,
                static_tools=self.base_python_tools,
                authorized_imports=self.authorized_imports,
            )  # The second element is boolean is_final_answer
        )
        return f"Stdout:\n{str(state['_print_outputs'])}\nOutput: {output}"

In [None]:
######################################################
#  The following helper code is given to you.
######################################################

from eval_utils import score_answers

def run_math_tool_calling_agent(task="math", model_id=model_id, action_type="tool_calling"):
    math_agent_answers_file = f"output/{model_id.replace('/', '__')}__{action_type}__{task}.jsonl"
    math_agent = ToolCallAgent(
        model=model,
        tools=[PythonInterpreterTool(authorized_imports=["numpy", "sympy"]), FinalAnswerTool()],
        max_steps=10
    )
    answer_questions(task, math_agent, action_type, math_agent_answers_file)
    df = score_answers([math_agent_answers_file])
    print(df)

run_math_tool_calling_agent()

In [None]:
######################################################
#  The following helper code is given to you.
######################################################

from eval_utils import score_answers

def run_gaia_tool_calling_agent(task="gaia", model_id=model_id, action_type="tool_calling"):
    research_agent_answers_file = f"output/{model_id.replace('/', '__')}__{action_type}__{task}.jsonl"
    research_agent = ToolCallAgent(
        model=model,
        tools=[GoogleSearchTool(), VisitWebpageTool(), PythonInterpreterTool(authorized_imports=["numpy", "sympy"]), FinalAnswerTool()],
        max_steps=10
    )
    answer_questions(task, research_agent, action_type, research_agent_answers_file)
    df = score_answers([research_agent_answers_file])
    print(df)

run_gaia_tool_calling_agent()

| Task        | Action Type | Performance      |
|-------------|-----|-----------------|
| SimpleQA       | vanilla  |  |
| SimpleQA       | tool calling  |  |
| MATH       | vanilla  |  |
| MATH       | tool calling  |  |
| GAIA       | vanilla  |  |
| GAIA       | tool calling  |  |