<a href="https://colab.research.google.com/github/bwowejk/AGI-research/blob/main/Draft_1(ARC).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#pip installs
# Initial setup
%%capture
!pip install -U transformers accelerate bitsandbytes
!pip install arckit


In [None]:
#Imports
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
import torch
import arckit
import json
from pathlib import Path
import re
import textwrap


In [None]:
"""
Load ARC tasks and output prompts with tasks

"""

class ARCTasksHelper():
  def __init__(self):
    self.train_set = None
    self.test_set = None


  #loading arc tasks
  def _load_tasks(self):
    self.train_set, self.test_set = arckit.load_data("arcagi")
    print("ARC tasks loaded successfully")

  #construct a prompt to feed to the model
  def _build_arc_prompt_from_arckit(self, task):
    """
    Build an ARC prompt from an arckit.Task object,
    using its .to_dict() representation.
    """

    # Use the dictionary form (matches official ARC JSON)
    data = task.to_dict()

    train_pairs = [(pair["input"], pair["output"]) for pair in data["train"]]
    test_inputs = [pair["input"] for pair in data["test"]]

    header = """
You are an AI assistant specialized in solving Abstract Reasoning Corpus (ARC-AGI) tasks by
generating Python code.
Your goal is to analyze input-output grid pairs. The outputs were produced by applying a
transformation rule to the inputs. Implement the transformation rules as a Python function.
You should only write the implemented transformation in code.

You must write code in triple backticks (```python and then ```). You must write a function
called `transform` which takes a single argument, the input grid as `list[list[int]]`, and
returns the transformed grid (also as `list[list[int]]`).
You should make sure that you implement a version of the transformation which works in general
(at least for all given input-output pairs and test input pairs).
The number in the input grid can be mapped to the following colors: 0:Black; 1:Blue; 2:Red; 3:
Green; 4:Yellow; 5:Grey; 6:Pink; 7:Orange; 8:Purple; 9:Brown

Now, solve the following ARC-AGI task:
    """

    def format_grid(grid):
        rows = ["[" + " ".join(str(x) for x in row) + "]" for row in grid]
        return f"[" + "\n".join(rows) + "]"

    parts = []

    # TRAIN
    for i, (inp_grid, out_grid) in enumerate(train_pairs, start=1):
        parts.append(
f"""
## Input {i} (grid shape: {len(inp_grid)} by {len(inp_grid[0])}):
{format_grid(inp_grid)}

## Output {i} (grid shape: {len(out_grid)} by {len(out_grid[0])}):
{format_grid(out_grid)}
"""
        )

    # TEST
    for j, inp_grid in enumerate(test_inputs, start=1):
        parts.append(
f"""
## Test Input {j} (grid shape: {len(inp_grid)} by {len(inp_grid[0])}):
{format_grid(inp_grid)}
"""
        )

    return header + "\n".join(parts)


  #generate prompts for tasks
  def generate_task_dataset_prompts(self):

    self._load_tasks()
    train_prompts = []
    test_prompts = []

    for task in self.train_set:
      prompt = self._build_arc_prompt_from_arckit(task)
      train_prompts.append(prompt)

    for task in self.test_set:
      prompt = self._build_arc_prompt_from_arckit(task)
      test_prompts.append(prompt)

    print("\nPrompts successfully generated")
    return train_prompts, test_prompts


  #AFTER GETTING MODEL RESPONSE FUNCTIONS

  #obtain the function in backticks from the text as string


  def extract_python_function(self, text):
      """
      Extracts the first Python function from text (from 'def' to 'return').
      Returns a single function string or None if not found.
      """
      # Match code blocks with various language tags
      pattern = r"```(?:python|py|notebook-python)?\s*(.*?)```"
      blocks = re.findall(pattern, text, flags=re.DOTALL)

      # If code blocks found, use them; otherwise use raw text
      code_text = "\n\n".join(blocks) if blocks else text

      # Extract first function from 'def' to 'return' statement
      func_pattern = r"(def\s+\w+\s*\([^)]*\):.*?return\s+[^\n]+)"
      match = re.search(func_pattern, code_text, flags=re.DOTALL)

      if match:
          return match.group(1).strip()

      # Fallback: if no return statement, try to get function with indentation logic
      func_pattern_no_return = r"(def\s+\w+\s*\([^)]*\):(?:.*?)(?=\ndef\s+\w+|\Z))"
      match = re.search(func_pattern_no_return, code_text, flags=re.DOTALL)

      if match:
          return match.group(1).strip()

      return None

  #obtain the functions in backticks from the text as string
  def extract_python_functions(self, text):
    """
    Extracts individual Python functions from text.
    Returns a list of function strings (from 'def' to 'return').
    """
    # First, extract code from markdown code blocks if present
    code_block_pattern = r"```(?:python|py|notebook-python)?\s*(.*?)```"
    code_blocks = re.findall(code_block_pattern, text, flags=re.DOTALL)

    # If code blocks found, use them; otherwise use the raw text
    code_text = "\n\n".join(code_blocks) if code_blocks else text

    # Pattern to match function definitions from 'def' to 'return'
    # This captures the function name, parameters, body, and the return statement
    func_pattern = r"(def\s+\w+\s*\([^)]*\):.*?return\s+[^\n]+)"

    functions = re.findall(func_pattern, code_text, flags=re.DOTALL)

    return [func.strip() for func in functions]


  #evaluate the python in string on a grid
  def run_candidate_func(self, candidate_str, input_grid):
    namespace = {}
    try:
        # Convert string to callable function
        exec(candidate_str, namespace)
        candidate_func = namespace.get("transform")

        if candidate_func is None:
            # Function not defined
            return None

        # Execute function
        output = candidate_func(input_grid)
        return output
    except Exception as e:
        # Any error is treated as failure
        return None


  #get scores for a given function
  def score_candidate_on_train_pair(self, candidate_str, train_pairs):

    total_cells = 0
    matched_cells = 0

    for input_grid, expected_output in train_pairs:
        output = self.run_candidate_func(candidate_str, input_grid)
        if output is None:
            # Function failed to execute, score 0 for this example
            continue

        # Count matches per cell
        for row_out, row_expected in zip(output, expected_output):
            for val_out, val_expected in zip(row_out, row_expected):
                total_cells += 1
                if val_out == val_expected:
                    matched_cells += 1

    if total_cells == 0:
        return 0.0

    return matched_cells / total_cells

    #evaluate a single candidate function on all train and test grids, return dict
  def evaluate_candidate_on_task(self, task, candidate_str):
      """
      Returns a dict with:
      {
          "code": candidate_str,
          "evaluation": [outputs for train grids..., outputs for test grids...]
      }
      """
      data = task.to_dict()
      train_pairs = [(pair["input"], pair["output"]) for pair in data["train"]]
      test_inputs = [pair["input"] for pair in data["test"]]

      evaluation_results = []

      # Evaluate on train grids
      for input_grid, _ in train_pairs:
          output = self.run_candidate_func(candidate_str, input_grid)
          evaluation_results.append(output if output is not None else "ERROR")

      # Evaluate on test grids
      for input_grid in test_inputs:
          output = self.run_candidate_func(candidate_str, input_grid)
          evaluation_results.append(output if output is not None else "ERROR")

      return {"code": candidate_str, "evaluation": evaluation_results}



  #build refinement prompt
  def build_arc_repair_prompt_from_arckit(self, task, candidate: dict):
      """
      Build an ARC repair-style prompt from an arckit.Task object
      and a single candidate implementation.

      Parameters
      ----------
      task : arckit.Task
      candidate : dict
          {
              "code": str,
              "evaluation": list[str]   # ordered: train → test
          }
      """

      data = task.to_dict()

      train_pairs = [(pair["input"], pair["output"]) for pair in data["train"]]
      test_inputs = [pair["input"] for pair in data["test"]]

      header = """
  You are an AI assistant specialized in solving Abstract Reasoning Corpus (ARC-AGI) tasks by
  repairing Python code implementations.
  Your goal is to analyze input-output grid pairs. The outputs were produced by applying a
  transformation rule to the inputs.
  You will be given a python function `transform` that was supposed to implement the
  transformation rule, but it is not working correctly for all inputs.
  Your role is to fix this `transform` function.

  Your solution should be:
  - Accurate: Correctly fix the transformation for all given inputs
  - Comprehensive: Handles all possible input scenarios
  - Well-structured: Uses clear, readable, and efficient code

  The number in the input grid can be mapped to the following colors:
  0:Black; 1:Blue; 2:Red; 3:Green; 4:Yellow; 5:Grey;
  6:Pink; 7:Orange; 8:Purple; 9:Brown

  Now, repair the following ARC-AGI task implementation:
  """

      def format_grid(grid):
          rows = ["[" + " ".join(str(x) for x in row) + "]" for row in grid]
          return "[" + "\n".join(rows) + "]"

      parts = []

      for i, (inp_grid, out_grid) in enumerate(train_pairs, start=1):
          parts.append(
  f"""
  ## Input {i} (grid shape: {len(inp_grid)} by {len(inp_grid[0])}):
  {format_grid(inp_grid)}

  ## Output {i} (grid shape: {len(out_grid)} by {len(out_grid[0])}):
  {format_grid(out_grid)}
  """
          )

      for j, inp_grid in enumerate(test_inputs, start=1):
          parts.append(
  f"""
  ## Test Input {j} (grid shape: {len(inp_grid)} by {len(inp_grid[0])}):
  {format_grid(inp_grid)}
  """
          )

      evaluation_blocks = ""
      for k, result in enumerate(candidate["evaluation"], start=1):
          evaluation_blocks += f"""
  ## Evaluation Result {k}
  {result}
  """

      candidate_section = f"""
  Previous implementation:
  ```python
  {candidate["code"].strip()}
  ```

  This implementation produced the following results:
  {evaluation_blocks}

  The above implementation produced incorrect results.

  Fix the transform function so that it produces correct outputs for all training examples
  and generalizes to the test inputs.

  You must return ONLY the corrected Python code in triple backticks.
  """

      return header + "\n".join(parts) + candidate_section


    #build crossover prompt from two python function strings
  def build_crossover_prompt(self, func_a_str, func_b_str):
    """
    Returns a prompt string for performing GP crossover between two Python functions.
    func_a_str, func_b_str: Python functions as strings
    """
    prompt = f"""

You are performing genetic programming crossover on Python code.

You are given two Python functions.

STEP 1 — STANDARDISATION (MANDATORY):
- Rename variables so BOTH functions use:
  - n for grid size
  - output for the result grid
- Do NOT introduce new variables.
- Only rename existing variables.
- The standardized versions MUST be different from at least one original.

STEP 2 — CROSSOVER (MANDATORY):
- Create exactly TWO child functions.
- Each child MUST:
  - Contain at least ONE line originally from Function A
  - Contain at least ONE line originally from Function B
- Returning either parent unchanged is FORBIDDEN.
- Renaming alone does NOT count as crossover.

RULES:
- Use ONLY lines that appear in the standardized functions.
- Do NOT invent new lines.
- Do NOT introduce new variables.
- Do NOT explain anything.
- Each child must be enclosed in back ticks
      Output Child 1:
      ```python
      # Your first child function here
      ```

      Output Child 2:
      ```python
      # Your second child function here
      ```
- Output ONLY valid Python code.
- All children MUST use the function name: transform

Function A:
```python
{func_a_str}
```

Function B:
```python
{func_b_str}
```

"""
    return prompt



  def build_mutation_prompt(self, func_str):
    """
    Returns a prompt string for performing GP mutation on a Python function.
    func_str: Python function as a string
    """
    prompt = f"""
You are a genetic programming mutation operator.

You are given a Python function that transforms a grid (2D list of integers).

Generate EXACTLY 3 mutated child functions by applying ONE small structural mutation per child.

RULES (MANDATORY):

1. All children must keep the function signature EXACTLY:
   def transform(grid):

2. Keep the grid size definition line unchanged (e.g., n = len(grid)).

3. Preserve output dimensions and type.

4. Use ONLY the following primitives:
   - Variables: grid, output, i, j, n
   - Integers: 0, 1
   - Operators: +, -, *
   - Indexing: grid[...][...], output[...][...]
   - Loops: for i in range(n), for j in range(n)
   - Assignment: output[a][b] = grid[c][d]

5. Do NOT introduce:
   - New variables
   - New functions
   - Conditionals
   - Function calls
   - Additional loops

6. Each child must differ from the original by EXACTLY ONE line of logic.

7. Each child must differ from the other children.

8. Remove any dead or redundant code.

OUTPUT FORMAT:
Each mutant must be enclosed in separate code blocks:
```python
# Mutant 1
```
```python
# Mutant 2
```
```python
# Mutant 3
```

Original function:
```python
{func_str}
```

"""
    return prompt



In [None]:
task_helper = ARCTasksHelper()
train_set_prompts, test_set_prompts = task_helper.generate_task_dataset_prompts()

In [None]:
print(test_set_prompts[0])

In [None]:
"""
Outer loop
- Take in prompts to
      1. Either generate initial population
      2. Or Refine solution and generate better population
-
"""
from abc import ABC, abstractmethod

class Model(ABC):

  #base LLM, owns model and tokenizer
  # Class-level shared model and tokenizer
  model = None
  tokenizer = None

  def __init__(self, model_name):
      if Model.model is None or Model.tokenizer is None:
        self.model_name = model_name
        self._load_model()
      else:
            print("Using shared model instance")

  def _load_model(self):

    tokenizer = AutoTokenizer.from_pretrained(self.model_name)

    #4-bit quantization
    bnb_config_4bit = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
        bnb_4bit_use_double_quant=True,
        )


    model_4bit = AutoModelForCausalLM.from_pretrained(
        self.model_name,
        quantization_config=bnb_config_4bit,
        low_cpu_mem_usage=True,
        device_map="auto",
    )

    self.tokenizer = tokenizer
    self.model = model_4bit

    print("Model Loaded Succeffully")


  #Get model response
  @abstractmethod
  def get_model_response(self, prompt):
    pass

  #extract python function can also be done here

  def extract_python_function(self, text):
      """
      Extracts the first Python function from text (from 'def' to 'return').
      Returns a single function string or None if not found.
      """
      # Match code blocks with various language tags
      pattern = r"```(?:python|py|notebook-python)?\s*(.*?)```"
      blocks = re.findall(pattern, text, flags=re.DOTALL)

      # If code blocks found, use them; otherwise use raw text
      code_text = "\n\n".join(blocks) if blocks else text

      # Extract first function from 'def' to 'return' statement
      func_pattern = r"(def\s+\w+\s*\([^)]*\):.*?return\s+[^\n]+)"
      match = re.search(func_pattern, code_text, flags=re.DOTALL)

      if match:
          return match.group(1).strip()

      # Fallback: if no return statement, try to get function with indentation logic
      func_pattern_no_return = r"(def\s+\w+\s*\([^)]*\):(?:.*?)(?=\ndef\s+\w+|\Z))"
      match = re.search(func_pattern_no_return, code_text, flags=re.DOTALL)

      if match:
          return match.group(1).strip()

      return None





In [None]:
# Scientist, (separated from engineer to make it easier to adopt different models for the diff purposes is need be)
class Scientist(Model):

  #the input prompt, can either be for seeding, or for refining
  def get_model_response(self, prompt):
    messages = [
        {"role": "system", "content": "You are an AI assistant specialized in solving Abstract Reasoning Corpus (ARC-AGI) tasks by reasoning and generating Python code."},
        {"role": "user", "content": prompt}
    ]
    text = self.tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )
    model_inputs = self.tokenizer([text], return_tensors="pt").to(self.model.device)


    generated_ids = self.model.generate(
        **model_inputs,
        max_new_tokens=1024,
        do_sample = True,
        temperature=1.0,   # SOAR setting
        min_p=0.05,
    )
    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]

    response = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]

    return response

    #create a population, by sampling the llm n times
  def get_population(self, pop_size, prompt):
    population = []

    for _ in range(pop_size):
        candidate = self.get_model_response(prompt)
        extracted_candidate = self.extract_python_function(candidate)

        if extracted_candidate is not None:
            population.append(extracted_candidate)

    return population




In [None]:
# Engineer
class Engineer(Model):

  #the input prompt, can either be for seeding, or for refining
  def get_model_response(self, prompt, type = None):    #type can either be mutator or cross-over

    # 1. Define Messages based on type

    if type == "cross-over":
      messages = [
          {"role": "system", "content": "You are a genetic programming crossover operator. You recombine Python code by copying mandatory fragments from multiple parents"},
          {"role": "user", "content": prompt}
      ]



    elif type == "mutator":
      messages = [
          {"role": "system", "content": "You are an AI assistant specialized in creating structural variations of Python functions without knowing their purpose by reasoning and generating Python code"},
          {"role": "user", "content": prompt}
      ]



    else:
      print ("Type not recognized !!")
      return ""

    text = self.tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )

    model_inputs = self.tokenizer([text], return_tensors="pt").to(self.model.device)

    if type == 'cross-over':
      generated_ids = self.model.generate(
        **model_inputs,
        max_new_tokens=1024,
        do_sample = False,
      )

    elif type == 'mutator':
      generated_ids = self.model.generate(
          **model_inputs,
          max_new_tokens=1024,
          do_sample = True,
          top_p=0.9,
          temperature=1.2,
          #repetition_penalty=1.0
        )



    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]

    response = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]

    return response




## Evolutionary Loop

In [None]:
import random

class EvolutionaryOptimizer:
    def __init__(self, task_helper, scientist, engineer):
        self.helper = task_helper
        self.scientist = scientist
        self.engineer = engineer

        # GP Hyperparameters
        self.POPULATION_SIZE = 10
        self.MAX_GENERATIONS = 5
        self.ELITISM_COUNT = 2       # Keep the top 2 best codes
        self.CROSSOVER_RATE = 0.6    # 60% probability
        self.MUTATION_RATE = 0.4     # 40% probability

    def evaluate_population(self, population, task):
        """
        Runs every candidate against the task's training pairs.
        Returns sorted list of (score, function_code).
        """
        data = task.to_dict()
        train_pairs = [(pair["input"], pair["output"]) for pair in data["train"]]

        scored_population = []
        for code in population:
            # Uses your score_candidate_on_train_pair method
            score = self.helper.score_candidate_on_train_pair(code, train_pairs)
            scored_population.append((score, code))

        # Sort descending (Higher score is better)
        scored_population.sort(key=lambda x: x[0], reverse=True)
        return scored_population

    def tournament_selection(self, scored_population, k=3):
        """Picks a parent using tournament selection."""
        # Sample k individuals
        tournament = random.sample(scored_population, min(k, len(scored_population)))
        # Return the code of the one with the highest score
        return max(tournament, key=lambda x: x[0])[1]

    def evolve(self, task, seed_population):
        """
        Main Evolutionary Loop.
        """
        print(f"\n--- Starting Evolution ---")
        population = seed_population
        best_solution = None
        best_score = -1.0

        for gen in range(self.MAX_GENERATIONS):
            # 1. Evaluate
            scored_pop = self.evaluate_population(population, task)

            # Statistics
            gen_best_score, gen_best_code = scored_pop[0]
            print(f"Generation {gen+1}: Best Score = {gen_best_score:.2f}")

            # Check for Perfect Solution
            if gen_best_score >= 1.0:
                print(">>> Perfect Solution Found!")
                return gen_best_code

            # Update global best
            if gen_best_score > best_score:
                best_score = gen_best_score
                best_solution = gen_best_code

            # 2. Elitism (Carry over best individuals)
            next_generation = [code for _, code in scored_pop[:self.ELITISM_COUNT]]

            # 3. Breeding
            while len(next_generation) < self.POPULATION_SIZE:

                # Roll dice for Crossover vs Mutation
                if random.random() < self.CROSSOVER_RATE:
                    # --- CROSSOVER ---
                    parent_a = self.tournament_selection(scored_pop)
                    parent_b = self.tournament_selection(scored_pop)

                    # Try to get a different parent B if possible
                    if parent_a == parent_b and len(scored_pop) > 1:
                        parent_b = self.tournament_selection(scored_pop)

                    prompt = self.helper.build_crossover_prompt(parent_a, parent_b)

                    # Engineer generates 2 children
                    response = self.engineer.get_model_response(prompt, type="cross-over")

                    # Extract using the NEW helper method that supports multiple functions
                    children = self.helper.extract_python_functions(response)

                    if children:
                        # Randomly sample one child for diversity
                        selected_child = random.choice(children)
                        next_generation.append(selected_child)
                    else:
                        # Fallback: keep parent if generation failed
                        next_generation.append(parent_a)

                else:
                    # --- MUTATION ---
                    parent = self.tournament_selection(scored_pop)
                    prompt = self.helper.build_mutation_prompt(parent)

                    # Engineer generates mutants
                    response = self.engineer.get_model_response(prompt, type="mutator")

                    # Extract mutants
                    mutants = self.helper.extract_python_functions(response)

                    if mutants:
                        # Randomly sample one child for diversity
                        selected_mutant = random.choice(mutants)
                        next_generation.append(selected_mutant)
                    else:
                        next_generation.append(parent)

            # Ensure population size stays constant (truncate if crossover added too many)
            population = next_generation[:self.POPULATION_SIZE]

        print(f"Evolution finished. Best Score: {best_score}")
        return best_solution


In [None]:
# 1. Initialize
helper = ARCTasksHelper()

# Load the base model once
base_model_name = "Qwen/Qwen2.5-Coder-7B-Instruct"
base_model = Scientist(base_model_name)  # use Scientist just to load the model, Now base_model.model and base_model.tokenizer are available

# Create Scientist and Engineer pointing to the same model/tokenizer
scientist = Scientist.__new__(Scientist)  # skip __init__
scientist.model = base_model.model
scientist.tokenizer = base_model.tokenizer

engineer = Engineer.__new__(Engineer)      # skip __init__
engineer.model = base_model.model
engineer.tokenizer = base_model.tokenizer
optimizer = EvolutionaryOptimizer(helper, scientist, engineer)





In [None]:
# 2. Load Prompts
train_prompts, test_prompts = helper.generate_task_dataset_prompts()

# 3. Select Task (e.g., index 0)
task_idx = 0
task_obj = helper.train_set[task_idx]
seed_prompt = train_prompts[task_idx]

print(f"Solving Task {task_idx}...")

# 4. Phase 1: Scientist Seeding
print("--- Phase 1: Seeding ---")
seed_population = scientist.get_population(pop_size=5, prompt=seed_prompt)

if seed_population:
    # 5. Phase 2: Evolution
    best_code = optimizer.evolve(task_obj, seed_population)

    print("\n=== Final Evolved Solution ===")
    print(best_code)
else:
    print("Scientist failed to generate seeds.")