# OpenChat Model
- The model is opensource and has an "apache-2.0" licence
- The model requires a CUDA architecture to function, so it will be run using an inference endpoint hosted on hugging face, => AWS
- Ideally the model would run locally, but for demonstration purposes an API call is used. The actual size of the model is around 15GB

In [10]:
import requests
import random
import re
from datasets import load_dataset
import pandas as pd
import json

In [2]:
key = ""

In [3]:
# endpoint and query function

API_URL = "https://vrv92f7slj7x8qqc.us-east-1.aws.endpoints.huggingface.cloud"
headers = {
	"Authorization": key,
	"Content-Type": "application/json"
}

def query(payload):
    response = requests.post(API_URL, headers=headers, json=payload)
    return response.json()

Example query

In [None]:
output = query({
	"inputs": "Kylar went to the store to buy glasses for his new apartment. One glass costs $5, but every second glass costs only 60% of the price. Kylar wants to buy 16 glasses. How much does he need to pay for them?",
    "parameters": {
    "max_new_tokens": 150
  }
})

In [None]:
print(output)

### Functions

In [18]:
def get_random_test_object(dataset):
    if "test" in dataset and len(dataset["test"]) > 0:
        random_index = random.randint(0, len(dataset["test"]) - 1)
        test_object = dataset["test"][random_index]
        
        # Assuming each object in the dataset has 'question' and 'answer' keys
        question = test_object.get("question", "No question found")
        answer = test_object.get("answer", "No answer found")
        
        return question, answer
    else:
        return None, None
    
def get_multiple_test_objects(dataset, num_objects):
    qa_pairs = []

    for _ in range(num_objects):
        question, answer = get_random_test_object(dataset)
        if question and answer:
            qa_pairs.append({'question': question, 'answer': answer})
    
    return qa_pairs

In [5]:
def extract_dataset_answer(text):
    match = re.search(r'####\s*(\d+)', text)
    if match:
        return match.group(1)
    else:
        return "Answer not found"
    
def extract_answer(model_response):
    text = model_response[0]['generated_text']
    
    # Use regular expression to find the pattern '### Answer : [answer]'
    match = re.search(r'### Answer :\s*(\d+)', text)
    
    if match:
        return match.group(1)
    else:
        return "Answer not found"

In [6]:
def check_answer(answer, output):
    if not (isinstance(answer, str) and isinstance(output, str)):
        raise TypeError("Must be of type str")
    if re.search("\s" + answer +"\s*", output):
        # print("answer is correct")
        return 1
    # print("answer is wrong")
    return 0

In [7]:
def get_random_mutation_txt(txt_file_path):
    with open(txt_file_path, 'r', encoding='utf-8') as file:
        lines = file.readlines()

    # Remove any leading/trailing whitespace and filter out empty lines
    prompts = [line.strip() for line in lines if line.strip()]
    
    if prompts:
        return random.choice(prompts)
    else:
        return "No mutation prompts found."

In [8]:
def apply_mutation(prompt, mutation_prompt):
    applied_mutation = f"{mutation_prompt}:{prompt}"
    mutated_prompt = query({
        "inputs":applied_mutation,
            "parameters": {
            "max_new_tokens": 150
        }
    })

    return mutated_prompt

We store good prompts to show at the end, and bad prompts are kept to be mutated/crossover to be improved

### Genetic Algorithm

In [25]:
class GeneticAlgorithm:
    def __init__(self, dataset, mutation_file):
        self.dataset = dataset
        self.mutation_file = mutation_file
        self.initial_population = self.generate_initial_population(20)
        self.good_prompts = []
        self.bad_prompts = []
        self.mutated_bad_prompts = []

    def query(payload):
        response = requests.post(API_URL, headers=headers, json=payload)
        return response.json()

    def generate_initial_population(self, num_of_population):
        num_pairs = num_of_population
        qa_pairs = get_multiple_test_objects(self.dataset, num_pairs)
        return qa_pairs

    def check_answer(self, answer, output):
        if not (isinstance(answer, str) and isinstance(output, str)):
            raise TypeError("Must be of type str")
        if re.search("\s" + answer +"\s*", output):
            # print("answer is correct")
            return 1
        # print("answer is wrong")
        return 0
        
    def apply_mutation(self, prompt):
        mutation_prompt = get_random_mutation_txt(self.mutation_file)
        applied_mutation = f"{mutation_prompt}:{prompt}"
        mutated_prompt = query({
            "inputs":applied_mutation,
                "parameters": {
                "max_new_tokens": 180
            }
        })
        return mutated_prompt

    def run_iteration(self):
        self.bad_prompts = []

        for question, correct_answer in self.initial_population:
            dataset_answer = extract_dataset_answer(correct_answer)
            response = query({
                "inputs": question,
                "parameters": {
                "max_new_tokens": 180
            }
            })
            answer = extract_answer(response)

            if self.check_answer(dataset_answer, answer):
                self.good_prompts.append(question)
            else:
                self.bad_prompts.append(question)

            for bad_prompt in self.bad_prompts:
                mutated_prompt = self.apply_mutation(bad_prompt)
                self.mutated_bad_prompts.append(mutated_prompt)

    def run(self, num_iterations):
        for _ in range(num_iterations):
            self.run_iteration()
        return self.good_prompts

In [11]:
dataset = load_dataset("gsm8k", "main")

In [26]:
ga = GeneticAlgorithm(dataset, 'mutation_prompts.txt')

In [28]:
num_iterations = 1
final_good_prompts = ga.run(num_iterations)

KeyboardInterrupt: 