In [4]:
import os
import json
import pprint
import re
from PIL import Image as PILImage
from Prompts import *
import pickle as pkl
import yaml
from utils.config import *
from utils.models import *
import copy
import subprocess
import networkx as nx
from xml.etree import ElementTree as ET

In [13]:
def validate_bt_nodes(tree,node_library):
    for elem in tree.iter():
        if elem.tag not in node_library:
            print(f'{elem.tag} not in node_library')
            return False
        if elem.tag == 'SubTree':
            continue
         
        #check for incorrect nodes
        for k,v in elem.attrib.items():
            if k not in node_library[elem.tag]:
                print(f'{k} not in node_library[{elem.tag}]')
                return False
            
        #check if all attributes are correct
        if len(node_library[elem.tag])!=len(elem.attrib.keys()):
            return False
        for v in node_library[elem.tag]:
            if v not in list(elem.attrib.keys()):
                return False
    return True

node_library = {
 'root':['main_tree_to_execute'],
 'BehaviorTree':['ID'],
 'include':['path'],
 'SetBlackboard':['output_key','value'],
 'Sequence':['name'],
 'Fallback':['name'],
 'SubTree':['ID'],
 'Inverter':[],
 'IsRobotVisible':['agent_id','distance'],
 'IsRobotNearby':['agent_id','distance'],
 'IsGoalReached':['agent_id'],
 'TimeExpiredCondition':['seconds','ts','only_once'],
 'RobotSays':['agent_id','message'],
 'RobotMoved':['agent_id'],
 'IsRobotBlocking':['agent_id','distance'],
 'UpdateGoal':['agent_id'],
 'MakeGesture':['agent_id','message'],
 'RegularNav':['agent_id','time_step'],
 'LookAtRobot':['agent_id'],
 'FollowRobot':['agent_id','time_step'],
 'AvoidRobot':['agent_id','time_step'],
 'GiveWaytoRobot':['agent_id','time_step'],
 'BlockRobot  ':['agent_id','time_step'],
}

In [20]:
tree_str = """<root main_tree_to_execute='GiveWayHumanTree'>
    <include path='BTRegularNav.xml'/>
    <BehaviorTree ID='GiveWayHumanTree'>        
        <Fallback name='VisibilityFallback'>
            <Sequence name='WaitAndGiveWaySequence'>
                <IsRobotVisible agent_id='{id}' distance='4.0'/>
                <MakeGesture agent_id='{id}' message='2'/>
                <GiveWaytoRobot agent_id='{id}' time_step='{dt}'/>
            </Sequence>
            <Sequence name='RegNav'>
                <MakeGesture agent_id='{id}' message='0'/>
                <SetBlackboard output_key='agentid' value='{id}'/>
                <SetBlackboard output_key='timestep' value='{dt}'/>
                <SubTree ID='RegularNavTree' id='agentid' dt='timestep'/>
            </Sequence>
        </Fallback>
    </BehaviorTree>
</root>"""

In [21]:
tree = ET.fromstring(tree_str)

In [22]:
validate_bt_nodes(tree,node_library)

id not in node_library[Fallback]


False

In [12]:
for elem in tree.iter():
   print(elem.tag)
   for k,v in elem.attrib.items():
    print(f"\t {k}:{v}")

root
	 main_tree_to_execute:GiveWayHumanTree
include
	 path:BTRegularNav.xml
BehaviorTree
	 ID:GiveWayHumanTree
Fallback
	 name:VisibilityFallback
Sequence
	 name:WaitAndGiveWaySequence
IsRobotVisible
	 agent_id:{id}
	 distance:4.0
MakeGesture
	 agent_id:{id}
	 message:2
GiveWaytoRobot
	 agent_id:{id}
	 time_step:{dt}
Sequence
	 name:RegNav
MakeGesture
	 agent_id:{id}
	 message:0
SetBlackboard
	 output_key:agentid
	 value:{id}
SetBlackboard
	 output_key:timestep
	 value:{dt}
SubTree
	 ID:RegularNavTree
	 id:agentid
	 dt:timestep


In [3]:
def find_closest_connected_node(g, node_id, node_type):
    # Check if the node_id exists in the graph
    if node_id not in g:
        return []

    start_pos = g.nodes[node_id]['pos']
    # Get all neighbors of the given node
    neighbors = g.neighbors(node_id)

    # Filter neighbors by the specified node_type
    closest_nodes = [n for n in neighbors if g.nodes[n].get('type') == node_type]

    distances = []
    for n in closest_nodes:
        if g.nodes[n]['type'] == node_type:
            other_pos = g.nodes[n]['pos']
            dist = np.linalg.norm(np.array(start_pos) - np.array(other_pos))
            distances.append((dist, n))
    distances = sorted(distances, key=lambda x: x[0])
    closest_nodes = [node_id for dist, node_id in distances]
    return closest_nodes

In [4]:
def getPathLength(graph,node1,node2):
    # return the shortest path using a* between the two nodes
    cost = 0
    try:
        path =  nx.astar_path(graph,node1,node2, weight = "cost")
        n1 = path[0]
        if len(path) > 1:
            for n in path[1:]:
                cost += graph.edges[n1,n]["cost"]
                n1 = n
    except nx.NetworkXNoPath:
        cost = -1
    return cost
    

In [5]:
def add_edge_wts(graph):
    def dist(g,n1,n2):
        (x1,y1) = g.nodes[n1]['pos']
        (x2,y2) = g.nodes[n2]['pos']
        return ((x1-x2)**2+(y1-y2)**2)**0.5        
    nx.set_edge_attributes(graph, {e: dist(graph,e[0],e[1]) for e in graph.edges()}, "cost")
    return graph     

In [7]:
with open(os.path.join('locations',LOCATION,'scene_graph.json'),'r') as f:
    scene_graph = json.load(f)

In [8]:
g = nx.node_link_graph(scene_graph)

In [9]:
find_closest_connected_node(g,'5','INTERSECTION')

['3']

In [13]:
g = add_edge_wts(g)

In [23]:
g.nodes['1A']['type']

'child'

In [17]:
g.edges[('1','2')]

{'cost': 166.14752480852673}

In [11]:
list(g.neighbors('1'))

['1A', '1B', '2']

In [7]:
list(g.edges(['1']))

[('1', '1A'), ('1', '1B'), ('1', '2')]

In [None]:
g