# Test Module Act prompt
In this notebook we will manually test some prompts and completions of the module Actuate. We are saving the results on a .txt file to compare them with other executions. 

In [3]:
import sys
import os

# Add the parent directory to sys.path
sys.path.append(os.path.abspath('../../../'))

from dotenv import load_dotenv
import threading
from llm import LLMModels

from queue import Queue
import re
from utils.llm import extract_answers
from utils.logging import CustomAdapter
from game_environment.substrates.python.commons_harvest_open import ASCII_MAP
from agent.memory_structures.spatial_memory import SpatialMemory
from game_environment.utils import connected_elems_map
from utils.math import manhattan_distance

spatial_memory = SpatialMemory(ASCII_MAP)

In [4]:

load_dotenv()
llm_models = LLMModels()
best_model = llm_models.get_gpt_4o_model()

# Test the model
print("Testing the model with a completion task")

# Load the model
best_model.completion("which version of gpt do you are", inputs=[])


Testing the model with a completion task


NotFoundError: Error code: 404 - {'error': {'message': 'The model `gpt-4a` does not exist or you do not have access to it.', 'type': 'invalid_request_error', 'param': None, 'code': 'model_not_found'}}

## Experiment parameters
We will use the following parameters for the tests:

In [81]:
# Add a description to the experiment if required, example:
# description = "This experiment tests actuate without plan, and world understanding. Also we are changing Observations formats."
description = "This experiment tests actuate without plan, and world understanding. Also we are changing Observations formats."

# Modify the path to the prompt file if required, do not include the folder "prompts" in the path
#prompt_path = "actuate_tests/actuate_5.txt"
prompt_path = "actuate_tests/actuate_attack_2.txt"
# Modify the number of replicates, it mean the quantity of times that the prompt will be executed
replicates = 50

local_agent_position = (9, 5) # Position of the agent in the observaed map

In [82]:
with open(f"prompts/{prompt_path}", "r") as file:
    prompt_content = file.read()

prompt_content

start_of_the_prompt = "You have this information about an agent"

# Extract some information from the prompt
Observed_map = re.findall(r"Observed_map\s+=\s+\"{3}(.+)\"{3}", prompt_content, re.DOTALL)[0]
agents_names = re.findall(r"agents\s+=\s+(\[.+\])", prompt_content)
agents_names = eval(agents_names[0])
agents_ids = [str(i) for i in range(len(agents_names))]
orientation = int(re.findall(r"orientation\s+=\s+(\d+)", prompt_content)[0])
position = eval(re.findall(r"position\s+=\s+(\(.+\))", prompt_content)[0])
completion_prompt = re.findall(start_of_the_prompt + r".+", prompt_content, re.DOTALL)[0]
completion_prompt

'You have this information about an agent called Laura:\n\nLaura\'s bio: Laura is a cooperative person. \n\nLaura\'s world understanding: I am in a misterious grid world. In this world there are the following elements:\nApple: This object can be taken by any agent. The apple is taken when I go to its position. Apples only grow on grass tiles. When an apple is taken it gives the agent who took it a reward of 1.\nGrass: Grass tiles are visible when an apple is taken. Apples will regrow only in this type of tile based on a probability that depends on the number of current apples in a L2 norm neighborhood of radius 2. When there are no apples in a radius of 2 from the grass tile, the grass will disappear. On the other hand, if an apple grows at a determined position, all grass tiles that had beeen lost will reappear if they are between a radius of two from the apple.\nTree: A tree is composed from apples or grass tiles, and it is a tree because the patch of these tiles is connected and hav

In [83]:
def get_global_position(orientation, position, local_dest_pos: tuple[int, int], local_self_pos: tuple[int, int]) -> tuple[int, int]:
        """Get the global position of an element given its local position on the observed map.

        Args:
            local_dest_pos (tuple[int, int]): Local position of the destination on the observed map.
            local_self_pos (tuple[int, int]): Local position of the agent on the observed map.

        Returns:
            tuple[int, int]: Global position of the element.
        """
        # North
        if orientation == 0:
            element_global = (local_dest_pos[0] - local_self_pos[0]) + position[0],\
                                (local_dest_pos[1] - local_self_pos[1]) + position[1]
        # East
        elif orientation == 1:
            element_global = (local_dest_pos[1] - local_self_pos[1]) + position[0],\
                             -1 * (local_dest_pos[0] - local_self_pos[0]) + position[1]
        # South
        elif orientation == 2:
            element_global = -1 * (local_dest_pos[0] - local_self_pos[0]) + position[0],\
                             -1 * (local_dest_pos[1] - local_self_pos[1]) + position[1]
        # West
        elif orientation == 3:
            element_global =  -1 * (local_dest_pos[1] - local_self_pos[1]) + position[0],\
                                (local_dest_pos[0] - local_self_pos[0])+ position[1]

        return element_global

def get_local_position(orientation, position, global_dest_pos: tuple[int, int], local_self_pos: tuple[int, int]) -> tuple[int, int]:
        """Get the local position of an element given its global position on the observed map.

        Args:
            global_dest_pos (tuple[int, int]): Global position of the destination on the observed map.
            local_self_pos (tuple[int, int]): Local position of the agent on the observed map.

        Returns:
            tuple[int, int]: Local position of the element.
        """
        # North
        if orientation == 0:
            element_local = (global_dest_pos[0] - position[0]) + local_self_pos[0],\
                                (global_dest_pos[1] - position[1]) + local_self_pos[1]
        # East
        elif orientation == 1:
            element_local = (global_dest_pos[1] - position[1]) + local_self_pos[1],\
                            -1 * (global_dest_pos[0] - local_self_pos[0]) + position[1]
        # South
        elif orientation == 2:
            element_local = -1 * (global_dest_pos[0] - local_self_pos[0]) + position[0],\
                             -1 * (global_dest_pos[1] - local_self_pos[1]) + position[1]
        # West
        elif orientation == 3:
            element_local =  -1 * (global_dest_pos[1]  - local_self_pos[1]) + position[0],\
                                (global_dest_pos[0] - position[1]) + local_self_pos[0]

        return element_local

In [84]:
def check_if_apple_was_grabbed(observed_map: str, orientation: int, position: tuple[int, int], choose_position: list[int], local_agent_position: tuple[int, int]):
    local_chosen_position = get_local_position(orientation, position, (choose_position[0], choose_position[1]), local_agent_position)
    if observed_map.strip().split('\n')[local_chosen_position[0]][local_chosen_position[1]] == 'A':
        return True
    return False

def check_if_agent_chose_grass(observed_map: str, orientation: int, position: tuple[int, int], choose_position: list[int], local_agent_position: tuple[int, int]):
    local_chosen_position = get_local_position(orientation, position, (choose_position[0], choose_position[1]), local_agent_position)
    if observed_map.strip().split('\n')[local_chosen_position[0]][local_chosen_position[1]] == 'G':
        return True
    return False

def check_if_agent_chose_empty(observed_map: str, orientation: int, position: tuple[int, int], choose_position: list[int], local_agent_position: tuple[int, int]):
    local_chosen_position = get_local_position(orientation, position, (choose_position[0], choose_position[1]), local_agent_position)
    if observed_map.strip().split('\n')[local_chosen_position[0]][local_chosen_position[1]] == 'F':
        return True
    return False

def check_if_choose_nearest_apple(observed_map: str, orientation: int, position: tuple[int, int], choose_position: list[int], local_agent_position: tuple[int, int]):
    groups = connected_elems_map(observed_map.strip(), ['A'])
    for group in groups.values():
        nearest_apple = None
        nearest_distance = float('inf')
        for apple in group['elements']:
            distance = manhattan_distance(apple, local_agent_position)
            if distance < nearest_distance:
                nearest_distance = distance
                nearest_apple = apple

    apple_global = get_global_position(orientation, position, nearest_apple, local_agent_position)
        
    if apple_global[0] == choose_position[0] and apple_global[1] == choose_position[1]:
        return True
    return False

def check_if_choose_nearest_tree(observed_map: str, orientation: int, position: tuple[int, int], choose_position: list[int], local_agent_position: tuple[int, int]):
    groups = connected_elems_map(observed_map.strip(), ['A', '#', 'G'] + agents_ids)
    
    nearest_tree = None
    nearest_distance = float('inf')
    chosen_tree = None
    local_chosen_position = get_local_position(orientation, position, (choose_position[0], choose_position[1]), local_agent_position)
    for group in groups.values():

        # Is it a tree? a tree is composed by at least an 'A' or a 'G' element
        is_tree = False
        for el in group['elements']:
            if observed_map.strip().split('\n')[el[0]][el[1]] == 'A' or observed_map.strip().split('\n')[el[0]][el[1]] == 'G':
                is_tree = True
                break
        
        if not is_tree:
            continue

        # Check if this tree is the chosen one
        if list(local_chosen_position) in group['elements']:
            chosen_tree = group['center']

        tree_center = group['center']

        distance = manhattan_distance(tree_center, local_agent_position)
        if chosen_tree == tree_center:
            if distance <= nearest_distance:
                nearest_distance = distance
                nearest_tree = tree_center
        else:
            if distance < nearest_distance:
                nearest_distance = distance
                nearest_tree = tree_center

    if nearest_tree == chosen_tree:
        return True
    return False

def check_if_tree_is_depleted(observed_map: str, choose_position: list[int], orientation: int, position: tuple[int, int]):
    groups = connected_elems_map(observed_map.strip(), ['A', '#', 'G'] + agents_ids)
    for group in groups.values():
        apples = []
        for elem in group['elements']:
            element = observed_map.strip().split('\n')[elem[0]][elem[1]]
            if element == 'A':
                apples.append(elem)
        
        if len(apples) == 1:
            apple_global = get_global_position(orientation, position, apples[0], local_agent_position)
            if apple_global[0] == choose_position[0] and apple_global[1] == choose_position[1]:
                return True
    return False

def check_if_agent_is_attacked(observed_map: str, orientation: int, position: tuple[int, int], choose_position: list[int], attacked_player: str):
    # Find the positions of numbers in the observed map
    local_chosen_position = get_local_position(orientation, position, (choose_position[0], choose_position[1]), local_agent_position)
    tile_value = observed_map.strip().split('\n')[local_chosen_position[0]][local_chosen_position[1]]
    if tile_value in agents_ids:
        if attacked_player.lower() == agents_names[int(tile_value)].lower():
            return True
    
    return False

In [85]:
import re
import datetime

def format_ratio(ratio):
    """Formats the ratio as a percentage with 2 decimal places."""
    return f"{ratio:.2%}"


def extract_experiment_results(results:list):
    
    """
    Function to extract the results of the experiment and save them to a string
    It will show the success completions, the ratio of grabbing the last apple, 
    the ratio of avoiding consuming the last apple, and the ratio of attacking agents
    
    Args:
        results (list): The list of results
    Returns:
        str_results (str): The results of the experiment in string format
    """
    
        # Initialize variables
    success_completions =  0 
    took_apple = 0
    go_to_grass = 0
    go_to_empty = 0
    attacked = 0
    stay_put = 0
    neares_apple = 0
    nearest_tree = 0
    depleted_tree = 0
    successfully_attacked = 0
    errors = 0
    first_actions_summary = []

    for response in results:
        try:
            response_dict = extract_answers(response.lower())
            actions = [response_dict['answer']]
            for i, action in enumerate(actions):
                current_action = action
                end_position = spatial_memory.get_position_from_action(current_action)
                
                # Analyze first action
                if i == 0:
                    first_actions_summary.append(current_action)
                    if (current_action.startswith(('grab ', 'consume ')) or "go to " in current_action):
                        # Check if the agent took an apple, went to grass or went to an empty space
                        if check_if_apple_was_grabbed(Observed_map, orientation, position, end_position, local_agent_position):
                            took_apple += 1
                            # Check if the agent chose the nearest apple
                            if check_if_choose_nearest_apple(Observed_map, orientation, position, end_position, local_agent_position):
                                neares_apple += 1
                            # Check if the agent chose the nearest tree
                            if check_if_choose_nearest_tree(Observed_map, orientation, position, end_position, local_agent_position):
                                nearest_tree += 1
                            # Check if the agent chose a depleted tree
                            if check_if_tree_is_depleted(Observed_map, end_position, orientation, position):
                                depleted_tree += 1
                        elif check_if_agent_chose_grass(Observed_map, orientation, position, end_position, local_agent_position):
                            go_to_grass += 1
                            # Check if the agent chose the nearest tree
                            if check_if_choose_nearest_tree(Observed_map, orientation, position, end_position, local_agent_position):
                                nearest_tree += 1
                        elif check_if_agent_chose_empty(Observed_map, orientation, position, end_position, local_agent_position):
                            go_to_empty += 1
                        else:
                            errors += 1
                            print(f"Wrong position: {current_action}")
                    elif current_action.startswith(('attack ', 'immobilize ', 'tie')):
                        attacked += 1

                        regex = r"player\s+\(?(\w+)\)\s+at\s+[\[\()](\d+).*(\d)[\)\]]"
                        match = re.search(regex, current_action)
                        if match:
                            attacked_agent_name = match.group(1)
                            attacked_agent_position = (int(match.group(2)), int(match.group(3)))
                            if check_if_agent_is_attacked(Observed_map, orientation, position, attacked_agent_position, attacked_agent_name):
                                successfully_attacked += 1

                    elif current_action.startswith(('avoid consuming ', 'stay put')):
                        stay_put += 1
                    else:
                        errors += 1
                        print(f"Wrong action: {current_action}")
                
                # Analyze all actions
                #if current_action.startswith(('grab ', 'consume ')) or "go to " in current_action and end_position == last_apple_position:
                    #grab_last_apple += 1
                #elif current_action.startswith('attack '):
                    #times_decided_to_attack += 1
                #elif current_action.startswith('avoid consuming '):
                    #times_avoid_consuming += 1

            success_completions += 1
        except Exception as e:
            print(f"Error processing result: {e}")

    # Prepare formatted string
    str_results = (
        f"Experiment Results Summary\n"
        f"--------------------------\n"
        f"Success completions: {success_completions}\n\n"
        f"First Action Insights:\n"
        f"- depleted a tree: {format_ratio(depleted_tree / success_completions)}\n"
        f"- Chose nearest apple: {format_ratio(neares_apple / success_completions)}\n"
        f"- Chose nearest tree: {format_ratio(nearest_tree / success_completions)}\n"
        f"- stayed put: {format_ratio(stay_put / success_completions)}\n"
        f"- took an apple: {format_ratio(took_apple / success_completions)}\n"
        f"- went to grass: {format_ratio(go_to_grass / success_completions)}\n"
        f"- went to empty: {format_ratio(go_to_empty / success_completions)}\n"
        f"- attacked: {format_ratio(attacked / success_completions)}\n"
        f"- errors: {format_ratio(errors / success_completions)}\n"
        f"- successfully attacked: {format_ratio(successfully_attacked / success_completions)}\n\n"
        f"Overall Insights:\n"
        #f"- Grab the last apple overall: {format_ratio(grab_last_apple / (success_completions*3))}\n"
        #f"- Avoid consuming overall: {format_ratio(times_avoid_consuming / (success_completions*3))}\n"
        #f"- Decide to attack overall: {format_ratio(times_decided_to_attack / (success_completions*3))}\n\n"
        f"First Actions Summary:\n"
        + "\n".join([f"- {action}" for action in first_actions_summary])
    )

    return str_results

# Function to execute the API call in a thread
def get_completion(results:list, index:int, prompt_path:str):
    """
    Function to execute the API call in a thread
    
    Args:
        results (list): The list to store the results
        index (int): The index of the list where the result will be stored
        prompt_path (str): The path to the prompt file
    Returns:
        None 
    """
    try:
        llm = LLMModels().get_longer_context_fallback()
        response = llm.completion(prompt=prompt_path, inputs=[])
        results[index] = response  # Store the response in the results list at the given index
    except ValueError as e:
        print(e)
        results[index] = None  # Store None if an error occurs




def execute_experiment (prompt_path:str, replicates:int, description:str=""):
    """
    Function to execute the experiment
    
    Args:
        prompt_path (str): The path to the prompt file
        replicates (int): The number of replicates
        description (str): The description of the experiment
    Returns:
        None
    """
    load_dotenv()
    results = [None] * replicates
    threads = []

    for i in range(replicates):
        thread = threading.Thread(target=get_completion, args=(results, i, completion_prompt), name=f"Thread-{i}")
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    str_results = extract_experiment_results(results)
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    module = prompt_path.split('/')[0]
    file_name = prompt_path.split('/')[1].split('.')[0]
    results_filename = f"results_{file_name}___{timestamp}.txt"
    folder = "experiment_results"
    path_to_results =  f"{folder}/{results_filename}"
    with open(path_to_results, "w") as file:
        file.write('\n'+f'#'*120 + '\n') 
        file.write (f"###       Experiment for Actuate Module Promt: {prompt_path}      ###\n")
        file.write(f'#'*80 + '\n') 
        
        file.write(f"Experiment Description:\n{description}\n\n")
        
        file.write('\n'+f'#'*120 + '\n') 
        file.write(str_results)
        
        #Write the prompt itself to the file
        file.write('\n'+f'#'*120 + '\n') 
        file.write("\n\nPrompt:\n")
        file.write(completion_prompt)
        
        file.write('\n'+f'#'*120 + '\n') 
        file.write("\n\nLLM Completions Detail:\n")
        for i, result in enumerate(results, start=1):
            file.write(f"Replicate {i}:\n{result or 'No response'}\n\n")




In [86]:

execute_experiment(prompt_path, replicates, description)



ERROR:agent.memory_structures.spatial_memory:Action stay put does not contain a position
ERROR:agent.memory_structures.spatial_memory:Action stay put does not contain a position
ERROR:agent.memory_structures.spatial_memory:Action stay put does not contain a position
ERROR:agent.memory_structures.spatial_memory:Action stay put does not contain a position
ERROR:agent.memory_structures.spatial_memory:Action stay put does not contain a position
ERROR:agent.memory_structures.spatial_memory:Action stay put does not contain a position
ERROR:agent.memory_structures.spatial_memory:Action stay put does not contain a position


Wrong position: go to position (9, 1)
Wrong position: go to position (9, 1)
Error processing result: 'answer'
Error processing result: 'answer'
Error processing result: 'answer'
