In [24]:
import plotly.express as px
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from matplotlib.colors import to_hex, to_rgba

from pathlib import *
from typing import Union
import random
import sys
import os
import json
import pprint
import re
import openai
from dotenv import load_dotenv
load_dotenv()
import env
import utils

import importlib
importlib.reload(env)
importlib.reload(utils)

OPENAI_KEY = os.getenv('OPENAI_KEY')

setup_data_path = Path().resolve() / 'setup_data'

In [2]:
client = openai.OpenAI()
async_client = openai.AsyncOpenAI()

In [3]:
# Define util functions
def cubes(size, pos_x, pos_y, pos_z, color):
    # create points
    x, y, z = np.meshgrid(
        np.linspace(pos_x-size/2, pos_x+size/2, 2), 
        np.linspace(pos_y-size/2, pos_y+size/2, 2), 
        np.linspace(pos_z-size/2, pos_z+size/2, 2),
    )
    x = x.flatten()
    y = y.flatten()
    z = z.flatten()
    
    return go.Mesh3d(x=x, y=y, z=z, alphahull=1, flatshading=True, color=color, lighting={'diffuse': 0.1, 'specular': 2.0, 'roughness': 0.5})

def clamp_color(val):
    if val < 0:
        val = 0
    if val > 255:
        val = 255
    return val

def clamp_op(val):
    if val < 0:
        val = 0
    if val > 1:
        val = 1
    return val


def setup_env(agents, obstacles):
    '''setup_env
    Parameters
    ----------
    agents : dict[dict[list]]
        Dict of the agents - key: name, values: dict[init[x,y,z], path[[x,y,z]...], goal[x,y,z]]
    
    obstacles : list[dict]
        List of the obstacles, each obstacle should have the following keys: size, x, y, z
    Returns
    -------
    figure : plotly.graph_objects.Figure
    '''
    fig = go.Figure()

    for obstacle in obstacles:
        x,y,z = obstacle['coords']
        fig.add_trace(cubes(obstacle['size'], x, y, z, 'rgba(25,25,25,0.3)'))

    palette = px.colors.qualitative.Plotly
    if len(agents) > len(px.colors.qualitative.Plotly):
        palette = px.colors.qualitative.Alphabet
    for i, agent_key in enumerate(agents):
        init = agents[agent_key]['init']
        goal = agents[agent_key]['goal']

        if agents[agent_key]['path']:
            path = [init] + agents[agent_key]['path'] + [goal]
        else:
            path = agents[agent_key]['path']
        col = [c * 255 for c in to_rgba(palette[i])]
        cols = [
            [col[0]-10, col[1]-10, col[2]-10, col[3]-0.1],
            col,
            [col[0]+10, col[1]+10, col[2]+10, col[3]-0.1],
        ]
        cols = [[str(clamp_color(c)) for c in co[:-1]] + [str(clamp_op(co[3]))] for co in cols]
        colors = [f'rgba({",".join(cole)})' for cole in cols]
        fig.add_trace(go.Scatter3d(x=[init[0]], y=[init[1]], z=[init[2]], mode='markers', marker=dict(color=colors[0]), name=agent_key))
        fig.add_trace(go.Scatter3d(x=[p[0] for p in path], y=[p[1] for p in path], z=[p[2] for p in path], mode='lines', line=dict(color=colors[1]),  showlegend=False))
        fig.add_trace(go.Scatter3d(x=[goal[0]], y=[goal[1]], z=[goal[2]], mode='markers', marker=dict(color=colors[2]), showlegend=False))


        

    return fig

def generate_new_env(setup_data_path : Path, count = 30, grid_size = 10, num_obstacles = 10, obstacle_size = 1, agent_names = ['Alice', 'Bob', 'Chad', 'Dave'],seed = random.randrange(sys.maxsize)):
    random.seed(seed)
    random_coords = lambda: [random.randint(1,grid_size), random.randint(1,grid_size), random.randint(1,grid_size)]
    
    dirs = os.listdir(setup_data_path)
    dir_path = setup_data_path / f'Version_{len(dirs)}'
    os.mkdir(dir_path)
    

    config = {
        'grid_size' : grid_size,
        'num_obstacles' : num_obstacles,
        'obstacle_size' : obstacle_size,
        'agent_names' : agent_names,
        'seed' : seed, 
        'count' : count,
    }
    with open(dir_path / 'config.json', 'w') as config_file:
        json.dump(config, config_file, indent=4)
    
    
    for i in range(count):
        coords = []
        obstacles = []
        agents = {}
        for j in range(num_obstacles):
            obs_coord = random_coords()
            while obs_coord in coords:
                obs_coord = random_coords()
            coords.append(obs_coord)
            obstacles.append({'size' : obstacle_size, 'coords' : obs_coord})

        for name in agent_names:
            coord1 = random_coords()
            coord2 = random_coords()
            while coord1 in coords:
                coord1 = random_coords()
            coords.append(coord1)
            while coord2 in coords:
                coord2 = random_coords()
            coords.append(coord2)
            agents[name] = {'init' : coord2, 'path' : [], 'goal' : coord1}

        with open(dir_path / f'env_{i}.json', 'w') as env_file:
            data_str = pprint.pformat({'config' : config, 'obstacles' : obstacles, 'agents' : agents}, compact=True, indent=4).replace("'",'"')
            env_file.write(data_str)
            
    
    return dir_path

def load_envs(setup_path : Path, version : int):
    dir_path = setup_path / f'Version_{version}'
    files = os.listdir(dir_path)
    

    files.remove('config.json')
    envs = []
    for file_name in files:
        with open(dir_path / file_name) as file:
            envs.append(json.load(file))

    return envs

def default_prompt(env):
    agents = env['agents']
    obstacles = env['obstacles']
    grid_size = env['config']['grid_size']

    system_prompt = '''Plan paths for agents to navigate a 3D grid to reach their respective goals and avoid collision.
You are given:
1) a list of obstacle coordinates (x, y, z): locations of the obstacle grid cells, agents must avoid them.
2) a list of [([name], [init], [goal]) tuples], [init] and [goal] are 3D coordinates of the initial position and goal position of agent named [name].
3) a previous plan, if any, and why it failed. Analyze this information and re-plan a collision-free path.

How to plan a <path>:
1) Make sure each path does not touch any obstacle or another agent.
2) Create a set of points for each agent to go from their init coordinates to their goal coordinates.
3) Make sure the coordinates are exactly one step away from each other, in one direction. 
Example of a <path>: [(1,1,1), (1,1,2), (1,1,3),...]

Output Instruction:
First output PLAN
Then, for each agent, output the coordinates of their path, each agent on a new line. 
EXAMPLE: 'PLAN
NAME Alice PATH[<path>]
NAME Bob PATH[<path>]
NAME Chad PATH[<path>]
NAME Dave PATH[<path>]'
    '''
    has_feedback = False

    user_prompt_data = []
    obstacle_str = " ".join([f"({', '.join(map(str, obs['coords']))})" for obs in obstacles])
    agents_str = [f"Agent {agent} init: ({', '.join(map(str, agents[agent]['init']))}) goal: ({', '.join(map(str, agents[agent]['goal']))})" for agent in agents]
    user_info = f'''At the current step: Grid size: {' x '.join([str(grid_size)] * 3)}
Obstacles: {obstacle_str}
{agents_str[0]}
{agents_str[1]}
{agents_str[2]}
{agents_str[3]}'''
    user_prompt_data.append(user_info)
    
    feedback = '''
    Feedback: this previous plan failed:
    [past plan, omitted]
    Use this information to try again, update this plan so it has collision-free, strictly one-step-apart paths. '''
    if has_feedback:
        user_prompt_data.append(feedback)
    
    user_prompt = '\n'.join(user_prompt_data + ['Your reasoning and plan is'])

    return system_prompt , user_prompt

def parse_response(response):
    '''
    inputs - env, response
    output - path, feedback : list[list[str]] - where the first list corresponds to the agents
    '''
    plan_ind = response.find('PLAN')
    if plan_ind == -1:
        return [], 'PLAN statement not found'
    vals = response.split('\n')
    for i, l in enumerate(vals):
        if 'PLAN' in l:
            plan_ind = i
            break
        
    plan_vals = vals[i:i+5]
    info_extr = []
    feedback = []
    for action in plan_vals[1:]:
        extr = re.findall(r'NAME (\w+) PATH *(\[\((?:\d+, ?\d+, ?\d+)\)(?:, ?\((?:\d+, ?\d+, ?\d+)\))*\])', action)
        if not extr or len(extr[0]) != 2:
            info_extr.append({})
            feedback.append([f'The following line could not be parsed: {action}'])
            continue
        feedback.append([''])
        info_extr.append({extr[0][0] : extr[0][1]})
        points = re.findall(r'\(\d+, ?\d+, ?\d+\)',extr[0][1])
        path = []
        for point in points:
            path.append([int(c.strip()) for c in point.strip('()').split(',')])
        info_extr[-1][extr[0][0]] = path

    pth = {list(l.keys())[0] : list(l.values())[0] for l in info_extr}
    return pth, feedback
    
def validate_paths(self, paths, feedback):
    formatting = lambda x: ','.join([str(v) for v in x])
    obstacles = [val['coords'] for val in self.obstacles]
    paths_agents = np.swapaxes(np.array([paths[agent] for agent in paths], dtype=object),0,1)

    for k, agent_name in enumerate(paths):
        
        if not feedback[k]:
            continue
        path_feedback = []
        j = 1
        for l, point in enumerate(paths[agent_name]):
            if point in obstacles:
                path_feedback.append(f'Collision detected: {agent_name}: Path point ({formatting(point)}) collided with the following obstacle ({formatting(point)}).')
            if j == len(paths[agent_name]): continue
            next_point = paths[agent_name][j]
            diff = sum(abs(np.array(point) - np.array(next_point)))
            if diff == 0:
                path_feedback.append(f'Duplicate points in one path: {agent_name}: Path point: ({formatting(point)}) is the same as ({formatting(next_point)}).')
            if diff > 1:
                path_feedback.append(f'One or more of the coordinates in the following points were not exactly one unit apart: {agent_name}: Path points: [({formatting(point)}),({formatting(next_point)})]')
            j += 1
        feedback[k] = path_feedback
    return paths, feedback

In [11]:
utils.generate_new_env(setup_data_path, count=4)


PosixPath('/Users/mustafakhan/Library/Mobile Documents/com~apple~CloudDocs/My Stuff/Tech Shtuff/Code/OSR/Robotics/robot-collab-test/toy_experiment/setup_data/Version_0')

In [4]:
envs = utils.load_envs(setup_data_path, 'Version_0')

In [9]:
prompts = envs[0].default_prompt(feedback_plan={'feedback' : ['']})

In [22]:
print(prompts[0])

Plan paths for agents to navigate a 3D grid to reach their respective goals and avoid collision.
You are given:
1) a list of obstacle coordinates (x, y, z): locations of the obstacle grid cells, agents must avoid them.
2) a list of [([name], [init], [goal]) tuples], [init] and [goal] are 3D coordinates of the initial position and goal position of agent named [name].
3) a previous plan, if any, and why it failed. Analyze this information and re-plan a collision-free path.

How to plan a <path>:
1) Make sure each path does not touch any obstacle or another agent.
2) Create a set of points for each agent to go from their init coordinates to their goal coordinates.
3) Make sure the coordinates are exactly one step away from each other, in one direction. Note - you may only move in one direction at a time.
Example of a <path>: [(1,1,1), (1,1,2), (1,1,3),...]

Output Instruction:
First output PLAN
Then, for each agent, output the coordinates of their path, each agent on a new line. 
EXAMPLE:

In [12]:
response = '''To plan paths for the agents to navigate the 3D grid and reach their respective goals while avoiding obstacles and each other, we follow the steps below:

1. **Identify Agents and Goals**: 
   - Alice: Init (9, 3, 2) → Goal (6, 1, 3)
   - Bob: Init (2, 6, 4) → Goal (4, 3, 9)
   - Chad: Init (1, 10, 3) → Goal (3, 1, 5)
   - Dave: Init (1, 4, 2) → Goal (6, 8, 9)

2. **Avoid Obstacles**: 
   - List of obstacles must be avoided: 
     (5, 9, 10), (10, 3, 10), (4, 7, 9), (7, 6, 2), 
     (9, 3, 9), (10, 7, 9), (5, 7, 7), (7, 3, 9), 
     (6, 3, 10), (4, 8, 8)

3. **Plan Paths**:
   - Ensure each agent takes one step at a time in one direction (x, y, or z).
   - Paths should avoid collisions with obstacles and other agents at the same step.

4. **Create Paths**:

- **Alice**:
  Start: (9, 3, 2) → Goal: (6, 1, 3)
  Path: Move along the x-axis until x=6, adjust y to 1, adjust z to 3.
  - Path: [(9, 3, 2), (8, 3, 2), (7, 3, 2), (6, 3, 2), (6, 2, 2), (6, 1, 2), (6, 1, 3)]

- **Bob**:
  Start: (2, 6, 4) → Goal: (4, 3, 9)
  Path: Move along the x-axis to x=4, adjust y to 3, adjust z to 9.
  - Path: [(2, 6, 4), (3, 6, 4), (4, 6, 4), (4, 5, 4), (4, 4, 4), (4, 3, 4), (4, 3, 5), (4, 3, 6), (4, 3, 7), (4, 3, 8), (4, 3, 9)]

- **Chad**:
  Start: (1, 10, 3) → Goal: (3, 1, 5)
  Path: Move along x to x=3, adjust y to 1, adjust z to 5.
  - Path: [(1, 10, 3), (2, 10, 3), (3, 10, 3), (3, 9, 3), (3, 8, 3), (3, 7, 3), (3, 6, 3), (3, 5, 3), (3, 4, 3), (3, 3, 3), (3, 2, 3), (3, 1, 3), (3, 1, 4), (3, 1, 5)]

- **Dave**:
  Start: (1, 4, 2) → Goal: (6, 8, 9)
  Path: Gradually adjust x, y, z to reach the goal, avoiding collisions.
  - Path: [(1, 4, 2), (2, 4, 2), (3, 4, 2), (4, 4, 2), (5, 4, 2), (6, 4, 2), (6, 5, 2), (6, 6, 2), (6, 7, 2), (6, 8, 2), (6, 8, 3), (6, 8, 4), (6, 8, 5), (6, 8, 6), (6, 8, 7), (6, 8, 8), (6, 8, 9)]

5. **Ensure non-collision**: 
   - Confirm no paths intersect at the same time except at different z levels.
   - Flexibility can be added to path tweaks if necessary based on shared space or near misses.

**Final Output Plan**:

PLAN
NAME Alice PATH[(9, 3, 2), (8, 3, 2), (7, 3, 2), (6, 3, 2), (6, 2, 2), (6, 1, 2), (6, 1, 3)]
NAME Bob PATH[(2, 6, 4), (3, 6, 4), (4, 6, 4), (4, 5, 4), (4, 4, 4), (4, 3, 4), (4, 3, 5), (4, 3, 6), (4, 3, 7), (4, 3, 8), (4, 3, 9)]
NAME Chad PATH[(1, 10, 3), (2, 10, 3), (3, 10, 3), (3, 9, 3), (3, 8, 3), (3, 7, 3), (3, 6, 3), (3, 5, 3), (3, 4, 3), (3, 3, 3), (3, 2, 3), (3, 1, 3), (3, 1, 4), (3, 1, 5)]
NAME Dave PATH[(1, 4, 2), (2, 4, 2), (3, 4, 2), (4, 4, 2), (5, 4, 2), (6, 4, 2), (6, 5, 2), (6, 6, 2), (6, 7, 2), (6, 8, 2), (6, 8, 3), (6, 8, 4), (6, 8, 5), (6, 8, 6), (6, 8, 7), (6, 8, 8), (6, 8, 9)]'''

In [25]:
envs[0].parse_response(response)[0]

{'Alice': [[9, 3, 2],
  [8, 3, 2],
  [7, 3, 2],
  [6, 3, 2],
  [6, 2, 2],
  [6, 1, 2],
  [6, 1, 3]],
 'Bob': [[2, 6, 4],
  [3, 6, 4],
  [4, 6, 4],
  [4, 5, 4],
  [4, 4, 4],
  [4, 3, 4],
  [4, 3, 5],
  [4, 3, 6],
  [4, 3, 7],
  [4, 3, 8],
  [4, 3, 9]],
 'Chad': [[1, 10, 3],
  [2, 10, 3],
  [3, 10, 3],
  [3, 9, 3],
  [3, 8, 3],
  [3, 7, 3],
  [3, 6, 3],
  [3, 5, 3],
  [3, 4, 3],
  [3, 3, 3],
  [3, 2, 3],
  [3, 1, 3],
  [3, 1, 4],
  [3, 1, 5]],
 'Dave': [[1, 4, 2],
  [2, 4, 2],
  [3, 4, 2],
  [4, 4, 2],
  [5, 4, 2],
  [6, 4, 2],
  [6, 5, 2],
  [6, 6, 2],
  [6, 7, 2],
  [6, 8, 2],
  [6, 8, 3],
  [6, 8, 4],
  [6, 8, 5],
  [6, 8, 6],
  [6, 8, 7],
  [6, 8, 8],
  [6, 8, 9]]}

In [22]:
await utils.run_envs(envs, model='gpt-4o', async_client=async_client, max_tries=5, prompting_method='default')

[False, False, False, False]
<Task pending name='Task-24' coro=<env.run() running at /Users/mustafakhan/Library/Mobile Documents/com~apple~CloudDocs/My Stuff/Tech Shtuff/Code/OSR/Robotics/robot-collab-test/toy_experiment/env.py:215>>
All envs complete!
Querying_0 in Env: 0
Querying_0 in Env: 1
Querying_0 in Env: 2
Querying_0 in Env: 3
Env: 1
## Reasoning:
1. **Understand the Initial Setup**: The grid is 10x10x10. We have defined obstacle coordinates that the agents need to avoid while navigating from their initial positions to their goals. The agents are Alice, Bob, Chad, and Dave, each with a distinct starting and ending position.
   
2. **Ensure No Collisions with Obstacles and Other Agents**: The paths should be carefully planned to avoid any collisions with the obstacles or each other. Each move can only go in one of the x, y, or z directions per step.

3. **Plan Paths Individually**:
   - Alice: Move in the z-axis from 6 to 3, avoiding overlay with other agent plans.
   - Bob: Mov

In [21]:
envs[3].visualize(model='gpt-4o-mini')

In [68]:
await envs[1].run(model='gpt-4o-mini', async_client=async_client, max_tries=8, prompting_method='default')

Querying_0
We need to navigate four agents on a 3D grid while avoiding obstacles and ensuring they do not collide with each other. 

First, let's analyze the obstacles and the agents' movements based on their goals:

### Obstacles:
1. (6, 8, 4)
2. (1, 4, 8)
3. (4, 2, 1)
4. (5, 9, 10)
5. (1, 7, 2)
6. (6, 7, 6)
7. (9, 3, 3)
8. (6, 4, 3)
9. (8, 2, 1)
10. (9, 7, 1)

### Agents:
- **Alice**: Initially at (8, 7, 6) and needs to move to (8, 8, 3).
- **Bob**: Initially at (3, 5, 1) and needs to move to (4, 2, 10).
- **Chad**: Initially at (5, 7, 2) and needs to move to (8, 7, 2).
- **Dave**: Initially at (9, 2, 7) and needs to move to (8, 9, 1).

### Previous Planning Issues:
1. **Alice**: The previous plan likely faced issues due to the presence of obstacle (6, 7, 6) when moving in the y-direction or hitting obstacles in the z-direction.
2. **Bob**: Movement toward the goal (4, 2, 10) might intersect with obstacle (4, 2, 1).
3. **Chad**: Might have been blocked in the x or z direction by obst

In [26]:
envs[2].visualize(model='gpt-4o')

In [55]:
envs[0].visualize()

In [8]:
prompts = envs[0].default_prompt()
response = await async_client.chat.completions.create(messages=[
    {'role' : 'system', 'content' : prompts[0]},
    {'role' : 'user', 'content' : prompts[1]},
], model='gpt-4o-mini', )

In [10]:
response.choices[0].message.content# .count('PLAN')

"To plan the paths for the agents Alice, Bob, Chad, and Dave, I will analyze their initial and goal positions relative to the obstacles. We need to ensure that each path avoids obstacles while moving step by step in 3D space.\n\n1. **Planning Paths Sequentially**:\n   - Each agent starts by moving upward in the z-axis as their goals vary along the z-axis, except for Chad and Dave, who may need to navigate in all three dimensions.\n   - Checking each agent against obstacles will help identify potential movements.\n\n2. **Obstacle Avoidance**:\n   - We need to ensure that the agents' paths don't overlap each other, especially since Alice and Bob have similar z-coordinates initially.\n\n### Analyzing Each Agent:\n- **Agent Alice**:\n    - Start: (7, 4, 4) Goal: (7, 4, 10)\n    - Immediate vertical movement from z=4 to z=10. No horizontal movement is required since the x and y coordinates remain the same.\n    - Path: [(7, 4, 4), (7, 4, 5), (7, 4, 6), (7, 4, 7), (7, 4, 8), (7, 4, 9), (7, 4

In [22]:
path, feedback = envs[0].parse_response(response.choices[0].message.content)

['PLAN', 'NAME Alice PATH[(7, 4, 4), (7, 4, 5), (7, 4, 6), (7, 4, 7), (7, 4, 8), (7, 4, 9), (7, 4, 10)]', 'NAME Bob PATH[(7, 6, 10), (6, 6, 10), (5, 6, 10), (5, 5, 10), (5, 4, 10), (5, 3, 10)]', 'NAME Chad PATH[(1, 7, 3), (2, 7, 3), (3, 7, 3), (4, 7, 3), (5, 6, 3), (6, 5, 3), (7, 4, 3), (8, 3, 4), (9, 2, 5), (10, 1, 6), (10, 1, 7)]', 'NAME Dave PATH[(2, 10, 3), (2, 9, 4), (2, 8, 5), (2, 7, 6), (2, 6, 7), (3, 6, 7)]']
[{'Alice': [[7, 4, 4], [7, 4, 5], [7, 4, 6], [7, 4, 7], [7, 4, 8], [7, 4, 9], [7, 4, 10]]}, {'Bob': [[7, 6, 10], [6, 6, 10], [5, 6, 10], [5, 5, 10], [5, 4, 10], [5, 3, 10]]}, {'Chad': [[1, 7, 3], [2, 7, 3], [3, 7, 3], [4, 7, 3], [5, 6, 3], [6, 5, 3], [7, 4, 3], [8, 3, 4], [9, 2, 5], [10, 1, 6], [10, 1, 7]]}, {'Dave': [[2, 10, 3], [2, 9, 4], [2, 8, 5], [2, 7, 6], [2, 6, 7], [3, 6, 7]]}]


In [24]:
path, feedback = envs[0].validate_paths(path, feedback)

In [28]:
feedback

[['Collision detected: Alice: Path point (7,6,10) collided with Bob : (7,6,10)',
  'One or more of the coordinates in the following points were not exactly one unit apart: Alice: Path points: [(7,6,10),(7,4,5)]'],
 ['Collision detected: Bob: Path point (7,6,10) collided with Alice : (7,6,10)',
  'Collision detected: Bob: Path point (5,5,10) collided with the following obstacle (5,5,10).'],
 ['One or more of the coordinates in the following points were not exactly one unit apart: Chad: Path points: [(4,7,3),(5,6,3)]',
  'One or more of the coordinates in the following points were not exactly one unit apart: Chad: Path points: [(5,6,3),(6,5,3)]',
  'One or more of the coordinates in the following points were not exactly one unit apart: Chad: Path points: [(6,5,3),(7,4,3)]',
  'One or more of the coordinates in the following points were not exactly one unit apart: Chad: Path points: [(7,4,3),(8,3,4)]',
  'One or more of the coordinates in the following points were not exactly one unit apa

In [15]:
agents = envs[0]['agents']
obstacles = envs[0]['obstacles']
for val in parse_response(response.choices[0].message.content)[0]:
    agent_name = list(val.keys())[0]
    agents[agent_name]['path'] = val[agent_name]