# OLP_LLM: Pipeline for generating FOON graphs via LLM Prompting

## Step 1 :- Generate a FOON via LLM Prompting

1. Initialize libraries needed for FOON (``FOON_graph_analyser.py``) as well as OpenAI api.

2. Perform 2-stage prompting for recipe prototype.
    - In the first stage, we ask the LLM for a *high-level recipe* (list of instructions) and a *list of objects* needed for completing the recipe.
    - In the second stage, we ask the LLM for a breakdown of *state changes* that happen for each step of the recipe; specifically, we ask for the *preconditions* and *effects* of each action, which is similar to how a functional unit in FOON has *input* and *output object nodes*.

### Initialize library

In [1]:
from olp_library import *
from random import randint, choice, shuffle

In [146]:
import utamp.driver as driver
from utamp.generate import randomize_blocks

run_utamp = True

if run_utamp:
    # -- load the scene only once:
    # utamp = driver.UTAMP(scene_file_name='./utamp/scenes/panda_blocks_tiles-5.ttt')

    fpath, tally = randomize_blocks('./utamp/scenes/panda_blocks_tiles_prototype.ttt')
    utamp = driver.UTAMP(scene_file_name=fpath)

    print("objects presently in scene:", utamp.objects_in_sim)

objects presently in scene: ['red_tile', 'blue_tile', 'yellow_tile', 'green_tile', 'red_block_1', 'red_block_2', 'red_block_3', 'blue_block_1', 'blue_block_2', 'yellow_block_1', 'yellow_block_2', 'yellow_block_3', 'green_block_1', 'green_block_2', 'green_block_3', 'green_block_4', 'green_block_5']


### Language to  OLP

##### Select a Task

In [3]:
blue_phrase = f"{tally['blue']} blue block{'s' if tally['blue'] > 1 else ''}"
green_phrase = f"{tally['green']} green block{'s' if tally['green'] > 1 else ''}"
red_phrase = f"{tally['red']} red block{'s' if tally['red'] > 1 else ''}"
yellow_phrase = f"{tally['yellow']} yellow block{'s' if tally['yellow'] > 1 else ''}"

print(blue_phrase)
print(green_phrase)
print(red_phrase)
print(yellow_phrase)

block_phrases = [blue_phrase, green_phrase, red_phrase, yellow_phrase]
shuffle(block_phrases)

# -- randomly select tiles of focus for generating a task:
tile_phrases = []
for C, _ in tally.items():
	if bool(randint(0, 1)):
		tile_phrases.append(f"a {C} tile")

if not bool(tile_phrases):
	# -- at least one colour will be randomly selected:
    random_colour = choice(list(tally.keys()))
    tile_phrases = [f"a {random_colour} tile"]

tile_phrase = ""
for C in tile_phrases:
    tile_phrase += ("" if tile_phrases.index(C) < len(tile_phrases) - 1  or len(tile_phrases) == 1 else "and ") + C + (", " if tile_phrases.index(C) < len(tile_phrases) - 2 else " " if tile_phrases.index(C) < len(tile_phrases) - 1 else "")

print(tile_phrase)

user_tasks = [
    f"I have {block_phrases[0]}, {block_phrases[1]}, {block_phrases[2]}, and {block_phrases[3]}. There is {tile_phrase} on the table. How can I stack all blocks on their corresponding tiles?",
    # f"I have {yellow_phrase}, {blue_phrase}, {green_phrase}, and {red_phrase} on a table. How can I stack the blocks in alphabetical order of colour?",
    # f"I have {yellow_phrase}, {blue_phrase}, {green_phrase}, and {red_phrase} on a table. How can I stack the blocks in reverse alphabetical order of colour?",
]

1 blue block
1 green block
1 red block
2 yellow blocks
a green tile


#### Perform Planning with LLM Prompting

In [4]:
openai_models = ["gpt-3.5-turbo", "gpt-4"]

# NOTE: all incontext examples will be stored within a JSON file:
# -- Q: can we randomly sample from the set of incontext examples?
# -- Q: should we also select an example "closest" to the provided task?
# incontext_file = "incontext_examples.txt"
incontext_file = "incontext_examples.json"

LLM_to_PDDL = True

user_task = choice(user_tasks)
selected_LLM = openai_models[-1]

llm_output = generate_OLP(user_task,
						  incontext_file,
						  selected_LLM,
						  verbose=False)

print('Objects used in plan:', llm_output['RelevantObjects'])
print('Terminal Steps:', llm_output['TerminalSteps'])

json.dump(llm_output, open('raw_OLP.json', 'w'), indent=4)

Objects used in plan: ['green block', 'green tile']
Terminal Steps: [1]


In [5]:
print("TASK:",  user_task)
print(f"***********************\n    High level plan\n***********************\n{llm_output['PlanSketch']}\n")

TASK: I have 2 yellow blocks, 1 red block, 1 green block, and 1 blue block. There is a green tile on the table. How can I stack all blocks on their corresponding tiles?
***********************
    High level plan
***********************
High-level Plan:
1. Pick and place the green block on the green tile.

Rationale: Since we only have a green tile, we will only stack the green block. We ignore the yellow, red, and blue blocks since there are no corresponding tiles.

unique_objects: ["green block", "green tile"]



In [6]:
print("***********************\n   Object level plan\n***********************\n")
print(llm_output['OLP'], sep="\n")
print(f"\n***********************\n   Plan objects\n***********************\n{llm_output['RelevantObjects']}\n")

***********************
   Object level plan
***********************

{'Query': 'How can I stack all blocks on their corresponding tiles?', 'CompleteObjectSet': ['green block', 'green tile'], 'Instructions': [{'Step': 1, 'Instruction': 'Pick and place the green block on the green tile.', 'RelatedObjects': ['green block', 'green tile'], 'Action': 'Pick and Place', 'State': {'green block': {'Precondition': ['on table'], 'Effect': ['on green tile']}, 'green tile': {'Precondition': ['on table'], 'Effect': ['on table', 'contains green block']}}}]}

***********************
   Plan objects
***********************
['green block', 'green tile']



#### Sample FOON creation

In [7]:
# plan_step, plan_objects = llm_output['OLP'],

if '.json' in str(incontext_file).lower():
    sample_unit = create_functionalUnit_ver2(llm_output, index=0)
else:
    sample_unit = create_functionalUnit_ver1(llm_output['OLP'][0]['Instructions'],llm_output['RelevantObjects'])

sample_unit.print_functions[2](version=1)

Creating functional unit for Step 1: Pick and place the green block on the green tile.
-> related objects: ['green block', 'green tile']
Current object is : green block
	 green block state changes: {'Precondition': ['on table'], 'Effect': ['on green tile']}
		 Precondition : on table || related objects: table
		 Effect : on green tile || related objects: green tile
Current object is : green tile
	 green tile state changes: {'Precondition': ['on table'], 'Effect': ['on table', 'contains green block']}
		 Precondition : on table || related objects: table
		 Effect : on table || related objects: table
		 Effect : contains green block || related objects: None
O	green block
S	on	 [table]
O	green tile
S	on	 [table]
M	Pick and Place	<Assumed>
O	green block
S	on	 [green tile]
O	green tile
S	contains	{green block}
S	on	 [table]


#### Creating FOON units

In [8]:
FOON_prototype = []

for x in range(len(llm_output['OLP']['Instructions'])):

    new_unit = None

    if '.json' in str(incontext_file).lower():
        new_unit = create_functionalUnit_ver2(llm_output, index=x)

    # else:
    #     print(step['Step'])
    #     # -- now we will create functional units that follow the FOON format:
    #     new_unit = create_functionalUnit_ver1(step, plan_objects)

    # NOTE: in order to define a macro-problem, we need to properly identify all goal nodes;
    #       we will do this with the help of the LLM:
    if 'TerminalSteps' in llm_output and (x+1) in llm_output['TerminalSteps']:
        # -- set output objects as goal nodes for the functional units deemed as terminal steps:
        print(f'Functional unit #{x} has terminal goals!')
        for N in range(new_unit.getNumberOfOutputs()):
            new_unit.getOutputNodes()[N].setAsGoal()

    # elif x == (len(llm_output['OLP']['Instructions']) - 1):
    #     # -- by default, treat the last functional unit as the terminal unit:
    #     for N in range(new_unit.getNumberOfOutputs()):
    #         new_unit.getOutputNodes()[N].setAsGoal()

    # -- add the functional unit to the FOON prototype:

    if not new_unit.isEmpty():
        # -- we should only add a new functional unit if it is not empty, meaning it must have the following:
        #    1. >=1 input node and >= 1 output node
        #    2. a valid motion node
        FOON_prototype.append(new_unit)
        FOON_prototype[-1].print_functions[-1]()
    else:
        print('NOTE: the following functional unit has an error, so skipping it:')
        new_unit.print_functions[-1]()

    print()


Creating functional unit for Step 1: Pick and place the green block on the green tile.
-> related objects: ['green block', 'green tile']
Current object is : green block
	 green block state changes: {'Precondition': ['on table'], 'Effect': ['on green tile']}
		 Precondition : on table || related objects: table
		 Effect : on green tile || related objects: green tile
Current object is : green tile
	 green tile state changes: {'Precondition': ['on table'], 'Effect': ['on table', 'contains green block']}
		 Precondition : on table || related objects: table
		 Effect : on table || related objects: table
		 Effect : contains green block || related objects: None
Functional unit #0 has terminal goals!
O	green block
S	on	 [table]
O	green tile
S	on	 [table]
M	Pick and Place	<Assumed>
O	green block
S	on	 [green tile]
O	green tile
S	contains	{green block}
S	on	 [table]



#### Testing PDDL Operator Generation via LLM prompting

In [9]:
if LLM_to_PDDL:

    pddl_sys_prompt_file = "llm_prompts/pddl_system_prompt.txt"

    pddl_system_prompt = open(pddl_sys_prompt_file, 'r').read()

    message = [{"role": "system", "content": pddl_system_prompt}]

    # for x in range(len(OLP_results['OLP']['Instructions'])):
    pddl_user_prompt = f"Generate PDDL for :\n{llm_output['OLP']['Instructions']}"

    message.extend([{"role": "user", "content": pddl_user_prompt}])

    _, response = prompt_LLM(given_prompt=message,
                             model_name=openai_models[-1], )
    print(response)


(:action pick_and_place_green_block_on_green_tile
 :parameters ()
 :precondition (and (on green_block table) (on green_tile table))
 :effect (and (on green_block green_tile) (not (on green_block table)) (in green_tile green_block))
)


### Writing FOON to a Text File

In [10]:
temp_file_name = 'prototype.txt'
task_file_name = f"{str.lower(re.compile('[^a-zA-Z]').sub('_', llm_output['OLP']['Query'])[:-1])}.txt"
print(task_file_name)

how_can_i_stack_all_blocks_on_their_corresponding_tiles.txt


In [11]:
# -- save the prototype FOON graph as a text file, which we will then run with a parser to correct numbering:
if not os.path.exists('./preprocess/'):
    os.makedirs('./preprocess/')

if not os.path.exists('./postprocess/'):
    os.makedirs('./postprocess/')

temp_file = open(f'./preprocess/{temp_file_name}', 'w')
temp_file.write('#\tFOON Prototype\n#\t-- Task Prompt: {0}\n//\n'.format(user_task))
for unit in FOON_prototype:
    unit.print_functions[2]()
    temp_file.write(unit.getFunctionalUnitText())
    print('//')

temp_file.close()

O	green block
S	on	 [table]
O	green tile
S	on	 [table]
M	Pick and Place	<Assumed>
O	green block
S	on	 [green tile]
O	green tile
S	contains	{green block}
S	on	 [table]
//


## Step 2 :- FOON to PDDL
1. Parse FOON file -- this step is important to ensure that all labels are unique and that the generated file follows the FOON syntax.

2. (Optional) Visualize FOON graph

3. Run ``FOON_to_PDDL.py`` script to generate FOON macro-operators

### Parse and clean generated FOON

In [12]:
# -- running parsing module to ensure that FOON labels and IDs are made consistent for further use:
#		(it is important that each object and state type have a *UNIQUE* identifier)
fpa.skip_JSON_conversion = True		# -- we don't need JSON versions of a FOON
fpa.skip_index_check = True			# -- always create a new set of index files

fpa.source_dir = './preprocess/'
fpa.target_dir = './postprocess/'
fpa._run_parser()

-- [FOON_parser] : Initiating parsing procedure!


 -- [FOON_parser] : Commencing parsing...
  -- parsing 'prototype.txt'...

 -- [FOON_parser] : Saving corrected files to './postprocess/'...
  -- Saving 'prototype.txt'...

-- Revising object label senses using WordNet and Concept-Net...


SUMMARY OF CHANGES:
  -- new total of OBJECTS : 3
  -- new total of STATES : 2
  -- new total of MOTIONS : 1

 -- [FOON_parser] : Parsing complete!


In [13]:
with open(f'./postprocess/{temp_file_name}', 'r') as input_file:
    with open(f'./postprocess/{task_file_name}', 'w') as output_file:
        for line in input_file:
            output_file.write(line)

### (Optional) Run FOON visualization tool

In [14]:
# -- after running the parser, take the 'prototype.txt' file and plot it using the following tool: https://davidpaulius.github.io/foon-view/
# fga._startFOONview()

### Generate PDDL files using ```FOON_to_PDDL``` module

In [15]:
import FOON_TAMP as fta
from pathlib import Path

FOON_subgraph_file = f'./postprocess/{task_file_name}'

# -- definition of macro and micro plan file names:
macro_plan_file = os.path.splitext(FOON_subgraph_file)[0] + '_macro.plan'
micro_plan_file = os.path.splitext(FOON_subgraph_file)[0] + '_micro.plan'

fta.micro_domain_file = './FOON_skills.pddl'

fta.flag_perception = False

# -- create a new folder for the generated problem files and their corresponding plans:
fta.micro_problems_dir = './micro_problems-' + Path(FOON_subgraph_file).stem
if not os.path.exists(fta.micro_problems_dir):
    os.makedirs(fta.micro_problems_dir)

ftp = fta.ftp

# -- perform conversion of the FOON subgraph file to PDDL:
ftp.FOON_subgraph_file = FOON_subgraph_file
ftp._convert_to_PDDL('OCP')

## -- parse through the newly created domain file and find all (macro) planning operators:
all_macro_POs = {PO.name : PO for PO in fta._parseDomainPDDL(ftp.FOON_domain_file)}

# -- checking to see if a macro level plan has been found (FD allows exporting to a file):
outcome = fta._findPlan(
    domain_file=ftp.FOON_domain_file,   # NOTE: FOON_to_PDDL object will already contain the names of the
    problem_file=ftp.FOON_problem_file,  #   corresponding macro-level domain and problem files
    output_plan_file=macro_plan_file
)

# TODO: write planning pipeline for other planners?

complete_micro_plan = []

if fta._checkPlannerOutput(output=outcome):
    print("  -- [FOON-TAMP] : A macro-level plan for '" + str(FOON_subgraph_file) + "' was found!")

    # -- now that we have a plan, we are going to parse it for the steps:
    macro_file = open(macro_plan_file, 'r')

    # NOTE: counters for macro- and micro-level steps:
    macro_count, micro_count = 0, 0

    macro_plan_lines = list(macro_file)
    for L in macro_plan_lines:
        if L.startswith('('):
            print('\t\t\t' + str(macro_count) + ' : ' + L.strip())
    print()

    for L in range(len(macro_plan_lines)):

        if macro_plan_lines[L].startswith('('):
            # NOTE: this is where we have identified a macro plan's step; here, we check the contents of its PO definition for:
            #	1. preconditions - this will become a sub-problem file's initial states (as predicates)
            #	2. effects - this will become a sub-problem file's goal states (as predicates)

            macro_count += 1

            macro_PO_name = macro_plan_lines[L][1:-2].strip()

            print(" -- [FOON-TAMP] : Searching for micro-level plan for '" + macro_PO_name + "' macro-PO...")

            # -- try to find this step's matching planning operator definition:
            matching_PO_obj = all_macro_POs[macro_PO_name] if macro_PO_name in all_macro_POs else None

            # -- when we find the equivalent planning operator, then we proceed to treat it as its own problem:
            if matching_PO_obj:
                # -- create sub-problem file (i.e., at the micro-level):
                micro_problem_file = fta._defineMicroProblem(
                    macro_PO_name,
                    preconditions=matching_PO_obj.getPreconditions(),
                    effects=matching_PO_obj.getEffects(),
                )

                micro_domain_file = fta._defineMicroDomain(
                    preconditions=matching_PO_obj.getPreconditions(),
                )

                complete_micro_plan.append('; step ' + str(macro_count) + ' -- (' + macro_PO_name + '):')

                need_to_replan = False

                # -- try to find a sub-problem plan / solution:
                outcome = fta._findPlan(
                    domain_file=micro_domain_file,
                    problem_file=micro_problem_file,
                    output_plan_file=str(fta.micro_problems_dir + '/' + macro_PO_name + '_micro.plan')
                )

                print('\n\t' + 'step ' + str(macro_count) +' -- (' + macro_PO_name + ')')

                if fta._checkPlannerOutput(outcome):

                    print('\t\t -- micro-level plan found as follows:')

                    # -- open the micro problem file, read each line referring to a micro PO, and save to list:
                    micro_file = open(
                        str(fta.micro_problems_dir + '/' + macro_PO_name + '_micro.plan'), 'r')

                    # -- all except for the last line should be valid steps:
                    micro_file_lines = []
                    count = 0

                    for micro_line in micro_file:
                        if micro_line.startswith('('):
                            # -- parse the line and remove trailing newline character:
                            micro_step = micro_line.strip()
                            micro_file_lines.append(micro_step)

                            # -- print entire plan to the command line in format of X.Y,
                            #       where X is the macro-step count and Y is the micro-step count:
                            count += 1
                            print('\t\t\t' + str(macro_count) + '.' + str(count) + ' : ' + micro_step)
                        #endif
                    #endfor
        print()
else:
    print("  -- [FOON-TAMP] : A macro-level plan for '" + str(FOON_subgraph_file) + "' was NOT found!")


  -- Reading file line : 100%|██████████| 12/12 [00:00<?, ?it/s]


< FOON_TAMP: converting FOON graph to PDDL planning problem (last updated: 31st March, 2023)>


 -- [FOON-fga] : Opening FOON file named './postprocess/how_can_i_stack_all_blocks_on_their_corresponding_tiles.txt'...

 -- [FOON-fga] : Building internal dictionaries...
 -- [FOON-fga] : Building output-to-FU dictionaries...
  -- Level 1: Output object map complete!
  -- Level 2: Output object map complete!
  -- Level 3: Output object map complete!

 -- [FOON-fga] : Building object-to-FU dictionaries...
  -- Level 1: Object map complete!
  -- Level 2: Object map complete!
  -- Level 3: Object map complete!

 -- [FOON_to_PDDL] : Creating domain file named './postprocess/how_can_i_stack_all_blocks_on_their_corresponding_tiles_domain.pddl'...

 -- [FOON_to_PDDL] : Creating problem file named './postprocess/how_can_i_stack_all_blocks_on_their_corresponding_tiles_problem.pddl'...
 -- [FOON_retrieval] : Creating file with kitchen items...

python3 C:/Users/david/fast-downward-23.06/fast-downwar




  -- [FOON-TAMP] : A macro-level plan for './postprocess/how_can_i_stack_all_blocks_on_their_corresponding_tiles.txt' was found!
			0 : (pick_and_place_0 )

 -- [FOON-TAMP] : Searching for micro-level plan for 'pick_and_place_0' macro-PO...
python3 C:/Users/david/fast-downward-23.06/fast-downward.py --plan-file ./micro_problems-how_can_i_stack_all_blocks_on_their_corresponding_tiles/pick_and_place_0_micro.plan ./micro_problems-how_can_i_stack_all_blocks_on_their_corresponding_tiles\./FOON_skills.pddl ./micro_problems-how_can_i_stack_all_blocks_on_their_corresponding_tiles/pick_and_place_0_problem.pddl --search astar(lmcut())

	step 1 -- (pick_and_place_0)
		 -- micro-level plan found as follows:
			1.1 : (move robot robot table)
			1.2 : (pick green_block table)
			1.3 : (move robot table green_tile)
			1.4 : (place green_block green_tile)




In [16]:
complete_micro_plan = []

# -- we can also perform a search where we assume we already have a task tree
#		and we execute each functional unit one by one as we see

macro_plan_lines = []
for N in range(len(ftp.fga.FOON_lvl3)):
    PO_name = f'{ftp._reviseObjectLabels(ftp.fga.FOON_lvl3[N].getMotion().getMotionLabel())}_{N}'
    macro_plan_lines.append(PO_name)

macro_count = 0

for L in range(len(macro_plan_lines)):

    macro_count += 1

    macro_PO_name = macro_plan_lines[L]

    print(" -- [FOON-TAMP] : Searching for micro-level plan for '" + macro_PO_name + "' macro-PO...")

    # -- try to find this step's matching planning operator definition:
    matching_PO_obj = all_macro_POs[macro_PO_name] if macro_PO_name in all_macro_POs else None

    # -- when we find the equivalent planning operator, then we proceed to treat it as its own problem:
    if matching_PO_obj:
        # -- create sub-problem file (i.e., at the micro-level):
        micro_problem_file = fta._defineMicroProblem(
            macro_PO_name,
            preconditions=matching_PO_obj.getPreconditions(),
            effects=matching_PO_obj.getEffects(),
        )

        micro_domain_file = fta._defineMicroDomain(
            preconditions=matching_PO_obj.getPreconditions(),
        )

        complete_micro_plan.append('; step ' + str(macro_count) + ' -- (' + macro_PO_name + '):')

        need_to_replan = False

        # -- try to find a sub-problem plan / solution:
        outcome = fta._findPlan(
            domain_file=micro_domain_file,
            problem_file=micro_problem_file,
            output_plan_file=str(fta.micro_problems_dir + '/' + macro_PO_name + '_micro.plan')
        )

        print('\n\t' + 'step ' + str(macro_count) +' -- (' + macro_PO_name + ')')

        if fta._checkPlannerOutput(outcome):

            print('\t\t -- micro-level plan found as follows:')

            # -- open the micro problem file, read each line referring to a micro PO, and save to list:
            micro_file = open(
                str(fta.micro_problems_dir + '/' + macro_PO_name + '_micro.plan'), 'r')

            # -- all except for the last line should be valid steps:
            micro_file_lines = []
            count = 0

            for micro_line in micro_file:
                if micro_line.startswith('('):
                    # -- parse the line and remove trailing newline character:
                    micro_step = micro_line.strip()
                    micro_file_lines.append(micro_step)

                    # -- print entire plan to the command line in format of X.Y,
                    #       where X is the macro-step count and Y is the micro-step count:
                    count += 1
                    print('\t\t\t' + str(macro_count) + '.' + str(count) + ' : ' + micro_step)
                #endif
            #endfor
    print()


 -- [FOON-TAMP] : Searching for micro-level plan for 'pick_and_place_0' macro-PO...
python3 C:/Users/david/fast-downward-23.06/fast-downward.py --plan-file ./micro_problems-how_can_i_stack_all_blocks_on_their_corresponding_tiles/pick_and_place_0_micro.plan ./micro_problems-how_can_i_stack_all_blocks_on_their_corresponding_tiles\./FOON_skills.pddl ./micro_problems-how_can_i_stack_all_blocks_on_their_corresponding_tiles/pick_and_place_0_problem.pddl --search astar(lmcut())
  -- [FOON-TAMP] : Error with planner execution! (planner used: fast-downward)

	step 1 -- (pick_and_place_0)



## Step 3: Connection with UTAMP System

In [17]:
if run_utamp:
    # -- now that we have a plan, we are going to parse it for the steps:
    macro_file = open(macro_plan_file, 'r')

    # NOTE: counters for macro- and micro-level steps:
    macro_count, micro_count = 0, 0

    # macro_plan_lines = list(macro_file)
    # for L in macro_plan_lines:
    #     if L.startswith('('):
    #         print('\t' + str(macro_count) + ' : ' + L.strip())
    #         macro_count += 1

    macro_plan_lines = []
    for N in range(len(ftp.fga.FOON_lvl3)):
        PO_name = f'{ftp._reviseObjectLabels(ftp.fga.FOON_lvl3[N].getMotion().getMotionLabel())}_{N}'
        # print('\t' + str(macro_count) + ' : ' + PO_name)
        macro_plan_lines.append(PO_name)

    macro_count = 0

    total_success = 0

    for L in range(len(macro_plan_lines)):
        # NOTE: this is where we have identified a macro plan's step; here, we check the contents of its PO definition for:
        #	1. preconditions - this will become a sub-problem file's initial states (as predicates)
        #	2. effects - this will become a sub-problem file's goal states (as predicates)

        macro_count += 1

        # macro_PO_name = macro_plan_lines[L][1:-2].strip()
        macro_PO_name = macro_plan_lines[L]

        print(" -- [FOON-TAMP] : Searching for micro-level plan for '" + macro_PO_name + "' macro-PO...")

        # -- try to find this step's matching planning operator definition:
        matching_PO_obj = all_macro_POs[macro_PO_name] if macro_PO_name in all_macro_POs else None

        # -- when we find the equivalent planning operator, then we proceed to treat it as its own problem:
        if matching_PO_obj:
            # -- create sub-problem file (i.e., at the micro-level):
            micro_problem_file = fta._defineMicroProblem(
                macro_PO_name,
                preconditions=matching_PO_obj.getPreconditions(),
                effects=matching_PO_obj.getEffects(),
            )

            complete_micro_plan.append('; step ' + str(macro_count) + ' -- (' + macro_PO_name + '):')
        else:
            continue

        goal_preds, objects_in_OLP = [], []

        # -- parse the goals in the generated micro-problem file:
        micro_problem_lines = open(micro_problem_file, 'r').readlines()
        for M in range(len(micro_problem_lines)):
            if ("(:goal" in micro_problem_lines[M]):
                while True:
                    M += 1

                    if micro_problem_lines[M].startswith("))"):
                        break

                    goal_string = str(micro_problem_lines[M]).replace("\t", "").strip()
                    if goal_string:
                        # -- first, we check if a goal string line is actually referring to a comment or a negation predicate:
                        if goal_string.startswith(";") or goal_string.startswith("(not"):
                            # -- we want to skip those:
                            continue

                        # -- append the goal string to the list of goal predicates:
                        goal_preds.append(goal_string)

                        # -- we also want to parse the goal string to identify all possible expressions for objects at the object level:
                        predicate_args = goal_preds[-1][1:-1].split(" ")
                        objects_in_OLP.extend([predicate_args[x] for x in range(1, len(predicate_args))])
                #endwhile
            #endif
        #endfor

        # print("objects in OLP:", objects_in_OLP)

        assert bool(goal_preds), f"Error: empty list of goals! Check the generated micro-problem file '{micro_problem_file}'"

        # -- prompt LLMs to perform object grounding
        #       (remove any objects that do not require grounding -- these are handled by the task planner system):
        objects_in_OLP = list(set(objects_in_OLP) - set(['hand', 'air', 'table', 'robot']))

        message = [
            {
                "role": "user",
                "content": f"Assign each of these simulation objects to real-world objects. You may only assign each object once."
                    " Give your output as a Python dictionary."
                    f"\nReal-world object: {objects_in_OLP}"
                    f"\nSimulated objects: {utamp.objects_in_sim}"
                    "\nExample: {'object_1': 'sim_object_1', 'object_2': 'sim_object_2'}"
            }
        ]

        _, response = prompt_LLM(message, selected_LLM)
        # print("LLM response:", response)
        regex_matches = re.findall(r'\{.+\}', str(eval(response)))

        if bool(regex_matches):
            # -- this means that the LLM has proposed some object groundings for us:
            obj_grounding = eval(regex_matches.pop())
            print(obj_grounding)

            # -- parse the goal predicates and replace the generic object names with those of the sim objects:
            print("before grounding:", goal_preds)
            for G in range(len(goal_preds)):
                goal_pred_parts = goal_preds[G][1:-1].split(" ")
                for obj in goal_pred_parts[1:]:
                    if obj in obj_grounding:
                        goal_preds[G] = goal_preds[G].replace(obj, obj_grounding[obj])

            print("after grounding:", goal_preds)

        success = driver.main(utamp, goal_preds=goal_preds)

        total_success += int(success)

    # utamp.pause()

    print('\n\n% Successful:', total_success / macro_count * 100.0)

 -- [FOON-TAMP] : Searching for micro-level plan for 'pick_and_place_0' macro-PO...
{'green_block': 'green_block_1', 'green_tile': 'green_tile'}
before grounding: ['(in hand air)', '(on green_tile green_block)', '(under green_block green_tile)', '(on table green_tile)', '(under green_tile table)']
after grounding: ['(in hand air)', '(on green_tile green_block_1)', '(under green_block_1 green_tile)', '(on table green_tile)', '(under green_tile table)']
user 51
green_tile:on,green_block_1;green_block_1:under,green_tile:on,air;
robot:x,0.0:y,0.0:z,0.0:roll,-0.0:pitch,0.0:yaw,-0.0:dx,0.0:dy,0.0:dz,0.0;hand:x,0.6994800000000002:y,0.2499000000000001:z,0.12973999999999997:roll,-3.1362:pitch,0.0005594700000000037:yaw,-1.5647999999999997:dx,0.024275404392629724:dy,0.2044156506692143:dz,0.05569436541288881;worksurface:x,0.5100384449958496:y,1.6720352996602017e-07:z,-0.07891372871402935:roll,0.0:pitch,0.0:yaw,-3.141590299206751:dx,0.6:dy,0.7:dz,0.0;red_tile:x,0.5750384449958343:y,1.88512194410073

# Baseline: LLM-as-Planner 

In [177]:
utamp.stop()

if run_utamp:
    utamp = driver.UTAMP(scene_file_name=fpath)

    print("objects presently in scene:", utamp.objects_in_sim)

    # -- use the LLM as a planner to acquire a task plan:

    # -- we are going to feed the LLM with the prompt of coming up with a plan using the pre-defined skills:
    llm_planner_sys_prompt = open('llm_prompts/llm_planner_system_prompt_2.txt', 'r').read()
    message = [{"role": "system", "content": llm_planner_sys_prompt}]

    llm_planner_user_prompt = f"Generate a PDDL task plan for the following task: {user_task} "\
        f"The current state of the robot's environment is provided as the following dictionary: {utamp.perform_sensing(method=-1)}."

    print(llm_planner_user_prompt)

    message.extend([{"role": "user", "content": llm_planner_user_prompt}])
    print()

    _, response = prompt_LLM(given_prompt=message, model_name=openai_models[-1], verbose=False)
    print(response)

    step_count = 0
    steps = response.split('\n')

    goals_per_step = []
    parsed_steps = []

    while True:
        step_count += 1

        found = False

        for x in range(len(steps)):
            if steps[x].startswith(f"{step_count}."):

                found = True

                step_parts = steps[x].split('(')[1].split(')')[0]
                action, args = step_parts.split(' ')[0], step_parts.split(' ')[1:]

                tokenized_step = step_parts.split(' ')

                parsed_steps.append(tokenized_step)

                if 'place' in action or 'stack' in action:
                    goals_per_step.append([
                        f"(on {args[0]} air)",
                        f"(under {args[0]} {args[1]})",
                        f"(on {args[1]} {args[0]})",
                    ])

        if not found:
            break


  -- Acquiring object properties from CoppeliaSim...: 100%|██████████| 17/17 [00:00<00:00, 136.94it/s]

objects presently in scene: ['red_tile', 'blue_tile', 'yellow_tile', 'green_tile', 'red_block_1', 'red_block_2', 'red_block_3', 'blue_block_1', 'blue_block_2', 'yellow_block_1', 'yellow_block_2', 'yellow_block_3', 'green_block_1', 'green_block_2', 'green_block_3', 'green_block_4', 'green_block_5']
Generate a PDDL task plan for the following task: I have 2 yellow blocks, 1 red block, 1 green block, and 1 blue block. There is a green tile on the table. How can I stack all blocks on their corresponding tiles? The current state of the robot's environment is provided as the following dictionary: {'on': {'red_tile': 'air', 'blue_tile': 'air', 'yellow_tile': 'air', 'green_tile': 'air', 'red_block_1': 'air', 'red_block_2': 'air', 'red_block_3': 'air', 'blue_block_1': 'air', 'blue_block_2': 'air', 'yellow_block_1': 'air', 'yellow_block_2': 'air', 'yellow_block_3': 'air', 'green_block_1': 'air', 'green_block_2': 'air', 'green_block_3': 'air', 'green_block_4': 'air', 'green_block_5': 'air'}, 'und




The task prompt and the dictionary seem to be inconsistent. The task prompt mentions 2 yellow blocks, 1 red block, 1 green block, and 1 blue block, but the dictionary includes more blocks. I will assume that the task prompt is correct and ignore the extra blocks in the dictionary.

Here is the PDDL task plan:

1. (pick yellow_block_1 table)
2. (place yellow_block_1 yellow_tile)
3. (pick yellow_block_2 table)
4. (place yellow_block_2 yellow_block_1)
5. (pick red_block_1 table)
6. (place red_block_1 red_tile)
7. (pick green_block_1 table)
8. (place green_block_1 green_tile)
9. (pick blue_block_1 table)
10. (place blue_block_1 blue_tile)


In [159]:
%%script --no-raise-error false

if run_utamp:
    print(len(goals_per_step))

    total_success = 0
    for x in goals_per_step:
        success = driver.main(utamp, goal_preds=x)
        total_success += int(success)

    print('\n\n% Successful:', total_success / (step_count-1) * 100.0)

5


KeyboardInterrupt: 

In [175]:
if run_utamp:
    utamp.start()

    total_success = 0

    for step in parsed_steps:
        close_gripper, target_object = None, None
        if 'pick' in step[0]:
            close_gripper = 1
        else:
            close_gripper = 0
        target_object = step[-1]

        success = utamp.path_planning(
            target_object=target_object,
            gripper_action=close_gripper)

        total_success += int(success)

    print('\n\n% Successful:', total_success / (step_count-1) * 100.0)

    utamp.stop()

Exception: 316: in sim.getObjectAlias: object does not exist.

In [178]:
# %%script --no-raise-error false
utamp.start()
utamp.path_planning(
	target_object='red_block_2',
	gripper_action=1,
	algorithm='PRMstar')

utamp.path_planning(
	target_object='green_block_2',
	gripper_action=0,
	algorithm='PRMstar')

utamp.path_planning(
	target_object='green_block_3',
	gripper_action=1,
	algorithm='PRMstar')

utamp.path_planning(
	target_object='red_block_2',
	gripper_action=0,
	algorithm='PRMstar')

utamp.path_planning(
	target_object='yellow_block_1',
	gripper_action=1,
	algorithm='PRMstar')

utamp.path_planning(
	target_object='yellow_tile',
	gripper_action=0,
	algorithm='PRMstar')

utamp.stop()