# CS 162 HW4

<a href="https://colab.research.google.com/github/PlusLabNLP/cs162-hw4-w25/blob/main/HW4.ipynb"><img alt="Colab Demo" src="https://img.shields.io/badge/​-Open%20in%20Colab-blue?logo=googlecolab&logoColor=F9AB00&style=flat"></a>

In this assignment, you'll work with a recent Large Language Model [Gemma 2 2B](https://arxiv.org/pdf/2408.00118). You'll learn how to use the model and its tokenizer, generate text using greedy decoding, top-p sampling, and top-k sampling, and evaluate the model’s basic arithmetic capabilities on a simple dataset.

* Identify the TODO blocks and implement the necessary code in those sections.
*  To speed up processing, use a GPU by selecting "Runtime" > "Change runtime type" > "GPU" in Colab.

# 0 HuggingFace

In [None]:
# TODO: Generate a new Access Token (Read) at https://huggingface.co/settings/tokens
# and enter it below to enable access to Gemma models, which are gated.
# Ensure you have requested access to Gemma models at
# https://huggingface.co/google/gemma-2-2b and received approval before proceeding.

from huggingface_hub import notebook_login
notebook_login('')

# 1 Model Setup

In [2]:
# For auto grading: Do not modify
import torch

def grade_model_setup(model):
    ## model parameter type
    first_param = next(model.parameters())
    print(f"Model parameter dtype: {first_param.dtype}")

    ## which device the model is on
    device_idx = next(model.parameters()).get_device()
    device = torch.cuda.get_device_name(device_idx) if device_idx != -1 else "CPU"
    print(f"Model is currently on device: {device}")

    ## what is the memory footprint
    print("Memory:", model.get_memory_footprint())

    if first_param.dtype == torch.bfloat16:
        print("<<Passed 1.1>>")
    else:
        raise(Exception(f"Failed 1.1: dtype is {first_param.dtype} instead of torch.bfloat16"))

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM


def model_and_tokenizer_setup(model_name: str):
    model, tokenizer = None, None
    ##################################################
    # TODO: Please load the model and tokenizer, which will
    # be later used for inference. To have an optimized
    # version of the model, load it in bfloat16 using torch_dtype.
    # use AutoTokenizer and AutoModelForCausalLM.
    # Hint: https://huggingface.co/google/gemma-2-2b#running-the-model-on-a-single--multi-gpu
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.bfloat16,
        device_map="auto"
    )
    # End of TODO.
    ##################################################
    model.eval()
    model.to('cuda')
    grade_model_setup(model)
    return model, tokenizer

model, tokenizer = model_and_tokenizer_setup(model_name="google/gemma-2-2b")

In [None]:
# Do not modify
def generate_next_token(model, tokenizer, tokenized_input, verbose=True):
    """
    Generate the probability distribution over vocabulary
    for the next token after the tokenized_input using the model.
    """
    outputs = model.generate(**tokenized_input, max_new_tokens=1, output_scores=True, return_dict_in_generate=True)
    next_token_scores = outputs["scores"][0].squeeze()
    next_token_scores = torch.softmax(next_token_scores, dim=0)
    next_token_id_greedy = outputs["sequences"][0][-1]
    if verbose:
        print(f"tokenized_input: {tokenized_input}")
        print(f"Shape of next_token_scores: {next_token_scores.shape}\nnext_token_scores[:5]: {next_token_scores[:5]}")
    return next_token_scores

input_text = "Go Bruins"
tokenized_input = tokenizer(input_text, return_tensors="pt").to("cuda")
next_token_scores = generate_next_token(model, tokenizer, tokenized_input, verbose=True)

In transformer-based models like those from Hugging Face's transformers library, tokenized inputs are typically in batch format.

* input_ids: This represents the tokenized numerical IDs of the input text. Since models process inputs in batches, tokenized_input['input_ids'] is a 2D list (batch_size × sequence_length). ['input_ids'][0] extracts the first example in the batch.

* attention_mask: This tells the model which tokens to attend to (1) and which to ignore (0, usually for padding). It has the same shape as input_ids and ensures that padding tokens don’t affect the model’s output.

* Since models process inputs as batches, even when working with a single sentence, the data is structured as a batch of size 1.

* next_token_scores: A 1D tensor (torch.Size([256000])) containing the probabilities for each possible next token in the vocabulary after applying the softmax function. The size of 256,000 corresponds to the vocabulary size, meaning the model is considering 256,000 possible tokens for the next step in the sequence.

In [None]:
def decode_token_ids(tokenizer, token_ids: list) -> str:
    decoded_text = None
    ##################################################
    # TODO: Please decode a list of token_ids to a string using the tokenizer.decode.
    decoded_text = tokenizer.decode(token_ids)
    # End of TODO.
    ##################################################
    return decoded_text


decoded_tokens = decode_token_ids(tokenizer, tokenized_input['input_ids'][0])
print(f"Decoded output: {decoded_tokens}")

In [None]:
# For auto grading: Do not modify
def grade_tokenizer_setup():
    text = decode_token_ids(tokenizer, [2, 6196, 13435, 575, 6461, 603, 78672])
    correct_text = "<bos>Best university in LA is UCLA"
    print(text)
    if text == correct_text:
        print(f"<<Passed 1.2>>")
    else:
        raise(Exception(f"Failed 1.2: decoded text is '{text}' instead of '{correct_text}'"))

grade_tokenizer_setup()

# 2 Decoding Strategies
In this section, you should implement different decoding strategies for text generation:
1. Greedy Decoding: Always selects the highest probability token at each step.
2. Top-P Sampling (Nucleus Sampling): Selects from a subset of the vocabulary containing the top-P cumulative probability mass.
3. Top-K Sampling: Selects from the top-K most probable tokens at each step.

In [None]:
from tqdm.auto import tqdm

def append_next_token(tokenized_input, next_token_id):
    """
    Appends the next token to the tokenized input sequence.

    This function is used to incrementally build input sequences for autoregressive decoding.
    It takes a tokenized input (the result of running a tokenizer on text) and a next_token_id
    (the ID of the next token predicted by a model) and appends this token ID to the input sequence.
    It also updates the attention mask accordingly.

    Args:
        tokenized_input: A dictionary containing:
            - "input_ids": A tensor of token IDs representing the input sequence.
            - "attention_mask": A tensor indicating which tokens should be attended to (1 for real tokens, 0 for padding).
        next_token_id: The ID of the next token to append, predicted by a model.

    Returns:
        Updated tokenized input with the new token ID and attention mask.
    """
    tokenized_input["input_ids"] = torch.cat(
        [tokenized_input["input_ids"], torch.tensor([[next_token_id]], dtype=torch.int64).to("cuda")], dim=-1
    )
    tokenized_input["attention_mask"] = torch.cat(
        [tokenized_input["attention_mask"], torch.tensor([[1]], dtype=torch.int64).to("cuda")], dim=-1
    )
    return tokenized_input


def greedy_decoding(model, tokenizer, input_text, max_length, disable_tqdm=True):
    """
    Generates text using greedy decoding by always selecting the most probable next token.

    Args:
        model: The language model used for text generation.
        tokenizer: The tokenizer corresponding to the model.
        input_text: The initial text prompt to generate from.
        max_length: The maximum number of new tokens to generate.

    Returns:
        sequence_str: The whole text as a string (input and generated).
    """
    sequence = tokenizer(input_text, return_tensors="pt").to("cuda")
    for _ in tqdm(range(max_length), disable=disable_tqdm):
        next_token_scores = generate_next_token(model, tokenizer, sequence, verbose=False)
        ##################################################
        # TODO: Implement greedy decoding using the next_token_scores.
        # Hint: generate tokens one by one until max_length is reached or the model generates <eos> (tokenizer.eos_token_id).
        # Hint: "sequence" stores the tokenized input text and is updated with newly generated tokens containing the whole input and new generated tokens.
        # Hint: Use append_next_token function above to update "sequence" with the new token you generate.
        # Hint: Use torch.argmax to get the most probable token.
        pass
        # End of TODO.
        ##################################################

    sequence_str = None
    ##################################################
    # TODO: Use decode_token_ids to decode the sequence to text.
    pass
    # End of TODO.
    ##################################################
    return sequence_str

def top_p_sampling(model, tokenizer, input_text, max_length, top_p, seed, disable_tqdm=True):
    """
    Generates text using top-p (nucleus) sampling by sampling from the smallest subset
    of tokens whose cumulative probability mass exceeds `top_p`.

    Args:
        model: The language model used for text generation.
        tokenizer: The tokenizer corresponding to the model.
        input_text: The initial text prompt to generate from.
        max_length: The maximum number of new tokens to generate.
        top_p: The nucleus sampling threshold.

    Returns:
        sequence_str: The whole text as a string (input and generated).
    """
    torch.random.manual_seed(seed)
    sequence_str = None
    ##################################################
    # TODO: Implement top-p sampling using the generate_next_token function.
    # Hint: Pay attention to greedy_decoding function and how it generates tokens.
    # Hint: Use torch argsort, cumsum, and multinomial to sample from the top-p tokens.
    pass
    # End of TODO.
    ##################################################
    return sequence_str

def top_k_sampling(model, tokenizer, input_text, max_length, top_k, seed, disable_tqdm=True):
    """
    Generates text using top-k sampling by selecting from the top-K most probable tokens.

    Args:
        model: The language model used for text generation.
        tokenizer: The tokenizer corresponding to the model.
        input_text: The initial text prompt to generate from.
        max_length: The maximum number of new tokens to generate.
        top_k: The number of top candidates to sample from.

    Returns:
        sequence_str: The whole text as a string (input and generated).
    """
    torch.random.manual_seed(seed)
    sequence_str = None
    ##################################################
    # TODO: Implement top-k sampling using the generate_next_token function.
    # Hint: Pay attention to greedy_decoding function and how it generates tokens.
    # Hint: Use torch topk and multinomial to sample from the top-k tokens.
    pass
    # End of TODO.
    ##################################################
    return sequence_str

print("greedy_decoding:", greedy_decoding(model, tokenizer, input_text="Best university in LA is ", max_length=45, disable_tqdm=False))
print("top_p_sampling:", top_p_sampling(model, tokenizer, input_text="Best university in LA is ", max_length=20, top_p=0.8, seed=0, disable_tqdm=False))
print("top_k_sampling:", top_k_sampling(model, tokenizer, input_text="Best university in LA is ", max_length=20, top_k=200, seed=0, disable_tqdm=False))

In [None]:
# For auto grading: Do not modify

def grade_greedy_decoding():
    r = greedy_decoding(model, tokenizer, input_text="Best university in LA is ", max_length=4)
    correct_text = "<bos>Best university in LA is <strong>UCLA</strong>."
    if r == correct_text:
        print(f"<<Passed 2.1>>")
    else:
        print(f"###Failed 2.1###: generated text is '{r}' instead of '{correct_text}'")

def grade_top_p_sampling():
    r = top_p_sampling(model, tokenizer, input_text="Best university in LA is ", max_length=4, top_p=0.0, seed=3)
    correct_text = "<bos>Best university in LA is <strong>UCLA</strong>."
    if r == correct_text:
        print(f"<<Passed 2.2.1>>")
    else:
        print(f"###Failed 2.2.1###: generated text is '{r}' instead of '{correct_text}'")

def grade_top_k_sampling():
    r = top_k_sampling(model, tokenizer, input_text="Best university in LA is ", max_length=4, top_k=1, seed=3)
    correct_text = "<bos>Best university in LA is <strong>UCLA</strong>."
    if r == correct_text:
        print(f"<<Passed 2.3.1>>")
    else:
        print(f"###Failed 2.3.1###: generated text is '{r}' instead of '{correct_text}'")

grade_greedy_decoding()
grade_top_p_sampling()
grade_top_k_sampling()

In [None]:
NUM_RUNS = 3
MAX_LENGTH = 20
TOP_P = 0.8
TOP_K = 10
INPUT_TEXT = "My new AI assistant just claimed it invented "
##################################################
# Please explore different decoding strategies for language generation by running three algorithms
# —greedy decoding, top-p (nucleus) sampling, and top-k sampling—multiple times and printing their outputs.
# Specifically, you should use the functions `greedy_decoding`, `top_p_sampling`, and `top_k_sampling` to generate text.
# For each algorithm, run it using ***different seeds*** (you can use i from the for loop) and observe how the outputs vary across runs.
# Since greedy decoding is deterministic, its outputs should remain consistent, while top-p and top-k sampling introduce randomness, leading to different generations.
# Set the values for top-p and top-k as TOP_P and TOP_K.
# Use the `max_length` of MAX_LENGTH for all generations to ensure consistency in output length.
# Your goal is to understand these behaviors by observing the diversity in the generated outputs.
# When submitting your notebook, make sure to include the outputs of your runs for manual grading. (Do not clear outputs)

# GREEDY DECODING
print("######### Greedy Decoding: #########")
for i in range(NUM_RUNS):
    ##################################################
    # TODO: Please run greedy decoding and print the output.
    pass
    # End of TODO.
    ##################################################

print("######### Top-p Sampling: #########")
for i in range(NUM_RUNS):
    ##################################################
    # TODO: Please run top-p sampling and print the output.
    pass
    # End of TODO.
    ##################################################

print("######### Top-k Sampling: #########")
for i in range(NUM_RUNS):
    ##################################################
    # TODO: Please run top-k sampling and print the output.
    pass
    # End of TODO.
    ##################################################

# 3 Load Arithmetic Dataset

In this section, we will implement a data loader using torch.utils.data.Dataset to load a file containing arithmetic problems and corresponding answers.

In [None]:
import pandas as pd
from torch.utils.data import Dataset, DataLoader


class ArithmeticDataset(Dataset):
    """
    A custom dataset class for loading arithmetic problems and answers from a file.
    The dataset can be filtered based on operation type and number of digits in the operands.
    """
    def __init__(self, file_path_csv, operation=None, num_digits=None):
        """
        Initialize the dataset with the file containing arithmetic problems and answers.

        Args:
        - file_path_csv: Path to the csv file containing the arithmetic problems and their answers.
        - operation: Optional filter for the arithmetic operation. Can be 'add', 'sub', 'mul', or 'div'.
        - num_digits: Optional filter for the number of digits in the operands.
        """
        self.df = pd.read_csv(file_path_csv)
        self.operation = operation
        self.num_digits = num_digits

        ##################################################
        # TODO: Please complete the implementation of __init__
        # filter the dataframe (self.df) as per the operation type and num_digits given to you.
        # (Do not filter operation or num_digits if they are None.)
        if operation is not None:
            self.df = self.df[self.df['operation'] == operation]
        if num_digits is not None:
            self.df = self.df[self.df['num_digits'] == num_digits]
        # End of TODO.
        ##################################################

    def __len__(self):
        """Return the number of examples in the dataset."""
        return len(self.df)

    def __getitem__(self, idx):
        """
        Get an item (problem, answer) by index.

        Args:
        - idx: Index of the item to retrieve.

        Returns:
        - A tuple containing the problem (as a string) and the corresponding answer (as an integer).
        """
        problem, answer = None, None
        ##################################################
        # TODO: Please complete the implementation of __getitem__
        # to return the problem and answer at the given index.
        # Hint: Use the df.iloc to access the index.
        row = self.df.iloc[idx]
        problem = row['problem']
        answer = row['answer']
        # End of TODO.
        ##################################################
        return problem, answer

dataset = ArithmeticDataset(file_path_csv="arithmetic_problems.csv")
dataset.df

In [None]:
# For auto grading: Do not modify
def grade_dataset():
    dataset = ArithmeticDataset(file_path_csv="arithmetic_problems.csv", operation="add", num_digits=2)
    if len(dataset) != 14:
        raise(Exception(f"Failed 3.1: dataset length is {len(dataset)} instead of 14"))
    if dataset[10] != ('62 + 31 = ', 93):
        raise(Exception(f"Failed 3.1: dataset[10] is {dataset[10]} instead of ('62 + 31 = ', 93)"))
    print("<<Passed 3.1>>")

grade_dataset()

# 4 Implement Evaluation Metric

In [None]:
from sklearn.metrics import accuracy_score, f1_score

def evaluate_standard(gt_answers, pred_answers):

    accuracy = 0

    ##################################################
    # TODO: Please finish the standard evaluation metrics.
    # You need to compute the accuracy for the
    # predictions and ground truth answers.
    # Please use the scikit-learn APIs.

    pass

    # End of TODO.
    ##################################################

    return accuracy

In [None]:
# For auto grading: Do not modify
def grade_evaluation_metric():
    gt_answers = [1, 2, 3, 4, 5]
    pred_answers = [1, 2, 3, 4, 2]
    acc = evaluate_standard(gt_answers, pred_answers)
    if acc != 0.8:
        raise(Exception(f"Failed 4.1: accuracy is {acc} instead of 0.8"))
    print("<<Passed 4.1>>")

grade_evaluation_metric()

# 5 Evaluate the Model

In [None]:
def evaluate_arithmetic(model, tokenizer, dataset_path_csv, decoding_strategy, top_p=None, top_k=None):
    """
    Evaluate the model on the arithmetic dataset using the given decoding strategy.

    Args:
    - model: The language model used for text generation.
    - tokenizer: The tokenizer corresponding to the model.
    - dataset_path_csv: The path to the csv file containing the arithmetic problems and their answers.
    - decoding_strategy: The decoding strategy to use for text generation. (Can be "greedy", "top_p", "top_k")
    - top_p: The nucleus (top-p) sampling threshold. (Only used if decoding_strategy is "top_p")
    - top_k: The number of top candidates to sample from. (Only used if decoding_strategy is "top_k")

    Returns:
    - answers: A pandas DataFrame containing the evaluation results.
    """
    answers = []
    for operation in tqdm(["add", "sub", "mul", "div"]):
        for num_digits in [1, 2, 3, 4]:
            ##################################################
            # TODO: Please load the dataset for the given operation and num_digits.
            # Hint: Use the ArithmeticDataset class you implemented.
            # Hint: Use the dataset_path_csv, operation, and num_digits arguments.
            dataset = None
            # End of TODO.
            ##################################################

            for problem, answer in DataLoader(dataset, batch_size=1):
                pred_sequence = None
                ##################################################
                # TODO: Generate the prediction for the given problem based on the decoding_strategy.
                # Hint: Use the greedy_decoding, top_k_sampling, and top_p_sampling functions.
                # Hint: You can use if statements to call the appropriate function based on the decoding_strategy.
                # IMPORTANT: For max_length, use the length of the answer (How many digits are in the answer).
                # Hint: answer is a tensor, so you can access the value itself using answer.item() method and then count its number of digits.
                # For seed, use 0.
                pass
                # End of TODO.
                ##################################################

                # Extracting final answer from the sequence (What comes after "=")
                pred_answer = pred_sequence.split("=")[1].strip()
                answers.append({
                    "operation": operation,
                    "num_digits": num_digits,
                    "problem": problem[0],
                    "sequence": pred_sequence,
                    "decoding_strategy": decoding_strategy,
                    "true_answer": str(answer.item()),
                    "pred_answer": pred_answer
                })
    return pd.DataFrame(answers)

answers = evaluate_arithmetic(model, tokenizer, "arithmetic_problems.csv", "greedy")
answers

In [None]:
# See the incorrect predictions
answers[answers["true_answer"] != answers["pred_answer"]]

In [None]:
accuracy = None
##################################################
# TODO: Evaluate the predictions using the evaluate_standard function.
# Hint: Use the "true_answer" and "pred_answer" columns from the answers dataframe.
pass
# End of TODO.
##################################################
accuracy

In [None]:
# For auto grading: Do not modify
def grade_acc():
    if 0.75 < accuracy < 1:
        print("<<Passed 5.1>>")
    else:
        raise(Exception(f"Failed 5.1: accuracy is {accuracy} instead of 0.75<acc<1"))

grade_acc()

In [None]:
# For auto grading: Do not modify
def grade_operation_and_digits():
    dfs = {}
    dfs["greedy"] = evaluate_arithmetic(model, tokenizer, "arithmetic_problems.csv", "greedy")
    dfs["top_p"] = evaluate_arithmetic(model, tokenizer, "arithmetic_problems.csv", "top_p", top_p=0.9)
    dfs["top_k"] = evaluate_arithmetic(model, tokenizer, "arithmetic_problems.csv", "top_k", top_k=20)
    for k in dfs.keys():
        dfs[k]["acc"] = dfs[k]["true_answer"] == dfs[k]["pred_answer"]
        dfs[k] = dfs[k].groupby(["operation", "num_digits"])["acc"].mean().reset_index()
    df = dfs["greedy"].copy()[["operation", "num_digits"]]
    df["acc_greedy"] = dfs["greedy"]["acc"] * 100
    df["acc_top_p"] = dfs["top_p"]["acc"] * 100
    df["acc_top_k"] = dfs["top_k"]["acc"] * 100
    df = df.sort_values(["num_digits", "operation"]).reset_index(drop=True)
    df.loc['mean'] = df[["acc_greedy", "acc_top_p", "acc_top_k"]].mean()

    def check_acc(name, value, valid_range):
        if valid_range[0] < value < valid_range[1]:
            print(f"<<Passed 5.2.{name}>>")
        else:
            print(Exception(f"Failed 5.2.{name}: {value} is not in the range {valid_range}"))

    check_acc("acc_greedy", df.loc['mean', 'acc_greedy'], (75, 100))
    check_acc("acc_top_p", df.loc['mean', 'acc_top_p'], (50, 70))
    check_acc("acc_top_k", df.loc['mean', 'acc_top_k'], (50, 70))
    return df.style.background_gradient(cmap='RdYlGn', subset=["acc_greedy", "acc_top_p", "acc_top_k"]).format(precision=2)

grade_operation_and_digits()