Fix the system and os paths to the main repository directory.

In [12]:
import os
import sys
# Add the parent directory to sys.path
sys.path.append(os.path.abspath('../../'))
os.chdir(os.path.abspath('../../'))


Requiered packages

In [13]:
import os
import logging

import chromadb
import uuid
from chromadb.utils import embedding_functions
from utils.llm import CustomEmbeddingFunction

from utils.files import load_config
from utils.time import str_to_timestamp
from utils.logging import CustomAdapter
from dotenv import load_dotenv
load_dotenv(override=True)

# Parameters and Scene

Here you are requiered to choose the exact simulation you want to recreate. Simulation should be one from logs saved simulations, it should have inside the ltm_databes.

Then you have to choose the exact step of the simulation. From that step the simulation will be recreated. It means, the exactly state of:
- Map
- Agents
- Long term memory databases
- Short term memory of agents

In [14]:
simulation_file = "2024-02-21--16-57-53"

step_to_start = 100

In [15]:

scene_name = f"scene_{simulation_file}___step{step_to_start}"
#scene_name = f"scene_personalized_name"

## Configure paths
logs_path = "logs"
sim_path = os.path.join(logs_path, simulation_file)

scene_track_file = os.path.join(sim_path, "scene_track.txt")


scenes_path = "data/scenes"
scene_path = os.path.join(scenes_path, scene_name)

# Create the scene directory
os.makedirs(scene_path, exist_ok=True)



### Save scene track information

In [16]:
# Read the scene track file
with open(scene_track_file, "r") as f:
    scene_track = f.readlines()
    initial_track = scene_track[0]
    step_track = scene_track[step_to_start-1]

letter_orientations = { 0: "N",
                        1: "E",
                        2: "S",
                        3: "W" }

# Parse to a dictionary
step_track = eval(step_track)
initial_track = eval(initial_track)

# Write a file called state_map.txt with the state map
def save_map(step_track, scene_path):
    state_map_file = os.path.join(scene_path, "map_to_load.txt")
    with open(state_map_file, "w") as f:
        f.write(str(step_track["current_map"]))

def save_agents_status(step_track, scene_path):
    agents_status = step_track["agents_status"]
    agents_status_file = os.path.join(scene_path, "agents_status.txt")
    with open(agents_status_file, "w") as f:
        f.write(str(agents_status))

def get_apples_to_desappear(current_map, initial_map):
    apples_to_desappear = []
    current_map = current_map.split("\n")
    initial_map = initial_map.split("\n")
    for i in range(len(current_map)):
        for j in range(len(current_map[0])):
            if current_map[i][j] != 'A' and initial_map[i][j] == 'A':
                apples_to_desappear.append([i, j])
    return apples_to_desappear

def write_lua_variables(step_track, initial_track, scene_path):
    variables_file = os.path.join(scene_path, "variables.txt")
    
    agents_status = step_track["agents_status"]
    
    startPositions, startOrientations = [], []
    applesToDesappear = []
    
    for agent, status in agents_status.items():
        startPositions.append(list(status["global_position"]))
        startOrientations.append(letter_orientations[status["orientation"]])

    applesToDesappear = get_apples_to_desappear(step_track['current_map'], initial_track['current_map'])
    
    variables = {"startPositions": startPositions,
                 "startOrientations": startOrientations,
                 "applesToDesappear": applesToDesappear}
    
    with open(variables_file, "w") as f:
        f.write(str(variables))
    
save_map(step_track, scene_path)
save_agents_status(step_track, scene_path)
write_lua_variables(step_track, initial_track, scene_path)
step_track

## Filter and save the memory databases until the step 

In [None]:
data_base_folder = os.path.join(sim_path, "ltm_database")
agents = os.listdir(data_base_folder)
date_format  = load_config()['date_format']

# Create the ltm_database folder on the scene if it does not exist
ltm_scene_db_folder = os.path.join(scene_path, "ltm_database")
os.makedirs(ltm_scene_db_folder, exist_ok=True)

In [None]:
def retrieve_data_for_agent_with_filter(database_source, agent_name, step_track):
    
    """
    Retrieve collections from the database for a specific agent with a filter
    The filter is a dictionary with the metadata to filter.
    We are filtering by "timestamp", and retrieving all the collections that have a timestamp less than the one specified in the filter.
    
    Args:
        database_source: The database source
        agent_name: The agent name
        filter: The filter
    Returns:
        A list with the collections
    """
    db_path = os.path.join(database_source, agent_name, "long_term_memory.db")
    
    chroma_client = chromadb.PersistentClient(path=db_path)
    collection = chroma_client.get_or_create_collection(agent_name)
    
    try:
        timestamp_to_filt = str_to_timestamp(step_track["memory_time"], date_format)
        print(f"Timestamp to filter for agent {agent_name} is {timestamp_to_filt}")
        # Now we filter by timestamp
        filter_timestamp = {'timestamp':{"$lte": timestamp_to_filt}}
        data_filtered = collection.get(where=filter_timestamp)
            
        return data_filtered
    except:
        print(f"Error with agent {agent_name}, no data found for this step {step_to_start} and time {step_track['memory_time']}")
        return []

# Test for Juan
agent_name = "Juan"
data_filtered = retrieve_data_for_agent_with_filter(data_base_folder, agent_name, step_track)
data_filtered

In [None]:
def persist_data_for_agent (database_destination, agent_name, data):
    """
    Persist data in the database for a specific agent
    
    Args:
        database_destination: The database destination
        agent_name: The agent name
        data: The data to persist
    Returns:
        None
    """
    db_path = os.path.join(database_destination, agent_name, "long_term_memory.db")
    
    chroma_client = chromadb.PersistentClient(path=db_path)
    #openai_ef = CustomEmbeddingFunction()
    #chroma_client.delete_collection(agent_name)
        # Delete collection if it already exists
    if agent_name in [c.name for c in chroma_client.list_collections()]:
        chroma_client.delete_collection(agent_name)

    collection = chroma_client.get_or_create_collection(agent_name)
    #for doc, meta, id in zip(data['documents'], data['metadatas'], data['ids']):
    #    collection.add(documents=[doc], metadatas=meta, ids=id) 
    collection.add(documents=data['documents'], metadatas=data['metadatas'], ids=data['ids'])
    
    return

# Persist the data for Juan
agent_name = "Juan"
persist_data_for_agent(ltm_scene_db_folder, agent_name, data_filtered)

# Test for Juan
agent_name = "Juan"
data_saved = retrieve_data_for_agent_with_filter(ltm_scene_db_folder, agent_name, step_track)
len_saved, len_filtered = len(data_saved['ids']), len(data_filtered['ids'])
print(f'data saved is len {len_saved} and should be {len_filtered}')

In [None]:
# Save the data for all the agents
for agent_name in agents:
    data_filtered = retrieve_data_for_agent_with_filter(data_base_folder, agent_name, step_track)
    persist_data_for_agent(ltm_scene_db_folder, agent_name, data_filtered)
    print(f"Data saved for agent {agent_name}")
    
    # Test
    data_saved = retrieve_data_for_agent_with_filter(ltm_scene_db_folder, agent_name, step_track)
    len_saved, len_filtered = len(data_saved['ids']), len(data_filtered['ids'])
    print(f'data saved is len {len_saved} and should be {len_filtered}')


## Persist stm memory of agents

In [None]:
def persist_stm_agents( step_max:str):
    """
    Reads a txt file that contains stm for all agents for diferent steps in ascending order
    and persists the stm for all the agents in the scene folder on a stm_memories.txt 
    file.
    """
    
    stm_file = os.path.join(sim_path, "short_term_memories.txt")
    stm_scene_file = os.path.join(scene_path, "short_term_memories.txt")
    
    with open(stm_file, "r") as f:
        stm_memories = f.readlines()
    
    stm_dicts = []
    for stm in stm_memories:
        stm_dicts.append(eval(stm))
        
    stm_scene = [x for x in stm_dicts if int(x['steps_count']) <= int(step_max)]
    stm_scene = stm_scene[-1]
    with open(stm_scene_file, "w") as f:
        f.write(str(stm_scene))
        
    return stm_scene

persist_stm_agents(step_to_start)

### We save a video of the simulation until the step

In [None]:

## Adapt the visual record
# Get the images from the simulation_folder + "world", then take images that are in the range of the steps, the format is "number.png"
# Create a video with the images and save it in the scene_folder

import cv2
import numpy as np


image_folder = sim_path + "/world/"
video_path = scene_path + "/scene.avi"


images = [img for img in os.listdir(image_folder) if img.endswith(".png") or img.endswith(".jpg")]

images.sort(key=lambda x: int(x.split('.')[0]))  # Asumiendo que el nombre del archivo es el "step" y no tiene puntos adicionales

out = cv2.VideoWriter(video_path, cv2.VideoWriter_fourcc(*'DIVX'), 1/1, (1600, 1200))

for image in images:
    if 0 <= int(image.split(".")[0]) <= step_to_start:
        img_path = image_folder + image
        img = cv2.imread(img_path)
        img_resized = cv2.resize(img, (1600, 1200), interpolation=cv2.INTER_NEAREST)  # Redimensiona la imagen a la resolución deseada
        height, _, _ = img_resized.shape
        font = cv2.FONT_HERSHEY_SIMPLEX
        text = "Step: " + image.split(".")[0]
        cv2.putText(img_resized, text, (12, int(height * 0.035)), font, 1, (255, 255, 255), 2, cv2.LINE_AA)
        out.write(img_resized)
        
    

out.release()