In [26]:
import re
import json
import sympy as sp
import requests
from typing import List, Dict, Any
from transformers import AutoTokenizer, AutoModelForCausalLM
from transformers import pipeline, PreTrainedModel, PreTrainedTokenizerFast
from torch.optim import Adam
import torch
from torch.distributions import Categorical
from dotenv import load_dotenv
from openai import OpenAI
import openai
import os

In [2]:
load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

LLM_MODEL = "google/gemma-2-2b-it" 
REWARD_MODEL = "google/gemma-2-2b-it"
MAX_HOTPOT_STEPS = 5
MAX_GSM8K_STEPS = 10

tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL)
model = AutoModelForCausalLM.from_pretrained(LLM_MODEL).to(device)
# query_pipe = pipeline("text-generation", model=query_model, tokenizer=query_tokenizer, device=device)

reward_tokenizer = AutoTokenizer.from_pretrained(REWARD_MODEL)
reward_model = AutoModelForCausalLM.from_pretrained(REWARD_MODEL)

# Should be dictionary of form {tool_name: tool_function}
tools_list = []

Loading checkpoint shards: 100%|██████████| 2/2 [00:02<00:00,  1.23s/it]
Loading checkpoint shards: 100%|██████████| 2/2 [00:05<00:00,  2.88s/it]


In [18]:
def trajectory_history_prompt(trajectory):
    traj = "Chat History:\n"
    for i in range(len(trajectory)):
        traj += f"Attempt {i + 1}:\n{trajectory[i]}\n"
    return f'''The entire history of your previous outputs when generating this problem, its test cases, and solution, is presented below.\n{traj}'''

def mutation_prompt(problem):
    print(problem)
    return f'''Please mutate this question to increase the difficulty of it. The new question should be sufficiently different and not be a paraphrased version of the original question. Do not provide any hints, solutions or outputs. Only one new instruction is allowed. Additionally, generate rationales for what changes you have made, and how these make the problem harder or more correct. Ensure that you include the <|New Question Begin|>, <|New Question End|>, <|Rationales Begin|>, <|Rationales End|> tags as depicted.
Question: {problem}

### Prompt Template
<|New Question Begin|>
[New Question]
<|New Question End|>
<|Rationales Begin|>
[Rationales for Thinking]
<|Rationales End|>
'''

def test_prompt(problem, trajectory):
    return f'''Please generate unit tests that can be used to verify a solution to this problem in the format within the specified section below. The entire chat history of your previous attempts to generate questions and unit tests is presented below in the "Chat History" section below. Please place the problem in the appropriate tags in the template; you may modify it to more appropriately match edge-cases in the tests, but do not change the core idea of the problem. Ensure your code is within code blocks. For the tests, use pytest style by defining individual test functions (without classes) and using assert statements. Your tests should be implementation independent and assume the solution function is called "solution".
Question: {problem}

### Prompt Template
<|Question Begin|>
[New Question]
<|Question End|>
<|Rationales Begin|>
[Rationales for Thinking]
<|Rationales End|>
<|Test Begin|>
[Unit Test Code in Python]
<|Test End|>
{trajectory_history_prompt(trajectory)}
'''

def sol_prompt(problem, trajectory):
    return f'''Please generate a solution to the given question in the specified section below. The entire chat history of your previous attempts to generate questions, unit tests, and solutions is presented below in the "Chat History" section below. Please place the problem in the appropriate tags in the template; you may modify it to more appropriately match edge-cases in the tests, but do not change the core idea of the problem. Ensure your code is within code blocks. For the tests, use pytest style by defining individual test functions (without classes) and using assert statements. You may modify the tests to better fit the problem and solution, but do not reduce coverage or any edge-cases. Your solution should be a function called "solution". Your tests will be ran against your solution in a code execution environment and the result will be given to you in the next query 
Question: {problem}

### Prompt Template
<|Question Begin|>
[New Question]
<|Question End|>
<|Rationales Begin|>
[Rationales for Thinking]
<|Rationales End|>
<|Test Begin|>
[Unit Test Code in Python]
<|Test End|>
<|Solution Begin|>
[Solution Code in Python]
<|Solution End|>
{trajectory_history_prompt(trajectory)}
'''

In [None]:
# Function to get AI response
def get_openai_response(prompt, model="o3-mini", log_file="./api_log_swirl.txt"):
    try:
        response = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )
        msg = response.choices[0].message.content.strip()

        if log_file != None:
            with open(log_file, "a") as f:
                f.write("PROMPT:\n\n" + prompt + "\n\n" + "RESPONSE\n\n" + msg + "\n\n")

        return msg
    except Exception as e:
        return f"Error: {e}"

def query_model(prompt, log_file="./api_log_swirl.txt"):
    input_ids = tokenizer(prompt, return_tensors="pt").to(device)
    output = tokenizer.decode(model.generate(**input_ids, max_new_tokens=256)[0])
    if log_file != None:
            with open(log_file, "a") as f:
                f.write("PROMPT:\n\n" + prompt + "\n\n" + "RESPONSE\n\n" + output + "\n\n")
    return output

def parse_tag(text, tag):
    pattern = r"<\|" + tag + r" Begin\|>(.*?)<\|" + tag + r" End\|>"
    return re.findall(pattern, text, re.DOTALL)


In [32]:
def generate(seed_problem):
    response = query_model(mutation_prompt(seed_problem))
    question = parse_tag(response, "Question")
    rationales = parse_tag(response, "Rationales")

    trajectory = [response]

    for i in range(2):
        response = query_model(test_prompt(question, trajectory))
        question = parse_tag(response, "Question")
        rationales = parse_tag(response, "Rationales")
        tests = parse_tag(response, "Tests")
        trajectory.append(response)

    for i in range(2):
        response = query_model(sol_prompt(question, trajectory))
        question = parse_tag(response, "Question")
        rationales = parse_tag(response, "Rationales")
        tests = parse_tag(response, "Tests")
        solution = parse_tag(response, "Solution")
        trajectory.append(response)
    
    print(response, question, solution)

print(client.models.list())
generate("Write a function to find the most common elements and their counts of a specified text, excluding any elements that appear in the provided list of excluded words.")
    

SyncPage[Model](data=[Model(id='gpt-4o-audio-preview-2024-12-17', created=1734034239, object='model', owned_by='system'), Model(id='dall-e-3', created=1698785189, object='model', owned_by='system'), Model(id='dall-e-2', created=1698798177, object='model', owned_by='system'), Model(id='gpt-4o-audio-preview-2024-10-01', created=1727389042, object='model', owned_by='system'), Model(id='o3-mini-2025-01-31', created=1738010200, object='model', owned_by='system'), Model(id='o3', created=1744225308, object='model', owned_by='system'), Model(id='o3-mini', created=1737146383, object='model', owned_by='system'), Model(id='o3-2025-04-16', created=1744133301, object='model', owned_by='system'), Model(id='o4-mini', created=1744225351, object='model', owned_by='system'), Model(id='gpt-4.1-nano', created=1744321707, object='model', owned_by='system'), Model(id='gpt-4.1-nano-2025-04-14', created=1744321025, object='model', owned_by='system'), Model(id='gpt-4o-realtime-preview-2024-10-01', created=1727

In [None]:
 # This is just a start, hasnt been tested at all
 DISCOUNT_FACTOR = 0.99
 
 def compute_rewards(self, seed_question, trajectory) -> float:
    '''Compute the rewards for every state in the trajectory'''
        ''' TODO: I think a good initial test would include
        - Penalize the model for changing the question after the initial mutation (could probably detect this with BeRT embedding cosine simliarity)
        - Penalize the model for changing tests after test generation stage
        - Penalize the model for failing tests during solution generation (these should all be binary probably) 
        '''
        pass

    def train_on_trajectory(self, seed_question, trajectory: List[Dict[str, Any]]):
        log_probs = []
        rewards = compute_rewards(seed_question, trajectory)
        for step in trajectory:
            inputs = query_tokenizer(step, return_tensors='pt').to(self.model.device)
            outputs = self.model(**inputs)
            logits = outputs.logits[:, -1, :]
            dist = Categorical(logits=logits)

        # Compute cumulative reward
        cumulative = 0
        returns = []
        for r in reversed(rewards):
            cumulative = r + DISCOUNT_FACTOR * cumulative
            returns.insert(0, cumulative)
        returns = torch.tensor(returns)
        log_probs = torch.stack(log_probs)

        loss = -(log_probs * returns).sum()
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()