## Keep average distance

the agents goal is to position close to each others at a distance previously defined

challenges:
- deal with continuous space environment
- limited vision of an agent

In [20]:
from typing import Set
from ray.rllib.env.multi_agent_env import MultiAgentEnv
import random as rnd
from gymnasium.spaces import Discrete, Box, Dict, Tuple
from gymnasium.spaces.utils import flatten, flatten_space
import numpy as np
from IPython.display import clear_output
import math
from ipycanvas import Canvas, hold_canvas


class EnvironmentConfiguration: 
    def __init__(self, n_agents, target_distance, speed, max_steps=None):
        self.n_agents = n_agents
        self.target_distance = target_distance
        self.max_steps = max_steps
        self.speed = speed

class KeepTheDistance(MultiAgentEnv):

    canvas = None

    def __init__(self, config: EnvironmentConfiguration):
        assert config.n_agents == 2 # just base case implemented 
             
        self.n_agents = config.n_agents
        self.target_distance = config.target_distance
        self.max_steps = config.max_steps
        self.speed = config.speed
        
        self.agents_ids = ['agent-' + str(i) for i in range(self.n_agents)]
        self.agent_colors = {agent: self.rgb_to_hex(rnd.randint(0, 255), rnd.randint(0, 255), rnd.randint(0, 255)) for agent in self.agents_ids}
        self.observation_space = self.observation_space('agent-0')
        self.action_space = self.action_space("")

    def unflatten_observation_space(self, agent):
        distance_vector = Box(low=-np.inf, high=np.inf, shape=(2,1), dtype=np.float32)
        obs_space = Dict({"nbr-1": distance_vector})
        return obs_space

    def observation_space(self, agent):
        return flatten_space(self.unflatten_observation_space(agent))

    def action_space(self, agent):
        direction = Box(low=-1.0, high=1.0, shape=(2,1), dtype=np.float32)
        speed = Box(0.0, 1.0, dtype=np.float32)
        return flatten_space(Tuple([direction, speed]))
    
    def __get_random_point(self, max_x, max_y, min_x=0, min_y=0):
        return (rnd.randint(min_x, max_x-1), rnd.randint(min_y, max_y-1))
    
    def __get_observation(self, agent):
        nbr = self.__get_other_agents(agent)
        obs = {"nbr-1": self.__compute_distance_vector(agent, nbr[0])}
        return flatten(self.unflatten_observation_space(agent), obs)

    def rgb_to_hex(self, r, g, b):
        return f'#{r:02x}{g:02x}{b:02x}'

    def __compute_distance_vector(self, agent1, agent2):
        agent1_pos = self.agents_pos[agent1]
        agent2_pos = self.agents_pos[agent2]
        return (agent1_pos[0]-agent2_pos[0], agent1_pos[1]-agent2_pos[1])

    def __compute_distance(self, distance_vector):
        return math.sqrt(math.pow(distance_vector[0], 2) + math.pow(distance_vector[1], 2))

    def __compute_norm(self, vector):
        return math.sqrt(math.pow(vector[0], 2) + math.pow(vector[1], 2))
    
    def __compute_unit_vector(self, vector):
        norm = self.__compute_norm(vector)
        return [vector[0]/norm, vector[1]/norm]

    def __get_local_reward(self, agent):
         obs = [self.__compute_distance_vector(agent, self.__get_other_agents(agent)[0])]
         return -np.array([abs(self.__compute_distance(distance_vector) - self.target_distance) for distance_vector in obs]).sum()
    
    def __get_global_reward(self):
        return 0
    
    def __get_other_agents(self, agent):
        return [other for other in self.agents_ids if other != agent]

    def __update_agent_position(self, agent, action):
        unit_movement = self.__compute_unit_vector([action[0], action[1]])
        self.agents_pos[agent] = (self.agents_pos[agent][0] + unit_movement[0]*action[2]*self.config.speed, 
                                 self.agents_pos[agent][1] + unit_movement[1]*action[2]*self.config.speed)

    def reset(self, seed=None, options=None):
        self.steps = 0
        self.agents_pos = {agent: self.__get_random_point(max_x=100, max_y=100) for agent in self.agents_ids}
        return {agent: self.__get_observation(agent) for agent in self.agents_ids}, {}
     
    def step(self, actions):
        self.steps += 1
        observations, rewards, terminated, truncated, infos = {}, {}, {}, {}, {}

        for agent, action in actions.items():
            self.__update_agent_position(agent, action)

        for agent in actions.keys():
            observations[agent] = self.__get_observation(agent)
            rewards[agent] = self.__get_local_reward(agent) + self.__get_global_reward()
            terminated[agent] = False
            truncated[agent] = False
            infos[agent] = {}

        truncated['__all__'] = False
        if self.max_steps != None and self.steps == self.max_steps:
            terminated['__all__'] = True
        else:
            terminated['__all__'] = False

        return observations, rewards, terminated, truncated, infos
     
    def rgb_to_hex(self, r, g, b):
        return f'#{r:02x}{g:02x}{b:02x}'

    def render(self):
        width, height = 100, 100
        if self.canvas is None:
            self.canvas = Canvas()
            display(self.canvas)
        
        with hold_canvas():
            agent_size = 3
            top_left = (0.0,0.0)
            bottom_right = (100.0, 100.0)
            self.canvas.clear()

            for agent in self.agents_ids:
                raw_pos = self.agents_pos[agent]
                color = self.agent_colors[agent]
                
                agent_pos_in_frame = [((raw_pos[0]-top_left[0])/(bottom_right[0]-top_left[0]))*width,
                            ((raw_pos[1]-top_left[1])/(bottom_right[1]-top_left[1]))*height,]

                self.canvas.fill_style = color
                self.canvas.fill_circle(
                    agent_pos_in_frame[0],
                    agent_pos_in_frame[1],
                    agent_size
                )

    def get_agent_ids(self):
       return self.agents

In [22]:
import time




env_config = EnvironmentConfiguration(n_agents=2, target_distance=5, max_steps=100, speed=2)
env = KeepTheDistance(env_config)
obs, _ = env.reset()
env.render()

for i in range(10):

    actions = {agent: np.array([1.0, 1.0, 1.0], np.float32) for agent in obs.keys()}
    #actions = {agent: env.action_space.sample() for agent in obs.keys()}
    obs, _, _, _, _ = env.step(actions)
    env.render()
    time.sleep(0.5)


Canvas()

In [27]:
from tkinter import Tk, Canvas, PhotoImage, mainloop
root = Tk()
root.title("Canvas Pixel Painting")
width, height = 500, 500

# Create a canvas widget
canvas = Canvas(root, width=width, height=height)
canvas.pack()

x, y = (200, 100)
w = 10
canvas.create_oval(x-w/2, y-w/2, x+w/2, y+w/2, outline="black", fill="red")
x, y = (300, 100)
canvas.delete("all")
canvas.create_rectangle( (x, y)*2 )
x, y = (400, 100)
canvas.create_rectangle( (x, y)*2 )
#canvas.put("red", (100, 200))
root.mainloop()