In [None]:
!pip install datasets

In [None]:
from datasets import load_dataset

dataset = load_dataset("wikitext", "wikitext-2-raw-v1")
train: str = str.join(" ", dataset["train"]["text"])[:pow(10, 6)]
test: str = str.join(" ", dataset["test"]["text"])[:pow(10, 6)]

In [None]:
from itertools import chain, pairwise
from collections import Counter
from tqdm import tqdm

class Tokenizer:
    # The lookup list contains *byte groups*, represented as a tuples of ints.
    # The token ID for a byte group is its index in the list.
    vocab: list[tuple[int, ...]]

    def __init__(self, training_seq: str, vocab_size: int) -> None:
        # Initialize a lookup with single-byte groups
        self.vocab = [(i,) for i in range(pow(2, 8))]
        byte_seq = list(bytes(training_seq, "utf-8"))
        for i in tqdm(range(pow(2, 8), vocab_size)):
            """
            TODO: iteratively add the most common token pairs to the vocabulary.
            Advice: try using Counter and pairwise from the python std lib.
            """
            pairs = pairwise(byte_seq)
            counts = Counter(pairs)

            if not counts:
                break

            common, _ = counts.most_common(1)[0]

            if all(0 <= b < 256 for b in common):
                self.vocab.append(common)
            #else:
            #    print(f"Skipping invalid pair: {common}")

            hu = []
            i = 0
            while i < len(byte_seq):
                if i < len(byte_seq) - 1 and (byte_seq[i], byte_seq[i + 1]) == common:
                    hu.append(len(self.vocab) - 1)
                    i += 2
                else:
                    hu.append(byte_seq[i])
                    i += 1
            byte_seq = hu

    def tokenize(self, seq: str) -> list[int]:
        """
        TODO: convert a byte sequence into a token sequence by greedily adding
        the longest token that matches the rest of the sequence, e.g.,
        vocab = [a, aa, b]
        sequence = aaab
        token_seq = [1, 0, 2] NOT [0, 1, 2].
        """
        byte_seq = list(bytes(seq, "utf-8"))
        token_seq = []

        i = 0
        while i < len(byte_seq):
            for length in range(len(self.vocab), 0, -1):
                token = tuple(byte_seq[i:i + length])
                if token in self.vocab:
                    token_seq.append(self.vocab.index(token))
                    i += length
                    break
            else:
                raise ValueError("No matching token found")
        return token_seq

    def detokenize(self, token_seq: list[int]) -> str:
        # TODO: convert a token sequence into a byte sequence.
        byte_seq = list(chain.from_iterable(self.vocab[token] for token in token_seq))
        return bytes(byte_seq).decode("utf-8")

train_data = train[:10000]
bigrams = list(pairwise(bytes(train_data, "utf-8")))
bigram_counts = Counter(bigrams)
print("Most common bigrams in the training data:")
print(bigram_counts.most_common(10))
tokenizer = Tokenizer(train_data, vocab_size=500)

print("Some of our new tokens:")
for token in tokenizer.vocab[-10:]:
    print(repr(bytes(token).decode("utf-8")))

In [None]:
# Do not edit this code cell
test_data = test[:10000]
train_bytes_len = len(bytes(train_data, "utf-8"))
train_token_len = len(tokenizer.tokenize(train_data))
print(f"Compressed train set to {train_token_len / train_bytes_len * 100:.0f}% original size")
test_bytes_len = len(bytes(test_data, "utf-8"))
test_token_len = len(tokenizer.tokenize(test_data))
print(f"Compressed test set to {test_token_len / test_bytes_len * 100:.0f}% original size")

assert train_data == tokenizer.detokenize(tokenizer.tokenize(train_data))
assert test_data == tokenizer.detokenize(tokenizer.tokenize(test_data))

In [None]:
from itertools import islice

langs = ["fr", "es", "zh", "ar"]
corp = {}

for i in langs:
    dataset = load_dataset("oscar", f"unshuffled_deduplicated_{i}", streaming=True, trust_remote_code=True)
    corp[i] = " ".join(item["text"] for item in islice(dataset["train"], 1000))

for i, corpus in corp.items():
    td = corpus[:10000]
    tl = len(tokenizer.tokenize(td))
    tb = len(bytes(td, "utf-8"))

    print(f" {i} = {tl / tb * 100:.0f}% original size")
    sample = tokenizer.tokenize(td[:200])
    #print(f"Sample : {sample}")
    detokenized_sample = tokenizer.detokenize(sample)
    #print(f"detokenized : {detokenized_sample}")


In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from typing import Optional

class LM():
  def __init__(self, model_name: str = "openai-community/gpt2-medium"):
    self.tokenizer = AutoTokenizer.from_pretrained(model_name)
    self.model = AutoModelForCausalLM.from_pretrained(model_name)
    self.model.eval()

  def greedy_decoding(self, prompt: str, max_length: int = 64) -> str:
    """
    TODO:

    Implement greedy decoding, in which we use the highest
    probability token at each decoding step
    """
    ids = self.tokenizer(prompt, return_tensors="pt").input_ids
    out = ids.clone()

    for x in range(max_length):
        logits = self.model(input_ids=out).logits
        nids = torch.argmax(logits[:, -1, :], dim=-1)
        out = torch.cat([out, nids.unsqueeze(-1)], dim=-1)
        if nids.item() == self.tokenizer.eos_token_id:
            break

    return self.tokenizer.decode(out[0], skip_special_tokens=True)

  def temperature_sampling(self, prompt: str, temperature: float = 1.0, max_length: int = 64) -> str:
    """
    TODO:

    Implement temperature sampling, in which we sample
    from the output distribution at each decoding step,
    with a temperature parameter to control the "peakiness"
    of the output distribution
    """
    ids = self.tokenizer(prompt, return_tensors="pt").input_ids
    out = ids.clone()

    for x in range(max_length):
        logits = self.model(input_ids=out).logits
        scaled_logits = logits[:, -1, :] / temperature
        probs = torch.softmax(scaled_logits, dim=-1)
        nids = torch.multinomial(probs, num_samples=1)
        out = torch.cat([out, nids], dim=-1)
        if nids.item() == self.tokenizer.eos_token_id:
            break

    return self.tokenizer.decode(out[0], skip_special_tokens=True)

  def nucleus_sampling(self, prompt: str, p: float = 0.9, max_length: int = 64, temperature: float = 1.0) -> str:
    """
    TODO:
    Implement nucleus sampling, in which we
    sample from a subset of the vocabulary
    at each decoding step
    Note: There is also a temperature parameter here
    """
    ids = self.tokenizer(prompt, return_tensors="pt").input_ids
    out = ids.clone()

    for x in range(max_length):
        logits = self.model(input_ids=out).logits
        scaled_logits = logits[:, -1, :] / temperature
        probs = torch.softmax(scaled_logits, dim=-1)
        sorted_probs, sorted_indices = torch.sort(probs, descending=True)
        allprobs = torch.cumsum(sorted_probs, dim=-1)

        inRange = allprobs <= p
        if not inRange.any():
            nids = sorted_indices[0, 0].unsqueeze(0)
        else:
            nucleus_indices = sorted_indices[inRange]
            nucleus_probs = sorted_probs[inRange]

            nids = nucleus_indices[torch.multinomial(nucleus_probs, num_samples=1)]

        out = torch.cat([out, nids.unsqueeze(0)], dim=-1)
        if nids.item() == self.tokenizer.eos_token_id:
            break

    return self.tokenizer.decode(out[0], skip_special_tokens=True)

  def typical_sampling(self, prompt: str, t: float = 0.9, max_length: int = 64, temperature: float = 1.0) -> str:

    ids = self.tokenizer(prompt, return_tensors="pt").input_ids
    out = ids.clone()

    for x in range(max_length):
        logits = self.model(input_ids=out).logits
        scaled_logits = logits[:, -1, :] / temperature
        probs = torch.softmax(scaled_logits, dim=-1)

        entropy = -(probs * probs.log()).sum(dim=-1, keepdim=True)
        conv = -probs.log() / entropy
        sorted_conv, sorted_indices = torch.sort(torch.abs(conv - t), descending=False)
        truncated_probs = probs[0, sorted_indices[0]]

        normalized_probs = truncated_probs / truncated_probs.sum()
        sampled_index = torch.multinomial(normalized_probs, num_samples=1).item()
        next_token = (sorted_indices[0][sampled_index]).unsqueeze(0).unsqueeze(0)
        out = torch.cat([out, next_token], dim=-1)

        if next_token.item() == self.tokenizer.eos_token_id:
            break

    return self.tokenizer.decode(out[0], skip_special_tokens=True)

  def generate(self,
               prompt: str,
               temperature: float = 1.0,
               p: Optional[float] = None) -> str:
      """
      TODO:

      Route to the appropriate generation function
      based on the arguments
      HINT: What parameter values should map to greedy decoding?
      """
      max_length = 64
      if p is not None:
          return self.nucleus_sampling(prompt, p=p, temperature=temperature, max_length=max_length)
      elif temperature < 0.5:
          return self.greedy_decoding(prompt, max_length=max_length)
      else:
          return self.temperature_sampling(prompt, temperature=temperature, max_length=max_length)


  def greedyTop10(self, prompt: str) -> list[tuple[str, float]]:
      input_ids = self.tokenizer(prompt, return_tensors="pt").input_ids
      logits = self.model(input_ids=input_ids).logits[:, -1, :]
      topkprob, topk = torch.topk(torch.softmax(logits, dim=-1), 10)

      tokens = []
      for prob, idx in zip(topkprob[0], topk[0]):
          try:
              decoded = self.tokenizer.decode([idx])
              tokens.append((decoded, prob.item()))
          except UnicodeDecodeError:
              continue

      return tokens

  def typicalTop10(self, prompt: str, t: float = 0.9, temperature: float = 1.0) -> list[tuple[str, float]]:
        """
        Implements Typical Sampling's top 10 tokens at the first decoding step.
        """
        input_ids = self.tokenizer(prompt, return_tensors="pt").input_ids
        logits = self.model(input_ids=input_ids).logits[:, -1, :] / temperature
        probs = torch.softmax(logits, dim=-1)

        entropy = -(probs * probs.log()).sum(dim=-1, keepdim=True)
        conv = -probs.log() / entropy
        deviation = torch.abs(conv - t).squeeze()
        sorted_indices = torch.argsort(deviation)

        top10i = sorted_indices[:10]
        tenprob = probs[0, top10i]
        tentoken = [(self.tokenizer.decode(idx), prob.item()) for idx, prob in zip(top10i, tenprob)]

        return tentoken

  def tempTop10(self, prompt: str, temperature: float = 1.0) -> list[tuple[str, float]]:
      input_ids = self.tokenizer(prompt, return_tensors="pt").input_ids
      logits = self.model(input_ids=input_ids).logits[:, -1, :] / temperature
      topkprob, topk = torch.topk(torch.softmax(logits, dim=-1), 10)

      tokens = []
      for prob, idx in zip(topkprob[0], topk[0]):
          decoded = self.tokenizer.decode([idx])
          if decoded.strip():
              tokens.append((decoded, prob.item()))
      return tokens

  def nucleusTop10(self, prompt: str, p: float = 0.9, temperature: float = 1.0) -> list[tuple[str, float]]:
      input_ids = self.tokenizer(prompt, return_tensors="pt").input_ids
      logits = self.model(input_ids=input_ids).logits[:, -1, :] / temperature
      probs = torch.softmax(logits, dim=-1)
      sorted_probs, sorted_indices = torch.sort(probs, descending=True)

      inRange = (torch.cumsum(sorted_probs, dim=-1) <= p)
      topkprob = sorted_probs[inRange][:10]
      topk = sorted_indices[inRange][:10]

      tokens = []
      for prob, idx in zip(topkprob, topk):
          decoded = self.tokenizer.decode([idx])
          if decoded.strip():
              tokens.append((decoded, prob.item()))
      return tokens


lm = LM()
prompt = "Once upon a time in a land far far away,"

print("greedy:", lm.greedyTop10(prompt))
print("temperature (t=1.0):", lm.tempTop10(prompt))
print("nucleus (p=0.9):", lm.nucleusTop10(prompt))
print("typical sampling:", lm.typicalTop10(prompt, t=0.9, temperature=1.0))

In [None]:
GPT2LM = LM("openai-community/gpt2-medium")

In [None]:
temperatures = [0.3, 0.5, 0.7, 0.9, 1.1]
results = {}

prompt = "Once upon a time in a land far far away,"
lm = LM()

for temp in temperatures:
    output = lm.generate(prompt, temperature=temp)
    results[temp] = {"Output": output}

for temp, data in results.items():
    print(f"Temperature: {temp}")
    print(f"Generated Output: {data['Output']}")
    print("\n\n")

In [None]:
vals = [0.97, 0.95, 0.9, 0.8, 0.7]
prompt = "Once upon a time in a land far far away,"
lm = LM()

for p in vals:
    output = lm.generate(prompt, p=p, temperature=1.0)
    print(f"p={p}\n{output}")
    print("\n\n")

In [None]:

prompt = [
    "The capital of the United States is"
]

lm = LM()
temperature = 1
p = 0.8

for i in prompt:
    print("Greedy:\n")
    print(lm.greedy_decoding(i))
    print("temp:")
    print(lm.temperature_sampling(i, temperature=temperature))
    print("\nNucleus:\n")
    print(lm.nucleus_sampling(i, p=p, temperature=temperature))
    print("Typical")
    print(lm.typical_sampling(i, t=0.9, temperature=temperature))
    print("\n\n")


In [None]:
from datasets import load_dataset

dataset = load_dataset("gsm8k", 'main')

In [None]:
len(dataset['train']), len(dataset['test'])

In [None]:
# An example instance of this dataset

dataset['test'][6]

In [None]:
# All imports for this question
from google.colab import userdata
import google.generativeai as genai
from datasets import Dataset
import random
from typing import Callable, List, Any

In [None]:
GOOGLE_API_KEY = userdata.get("GEMINI_API_KEY")

genai.configure(api_key=GOOGLE_API_KEY)

In [None]:
# Test if your setup is working, do not change the model name
model = genai.GenerativeModel("gemini-1.0-pro")
response = model.generate_content("What is Natural Language Processing? Explain it to a five year old.")
print(response.text)

### Processing the GSM8K Dataset

In [None]:
def process_gsm8k_answers(dataset: Dataset) -> Dataset:
    """
    Processes the GSM8K dataset to remove reasoning chains and retain only the numerical answers.
    Assumes answers are separated from reasoning by the '###' string.

    Args:
    dataset (Dataset): Huggingface Dataset object for GSM8K.

    Returns:
    Dataset: Processed Dataset object with numerical answers only.
    """

    def extract_answer(sample):
        # IMPLEMENT HERE
        # Split the answer using '###' and return a dictionary with the key 'processed_answer'
        if '###' in sample['answer']:
            processed_answer = sample['answer'].split('###')[-1].strip()
        else:
            processed_answer = sample['answer'].strip()
        return {'processed_answer': processed_answer}

    return dataset.map(extract_answer)

In [None]:
#TEST

gsm8k_test_processed = process_gsm8k_answers(dataset['test'])
print("Processed Test Dataset Example:")
print(gsm8k_test_processed[0])

In [None]:
def prompt_generation_zero_shot(problem: str) -> str:
    """
    Zero-shot prompt.

    Returns:
    str: The generated prompt.
    """
    # IMPLEMENT HERE
    return f"answer this math problem, I just want only the answer: \n\n Problem: {problem}\nAnswer:"


In [None]:
def prompt_generation_zero_shot_cot(problem: str) -> str:
    """
    Zero-shot Chain of Thought (CoT) prompt.

    Returns:
    str: The generated prompt.
    """
    # IMPLEMENT HERE
    return f"answer the following math problem. First, think through the steps logically and then give an answer:\n\nProblem: {problem}\n\n Explanation and answer:"

In [None]:
def prompt_generation_5_shot(problem: str, training_set: Dataset) -> str:
    """
    5-shot prompt generation for GSM8K problems. Randomly selects 5 examples from the training set.

    Returns:
    str: The generated prompt with 5 in-context_examples.
    """
    # IMPLEMENT HERE

    examples = random.sample(list(training_set), 5)
    examps = "\n\n".join(
        [f"Problem: {ex['question']}\nAnswer: {ex['processed_answer']}" for ex in examples]
    )
    return f"{examps}\n\n Problem: {problem}\nAnswer:"


In [None]:
def prompt_generation_5_shot_wrapper(problem: str) -> str:
    t_set = process_gsm8k_answers(dataset['train'])
    return prompt_generation_5_shot(problem, t_set)

In [None]:
def prompt_generation_5_shot_cot(problem: str, training_set: Dataset) -> str:
    """
    5-shot Chain of Thought (CoT) prompt generation. Randomly selects 5 examples
    from the training set and includes reasoning steps.

    Returns:
    str: The generated prompt with 5 CoT in-context examples.
    """
    # IMPLEMENT HERE
    examples = random.sample(training_set, 5)
    examps = "\n\n".join(
        [f"Problem: {ex['question']}\nReasoning: {ex['answer']}\nFinal Answer: {ex['processed_answer']}" for ex in examples]
    )

    return f"{examps}\n\n Problem: {problem}\n Reasoning and answer:"

In [None]:
def prompt_generation_5_shot_cot_wrapper(problem: str) -> str:
    t_set = list(process_gsm8k_answers(dataset['train']))
    return prompt_generation_5_shot_cot(problem, t_set)

In [None]:
# Feel free to change the method definition

def my_prompt(problem: str) -> str:
    """
    Your own unique way of prompting an LLM for Math word problems.

    Returns:
    str: The generated prompt
    """
    # IMPLEMENT HERE
    return f"solve the following math problem step by step. once you reach the answer, verify if its correct by checking your solution.\n\nProblem: {problem}\nSteps:\nAnswer:"

In [None]:
import re
def answer_processing(prediction: str, prompt_function: Any) -> str:
    """
    Processes the model's generated output to extract the final answer.

    Returns:
    str: The processed numerical answer.
    """

    prompt_name = prompt_function.__name__

    answer = None
    if "Answer:" in prediction:
        answer = prediction.split("Answer:")[-1].strip()
    elif "The answer is" in prediction:
        answer = prediction.split("The answer is")[-1].strip()
    else:
        answer = prediction.strip().split('\n')[-1].strip()

    num = re.findall(r"\d+", answer)
    if num:
        return f"# {''.join(num)}"
    else:
        return "# 0"

In [None]:
# Do not change, method to calculate accuracy from predictions and ground truth labels

def evaluate_accuracy(predictions: List[str], ground_truths: List[str]) -> float:
    correct = 0
    total = len(predictions)

    for pred, true in zip(predictions, ground_truths):
        if pred == true:
            correct += 1

    accuracy = correct / total
    return accuracy * 100

In [None]:
def pipeline_generate(
    model_instance: Any,
    test_set: Dataset,
    prompt_function: Callable[[str], str],
    process_answer_function: Callable[[str], str],
    evaluation_function: Callable[[List[str], List[str]], float],
    self_consistency: int,
) -> float:
    """
    Args:
    model_instance (Any): The Google Gemini model instance.
    test_set (Dataset): The GSM8K test set to evaluate on.
    prompt_function (Callable): Function to generate prompts for the test set.
    process_answer_function (Callable): Function to process the model's generated answers.
    evaluation_function (Callable): Function to evaluate model's answers against the ground truth.
    self_consistency: Number of samples to run self-consistency approach on.
    If negative, 0 or 1, this implies regular prompting

    Returns:
    float: The accuracy of the model on the test set.
    """

    predictions = []
    grounds = []

    for sample in test_set:
        problem = sample["question"]
        ground = sample["processed_answer"]

        #print(f"Problem: {problem}")
        #print(f"ground answer: {ground}")

        if self_consistency > 1:
            answers = []
            for _ in range(self_consistency):
                prompt = prompt_function(problem)
                #print(f"\n{prompt}")
                response = model_instance.generate_content(prompt)
                #print(f"output:\n{response.text}")
                processed_answer = process_answer_function(response.text, prompt_function)
                #print(f"answer: {processed_answer}")
                answers.append(processed_answer)

            final_answer = max(set(answers), key=answers.count)
        else:
            prompt = prompt_function(problem)
            #print(f"\n{prompt}")
            response = model_instance.generate_content(prompt)
            #print(f"output:\n{response.text}")
            final_answer = process_answer_function(response.text, prompt_function)
            #print(f"final answer: {final_answer}")

        predictions.append(final_answer)
        grounds.append(ground)

    accuracy = evaluation_function(predictions, grounds)
    #print(f"Accuracy: {accuracy}%")
    return accuracy

In [None]:
training_set = list(process_gsm8k_answers(dataset['train']))
print("Sample from the training set:", training_set[0])

In [None]:
gsm8k_test_processed = process_gsm8k_answers(dataset['test'])

# The following line is just to test your systems, comment this line out to report results on the entire test set in 3.3
gsm8k_test_processed = Dataset.from_dict(gsm8k_test_processed[:5])

# Run model generation with zero-shot prompt generation
accuracy = pipeline_generate(
    model_instance=model,
    test_set=gsm8k_test_processed,
    prompt_function=prompt_generation_zero_shot_cot, # Change this to test different prompt methods
    process_answer_function=answer_processing,
    evaluation_function=evaluate_accuracy,
    self_consistency=5,
)

print(f"Accuracy: {accuracy}%")