In [1]:
from aces.environement.p3.aces_p3 import ACES_p3
from transformers import HfArgumentParser, TrainingArguments, set_seed, DataCollatorForSeq2Seq
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class AcesArguments:
    """
    Arguments pertaining to which model/config/tokenizer we are going to fine-tune from.DataTrainingArguments
    """

    environement_name : str = field( default = "p3", metadata={"help": "environment name"})
    path_archive : str = field( default = "/home/flowers/work/aces/aces/environement/p3/preprocess_p3_emb_dedup_puzzles.json", metadata={"help": "path to the archive"})
    num_solutions: int = field( default = 10, metadata={"help": "number of solutions to generate to compute the difficulty score"})
    
@dataclass
class QdArguments:
    """
    Arguments pertaining to which model/config/tokenizer we are going to fine-tune from.DataTrainingArguments
    """

    a: str = field(
        default="/home/flowers/work/hf/Qwen2.5-Coder-3B-Instruct",
        metadata={"help": "Path to pretrained model or model identifier from huggingface.co/models"}
    )

@dataclass
class LLMArguments:
    """
    Arguments pertaining to which model/config/tokenizer we are going to fine-tune from.DataTrainingArguments
    """

    model_name_or_path: str = field(
        default="/home/flowers/work/hf/Qwen2.5-0.5B-Instruct",
        metadata={"help": "Path to pretrained model or model identifier from huggingface.co/models"}
    )
    online: Optional[bool] = field(
        default = False,
        metadata={
            "help": "use vllm server if True else use offline vllm"
        },
    )
    base_url: Optional[str] = field(
        default="http://localhost:8000",
        metadata={
            "help": "base url for vllm server"
        },
    )
    api_key: Optional[str] = field(
        default="",
        metadata={
            "help": "api key "
        },
    )
    gpu: Optional[bool] = field(
        default = 1,
        metadata={
            "help": "number of gpus to use (vllm)"
        },
    )
    cfg_generation : Optional[bool] = field(
        default = False,
        metadata={
            "help": "use cfg generation"
        },
    ),
    temperature: Optional[float] = field(
        default = 1.0,
        metadata={
            "help": "temperature"
        },
    )
    max_tokens: Optional[int] = field(
        default = 4000,
        metadata={
            "help": "max tokens"
        },
    )
    max_model_length: Optional[int] = field(
        default = 20000,
        metadata={
            "help": "max context size"
        },
    )

# parser = HfArgumentParser((AcesArguments,QdArguments,LLMArguments))
# model_args, data_args, training_args = parser.parse_args_into_dataclasses()#["--output_dir", "/home/flowers/work/hf/trained/"])
aces_args, qd_args, llm_args = AcesArguments(), QdArguments(), LLMArguments()


In [None]:
import random
from typing import List, Dict
from aces.llm_client import LLMClient
from dataclasses import dataclass, field
import json
from aces.environement.p3.p3_genotype import P3
from aces.environement.p3.prompt_function import get_prompt_label_p3, get_prompt_description_p3, prompt_solve_puzzle_given_f
from aces.environement.p3.skill_list import skill_list
from aces.environement.p3.utils import extract_skill, extract_solution, extract_f
from aces.code_sandbox import evaluate, pass_at_k
import numpy as np
#TODO inherite from base ACES class with common stuff
class ACES_p3:
    def __init__(self, AcesArguments: dataclass, LLMArguments : dataclass):
        # initialize LLM client
        self.llm_args = LLMArguments
        self.skill_list = skill_list

        self.init_llm()
        # initialize environment
        self.aces_args = AcesArguments 
        self.initialize_environment()
        self.archive = []
        self.semantic_descriptors = []

    def init_llm(self,) -> None:
        """init LLM client"""
        print("init LLM client")
        cfg_generation ={"model": self.llm_args.model_name_or_path, "temperature": self.llm_args.temperature,  "max_tokens": self.llm_args.max_tokens}

        self.llm = LLMClient(model = self.llm_args.model_name_or_path, 
                             cfg_generation = cfg_generation,
                             base_url = self.llm_args.base_url, 
                             api_key = self.llm_args.api_key, 
                             online = self.llm_args.online, 
                             gpu = self.llm_args.gpu,
                             max_model_length = self.llm_args.max_model_length)
        print("LLM client initialized")
    
    def initialize_environment(self) -> None:
        with open(self.aces_args.path_archive, 'r') as f:
            self.archives = json.load(f)
        list_p3 = []

        # generate semantic descriptor
        for p in self.archives:
            list_p3.append(P3(program_str = p['program_str']))
        list_p3 = self.generate_semantic_descriptors(list_p3)
        
        # generate dfficulty
        ## generate multiple solutions
        list_p3 = self.generate_multiple_solutions(list_p3)
        ## evaluate python code
        list_p3 = self.evaluate_python_code(list_p3)
        ## generate description
        list_p3 = self.generate_description(list_p3)
        self.archives = list_p3

    def generate_multiple_solutions(self, puzzles: list[P3]) -> List[P3]:
        """Use LLM to generate multiple solutions for a list of puzzle"""
        list_prompt_sol = []
        for p in puzzles:
            list_prompt_sol.append(prompt_solve_puzzle_given_f(p.program_str))
        list_solutions = self.llm.multiple_completion(list_prompt_sol,n = self.aces_args.num_solutions)
        for id_puzzle in range(len(puzzles)):
            problem = puzzles[id_puzzle].program_str 
            n_solutions = [self.process_solutions(solution=sol,problem=problem) for sol in list_solutions[id_puzzle].response]
            puzzles[id_puzzle].all_solution = n_solutions
        # verify solution with python
        return list_solutions
    
    def process_solutions(self, solution: str, problem: str) -> str: 
        """Process solution and return full puzzle (f+g)"""
        puzzle = extract_f(problem) + "\n" + extract_solution(solution)
        puzzle = puzzle.split("\nassert f")
        puzzle = puzzle[0] + "\nassert f(g()) == True\n"

    def evaluate_python_code(self, puzzles: list[P3]) -> List[P3]:
        """Evaluate python code"""
        list_task_id = []
        list_task_id_unique = []
        list_codes_to_test = []
        str_to_add=str(
                    f"\ndef run_eval():\n"
                    f"    return f(g()) == True"
                )
        for id_puz,p in enumerate(puzzles):
            list_task_id_unique.append(id_puz)
            for id_sol in range(len(p.all_solution)):
                list_task_id.append(id_puz)
                list_codes_to_test.append(p.all_solution[id_sol] + str_to_add)


        results = evaluate(list_codes_to_test, list_task_id, entry_point="run_eval")
        # dic_passk = results["pass@k"] # {task_id: pass@k} 
        raw_result = results["raw_result"] 
        for task_id in list_task_id_unique:
            all_solution = []
            all_solution_correct = []
            for id_completion in range(len(raw_result[task_id])):
                all_solution.append(raw_result[task_id][id_completion]["code"].split(str_to_add)[0])
                all_solution_correct.append(raw_result[task_id][id_completion]["correct"])
            
            puzzles[task_id].all_solution = all_solution
            puzzles[task_id].all_solution_correct = all_solution_correct

            number_solution = len(all_solution)
            c = sum(all_solution_correct)
            k=1 # estimation of pass@1
            
            if c==0:
                fitness = -np.inf
            else:
                fitness = pass_at_k(n=number_solution, c=c, k=k)
                list_correct_solution = [all_solution[i] for i in range(len(all_solution)) if all_solution_correct[i]]
                id_rd = random.randint(0,len(list_correct_solution)-1)
                puzzles[task_id].program_str = list_correct_solution[id_rd]
            puzzles[task_id].fitness = fitness

        return puzzles
    

    def generate_semantic_descriptors(self, puzzles: list[P3]) -> list[P3]:
        # Use LLM to evaluate puzzle along N programming skill dimensions
        # get prompt
        list_prompt = []
        for p in puzzles:
            list_prompt.append(get_prompt_label_p3(p.program_str, self.skill_list))
        list_skills = self.llm.multiple_completion(list_prompt)
        for i in range(len(puzzles)):
            skill, explanation_skill = extract_skill(list_skills[i].response[0])
            puzzles[i].emb = skill
            puzzles[i].explanation_emb = explanation_skill
            # puzzle[i].phenotype = skill
        return puzzles
    
    def generate_description(self, puzzles: list[P3]) -> list[P3]:
        # Use LLM to evaluate puzzle along N programming skill dimensions
        # get prompt
        list_prompt = []
        for p in puzzles:
            list_prompt.append(get_prompt_description_p3(p.program_str))
        list_description = self.llm.multiple_completion(list_prompt)
        for i in range(len(puzzles)):
            puzzles[i].description = list_description[i].response[0]
        return puzzles
    
    def explore(self, num_iterations: int):
        for _ in range(num_iterations):
            # Generate novel target in semantic space
            target_descriptors = self.generate_novel_target()
            
            # Generate puzzle matching target
            candidate_puzzle = self.generate_puzzle(target_descriptors)
            
            # Verify feasibility
            if self.evaluate_feasibility(candidate_puzzle):
                actual_descriptors = self.generate_semantic_descriptors(candidate_puzzle)
                self.generated_puzzles.append({
                    'puzzle': candidate_puzzle,
                    'descriptors': actual_descriptors
                })

    
    def generate_novel_target(self) -> List[float]:
        # Generate target that maximizes diversity from existing puzzles
        #TODO: reproduce aces targeted
        if not self.generated_puzzles:
            return [random.random() for _ in range(self.num_dimensions)]
            
        # Find underexplored regions in semantic space
        existing_descriptors = [p['descriptors'] for p in self.generated_puzzles]
        target = self.find_diverse_target(existing_descriptors)
        return target
aces= ACES_p3(aces_args, llm_args)

init LLM client


ERROR:tornado.general:SEND Error: Host unreachable


INFO 12-01 22:09:07 config.py:350] This model supports multiple tasks: {'generate', 'embedding'}. Defaulting to 'generate'.
INFO 12-01 22:09:07 llm_engine.py:249] Initializing an LLM engine (v0.6.4.post1) with config: model='/home/flowers/work/hf/Qwen2.5-0.5B-Instruct', speculative_config=None, tokenizer='/home/flowers/work/hf/Qwen2.5-0.5B-Instruct', skip_tokenizer_init=False, tokenizer_mode=auto, revision=None, override_neuron_config=None, tokenizer_revision=None, trust_remote_code=False, dtype=torch.bfloat16, max_seq_len=20000, download_dir=None, load_format=LoadFormat.AUTO, tensor_parallel_size=1, pipeline_parallel_size=1, disable_custom_all_reduce=False, quantization=None, enforce_eager=False, kv_cache_dtype=auto, quantization_param_path=None, device_config=cuda, decoding_config=DecodingConfig(guided_decoding_backend='outlines'), observability_config=ObservabilityConfig(otlp_traces_endpoint=None, collect_model_forward_time=False, collect_model_execute_time=False), seed=0, served_mo

Loading safetensors checkpoint shards:   0% Completed | 0/1 [00:00<?, ?it/s]


INFO 12-01 22:09:08 model_runner.py:1077] Loading model weights took 0.9276 GB
INFO 12-01 22:09:08 worker.py:232] Memory profiling results: total_gpu_memory=15.70GiB initial_memory_usage=1.31GiB peak_torch_memory=2.35GiB memory_usage_post_profile=1.34GiB non_torch_memory=0.41GiB kv_cache_size=11.37GiB gpu_memory_utilization=0.90
INFO 12-01 22:09:08 gpu_executor.py:113] # GPU blocks: 62111, # CPU blocks: 21845
INFO 12-01 22:09:08 gpu_executor.py:117] Maximum concurrency for 20000 tokens per request: 49.69x
INFO 12-01 22:09:10 model_runner.py:1400] Capturing cudagraphs for decoding. This may lead to unexpected consequences if the model is not static. To run the model in eager mode, set 'enforce_eager=True' or use '--enforce-eager' in the CLI.
INFO 12-01 22:09:10 model_runner.py:1404] If out-of-memory error occurs during cudagraph capture, consider decreasing `gpu_memory_utilization` or switching to eager mode. You can also reduce the `max_num_seqs` as needed to decrease memory usage.
INF

In [19]:
from itertools import combinations
n_skills = 5
skills = list(range(1, n_skills+1))
# Generate all combinations of up to 5 skills
skill_combinations = set()
for r in range(1, 3+1):  # From 1 skill to 5 skills
    skill_combinations.update(combinations(skills, r))
skill_combinations = list(skill_combinations)
skill_combinations

[(2,),
 (3, 4),
 (5,),
 (1, 2, 5),
 (2, 5),
 (1, 3),
 (1, 4, 5),
 (2, 4, 5),
 (4,),
 (1,),
 (4, 5),
 (1, 2, 4),
 (2, 4),
 (1, 2),
 (1, 5),
 (1, 3, 5),
 (2, 3, 5),
 (3,),
 (3, 5),
 (1, 2, 3),
 (1, 4),
 (1, 3, 4),
 (2, 3),
 (3, 4, 5),
 (2, 3, 4)]

In [22]:
idx = np.random.choice(len(skill_combinations),size=1)
out = skill_combinations[idx]
skill_targeted = [1 if i in out else 0 for i in range(n_skills)]


TypeError: only integer scalar arrays can be converted to a scalar index

In [7]:
200/10

20.0

In [17]:
import numpy as np
a[tuple(b)]=1
a[tuple(c)]=2
a[list(a.keys())[1]]

2

In [None]:
idx

In [None]:
a.keys()

In [None]:
from aces.environement.p3.prompt_function import get_programming_puzzles_prompt
from aces.environement.p3.p3_genotype import P3


In [5]:
p3_1 = P3(program_str="puzzle test1", emb=[1,0,1,0,0],fitness=0.5 )
p3_2 = P3(program_str="puzzle test2", emb=[1,0,1,0,0],fitness=0.5 )
list_p3 = [p3_1, p3_2]
skill_targeted=[1,0,1,0,1]

In [7]:
print(get_programming_puzzles_prompt(list_p3,skill_targeted,n_fewshot_ex=2))

Consider Python Programming Puzzles (P3). P3 consists of two functions: a problem function `f` and its corresponding solution `g`. The challenge lies in constructing a SAT problem `f` and a function `g` such that `f(g())` evaluates to `True`

## Main Rules:
- Each puzzle includes two functions: `def f(...)` and `def g(...)`.
- The first argument of `f` is always the output from `g()`.
- Ensure `f` and `g` have matching argument signatures (e.g., `def f(solution, arg1=value1, arg2=value2, ...)` and `def g(arg1=value1, arg2=value2, ...)`). You also need to set the value of argument of f (arg1,arg2,...) and g when you define them.
- Avoid using `f` inside `g`, and `g` inside `f`.
- Include any necessary imports so your code runs smoothly.
- Give a clear Puzzle description that must be brief and diverse compared to the other puzzles.
- Make sure the puzzle is self-contained within these two functions.
- Make sure that that each puzzle have just all required skills (see below)

## P3 Format

In [1]:
import json
path="/home/flowers/work/aces/aces/environement/p3/preprocess_p3_emb_dedup_puzzles.json"
with open(path, 'r') as f:
    data = json.load(f) 

In [None]:
data[0]
# suppose we only have "program_str"

{'program_str': 'def f(n: int) -> bool:\n    return str(n * n).startswith(\'123456789\')\ndef g():\n    return int(int("123456789" + "0" * 9) ** 0.5) + 1\nassert f(g()) == True',
 'emb': [1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0],
 'explanation_emb': 'This puzzle involves string manipulation to check if the square of a number starts with a specific sequence of digits, and mathematical operations to calculate the square root of a number. The puzzle also requires an understanding of number theory, specifically the concept of square roots.\n\nThe list of skills used is: [0, 1, 16].',
 'description': "Find the solution: n (an integer) that should be squared and its result in string format starts with '123456789'.",
 'quality': 1,
 'fitness': -0.020000000000000018,
 'all_solution': ['def f(n: int) -> bool:\n    return str(n * n).startswith(\'123456789\')\ndef g():\n    return int(int("123456789" + "0" * 9) ** 0.5) + 1\nassert f(g()) == True\nassert f(g()) == True',
  "def 

In [None]:
from aces.code_sandbox import evaluate, pass_at_k
str_to_add=str(
            f"\ndef run_eval():\n"
            f"    return f(g()) == True"
        )

list_codes=["def f(x):\n    return x\ndef g():\n    return True", "def f(x):\n    return x\ndef g():\n    return False",
            "def f(x):\n    return not x\ndef g():\n    return False", "def f(x):\n    return not x\ndef g():\n    return True"]


list_task_id=[0,0,1,1]
for i in range(len(list_codes)):
    list_codes[i] = list_codes[i]+str_to_add

res = evaluate(list_codes, list_task_id,entry_point="run_eval")

24


100%|██████████| 4/4 [00:00<00:00, 394.90it/s]


In [3]:
import ast
def rm_function(code, list_function_name):
        # Parse the solution code to an AST
    solution_ast = ast.parse(code)

    # Filter out functions from the code that are in the list_function_name
    filtered_body = [
        node for node in solution_ast.body
        if not isinstance(node, ast.FunctionDef) or node.name not in list_function_name
    ]
    # Create a new module with the filtered body
    new_solution_ast = ast.Module(body=filtered_body, type_ignores=[])

    # Convert the AST back to source code
    new_solution_code = ast.unparse(new_solution_ast)
    return new_solution_code


a="""
import numpy as np
def lcm(a, b):
    return a * b // gcd(a, b)

def f(n: int) -> bool:
    for i in range(1, n // 2 + 1):
        for j in range(i + 1, (n - i) // 2 + 1):
            k = n - i - j
            if k > j and lcm(k, lcm(i, j)) < 200:
                return True
    return False
from typing import *

def g():
    # We need to find a value of n that satisfies the condition in the problem statement.
    # A suitable n that works is 10 because:
    # f(10) is true because there is a combination of (2, 8) and (1, 9) that satisfies the condition.

    return 10
"""

In [5]:
rm_function(a,["f","g"])

'import numpy as np\n\ndef lcm(a, b):\n    return a * b // gcd(a, b)\nfrom typing import *'

In [17]:
list_task_id=[0,1]
for task_id in list_task_id:
    all_solution = []
    all_solution_correct = []
    for id_completion in range(len(res["raw_result"][task_id])):
        all_solution.append(res["raw_result"][task_id][id_completion]["code"].split(str_to_add)[0])
        all_solution_correct.append(res["raw_result"][task_id][id_completion]["correct"])


In [19]:
all_solution

['def f(x):\n    return not x\ndef g():\n    return False',
 'def f(x):\n    return not x\ndef g():\n    return True']

In [18]:
all_solution_correct

[True, False]

In [21]:
number_solution = len(all_solution)
c = sum(all_solution_correct)
k=1
pass_at_k(n=number_solution, c=c, k=k)

0.5

In [15]:
id_completion = 0
res["raw_result"][task_id][0]["code"].split(str_to_add)[0]


NameError: name 'task_id' is not defined

In [14]:
res["raw_result"][0][0]["correct"]

True

In [None]:
# how to get the name of all the function in a
a="""
def lcm(a, b):
    return a * b // gcd(a, b)

def f(n: int) -> bool:
    for i in range(1, n // 2 + 1):
        for j in range(i + 1, (n - i) // 2 + 1):
            k = n - i - j
            if k > j and lcm(k, lcm(i, j)) < 200:
                return True
    return False
from typing import *

def g():
    # We need to find a value of n that satisfies the condition in the problem statement.
    # A suitable n that works is 10 because:
    # f(10) is true because there is a combination of (2, 8) and (1, 9) that satisfies the condition.

    return 10
"""
import ast

# Parse the code string into an AST
parsed_code = ast.parse(a)

# Extract function names using list comprehension
function_names = [node.name for node in parsed_code.body if isinstance(node, ast.FunctionDef)]

function_names

['lcm', 'f', 'g']

In [None]:
def f(n: int, s='RObAQuYK', t='robaQUYKkuLY') -> bool:
    if n == 0:
        return s.lower() == t.lower()
    if n == 1:
        return s.lower() > t.lower()
    if n == -1:
        return s.lower() < t.lower()
    return False
def g(s = 'RObAQuYK', t = 'robaQUYKkuLY'):
    if s.lower() == t.lower():
        return 0
    if s.lower() > t.lower():
        return 1
    return -1
assert f(g()) == True


AssertionError: 