In [1]:
import os
import sys
import time
import re
from typing import List, Union, Dict, Any

sys.stdout = open(os.devnull, 'w')
sys.stderr = open(os.devnull, 'w')

!pip install transformers[torch] datasets
!pip install -U bitsandbytes
!pip install -U accelerate
!pip install --upgrade langchain
!pip install langchain-community
!pip install google-search-results


import torch
from transformers import (AutoModelForCausalLM, AutoTokenizer,
                          BitsAndBytesConfig, GenerationConfig, pipeline)
from transformers import DebertaV2Tokenizer, DebertaV2ForSequenceClassification

from langchain import (HuggingFacePipeline, LLMChain)
from langchain.agents import (load_tools, initialize_agent, Tool,
                               AgentExecutor, LLMSingleActionAgent,
                               AgentOutputParser)
from langchain.prompts import StringPromptTemplate
from langchain.schema import AgentAction, AgentFinish
from datasets import load_dataset

sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__

os.environ["SERPER_API_KEY"] = "5d73a1cb275e08283e0c3b7d6743c4390dc26e5a"

In [2]:
from huggingface_hub import notebook_login
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [3]:
# Model
MODEL_NAME = "google/gemma-2b-it"

def load_model(model_name):
  quantization_config = BitsAndBytesConfig(load_in_4bit=True,
                                          bnb_4bit_compute_dtype=torch.bfloat16,
                                          bnb_4bit_use_double_quant=True,
                                          bnb_4bit_quant_type= "nf4"
                                          )

  quantized_model = AutoModelForCausalLM.from_pretrained(
                    model_name,
                    device_map="auto",
                    torch_dtype=torch.bfloat16,
                    quantization_config=quantization_config
                    )

  tokenizer = AutoTokenizer.from_pretrained(model_name)
  tokenizer.pad_token = tokenizer.eos_token
  return quantized_model, tokenizer

model, tokenizer = load_model(MODEL_NAME)
model.eval()

config.json:   0%|          | 0.00/627 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/13.5k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/4.95G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/67.1M [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/137 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/34.2k [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/4.24M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/17.5M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/636 [00:00<?, ?B/s]

GemmaForCausalLM(
  (model): GemmaModel(
    (embed_tokens): Embedding(256000, 2048, padding_idx=0)
    (layers): ModuleList(
      (0-17): 18 x GemmaDecoderLayer(
        (self_attn): GemmaSdpaAttention(
          (q_proj): Linear4bit(in_features=2048, out_features=2048, bias=False)
          (k_proj): Linear4bit(in_features=2048, out_features=256, bias=False)
          (v_proj): Linear4bit(in_features=2048, out_features=256, bias=False)
          (o_proj): Linear4bit(in_features=2048, out_features=2048, bias=False)
          (rotary_emb): GemmaRotaryEmbedding()
        )
        (mlp): GemmaMLP(
          (gate_proj): Linear4bit(in_features=2048, out_features=16384, bias=False)
          (up_proj): Linear4bit(in_features=2048, out_features=16384, bias=False)
          (down_proj): Linear4bit(in_features=16384, out_features=2048, bias=False)
          (act_fn): PytorchGELUTanh()
        )
        (input_layernorm): GemmaRMSNorm((2048,), eps=1e-06)
        (post_attention_layernorm): G

In [4]:
dataset = load_dataset("cais/mmlu", "all")
data = dataset.filter(lambda example: example["subject"] == "college_mathematics")

README.md:   0%|          | 0.00/53.2k [00:00<?, ?B/s]

dataset_infos.json:   0%|          | 0.00/138k [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/3.50M [00:00<?, ?B/s]

validation-00000-of-00001.parquet:   0%|          | 0.00/408k [00:00<?, ?B/s]

dev-00000-of-00001.parquet:   0%|          | 0.00/76.5k [00:00<?, ?B/s]

auxiliary_train-00000-of-00001.parquet:   0%|          | 0.00/47.5M [00:00<?, ?B/s]

Generating test split:   0%|          | 0/14042 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/1531 [00:00<?, ? examples/s]

Generating dev split:   0%|          | 0/285 [00:00<?, ? examples/s]

Generating auxiliary_train split:   0%|          | 0/99842 [00:00<?, ? examples/s]

Filter:   0%|          | 0/14042 [00:00<?, ? examples/s]

Filter:   0%|          | 0/1531 [00:00<?, ? examples/s]

Filter:   0%|          | 0/285 [00:00<?, ? examples/s]

Filter:   0%|          | 0/99842 [00:00<?, ? examples/s]

In [5]:
def generate_response(prompt, max_new_tokens, temperature):
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    with torch.no_grad():
        output = model.generate(
            inputs["input_ids"],
            max_new_tokens=max_new_tokens,
            temperature=temperature,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id
        )
    response = tokenizer.decode(output[0], skip_special_tokens=True)
    return response.strip()


def calculate_accuracy(actual_answers, predicted_answers):
  cc = 0
  for i in range(len(actual_answers)):
    if actual_answers[i] == predicted_answers[i]:
      cc += 1
  return cc/len(actual_answers)


def extract_correct_option_with_same_llm(text):
    prompt = text
    prompt += 'The correct option is Option:'
    return generate_response(prompt, 1, 0.2)


classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")

def extract_correct_option_with_bart(text):
    candidate_labels = ["Answer option A is correct and final", "Answer option B is correct and final", "Answer option C is correct and final", "Answer option D is correct and final"]
    result = classifier(text, candidate_labels)
    predicted_option = result['labels'][0]
    return predicted_option.split()[2]


deberta = "microsoft/deberta-v3-large"
deberta_tokenizer = DebertaV2Tokenizer.from_pretrained(deberta)
deberta_model = DebertaV2ForSequenceClassification.from_pretrained(deberta)

def extract_correct_option_with_debarta(text, options=['A', 'B', 'C', 'D']):
    hypothesis_template = "The correct option is {}."
    candidate_labels = [hypothesis_template.format(option) for option in options]
    inputs = deberta_tokenizer(text, candidate_labels, return_tensors="pt", padding=True, truncation=True)
    with torch.no_grad():
        outputs = deberta_model(**inputs)
    predicted_class = torch.argmax(outputs.logits, dim=1).item()
    return options[predicted_class]


config.json:   0%|          | 0.00/1.15k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.63G [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/52.0 [00:00<?, ?B/s]

spm.model:   0%|          | 0.00/2.46M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/580 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/874M [00:00<?, ?B/s]

In [22]:
def zeroshot_inference(dataset):
    ground_truth_answers = []
    model_predicted_answers = []
    model_responses = []
    extracted_answers = []
    processed_answers = []

    start_time = time.time()

    for example in dataset:
        question = example["question"]
        choices = example["choices"]
        choice_a, choice_b, choice_c, choice_d = choices
        correct_answer = example["answer"]

        prompt = f"MCQ Question: {question}\n"
        prompt += "Four available options:\n"
        prompt += f"Option A. {choice_a}\nOption B. {choice_b}\nOption C. {choice_c}\nOption D. {choice_d}\n"
        prompt += "Return correct option in format 'Option X'."

        raw_response = generate_response(prompt, max_new_tokens=512, temperature=0.2)
        response = raw_response.split("Return correct option in format 'Option X'")[1].strip()

        try:
            extracted_answer = extract_correct_option_with_bart(response)
        except Exception as e:
            extracted_answer = 'N/A'

        extracted_answers.append(extracted_answer)

        option_to_index = {'A': 0, 'B': 1, 'C': 2, 'D': 3}

        if extracted_answer in option_to_index:
            predicted_answer = option_to_index[extracted_answer]
        else:
            predicted_answer = extracted_answer

        ground_truth_answers.append(correct_answer)
        model_predicted_answers.append(predicted_answer)
        model_responses.append(response)

    end_time = time.time()
    total_duration = end_time - start_time

    accuracy = calculate_accuracy(ground_truth_answers, model_predicted_answers)

    return accuracy, ground_truth_answers, model_predicted_answers, model_responses, extracted_answers, processed_answers, total_duration

zero_inference_accuracy, zero_actual_answers, zero_predicted_answers, zero_responses, zero_extracted_answers, zero_processed_answers, zero_inference_duration = zeroshot_inference(data['test'].select(range(30)))
zero_inference_results = {
    'zero_accuracy': zero_inference_accuracy,
    'zero_duration': zero_inference_duration,
    'zero_actual_answers': zero_actual_answers,
    'zero_predicted_answers': zero_predicted_answers,
}
zero_inference_results

{'zero_accuracy': 0.3,
 'zero_duration': 287.76387786865234,
 'zero_actual_answers': [1,
  3,
  3,
  0,
  2,
  3,
  2,
  2,
  2,
  0,
  3,
  1,
  0,
  1,
  3,
  3,
  1,
  2,
  3,
  0,
  3,
  3,
  1,
  0,
  1,
  3,
  3,
  0,
  0,
  1],
 'zero_predicted_answers': [0,
  1,
  0,
  3,
  0,
  0,
  0,
  2,
  2,
  3,
  0,
  0,
  2,
  0,
  0,
  3,
  3,
  0,
  3,
  0,
  3,
  2,
  2,
  0,
  0,
  3,
  0,
  2,
  0,
  2]}

In [16]:
def chainshot_inference(dataset):
    ground_truth_answers = []
    model_predicted_answers = []
    model_responses = []
    extracted_answers = []
    processed_answers = []

    start_time = time.time()

    for example in dataset:
        question = example["question"]
        choices = example["choices"]
        choice_a, choice_b, choice_c, choice_d = choices
        correct_answer = example["answer"]

        prompt = f"MCQ Question: {question}\n"
        prompt += "Four available options:\n"
        prompt += f"Option A. {choice_a}\nOption B. {choice_b}\nOption C. {choice_c}\nOption D. {choice_d}\n"
        prompt += "Think step by step, it is a math question."
        prompt += "Return correct option in format 'Option X'."

        raw_response = generate_response(prompt, max_new_tokens=512, temperature=0.2)
        response = raw_response.split("Return correct option in format 'Option X'")[1].strip()

        try:
            extracted_answer = extract_correct_option_with_bart(response)
        except Exception as e:
            extracted_answer = 'N/A'

        extracted_answers.append(extracted_answer)

        option_to_index = {'A': 0, 'B': 1, 'C': 2, 'D': 3}

        if extracted_answer in option_to_index:
            predicted_answer = option_to_index[extracted_answer]
        else:
            predicted_answer = extracted_answer

        ground_truth_answers.append(correct_answer)
        model_predicted_answers.append(predicted_answer)
        model_responses.append(response)

    end_time = time.time()
    total_duration = end_time - start_time

    accuracy = calculate_accuracy(ground_truth_answers, model_predicted_answers)

    return accuracy, ground_truth_answers, model_predicted_answers, model_responses, extracted_answers, processed_answers, total_duration

chain_accuracy, chain_actual_answers, chain_predicted_answers, chain_responses, chain_extracted_answers, chain_processed_answers, chain_duration = chainshot_inference(data['test'].select(range(30)))
chain_results = {
    'chain_accuracy': chain_accuracy,
    'chain_duration': chain_duration,
    'chain_actual_answers': chain_actual_answers,
    'chain_predicted_answers': chain_predicted_answers,
}
chain_results

{'chain_accuracy': 0.3333333333333333,
 'chain_duration': 402.1555163860321,
 'chain_actual_answers': [1,
  3,
  3,
  0,
  2,
  3,
  2,
  2,
  2,
  0,
  3,
  1,
  0,
  1,
  3,
  3,
  1,
  2,
  3,
  0,
  3,
  3,
  1,
  0,
  1,
  3,
  3,
  0,
  0,
  1],
 'chain_predicted_answers': [0,
  2,
  0,
  0,
  0,
  0,
  0,
  1,
  0,
  1,
  0,
  1,
  3,
  1,
  0,
  0,
  3,
  0,
  3,
  0,
  3,
  2,
  3,
  0,
  0,
  3,
  0,
  0,
  0,
  2]}

In [17]:
llm_pipeline = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    max_new_tokens=512,
    torch_dtype=torch.bfloat16,
    device_map="auto",
)
llm = HuggingFacePipeline(pipeline=llm_pipeline)

class CustomPromptTemplate(StringPromptTemplate):
    template: str
    tools: List[Tool]
    input_variables: List[str] = ["input", "intermediate_steps"]

    def format(self, **kwargs) -> str:
        intermediate_steps = kwargs.pop("intermediate_steps", [])
        thoughts = "".join(
            f"{action.log}\nObservation: {observation}\nThought: "
            for action, observation in intermediate_steps
        )
        kwargs["agent_scratchpad"] = thoughts
        kwargs["tools"] = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools])
        kwargs["tool_names"] = ", ".join([tool.name for tool in self.tools])
        return self.template.format(**kwargs)

class CustomOutputParser(AgentOutputParser):
    def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]:
        if "Option corresponding to correct answer is:" in llm_output:
            return AgentFinish(
                return_values={"output": llm_output},
                log=llm_output,
            )
        match = re.search(r"Action: (.*?)\nAction Input:[\s]*(.*)", llm_output, re.DOTALL)
        if not match:
            return AgentFinish(return_values={"output": llm_output}, log=llm_output)
        return AgentAction(
            tool=match.group(1).strip(),
            tool_input=match.group(2).strip(),
            log=llm_output,
        )

tools = [
    Tool.from_function(
        name="llm-math",
        description="Mathematical operations using an LLM.",
        func=llm
    ),
    Tool.from_function(
        name="google-serper",
        description="Search using Google Serper.",
        func=lambda x: x
    )
]

prompt = CustomPromptTemplate(
    template="""Answer the following question using these tools:

{tools}

Use this format:
Question: {input}
Thought: Think what to do next
Action: Pick an action [{tool_names}]
Action Input: Provide the input
Observation: Result of action
... (repeat as needed)
Thought: I know the final answer
Option corresponding to correct answer is: The option

Question: {input}
{agent_scratchpad}""",
    tools=tools,
    input_variables=["input", "intermediate_steps"]
)

llm_chain = LLMChain(llm=llm, prompt=prompt)

agent = LLMSingleActionAgent(
    llm_chain=llm_chain,
    output_parser=CustomOutputParser(),
    stop=["\nObservation:"],
    allowed_tools=[tool.name for tool in tools]
)

agent_executor = AgentExecutor.from_agent_and_tools(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=1,
)

In [18]:
def react_inference(dataset):
    ground_truth_answers = []
    model_predicted_answers = []
    extracted_answers = []
    agent_outputs = []
    processed_answers = []

    start_time = time.time()

    for example in dataset:
        question = example["question"]
        choices = example["choices"]
        choice_a, choice_b, choice_c, choice_d = choices
        correct_answer = example["answer"]

        prompt = f"MCQ Question: {question}\n"
        prompt += "Four available options:\n"
        prompt += f"Option A. {choice_a}\nOption B. {choice_b}\nOption C. {choice_c}\nOption D. {choice_d}\n"
        prompt += "Return correct option in format 'Option X'."

        agent_output = agent_executor.run(prompt)
        agent_outputs.append(agent_output)

        try:
            text = agent_output.strip().split('\n\nQuestion: ')[1].strip().split("Return correct option in format 'Option X'.")[1].strip()
        except Exception as e:
            text = None

        if text:
            try:
              extracted_answer = extract_correct_option_with_bart(text)
            except Exception as e:
              extracted_answer = 'N/A'
        else:
            extracted_answer = 'N/A'

        extracted_answers.append(extracted_answer)

        option_to_index = {'A': 0, 'B': 1, 'C': 2, 'D': 3}
        if extracted_answer in option_to_index:
            predicted_answer = option_to_index[extracted_answer]
        else:
            predicted_answer = extracted_answer

        ground_truth_answers.append(correct_answer)
        model_predicted_answers.append(predicted_answer)

    end_time = time.time()
    total_duration = end_time - start_time

    accuracy = calculate_accuracy(ground_truth_answers, model_predicted_answers)

    return accuracy, ground_truth_answers, model_predicted_answers, agent_outputs, extracted_answers, processed_answers, total_duration


react_accuracy, react_actual_answers, react_predicted_answers, react_agent_outputs, react_extracted_answers, react_processed_answers, react_duration = react_inference(data['test'].select(range(30)))
react_results = {
    'react_accuracy': react_accuracy,
    'react_duration': react_duration,
    'react_actual_answers': react_actual_answers,
    'react_predicted_answers': react_predicted_answers,
}
react_results

{'react_accuracy': 0.2,
 'react_duration': 257.25002002716064,
 'react_actual_answers': [1,
  3,
  3,
  0,
  2,
  3,
  2,
  2,
  2,
  0,
  3,
  1,
  0,
  1,
  3,
  3,
  1,
  2,
  3,
  0,
  3,
  3,
  1,
  0,
  1,
  3,
  3,
  0,
  0,
  1],
 'react_predicted_answers': [0,
  1,
  0,
  1,
  0,
  0,
  0,
  1,
  2,
  0,
  0,
  0,
  2,
  1,
  0,
  0,
  0,
  1,
  0,
  0,
  1,
  2,
  0,
  0,
  0,
  0,
  0,
  2,
  0,
  2]}

In [23]:
react_agent_outputs

["Answer the following question using these tools:\n\nllm-math: Mathematical operations using an LLM.\ngoogle-serper: Search using Google Serper.\n\nUse this format:\nQuestion: MCQ Question: Let k be the number of real solutions of the equation e^x + x - 2 = 0 in the interval [0, 1], and let n be the number of real solutions that are not in [0, 1]. Which of the following is true?\nFour available options:\nOption A. k = 0 and n = 1\nOption B. k = 1 and n = 0\nOption C. k = n = 1\nOption D. k > 1\nReturn correct option in format 'Option X'.\nThought: Think what to do next\nAction: Pick an action [llm-math, google-serper]\nAction Input: Provide the input\nObservation: Result of action\n... (repeat as needed)\nThought: I know the final answer\nOption corresponding to correct answer is: The option\n\nQuestion: MCQ Question: Let k be the number of real solutions of the equation e^x + x - 2 = 0 in the interval [0, 1], and let n be the number of real solutions that are not in [0, 1]. Which of 