In [None]:
from openai import OpenAI
from openai import APIConnectionError
from openai import RateLimitError
from openai import APIStatusError
from tenacity import retry, wait, wait_fixed

import replicate
from dotenv import load_dotenv
import os
import pandas as pd
load_dotenv()

In [None]:
# parsing MMLU parquet files
class_name = "all"
types = ["auxiliary_train", "dev", "test", "validation"]
dfa = []

for type_name in types:
    dfa.append(pd.read_parquet("mmlu-test/{}/{}-00000-of-00001.parquet".format(class_name, type_name), engine="fastparquet"))
df = pd.concat([dff for dff in dfa], ignore_index=True)

In [None]:
# MODEL = "gpt-3.5-turbo"
default_system_message = """What is the color of the sky? Answer in one word"""

# seed_model_system_message = """You are an evaluator agent. You have access to a large dataset of multiple choice questions \
                        # from the MMLU dataset but you won't interact with the dataset directly. Instead you will generate \
                        # a random number between 0 to 1000. The question at this number will be presented to a test LLM and its response \
                        # given to you. The test model will be presented with the question and a list of choices. The test model is supposed \
                        # to return an index. You have to check that the index is valid that is, corresponds to a choice in the list. Moreover,\
                        # you have to check if the LLM correctly answered the question by comparing with the correct answer index. Based \
                        # on the result, you will either output the string is_correct or is_wrong"""

seed_model_system_message = """
You have access to a number of functions which you can call by generating a json formatted python string where \
the fn field is a mapping from integer to the function to be called. Rest of the fields are paramters to the function if any. \
You can generate a random number between start and end values by setting fn to 0 and setting appropriate values for start and end. \
You can also sample from a dataset by setting fn to 1. This function does not require any arguments. \
You must only give the command, no need to explain. Strictly only the json formatted string!
"""

seed_model_check_system_message = """
You will be given two strings. The first one may or may not contain a number. The second string will be a number. \
Output the string Correct you can detect the number in the first string otherwise return the string No.\
The input will be as follows:
Model: <STRING>
Ref: <STRING>
For example:
Model: The correct answer is 4
Ref: 4
Your output should be Correct
"""

test_model_system_message = """
You will be asked a multiple choice question. You have to output the correct option number and nothing else. Just one valid integer. \
The format of the question will be as follows:
Question: <QUESTION>
Choices: 0. <CHOICE 1> 1. <CHOICE 2> and so on. 
Make sure that you ONLY output one valid integer and this is zero-based indexing!
"""

oai = OpenAI()

In [None]:
"".join(replicate.run("meta/meta-llama-3-8b-instruct", input={
    "prompt": "Call the appropriate function to generate a number between 0 and 1000",
    "system_prompt": seed_model_system_message
}))

In [None]:

# A custom agent to evaluate a given TEST_MODEL using randomly selected MCQ questions from the MMLU dataset
# The SEED_MODEL is a very basic LLM which orchestrates the evaluation process
import random
import json
import sys

class Agent:
    def __init__(self, dataset, seed_model="meta/meta-llama-3-8b-instruct", test_model="gpt-3.5-turbo", temperature=1):
        print("Starting agent")
        self.history = []
        self.temperature = temperature
        self.dataset = dataset
        self.seed_model = seed_model
        self.test_model = test_model

    def set_test_model(test_model):
        self.test_model = test_model

    def get_test_model():
        return self.test_model

    # @wait(wait=wait_fixed(2))
    def get_response(self, question, choices):
        try:
            choice_str = " ".join([ str(idx) + ". " + choice + " " for idx, choice in enumerate(choices)])
            question_str = question + " " + choice_str + " What is the correct answer?"
            self.history.append({"role": "system", "content": question_str })
            
            if self.test_model == "gpt-3.5-turbo":
                response = oai.chat.completions.create(
                    model=self.test_model, messages=self.history, temperature=self.temperature
                )
                return {"role": "assistant", "content": response.choices[0].message.content}
            elif self.test_model.startswith("meta") or self.test_model.startswith("mistral"): 
                return "".join(replicate.run(self.test_model, input={
                    "prompt": question_str,
                    "system_prompt": test_model_system_message
                }))
        except APIConnectionError as e:
            print("The server could not be reached")
            print(e.__cause__)  # an underlying Exception, likely raised within httpx.
        except RateLimitError as e:
            print("A 429 status code was received; we should back off a bit.")
        except APIStatusError as e:
            print("Another non-200-range status code was received")
            print(e.status_code)
            print(e.response)

    def _gen_random(self, start, end):
        return random.randint(start, end)

    def _get_question_from_mmlu(self, index):
        return self.dataset.loc[index]

    def get_question_set(self):
        r = self.execute_fncall(
            "".join(replicate.run(
             self.seed_model,
                input={
                "prompt": "Call the appropriate helper function to sample a random question from the dataset",
    "system_prompt": seed_model_system_message,
                })))
        return r

    def check_if_correct(self, response, question_set):
        prompt = "Model: " + response + "\nRef: " + str(question_set['answer'])
        res = replicate.run(
            self.seed_model, 
            input={
                "prompt": prompt,
                "system_prompt": seed_model_check_system_message,
            })
        # print("debug check", res)
        return True if res[0] == "Correct" else False
    
    def evaluate_model(self, n_questions):
        n_correct = 0
        n_wrong = 0
        n_errors = 0
        for i in range(n_questions):
            try:
                question_set = self.get_question_set()
                question = question_set['question']
                choices = question_set['choices']

                # question = "What color is the sky?"
                # choices = ['Red', 'Green', 'Purple', 'Orange', 'Blue']
                response = self.get_response(question, choices)

                # print("debug", response)
                # print("debug", question_set['question'])
                # print("debug", question_set['answer'])
                # print("debug", question_set['choices'])
                is_correct = self.check_if_correct(response, question_set)
                if is_correct:
                    n_correct += 1
                else:
                    n_wrong += 1
            except TypeError as te:
                print("Expected integer response from test model, got something else", te)
                n_errors += 1
            except:
                raise
                print("Something unexpected occurred")
                n_errors += 1
        return { "n_questions": n_questions, "n_correct": n_correct, "n_wrong": n_wrong, "n_errors": n_errors
            , "accuracy": n_correct / (n_questions - n_errors) if (n_questions - n_errors) > 0 else 0 }
        
    def execute_fncall(self, response):
        try:
            res = json.loads(response)
            if res['fn'] == 0:
                # call random number 
                return self._gen_random(res['start'], res['end'])
            elif res['fn'] == 1:
                # call get question
                return self._get_question_from_mmlu(self._gen_random(0, self.dataset.shape[0]))
            else:
                raise ValueError("Non existent function called {}".format(res.fn))
        except ValueError as e:
            print("Non existent function called", e)
        except:
            print("Some unknown error occured")
            raise # handle later by logging to error file

def main():
    TEST_MODEL = "meta/llama-2-13b"
    # agent = Agent(df, test_model="meta/meta-llama-3-8b-instruct") # temperature
    agent = Agent(df, test_model=TEST_MODEL)
    print(agent.evaluate_model(5))
    # question = "What color is the sky?"
    # choices = ['Red', 'Green', 'Purple', 'Orange', 'Blue']
    # print(agent.get_response(question, choices))

if __name__ == "__main__":
    main()