In [1]:
# Human specific
import inspect

# This prompt is ignored if the environment_name already corresponds to an existing environment.
specific = inspect.cleandoc("""Two people, Alice and Bob, are in different rooms. 
                            Alice is in Room 1, and Bob is in Room 2.
                            The rooms are connected, so either can move between them.
                            They both want to meet each other.
                            Find a sequence of actions that allows Alice and Bob to end up in the same room and meet.""")
environment_name = "MeetingPlanning"
format = "json"

In [3]:
# Generate the json representation of the environment
from pathlib import Path

from src.llm_plan.planner import Planner
from src.llm_plan.llm import ChatGPT
from src.llm_plan.config import ENVIRONMENTS_JSON_PATH

model = ChatGPT("gpt-4o")
planner = Planner()

plan_path = Path(f"{environment_name}.{format}")
full_path = ENVIRONMENTS_JSON_PATH / plan_path

# Skip if the plan already exists
if not full_path.exists():
    planner.generate_representation(model, specific, environment_name, format=format)
else:
    print(f"{full_path} already exists. Skipping generation.")
    print("[Warning]: The prompt `specific` will be ignored!")

environments/static/MeetingPlanning.json already exists. Skipping generation.


In [4]:
# Generate the environment plan
from src.llm_plan.environment import Environment

env = Environment(f"./environments/static/{environment_name}.json")
print("Plan:\n", env.plan)

[['traveler.pddl', 'james.pddl'], ['orchestrator.pddl']]

In [None]:
# Plan
responses = planner.plan(model, env)

In [None]:
# Extract the pddl problem and domain from the LLM response
from src.llm_plan.parser import PDDLParser

pddl_parser = PDDLParser()

# TODO: fix the vocabulary entry, it shouldn't be "pddl_scheduler" etc.
final_plan = responses["pddl_orchestrator"]
domain, problem = pddl_parser.parse(final_plan, from_file=False)

In [None]:
# Obtain the plan with the solver
import subprocess

from pathlib import Path

from src.llm_plan.config import SOLVER_BINARY, SOLVER_ARGS

BASE_FOLDER = Path(f"./tmp/{env.name}")
BASE_FOLDER.mkdir(parents=True, exist_ok=True)

with open(BASE_FOLDER / "problem.pddl", "w") as f:
    f.write(problem)

with open(BASE_FOLDER / "domain.pddl", "w") as f:
    f.write(domain)
    
# Launch the solver
command = [
    SOLVER_BINARY,
    *SOLVER_ARGS,
    BASE_FOLDER / "sas_plan",
    BASE_FOLDER / "domain.pddl",
    BASE_FOLDER / "problem.pddl",
]

with open(BASE_FOLDER / "logs.txt", "w") as logfile:
    subprocess.run(command, stdout=logfile, stderr=subprocess.STDOUT)

In [None]:
# Validate the plan with uVAL
import subprocess

from src.llm_plan.config import UNIVERSAL_VALIDATOR_BIN

command = f"{UNIVERSAL_VALIDATOR_BIN} -cv \
{BASE_FOLDER / 'domain.pddl'} \
{BASE_FOLDER / 'problem.pddl'} \
{BASE_FOLDER / 'sas_plan'}"

out = subprocess.run(command, shell=True, capture_output=True, text=True)

# This part won't be printed at test time
if out.stderr:
    pddl_error = out.stderr
    print(
        f"The validation found a problem with the plan: {out.stderr}"
    )
else:
    pddl_error = "No error found."
    print("The plan is valid.")
    print("[stdout]", out.stdout)

In [None]:
# Let's try to refine the plan syntax
from src.llm_plan.hypervisor import (
    HypervisorDeepThinkPDDL,
    HypervisorSyntaxPDDL,
    HypervisorFastDownwardAdapter
    )

num_attempts = 30

for i in range(num_attempts):
    
    # --- General validator of PDDL plans ---
    prompt_args = {
        "specification": env.config_data,
        "pddl_domain": domain,
        "pddl_problem": problem,
    }

    deep_think = HypervisorDeepThinkPDDL(llm=model, prompt_args=prompt_args)

    r = deep_think.run()
    domain, problem = pddl_parser.parse(r, from_file=False)
    
    # --- Fast Downward adapter ---
    prompt_args = {
        "specification": env.config_data,
        "pddl_domain": domain,
        "pddl_problem": problem,
    }
    
    hypervisor_fd = HypervisorFastDownwardAdapter(llm=model, prompt_args=prompt_args)
    
    r = hypervisor_fd.run()
    domain, problem = pddl_parser.parse(r, from_file=False)
    
    
    # Obtain the new plan with the solver
    with open(BASE_FOLDER / f"refined_problem_{i}.pddl", "w") as f:
        f.write(problem)

    with open(BASE_FOLDER / f"refined_domain_{i}.pddl", "w") as f:
        f.write(domain)
        
    # Launch the solver
    command = [
        SOLVER_BINARY,
        *SOLVER_ARGS,
        BASE_FOLDER / f"refined_sas_plan_{i}",
        BASE_FOLDER / f"refined_domain.pddl_{i}",
        BASE_FOLDER / f"refined_problem.pddl_{i}",
    ]

    with open(BASE_FOLDER / "refined_logs.txt", "w") as logfile:
        subprocess.run(command, stdout=logfile, stderr=subprocess.STDOUT)
        
    command = f"{UNIVERSAL_VALIDATOR_BIN} -cv \
    {BASE_FOLDER / f'refined_domain_{i}.pddl'} \
    {BASE_FOLDER / f'refined_problem_{i}.pddl'} \
    {BASE_FOLDER / f'refined_sas_plan_{i}'}"

    out = subprocess.run(command, shell=True, capture_output=True, text=True)

    # This part won't be printed at test time
    if out.stderr:
        pddl_error = out.stderr
        print(
            f"The validation found a problem with the plan: {out.stderr}"
        )
    else:
        pddl_error = "No error was found."
        print("The plan is valid.")
        print("[stdout]", out.stdout)
    
    # --- Syntax validator ---
    print(f"Validation attempt {i+1}/{num_attempts}")
    prompt_args = {
        "specification": env.config_data,
        "pddl_domain": domain,
        "pddl_problem": problem,
        "syntax_errors": pddl_error,
    }

    hypervisor_pddl = HypervisorSyntaxPDDL(llm=model, prompt_args=prompt_args)

    r = hypervisor_pddl.run()
    domain, problem = pddl_parser.parse(r, from_file=False)

In [None]:
# Final plan run and validation
with open(BASE_FOLDER / f"refined_problem_{i}.pddl", "w") as f:
    f.write(problem)

with open(BASE_FOLDER / f"refined_domain_{i}.pddl", "w") as f:
    f.write(domain)
    
# Launch the solver
command = [
    SOLVER_BINARY,
    *SOLVER_ARGS,
    BASE_FOLDER / f"refined_sas_plan_{i}",
    BASE_FOLDER / f"refined_domain_{i}.pddl",
    BASE_FOLDER / f"refined_problem_{i}.pddl",
]

with open(BASE_FOLDER / "refined_logs.txt", "w") as logfile:
    subprocess.run(command, stdout=logfile, stderr=subprocess.STDOUT)
    
command = f"{UNIVERSAL_VALIDATOR_BIN} -cv \
{BASE_FOLDER / f'refined_domain_{i}.pddl'} \
{BASE_FOLDER / f'refined_problem_{i}.pddl'} \
{BASE_FOLDER / f'refined_sas_plan_{i}'}"

out = subprocess.run(command, shell=True, capture_output=True, text=True)

# This part won't be printed at test time
if out.stderr:
    pddl_error = out.stderr
    print(
        f"The validation found a problem with the plan: {out.stderr}"
    )
else:
    pddl_error = "No error found."
    print("The plan is valid.")
    print("[stdout]", out.stdout)

In [None]:
# Report the natural language plan
from src.llm_plan.hypervisor import HypervisorNaturalLanguage

with open(BASE_FOLDER / f"refined_problem_{i}.pddl", "r") as f:
    problem = f.read()

with open(BASE_FOLDER / f"refined_domain_{i}.pddl", "r") as f:
    domain = f.read()
    
with open(BASE_FOLDER / f"refined_sas_plan_{i}", "r") as f:
    plan = f.read()
    
prompt_args = {
    "specification": env.config_data,
    "pddl_domain": domain,
    "pddl_problem": problem,
    "pddl_plan": plan,
}

hypervisor_to_nl = HypervisorNaturalLanguage(llm=model, prompt_args=prompt_args)

natural_plan = hypervisor_to_nl.run()

with open(BASE_FOLDER / "refined_nl_plan.txt", "w") as f:
    f.write(natural_plan)