In [1]:
import subprocess
import json
import re
from openai import OpenAI
from typing import Tuple, Iterator, List
from pathlib import Path

In [2]:
api_key_path = r"../openai_key.txt"
model = "gpt-4o"

with open(api_key_path, 'r') as f:
    key = f.read()
client = OpenAI(api_key=key)


In [3]:

ff_loc = "/home/exale/FF-v2.3/ff"

def run_ff(domain: str, problem: str, ff_loc: str) -> str:
    cmd = ["wsl", ff_loc, "-o", domain, "-f", problem]
    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0:
        raise RuntimeError(f"FF failed:\n{result.stderr}")
    return result.stdout

In [24]:
#domain = "./blocksworld/blocksworld_domain.pddl"
#problem = "./blocksworld/blocksworld_problems_pddl/instance-10.pddl"
domain = "./temp/domain.pddl"
problem = "./temp/problem.pddl"
print(run_ff(domain, problem, ff_loc))


ff: parsing domain file
domain 'BLOCK-WORLD' defined
 ... done.
ff: parsing problem file
problem 'BLOCK-WORLD-PROBLEM' defined
 ... done.



Cueing down from goal distance:    2 into depth [1]
                                   1            [1]
                                   0            

Cueing down from goal distance:    2 into depth [1]
                                   1            [1]
                                   0            

Cueing down from goal distance:    2 into depth [1]
                                   1            [1]
                                   0            

Cueing down from goal distance:    2 into depth [1]
                                   1            [1]
                                   0            

Cueing down from goal distance:    2 into depth [1]
                                   1            [1]
                                   0            

Cueing down from goal distance:    2 into depth [1]
                                   1

In [None]:
domain = "./blocksworld/blocksworld_domain.pddl"
problem = "./blocksworld/blocksworld_problems_pddl/instance-1.pddl"
print(run_ff(domain, problem, ff_loc))


ff: parsing domain file
domain 'BLOCKSWORLD-4OPS' defined
 ... done.
ff: parsing problem file
problem 'BW-GENERALIZATION-4' defined
 ... done.



Cueing down from goal distance:    2 into depth [1]
                                   1            [1]
                                   0            

Cueing down from goal distance:    2 into depth [1]
                                   1            [1]
                                   0            

Cueing down from goal distance:    2 into depth [1]
                                   1            [1]
                                   0            

Cueing down from goal distance:    2 into depth [1]
                                   1            [1]
                                   0            

Cueing down from goal distance:    2 into depth [1]
                                   1            [1]
                                   0            

Cueing down from goal distance:    2 into depth [1]
                               

In [4]:
def query_iterator(json_path: str) -> Iterator[Tuple[int, str]]:
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    for entry in data:
        yield entry["id"], entry["nl_query"]

In [None]:
lim = 10
correct = 0
gt_domain = "./blocksworld/blocksworld_domain.pddl"
task_query = {"pddl": "\nGenerate the contents for a domain.pddl and a problem.pddl file that captures this task.", 
              "plan": "\nCome up with a sequence of steps to complete this task."}
with open("./blocksworld/blocksworld_domain_nl.txt", 'r') as f:
    domain_nl = f.read()
json_path = "./blocksworld/blocksworld_problems_nl.json"
for idx, p_query in query_iterator(json_path):
    query = domain_nl + '\n\n' + p_query + task_query["plan"]
    response = client.chat.completions.create(
    model=model,
    messages=[
        {
            "role": "user",
            "content": query
        }
        ]
    )
    resp_text = response.choices[0].message.content
    domain_out = f"./temp/domain{idx}.pddl"
    problem_out = f"./temp/problem{idx}.pddl"
    extract_d_p(resp_text, domain_out, problem_out)

    gt_problem = f"./blocksworld/blocksworld_problems_pddl/instance-{idx}.pddl"
    gt_ff_result = run_ff(gt_domain, gt_problem, ff_loc)
    lm_ff_result = run_ff(domain_out, problem_out, ff_loc)

    gt_ff_plan = extract_plan(gt_ff_result)
    lm_ff_plan = extract_plan(lm_ff_result)

    if plan_matches(gt_ff_plan, lm_ff_plan):
        correct += 1
    if idx == lim:
        break


I am playing with a set of blocks where I need to arrange the blocks into stacks. Here are the actions I can do

Pick up a block
Unstack a block from on top of another block
Put down a block
Stack a block on top of another block

I have the following restrictions on my actions:
I can only pick up or unstack one block at a time.
I can only pick up or unstack a block if my hand is empty.
I can only pick up a block if the block is on the table and the block is clear. A block is clear if the block has no other blocks on top of it and if the block is not picked up.
I can only unstack a block from on top of another block if the block I am unstacking was really on top of the other block.
I can only unstack a block from on top of another block if the block I am unstacking is clear.
Once I pick up or unstack a block, I am holding the block.
I can only put down a block that I am holding.
I can only stack a block on top of another block if I am holding the block being stacked.
I can only stack a 

In [27]:
response = client.chat.completions.create(
    model=model,
    messages=[
        {
            "role": "user",
            "content": query
        }
    ]
)
resp_text = response.choices[0].message.content


In [28]:
print(resp_text)

To accomplish the task, you will need to follow a specific sequence of actions adhering to the given rules and goal conditions. Here's a step-by-step plan to achieve the desired arrangement of blocks:

1. **Pick up block B** (it is clear and on the table).
2. **Stack block B on top of block H** (block H is clear and on the table).
3. **Pick up block D** (it is clear and on the table).
4. **Stack block D on top of block B** (block B is now at the top of the stack).
5. **Pick up block G** (it is clear and on the table).
6. **Stack block G on top of block D** (block D is now at the top of the stack).
7. **Pick up block I** (it is clear and on the table).
8. **Stack block I on top of block G** (block G is now at the top of the stack).
9. **Pick up block F** (it is clear and on the table).
10. **Stack block F on top of block I** (block I is now at the top of the stack).
11. **Pick up block J** (it is clear and on the table).
12. **Stack block J on top of block F** (block F is now at the top

In [5]:
def _extract(text: str, anchor: str) -> str:
    start = text.find(anchor)
    if start == -1:
        raise ValueError(f"Anchor not found: {anchor!r}")
    depth = 0
    end = None
    for i, c in enumerate(text[start:], start):
        if c == '(':
            depth += 1
        elif c == ')':
            depth -= 1
            if depth == 0:
                end = i
                break
    if end is None:
        raise ValueError(f"Unbalanced parentheses for anchor {anchor!r}")
    return text[start:end+1]

In [6]:
def extract_d_p(source: str, domain_out: str, problem_out: str) -> None:
    domain_text = _extract(source, "(define (domain")
    problem_text = _extract(source, "(define (problem")

    Path(domain_out).write_text(domain_text, encoding='utf-8')
    Path(problem_out).write_text(problem_text, encoding='utf-8')

In [7]:
def extract_plan(ff_output: str) -> str:
    # Regex matches either "step    0: …" or lines like "    1: …"
    step_re = re.compile(r'^\s*(?:step\s*)?(\d+:\s*\S.*)$', re.IGNORECASE)
    lines = ff_output.splitlines()

    steps = []
    found_plan_header = False

    for line in lines:
        if not found_plan_header:
            if 'ff: found legal plan as follows' in line.lower():
                found_plan_header = True
            continue

        if line.strip().lower().startswith('time spent'):
            break

        m = step_re.match(line)
        if m:
            steps.append(m.group(1).strip())
    return "\n".join(steps)

In [None]:
def plan_matches()

In [None]:
domain_out = "./temp/domain.pddl"
problem_out = "./temp/problem.pddl"
extract_d_p(resp_text, domain_out, problem_out)