In [64]:
from __future__ import annotations

import json
from collections import deque


import toolpy as tp
from toolpy.integrations import groq

In [65]:
groq_interface = groq.GroqInterface(model=groq.GroqModel.LLAMA3_70B, n_retry=5)

registry = tp.llm.LLMRegistry()
registry.registry(model_name="llama3-70b", interface=groq_interface, default=True)

In [152]:
class ObjectStateUpdater(tp.BasicTool):
    _description = '''Updates a object state'''

    _system_message = '''You are a object state updater that outputs in JSON.
The JSON object must use the schema: {'new_state':'str'}

Please use a valid JSON format.
'''

    _base_prompt = '''Update the following object state:

Object name: {name}
Object place: {place}
Object description: {description}
Object current state: {state}
Actor: {actor}
Action on the object: {action}

The new state must describe only the object, not where it is or what is around it.
'''
    
    _input_description = {'name':'object name',
                          'place':'place where the object current are',
                          'description':'object description',
                          'state':'object current state',
                          'actor':'actor acting on the object',
                          'action':'action on the object'}

    _return_description = {'new_state':'new_state for the object'}

    def __init__(self, model_name:str | None = None) -> None:

        super().__init__(self._description, self._input_description, self._base_prompt, 
                         self._return_description, self._system_message,  model_name,
                         json_mode=True)


In [134]:
class ActionTargetSelector(tp.BasicTool):
    _description = '''Selects the objects that an action is changing'''

    _system_message = '''You are object selector that outputs in JSON.
The JSON object must use the schema: {'objects':['str', 'str', ...]}

Please use a valid JSON format.
'''

    _base_prompt = '''{environment_tree}
    
Considering the environment hierarchy above, select the objects that the following action is affecting, if any. The actor can affect itself state. 

Actor: {actor}
Place: {place}
Action: {action}
'''
    
    _input_description = {'environment_tree':'tree of elements in the environment',
                          'actor':'actor that is doing the action',
                          'place':'place where the actor is',
                          'action':'action that is beeing made in the environment'
    }

    _return_description = {'objects':'list of affected objects'}

    def __init__(self, model_name:str | None = None) -> None:

        super().__init__(self._description, self._input_description, self._base_prompt, 
                         self._return_description, self._system_message,  model_name,
                         json_mode=True)


In [56]:
def get_object_room(environment_tree, object_name):
    stack = deque()
    for room_name in environment_tree:
        if "object" in environment_tree[room_name]:
            if environment_tree[room_name]["name"] == object_name:
                return environment_tree[room_name]
        else:
            stack.append([room_name, environment_tree[room_name]])

    origin_place : list = None
    target_place = None
    while len(stack) > 0 and (origin_place is None or target_place is None):
        room = stack.pop()
        room_name = room[0]
        sub_tree = room[1]

        if isinstance(sub_tree, list):
            for object in sub_tree:
                if object["name"] == object_name:
                    return {room_name: sub_tree}

        else:
            for subroom_name in sub_tree:
                stack.append([subroom_name, sub_tree[subroom_name]])

In [129]:
def get_object(environment_tree, object_name):
    stack = deque()
    for room_name in environment_tree:
        if "object" in environment_tree[room_name]:
            if environment_tree[room_name]["name"] == object_name:
                return environment_tree[room_name]
        else:
            stack.append([room_name, environment_tree[room_name]])

    origin_place : list = None
    target_place = None
    while len(stack) > 0 and (origin_place is None or target_place is None):
        room = stack.pop()
        room_name = room[0]
        sub_tree = room[1]

        if isinstance(sub_tree, list):
            for object in sub_tree:
                if object["name"] == object_name:
                    return object

        else:
            for subroom_name in sub_tree:
                stack.append([subroom_name, sub_tree[subroom_name]])

In [58]:
import warnings

class ObjectMover(tp.Tool):
    _description = "Moves a object from one room to another"
    _input_description = {'object':'name of the object beeing moved',
                          'original_place':'original place of the object',
                          'target_place':'new place of the object',
                          'environment_tree':'tree of the environment'}

    _return_description = {'environment_tree':'new tree of the environment'}

    def __init__(self) -> None:
        super().__init__(self._description, self._input_description)
    
    def _execute(self, query: dict[str, str] | None, context: str | None=None):
        object_name = query['object']
        original_place_name = query['original_place']
        target_place_name = query['target_place']
        environment_tree = query['environment_tree']

        if original_place_name == target_place_name:
            return {'environment_tree':environment_tree}, self._return_description

        #Search for the rooms:
        stack = deque()
        for room_name in environment_tree:
            if "object" in environment_tree[room_name]:
                continue

            stack.append([room_name, environment_tree[room_name]])

        origin_place : list = None
        target_place = None
        while len(stack) > 0 and (origin_place is None or target_place is None):
            room = stack.pop()
            room_name = room[0]
            sub_tree = room[1]

            if room_name == original_place_name:
                origin_place = sub_tree
            if room_name == target_place_name:
                target_place = sub_tree

            if isinstance(sub_tree, list):
                continue

            for subroom_name in sub_tree:
                stack.append([subroom_name, sub_tree[subroom_name]])

        for place in [origin_place, target_place]:
            if place is None:
                warnings.warn(f"Place {place} not found. Aborting change.")
                return
            
        for index, origin_object in enumerate(origin_place):
            if origin_object["name"] == object_name:
                break
        else:
            origin_object = None

        if origin_object is None:
            warnings.warn(f"Origin object {object_name} not found in {original_place_name}. Aborting change.")
            return
        
        del origin_place[index]

        target_place.append(origin_object)

        return {'environment_tree':environment_tree}, self._return_description 


In [213]:
class DetectObjectMovement(tp.BasicTool):
    _description = '''Detects if some object in moving in the environment'''

    _system_message = '''You are object movement detector that outputs in JSON.
The JSON object must use the schema: {'objects':[{'name':'str', 'reasoning':'str', 'origin_place':'str', 'target_place', 'str', 'movement duration':'float'}]}

Where reasoning is the reason why the object is beeing moved.

Please use a valid JSON format.
'''

    _base_prompt = '''{environment_tree}
    
Considering the environment hierarchy above, detects if any object in moving to another place.

The state or action of the object must clearly give a reason for it being moved. DO NOT ASSUME any motives and movements that are not in the provided environment.

If no objects are being moved, return the 'objects' list as empty. 
'''
    
    _input_description = {'environment_tree':'tree of elements in the environment'}

    _return_description = {'objects':'objects beeing moved, with name, origin place, target place and movement duration'}

    def __init__(self, model_name:str | None = None) -> None:

        super().__init__(self._description, self._input_description, self._base_prompt, 
                         self._return_description, self._system_message,  model_name,
                         json_mode=True)


In [253]:
class ObservationGenerator(tp.BasicTool):
    _description = '''Generates the observation of a agent'''

    _system_message = '''You are observation generator that outputs in JSON.
The JSON object must use the schema: {'observation':'str'}

Please use a valid JSON format.
'''

    _base_prompt = '''{environment_tree}
    
Considering the environment hierarchy above, generates the observation for the following agent:

Agent name: {agent}
Agent state: {state}
Agent place: {place}
Agent action: {action}

The observation must describe what the agent can see, hear and feel sensorially. Use the third person singular.
'''
    
    _input_description = {'environment_tree':'tree of elements in the environment',
                          'agent':'actor that is doing the action',
                          'state':'current agent state',
                          'place':'place where the agent is',
                          'action':'action that the agent is doing'
    }

    _return_description = {'observation':'agent observation'}

    def __init__(self, model_name:str | None = None) -> None:

        super().__init__(self._description, self._input_description, self._base_prompt, 
                         self._return_description, self._system_message,  model_name,
                         json_mode=True)


In [271]:
alex_house = {
    "Alex's House": {
        "Living Room": [
            {"name": "sofa", 'type':'inanimate_object', 
             "description": "A comfortable, modern sofa perfect for relaxing or hosting friends.",
             "state":"idle and clean"},
            {"name": "coffee table", 'type':'inanimate_object', 
             "description": "A sleek coffee table with a few programming books and the latest tech magazines.",
             "state":"idle and clean"},
            {"name": "television", 'type':'inanimate_object', 
             "description": "A large smart TV, often used to watch tech talks or unwind with a show.",
             "state":"off and clean"},
            {"name": "bookshelf", 'type':'inanimate_object', 
             "description": "A bookshelf filled with a mix of classic literature and technical manuals.",
             "state":"idle and clean"},
            {"name": "rug", 'type':'inanimate_object', 
             "description": "A soft rug that adds a touch of warmth to the living room.",
             "state":"idle and clean"}
        ],
        "Kitchen": [
            {"name": "refrigerator", 'type':'inanimate_object', 
             "description": "A high-tech refrigerator that keeps Alex's meals and snacks fresh.",
             "state":"on and clean"},
            {"name": "stove", 'type':'inanimate_object', 
             "description": "A modern stove where Alex occasionally experiments with new recipes.",
             "state":"off and clean"},
            {"name": "microwave", 'type':'inanimate_object', 
             "description": "A quick and efficient microwave for heating up meals.",
             "state":"off and clean"},
            {"name": "kitchen sink", 'type':'inanimate_object', 
             "description": "A stainless steel sink",
             "state":"idle and clean"},
            {"name": "dishwasher", 'type':'inanimate_object', 
             "description": "A quiet dishwasher that handles the post-dinner cleanup.",
             "state":"off and clean"},
            {"name": "dining table", 'type':'inanimate_object', 
             "description": "A minimalist dining table where Alex enjoys breakfast and dinner.",
             "state":"idle and clean"},
            {"name": "chairs", 'type':'inanimate_object', 
             "description": "Comfortable chairs around the dining table, perfect for meals or working on a laptop.",
             "state":"idle and clean"}
        ],
        "Bedroom": [
            {"name": "bed", 'type':'inanimate_object', 
             "description": "A cozy bed where Alex gets his much-needed rest.",
             "state":"idle and clean"},
            {"name": "dresser", 'type':'inanimate_object', 
             "description": "A dresser with neatly organized clothes, reflecting Alex's organized nature.",
             "state":"idle and clean"},
            {"name": "nightstand", 'type':'inanimate_object', 
             "description": "A nightstand with a lamp and a few personal items.",
             "state":"idle and clean"},
            {"name": "closet", 'type':'inanimate_object', 
             "description": "A spacious closet with casual and work clothes.",
             "state":"idle and clean"},
            {"name": "desk", 'type':'inanimate_object', 
             "description": "A small desk with a notebook and pens, used for jotting down late-night ideas.",
             "state":"idle and clean"},
            {"name": "lamp", 'type':'inanimate_object', 
             "description": "A bedside lamp for reading before bed.",
             "state":"off and clean"}
        ],
        "Bathroom": [
            {"name": "shower", 'type':'inanimate_object', 
             "description": "A modern shower with various settings for a refreshing start to the day.",
             "state":"off and clean"},
            {"name": "bathroom sink", 'type':'inanimate_object', 
             "description": "A clean sink with essential toiletries.",
             "state":"idle and clean"},
            {"name": "toilet", 'type':'inanimate_object', 
             "description": "A simple, modern toilet.",
             "state":"idle and clean"},
            {"name": "mirror", 'type':'inanimate_object', 
             "description": "A mirror above the sink, used for daily grooming.",
             "state":"idle and clean"},
            {"name": "cabinet", 'type':'inanimate_object', 
             "description": "A cabinet filled with towels and personal care items.",
             "state":"idle and clean"}
        ],
        "Home Office": [
            {"name": "desk", 'type':'inanimate_object', 
             "description": "A large desk with multiple monitors and a laptop, where Alex spends most of his working hours.",
             "state":"idle and clean"},
            {"name": "computer", 'type':'inanimate_object', 
             "description": "A powerful computer equipped with the latest software development tools.",
             "state":"off and clean"},
            {"name": "office chair", 'type':'inanimate_object', 
             "description": "An ergonomic office chair to support long coding sessions.",
             "state":"idle and clean"},
            {"name": "bookshelf", 'type':'inanimate_object', 
             "description": "A bookshelf filled with programming books and reference materials.",
             "state":"idle and clean"},
            {"name": "printer", 'type':'inanimate_object',
              "description": "A reliable printer for printing documents and designs.",
              "state":"off and clean"}
        ]
    }
}

alex = {"name":"Alex", "type":"agent", "description":"human", 'state':'idle, standing in the middle of the room.'}

alex_house["Alex's House"]["Bedroom"].append(alex)

agent_names = ["Alex"]



In [259]:
class MovementActionDetector(tp.BasicTool):
    _description = '''Detects if a action is a movement.'''

    _system_message = '''You are movement action detector that outputs in JSON.
The JSON object must use the schema: {'reasoning':'str', 'is_movement':'bool', 'from':'str','to':'str','movement_duration':'float '}

Where reasoning is the reason why the object is beeing moved. The movement_duration must be in seconds.

Please use a valid JSON format.
'''

    _base_prompt = '''{environment_tree}   

Considering the above environment tree, detect if the following action is a movement of the agent. 
The action must clearly give a reason for it being a movement. The from and to places must be in the environment tree.

DO NOT ASSUME any motives and movements.

Action agent: {agent}
Action: {action}
'''
    
    _input_description = {'agent':'who is doing the action',
                          'environment_tree': 'tree of elements in the environment',
                            'action':'agent action'}

    _return_description = {'reasoning':'why this action is a movement', 
                           'is_movement':'bool indicating if the action is a movement', 
                           'from':'from where the object is coming',
                           'to':'to where the object is going',
                           'movement_duration':'duration of the movement in seconds'}

    def __init__(self, model_name:str | None = None) -> None:

        super().__init__(self._description, self._input_description, self._base_prompt, 
                         self._return_description, self._system_message,  model_name,
                         json_mode=True)


In [263]:
class ActionDurationEstimator(tp.BasicTool):
    _description = '''Estimates the duration of a action.'''

    _system_message = '''You are action duration estimator that outputs in JSON.
The JSON object must use the schema: {'reasoning':'str', 'action_duration':'float '}

Where reasoning is the reason why the action takes this time. The action_duration must be in seconds.

Please use a valid JSON format.
'''

    _base_prompt = '''{environment_tree}   

Considering the above environment tree, estimate the following action duration. 

DO NOT ASSUME ANYTHING THAT IS NOT IN THE ENVIRONMENT OR AGENT ACTION.

Action agent: {agent}
Action: {action}
'''
    
    _input_description = {'agent':'who is doing the action',
                          'environment_tree': 'tree of elements in the environment',
                            'action':'agent action'}

    _return_description = {'reasoning':'why the action takes this time', 
                           'action_duration':'duration of the action in seconds'}

    def __init__(self, model_name:str | None = None) -> None:

        super().__init__(self._description, self._input_description, self._base_prompt, 
                         self._return_description, self._system_message,  model_name,
                         json_mode=True)


In [272]:
class Environment:
    def __init__(self, initial_environment_tree, agent_names:list[str], model_name:str|None = None):
        self._environment_tree = initial_environment_tree
        self._agent_names = agent_names

        self._action_target_selector = ActionTargetSelector(model_name)
        self._object_state_updater = ObjectStateUpdater(model_name)
        self._movement_detector = MovementActionDetector(model_name)
        self._object_mover = ObjectMover()
        self._observation_generator = ObservationGenerator(model_name)
        self._action_duration_estimator = ActionDurationEstimator(model_name)

    def step(self, agents_action:dict[str,str]) -> dict[str,dict[str,str]]:
        #Actions
        for agent in agents_action:
            environment = json.dumps(self._environment_tree)

            action = agents_action[agent]
            place = next(iter(get_object_room(self._environment_tree, agent).keys()))

            query = {'environment_tree':environment,
                    'actor':agent,
                    'place':place,
                    'action':action}
            target_result, _ = self._action_target_selector(query)

            for object_name in target_result["objects"]:
                place = next(iter(get_object_room(self._environment_tree, object_name).keys()))
                object = get_object(self._environment_tree, object_name)

                query = {'name':object_name,
                        'place':place,
                        'description':object["description"],
                        'state':object["state"],
                        'actor':agent,
                        'action':action}
                
                state_update, _ = self._object_state_updater(query)

                object["state"] = state_update["new_state"]

                environment = json.dumps(self._environment_tree)

        #Start observations
        observations = {}
        for agent in self._agent_names:
            observations[agent] = {}

        #Movements
        for agent in agents_action:
            action = agents_action[agent]
            
            query = {'agent':agent,"environment_tree":environment, 'action':action}
            detected_movement, _ = self._movement_detector(query)

            if detected_movement['is_movement']:
                query = {'object':agent,
                          'original_place':detected_movement["from"],
                          'target_place':detected_movement["to"],
                          'environment_tree':self._environment_tree}

                new_env, _ = self._object_mover(query)
                self._environment_tree = new_env["environment_tree"]

                observations[agent]["duration"] = detected_movement["movement_duration"]
            else:
                action_duration_result, _ = self._action_duration_estimator(query)
                observations[agent]["duration"]= action_duration_result["action_duration"]

        environment = json.dumps(self._environment_tree)

        #States and places
        for agent in self._agent_names:
            agent_room = get_object_room(self._environment_tree, agent)
            agent_object = get_object(agent_room, agent)
            
            observations[agent]["state"] = agent_object["state"]
            observations[agent]["place"] = next(iter(agent_room.keys()))

        #Observations
        for agent in self._agent_names:
            action = 'No action'
            if agent in agents_action:
                action = agents_action[agent]

            query = {'environment_tree':environment,
                    'agent':agent,
                    'state':observations[agent]["state"],
                    'place':observations[agent]["place"],
                    'action':action}

            observation_result, _ = self._observation_generator(query)

            observations[agent]["observation"] = observation_result["observation"]

        return observations

In [273]:
agent_names = ["Alex"]
env = Environment(alex_house, agent_names)

In [274]:
actions = ["turn on the lamp", "go to the kitchen"]
observations = []

for step, action in enumerate(actions):
    agent_actions = {"Alex":action}

    obsevation = env.step(agent_actions)
    observations.append(obsevation)

In [275]:
observations

[{'Alex': {'duration': 1.0,
   'state': 'idle, standing in the middle of the room.',
   'place': 'Bedroom',
   'observation': "Alex sees the dresser, nightstand, closet, and desk in the bedroom, and notices the soft rug under his feet. He hears nothing unusual. He feels the gentle glow of the lamp on his skin as he turns it on, and senses the room's ambient temperature and humidity."}},
 {'Alex': {'duration': 10.0,
   'state': 'walking',
   'place': 'Kitchen',
   'observation': 'Alex sees the refrigerator, stove, microwave, kitchen sink, dishwasher, and dining table in the kitchen. He hears the hum of the refrigerator. He feels the soft rug under his feet as he walks into the room.'}}]

# Not used

In [None]:
class ActorSelector(tp.BasicTool):
    _description = '''Selects a actor to do an action'''

    _system_message = '''You are an actor selector that outputs in JSON.
The JSON object must use the schema: {'thought':'str', 'actor_name':'str', 'actor_place':'str', 'actor_action:'str'}

Where:
thought: why this object will do a action.
actor_name: what object will act
actor_place: where the object is
actor_action: what the object will do

Please use a valid JSON format.
'''

    _base_prompt = '''{environment_tree}
    
Considering the environment hierarchy above, selects a object to do a action (change your own state or act on surrounding objects).
If no object will perform action, return all elements as '' ('actor_name', 'actor_place', ...).
Don't select objects of type 'agent'.
The object must have some reason in its state or nearby objects to perform the action.
Strictly consider only current objects and states. Do not consider actions or motives outside the environment at the current moment.
'''
    
    _input_description = {'environment_tree':'tree of elements in the environment'}

    _return_description = {'actor_name':'actor that will do a action',
                           'actor_place':'place where the actor is',
                           'actor_action':'action of the actor'}

    def __init__(self, model_name:str | None = None) -> None:

        super().__init__(self._description, self._input_description, self._base_prompt, 
                         self._return_description, self._system_message,  model_name,
                         json_mode=True)


In [None]:
actor_selector = ActorSelector()
environment = json.dumps(alex_house)
query = {'environment_tree':environment}

actor_result, _ = actor_selector(query)

In [None]:
actor_result

{'thought': 'The coffee table is idle and clean, and the programming books on it are gathering dust. It needs to be dusted.',
 'actor_name': 'dining table',
 'actor_place': 'Kitchen',
 'actor_action': 'dusted the coffee table'}