In [20]:
import re
import json
import copy
import numpy as np
from typing import Dict, List, Tuple

In [21]:
def action_from_response(
    pg_dict_input: Dict[str, List[str]], original_response_dict: Dict
) -> Tuple[str, Dict[str, List[str]]]:
    """
    Updates the environment state based on the actions in the response.

    Args:
        pg_dict_input (Dict[str, List[str]]): Current state of the playground.
        original_response_dict (Dict): Actions to be executed.

    Returns:
        Tuple[str, Dict[str, List[str]]]: Feedback string and updated playground state.
    """
    system_error_feedback = ""
    pg_dict_original = copy.deepcopy(pg_dict_input)
    transformed_dict = {}
    for key, value in original_response_dict.items():
        coordinates = tuple(map(float, re.findall(r"\d+\.?\d*", key)))

        # match the item and location in the value
        match = re.match(r"move\((.*?),\s(.*?)\)", value)
        if match:
            item, location = match.groups()
            if "square" in location:
                location = tuple(map(float, re.findall(r"\d+\.?\d*", location)))
            transformed_dict[coordinates] = [item, location]

    for key, value in transformed_dict.items():
        # print(f"Key: {key}, Value1: {value[0]}, Value2: {value[1]}")
        if (
            value[0] in pg_dict_original[str(key[0]) + "_" + str(key[1])]
            and type(value[1]) == tuple
            and (
                (
                    np.abs(key[0] - value[1][0]) == 0
                    and np.abs(key[1] - value[1][1]) == 1
                )
                or (
                    np.abs(key[0] - value[1][0]) == 1
                    and np.abs(key[1] - value[1][1]) == 0
                )
            )
        ):
            pg_dict_original[str(key[0]) + "_" + str(key[1])].remove(value[0])
            pg_dict_original[str(value[1][0]) + "_" + str(value[1][1])].append(value[0])
        elif (
            value[0] in pg_dict_original[str(key[0]) + "_" + str(key[1])]
            and type(value[1]) == str
            and value[1] in pg_dict_original[str(key[0]) + "_" + str(key[1])]
            and value[0][:4] == "box_"
            and value[1][:7] == "target_"
            and value[0][4:] == value[1][7:]
        ):
            pg_dict_original[str(key[0]) + "_" + str(key[1])].remove(value[0])
            pg_dict_original[str(key[0]) + "_" + str(key[1])].remove(value[1])
        else:
            # print(f"Error, Iteration Num: {iteration_num}, Key: {key}, Value1: {value[0]}, Value2: {value[1]}")
            system_error_feedback += f"Your assigned task for {key[0]}_{key[1]} is not in the doable action list; "

    return system_error_feedback, pg_dict_original

In [12]:
pg_state = {"0.5_0.5": ["box_red"], 
            "0.5_1.5": ["target_blue", "target_red", "box_green", "target_green", "target_purple", "target_orange", "box_orange", "target_orange"], 
            "1.5_0.5": ["box_blue", "box_purple"], 
            "1.5_1.5": ["box_orange"]}
action = {'Agent[0.5, 0.5]': 'move(box_red, square[0.5, 1.5])', 
          'Agent[1.5, 0.5]': 'move(box_blue, square[0.5, 0.5])'}

In [23]:
action_from_response(
    pg_dict_input=pg_state, original_response_dict=action
)

('',
 {'0.5_0.5': ['box_blue'],
  '0.5_1.5': ['target_blue',
   'target_red',
   'box_green',
   'target_green',
   'target_purple',
   'target_orange',
   'box_orange',
   'target_orange',
   'box_red'],
  '1.5_0.5': ['box_purple'],
  '1.5_1.5': ['box_orange']})

In [1]:
pg_dict = [
    {'0.5_0.5': ['box_blue', 'target_blue', 'target_green', 'target_purple'], 
     '0.5_1.5': ['box_red', 'target_orange'], 
     '1.5_0.5': ['box_blue', 'target_blue', 'target_red', 'box_red', 'target_red', 'box_green', 'box_orange', 'box_orange'], 
     '1.5_1.5': ['target_blue', 'box_blue', 'box_purple', 'target_orange']}, 
    ...
]

In [22]:
def better_state_repres(pg_dict):
    new_pg_dict = {}

    for key, value in pg_dict.items():
        new_pg_dict[f'{key[:3]}, {key[-3:]}'] = value

    return new_pg_dict

In [23]:
better_state_repres(pg_dict[0])

{'0.5, 0.5': ['box_blue', 'target_blue', 'target_green', 'target_purple'],
 '0.5, 1.5': ['box_red', 'target_orange'],
 '1.5, 0.5': ['box_blue',
  'target_blue',
  'target_red',
  'box_red',
  'target_red',
  'box_green',
  'box_orange',
  'box_orange'],
 '1.5, 1.5': ['target_blue', 'box_blue', 'box_purple', 'target_orange']}

In [2]:
def is_valid_json(response: str) -> bool:
    """
    Checks if a response string is in valid JSON format.

    Args:
        response (str): The response string to validate.

    Returns:
        bool: True if the response is valid JSON, False otherwise.
    """
    try:
        json.loads(response)
        return True
    except ValueError:
        return False

In [3]:
is_judge = True


1. Bracket Check - find bracket
3. JSON Check

In [102]:
error_response = ['asdasd{"Agent[0.5, 0.5]":"move(box_blue, square[0.5, 1.5])", "agent[0.5, 1.5]":"move(box_blue, target_blue)"}asdsada', 
                  'asdasd{\'Agent[0.5, 0.5]\':\'move(box_blue, square[0.5, 1.5])", "Agent[0.5, 1.5]":"move(box_blue, target_blue)"}asdsada',
                  'asdasd{Agent[0.5, 0.5]:move(box_blue, square[0.5, 1.5]), Agent[0.5, 1.5]:move(box_blue, target_blue)"}asdsada', 
                  'Agent[0.5, 0.5]": ["move(box_blue, square[0.5, 1.5])"], "Agent[0.5, 1.5]": ["move(box_blue, target_blue)"]', 
                  '']

In [8]:
raw_response='''RAW RESPONSE: aaaasdsasdasda
sadsdasd
adsads
EXECUTE
{''s[[]]][Agent[0.5, 0.5]}]"asdasd:asda"move(box_blue, square[0.5, 1.5])", "agent[0.5, 1.5]":"move(box_blue, target_blue)"}
EXECUTE
asdsadaa''asda\
Justification:....
'''

def process_raw_response(response: str) -> dict:
    """
    Processes a raw response string containing agent locations and actions, 
    extracts relevant information, and converts it into dictionary format for loading in json.

    Args:
        raw_response (str): The string is expected to include an "EXECUTE" keyword followed by 
            JSON-like content describing agent locations and actions 
            (e.g "EXECUTE
            {"Agent[0.5, 0.5]":"move(box_blue, square[0.5, 1.5])")

    Returns:
        dict: A dictionary where keys are agent identifiers (e.g., "Agent[0.5, 0.5]") 
              and values are actions (e.g., "move(box_blue, square[0.5, 1.5])").

    Raises:
        ValueError: If the input string cannot be parsed.
    """
    
    #response = raw_response[re.search(r'EXECUTE', raw_response).end():]
    #response = response[:re.search(r'EXECUTE', response).start()]
    #response = raw_response

    try:
        pattern_1 = r'agent\[(\d+\.\d+), (\d+\.\d+)\]'
        agent_loc = re.findall(pattern_1, response, re.IGNORECASE)

        pattern_2 = r'move(\(\w+, (?:square.\d+\.\d+, \d+\.\d+.|\w+)\))'
        agent_action = re.findall(pattern_2, response, re.IGNORECASE)

        # print(agent_loc)
        # print(agent_action)
    except:
        print(f'ERROR IN PARSING THE RESPONSE: {response}')

    num_agent = len(agent_loc)
    action = []
    print(len(agent_loc), len(agent_action))

    for i in range(num_agent):
        
        action.append(f'"Agent[{agent_loc[i][0]}, {agent_loc[i][1]}]": "move{agent_action[i]}"')

    json_str = '{' + ', '.join(action) + '}'
    
    response_dict = json.loads(json_str)

    return response_dict
    
    
ex = '{"Agent[0.5, 0.5]": "move(box_purple, square[1.5, 1.5])", "Agent[0.5, 1.5]": "move(box_green, target_orange)", "Agent[1.5, 0.5]": "move(box_green, target_orange)", "Agent[1.5, 1.5]": "move(box_blue, target_blue)"}'
process_raw_response(ex)
# response_dict = process_raw_response(raw_response)
# response_dict


4 4


{'Agent[0.5, 0.5]': 'move(box_purple, square[1.5, 1.5])',
 'Agent[0.5, 1.5]': 'move(box_green, target_orange)',
 'Agent[1.5, 0.5]': 'move(box_green, target_orange)',
 'Agent[1.5, 1.5]': 'move(box_blue, target_blue)'}

In [187]:
a = '''
A1
{"Agent[1.5, 0.5]": "move(box_purple, square[0.5, 1.5])", "Agent[1.5, 1.5]": "move(box_blue, target_orange)"}
A2
{"Agent[1.5, 0.5]": "move(box_purple, square[0.5, 1.5])", "Agent[1.5, 1.5]": "move(box_blue, )"}

Final Action:
{"Agent[1.5, 0.5]": "move(box_purple, square[0.5, 1.5])", "Agent[1.5, 1.5]": "move(, target_orange)"}
'''

match = re.search(r'\{.*?\}', a, re.DOTALL)
if match:
    possible_action = re.findall(r'\{.*?\}', a, re.DOTALL)
    print(possible_action)

['{"Agent[1.5, 0.5]": "move(box_purple, square[0.5, 1.5])", "Agent[1.5, 1.5]": "move(box_blue, target_orange)"}', '{"Agent[1.5, 0.5]": "move(box_purple, square[0.5, 1.5])", "Agent[1.5, 1.5]": "move(box_blue, )"}', '{"Agent[1.5, 0.5]": "move(box_purple, square[0.5, 1.5])", "Agent[1.5, 1.5]": "move(, target_orange)"}']


In [212]:
a = '{"Agent[0.5, 0.5]": "move(box_blue, target_blue)", "Agent[0.5, 1.5]": "move(box_blue, square[0.5, 1.5])", "Agent[1.5, 0.5]": "move(box_red, target_red)", "Agent[1.5, 1.5]": "move(box_orange, target_orange)"}'

re.findall(r'move\((.*?),\s(.*?)\)', a)[0][0].strip('\'')

'box_blue'

In [196]:
re.findall(r'move(\(\w+, (?:square.\d+\.\d+, \d+\.\d+.|\w+)\))', a)[0]

'(box_blue, target_blue)'

In [225]:
for response in error_response[:1]:
    try:
        pattern_1 = r'agent\[(\d+\.\d+), (\d+\.\d+)\]'
        agent_loc = re.findall(pattern_1, response, re.IGNORECASE)

        pattern_2 = r'move\((.*?),\s(.*?)\)' #r'move(\(\w+, (?:square.\d+\.\d+, \d+\.\d+.|\w+)\))'
        agent_action = re.findall(pattern_2, response, re.IGNORECASE)

        print(agent_loc)
        print(agent_action)
    except:
        print(f'ERROR IN PARSING THE RESPONSE: {response}')

    num_agent = len(agent_loc)

    action = []

    for i in range(num_agent):
        
        action.append(f'"Agent[{agent_loc[i][0]}, {agent_loc[i][1]}]": "move({agent_action[i][0]}, {agent_action[i][1]})"')

    json_str = '{' + ', '.join(action) + '}'
    
    original_response_dict = json.loads(json_str)
    print(original_response_dict)

[('0.5', '0.5'), ('0.5', '1.5')]
[('box_blue', 'square[0.5, 1.5]'), ('box_blue', 'target_blue')]
{'Agent[0.5, 0.5]': 'move(box_blue, square[0.5, 1.5])', 'Agent[0.5, 1.5]': 'move(box_blue, target_blue)'}


In [219]:
original_response_dict

{'Agent[0.5, 0.5]': "move('box_blue', 'square[0.5, 1.5]')",
 'Agent[0.5, 1.5]': "move('box_blue', 'target_blue')"}

{'Agent[0.5, 0.5]': 'move(box_blue, square[0.5, 1.5])', 'Agent[0.5, 1.5]': 'move(box_blue, target_blue)'}
{'Agent[0.5, 0.5]': 'move(box_blue, square[0.5, 1.5])', 'Agent[0.5, 1.5]': 'move(box_blue, target_blue)'}
{'Agent[0.5, 0.5]': 'move(box_blue, square[0.5, 1.5])', 'Agent[0.5, 1.5]': 'move(box_blue, target_blue)'}
{'Agent[0.5, 0.5]': 'move(box_blue, square[0.5, 1.5])', 'Agent[0.5, 1.5]': 'move(box_blue, target_blue)'}

In [114]:
response = error_response[0]

pattern = r'agent\[(\d+\.\d+), (\d+\.\d+)\]'

agent_locs = re.findall(pattern, response, re.IGNORECASE)

# bracket CHECK
# match = re.search(r'{.*}', response, re.DOTALL)

#original_response_dict = match.group()

agent_locs

#re.original_response_dict
# # JSON CHECK
# original_response_dict = json.loads(original_response_dict)
# original_response_dict


[('0.5', '0.5'), ('0.5', '1.5')]

In [74]:
response = error_response[0]

pattern = 'move\(\w+, (?:square.\d+\.\d+, \d+\.\d+.|\w+)\)'

action = re.findall(pattern, response)

action

  pattern = 'move\(\w+, (?:square.\d+\.\d+, \d+\.\d+.|\w+)\)'


['move(box_blue, square[0.5, 1.5])', 'move(box_blue, target_blue)']

In [None]:
#pg_dict_original = copy.deepcopy(pg_dict_input)
transformed_dict = {}


for key, value in original_response_dict.items():
    coordinates = tuple(map(float, re.findall(r"\d+\.?\d*", key)))

    # match the item and location in the value
    match = re.match(r"move\((.*?),\s(.*?)\)", value)
    if match:
        item, location = match.groups()

        if "square" in location:
            location = tuple(map(float, re.findall(r"\d+\.?\d*", location)))

        transformed_dict[coordinates] = [item, location]

feedback = ""
for key, value in transformed_dict.items():
    # print(f"Key: {key}, Value1: {value[0]}, Value2: {value[1]}")
    if (
        value[0] in pg_dict_original[str(key[0]) + "_" + str(key[1])]
        and type(value[1]) == tuple
        and (
            (
                np.abs(key[0] - value[1][0]) == 0
                and np.abs(key[1] - value[1][1]) == 1
            )
            or (
                np.abs(key[0] - value[1][0]) == 1
                and np.abs(key[1] - value[1][1]) == 0
            )
        )
    ):
        pass
    elif (
        value[0] in pg_dict_original[str(key[0]) + "_" + str(key[1])]
        and type(value[1]) == str
        and value[1] in pg_dict_original[str(key[0]) + "_" + str(key[1])]
        and value[0][:4] == "box_"
        and value[1][:7] == "target_"
        and value[0][4:] == value[1][7:]
    ):
        pass
    else:
        if is_judge:
            feedback += f"You are the judge and your assigned task for {key[0]}_{key[1]} is not in the doable action list, so choose the alternative action of the central planner;"
        else:
            # print(f"Error, Iteration Num: {iteration_num}, Key: {key}, Value1: {value[0]}, Value2: {value[1]}")
            feedback += f"Your assigned task for {key[0]}_{key[1]} is not in the doable action list; "