In [1]:
from __future__ import print_function
from random import randint
from itertools import product, starmap
# product is essentially combinations without repetitions
# starmap is an iterator that applies a function over a set of tuples (generated by product)

import numpy as np

In [2]:
GRID = np.zeros((10,10), dtype=str)
GRID

array([['', '', '', '', '', '', '', '', '', ''],
       ['', '', '', '', '', '', '', '', '', ''],
       ['', '', '', '', '', '', '', '', '', ''],
       ['', '', '', '', '', '', '', '', '', ''],
       ['', '', '', '', '', '', '', '', '', ''],
       ['', '', '', '', '', '', '', '', '', ''],
       ['', '', '', '', '', '', '', '', '', ''],
       ['', '', '', '', '', '', '', '', '', ''],
       ['', '', '', '', '', '', '', '', '', ''],
       ['', '', '', '', '', '', '', '', '', '']], dtype='|S1')

In [5]:
GRID.shape

(10, 10)

In [14]:
TREE_SYM = "T"
DRONE_SYM = "D"
SUBJ_SYM = "S"
GOAL_SYM = "G"
# these two also include an index
PED_SYM = "P"  
WALL_SYM = "W"

print("All the obstacles add a weight to each tile")

In [3]:
def mhd(tileA, tileB):
    """Computes the Manhattan distance between two (x,y) tiles"""
    return abs(tileA[0] - tileB[0]) + abs(tileA[1] - tileB[1])

def place_goal(shape, min_dist=5):
    """Places the Goal and Subject tile in a grid so that they are at a minimum distance"""
    grid = np.zeros(shape, dtype=str)
    max_x, max_y = [p - 1 for p in shape]
    subj_pos, goal_pos = (0,0), (0,0)
    
    while mhd(subj_pos, goal_pos) < min_dist:
        subj_pos = (randint(0, max_x), randint(0, max_y))
        goal_pos = (randint(0, max_x), randint(0, max_y))
        
    grid[subj_pos] = "S"
    grid[goal_pos] = "G"
    
    return (grid, subj_pos, goal_pos)

In [4]:
g1, subj_pos, goal_pos = place_goal((5,5), 6)
g1

array([['', '', '', '', ''],
       ['', '', '', '', 'G'],
       ['', '', '', '', ''],
       ['', '', '', '', ''],
       ['', 'S', '', '', '']], dtype='|S1')

In [5]:
def neighbors(shape, pos, rad=1):
    """Gets the set of surrounding tiles given a tile and its set radius"""
    max_X, max_Y = shape
    
    jumps = [0] + [-x for x in range(1, rad+1)] + [x for x in range(1, rad+1)]
    area = list(starmap(lambda a,b: (pos[0] + a, pos[1] + b), product(jumps, jumps)))
    
    return [p for p in area[1:] if (p[0] in range(max_X) and p[1] in range(max_Y))]

def place_drone(grid, subj_pos, dist=1):
    """Places the drone at a certain distance from the subject"""
    block = neighbors(grid.shape, subj_pos, dist)
    dr_pos = block[randint(0, len(block) - 1)]
    grid[dr_pos] = "D"
    return (grid, dr_pos)

In [6]:
g1, drone_pos = place_drone(g1, subj_pos, 1)
g1

array([['', '', '', '', ''],
       ['', '', '', '', 'G'],
       ['', '', '', '', ''],
       ['', '', '', '', ''],
       ['', 'S', 'D', '', '']], dtype='|S1')

In [19]:
X = np.where(g1 == '')
X

(array([0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 4, 4, 4]),
 array([0, 1, 2, 3, 4, 0, 1, 2, 3, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 3, 4]))

In [29]:
def dist(p1, p2):
    return abs(p1[0] - p2[0]) + abs(p1[1] - p2[1])

def group_by_adj(ls):
    """Keep only adjacent cells"""
    groups = []
    for el in ls:
        grp = [x for x in ls if dist(x, el) <= 1]
        if len(grp) > 1:  # it also contains the element itself (sorted)
            groups.append(grp)

rows, cols = np.where(g1 == '')
cells = zip(rows, cols)
max_idx = max(max(rows), max(cols))

paths = []

for i in range(max_idx):
    # parse col i
    col_paths = group_by_adj([(x,y) for (x,y) in cells if y==i])
    # parse row i
    row_paths = group_by_adj([(x,y) for (x,y) in cells if x==i])
    paths.append(col_paths)
    paths.append(row_paths)
    
paths

[None, None, None, None, None, None, None, None]

In [126]:
def grid_range(init_pos, end_pos):
    """
    Computes the range of the pedestrian path or wall width.
    :param init_pos:
    :param end_pos:
    :returns: A tuple (line, (init_x, init_y), (end_x, end_y)) where 'line'
              is a tuple (X, -1) or (-1, X) being 'X' the common coordinate
    """
    if init_pos[0] == end_pos[0]:
        # line'd be a row
        line, init, end = init_pos[0], init_pos[1], end_pos[1]
    elif init_pos[1] == end_pos[1]:
        # line'd be a col
        line, init, end = init_pos[1], init_pos[0], end_pos[0]
    return line, init, end

def choose_paths(empties, max_length):
    """
    Returns a path tuple whose length is at maximum, max_length.
    :returns: A tuple (line, init, end)
    """
    paths = [x for x in ___ if max_length >= len(x)]
    chosen = paths[randint(0, len(paths) - 1)]
    
    if chosen[0][0] == chosen[-1][0]:
        line = (chosen[0][0], -1)
    else:
        line = (-1, chosen[0][1])
    returns (line, chosen[0], chosen[-1])
    
def place_peds(grid, no_peds, subj_pos, drone_pos, goal_pos):
    """
    Place a limited number of pedestrians in the grid. Subject, Drone and Goal poses cannot
    be overwritten by the pedestrians
    """
    empties = np.argwhere(grid == '')
    
    peds = dict()
    for p in range(no_peds):
        nm = "P{}".format(p)
        peds[nm]["pos"] = pos = choose_path(empties, max_length=3) # (line, init, end)
        peds[nm]["loop"] = bool(randint(0,1))
        
        # Writes "PX" in the init and start of the ped path
        grid[pos[1]] = grid[pos[2]] = nm
        # Writes "·" in the points between
        if line[0] == -1:
            grid[line[1], init[0]:end[0]] = "·"
        else:
            grid[line[0], init[1]:end[1]] = "·"
        
        
        

In [None]:
def place_objs(grid, probs, max_objs):
    """
    Place a limited number of objects, depending on the probabilities of each object type.
    :param grid:
    :param probs: A dictionary holding probabilities for Trees,Walls,Doors
    :param max_objs:
    """
    trees, doors, walls = dict(), dict(), dict()
    
    for tn in range(int(max_objs * probs["tree"])):
        empties = np.argwhere(grid == '')
        nm = "T{}".format(tn)
        # places the tree in an empty cell
        trees[nm]["pos"] = empties[randint(0, len(empties) - 1)]
        grid[trees[nm]["pos"]] = "T"
        
    for dn in range(int(max_objs * probs["door"])):
        empties = np.argwhere(grid == '')
        nm = "D{}".format(dn)
        # places the door in an empty cell with a random direction (only two: North/South and West/East)
        doors[nm]["pos"] = empties[randint(0, len(empties) - 1)]
        doors[nm]["dir"] = ["N", "W"][randint(0,1)]
        grid[doors[nm]["pos"]] = "D"
        
    for wn in range(int(max_objs * probs["wall"])):
        empties = np.argwhere(grid == '')
        nm = "W{}".format(tn)
        walls[nm]["pos"] = chosen_path(empties, max_length=2)  # returns (line, init, end)
        
        # Writes "WX" in the whole wall path
        grid[pos[1]] = grid[pos[2]] = nm
        if line[0] == -1:
            grid[line[1], init[0]:end[0]] = "W"  
            # important: check pathfinder if the char (W) is changed
        else:
            grid[line[0], init[1]:end[1]] = "W"
    

In [None]:
obj_probs = {"tree": 0.4, "door": 0.0, "wall": 0.2}
place_objs(g1, obj_probs, 5)

---
## A* pathfinder

  + The only thing the path finder cannot do is to jump walls or cross doors in the wrong direction.
  + It cannot cross the Drone cell `[D]` neither.
  + It can collide with pedestrian paths and avoid trees by rounding them.
  + It must start at the Subject cell `[S]` and end in the Goal cell `[G]`.
  + **We can only move horizontally or vertically one cell at a time**.
  
A future goal would be to modify the speed and stop duration for one or more points in the path. **This could also be placed randomly!**

In [23]:
class PathNode:
    def __init__(self, parent=None, position=None, weight=0):
        self.parent = parent
        self.pos = position
        self.weight = weight

        self.g = 0
        self.h = 0
        self.f = 0

    def __eq__(self, other):
        return self.position == other.position

In [None]:
class PathFinder:
    def __init__(self, grid, init, end):
        """
        :param grid:
        :param init:
        :param end:
        """
        self.grid = grid
        self.init = init
        self.end = end
        self.path = None
        
        self.a_star()
        
    @staticmethod
    def neighbors(shape, curr_node, rad=1):
        """Gets the set of surrounding tiles given a tile and its set radius"""
        max_X, max_Y = shape

        jumps = [0] + [-x for x in range(1, rad+1)] + [x for x in range(1, rad+1)]
        area = list(starmap(lambda a,b: (curr_node.pos[0] + a, curr_node.pos[1] + b), product(jumps, jumps)))

        within_range = (p for p in area[1:] if (p[0] in range(max_X) and p[1] in range(max_Y)))
        cell_types = (self.check_type(cell) for cell in within_range)
        return [PathNode(curr_node, pos, weight) for (pos, walkable, weight) in cell_types if walkable]
        

    def check_type(self, cell):
        """
        Gets information about the cell in (x,y). Walls and start pose are not
        traversable.
        
        :param cell: A tuple (x,y)
        :returns: A tuple (is_traversable, weight)
        """
        
        if "P" in self.grid[cell]:
            cell_info = (cell, True, 1)
        else:
            cell_info = {
                "T": (cell, True, 1.2),
                "D": (cell, True, 1.2),
                "S": (cell, False, None),
                "G": (cell, True, 1),
                "": (cell, True, 1),
                "·": (cell, True, 1),
                "W": (cell, False, None)
            }.get(self.grid[cell])
        
        return cell_info
    
    def get_path_from(self, node):
        """Computes the upward path from a leaf node."""
        path = []
        curr_node = node
        while curr_node is not None:
            path.append(curr_node)
            curr_node = curr_node.parent
            
        return path.reverse()
    
    def a_star(self):
        """Runs the A* search for the grid setup"""
        node0 = PathNode(None, self.init)
        nodeX = PathNode(None, self.end)
        
        opened, closed = [node0], []
        
        while len(opened) > 0:
            # Get the node with smallest F cost
            costs = map(lambda p: p.f, opened)
            curr_node, curr_idx = [(n,i) for i, (n, f) in enumerate(zip(opened, costs)) if f == min(costs)][0]
            
            # Switch it to 'closed'
            closed.append(curr_node)
            opened.pop(curr_idx)
            
            if curr_node == nodeX:
                # The path has been found!
                self.path = self.get_path_from(curr_node,)
                print("A path has been found!\n{}".format(self.path))
                break
                
            # Looks at the neighbor cells who are within grid range and are walkable (no walls).
            # Also cast each into a node
            block = PathFinder.walkable_node_neighbors(self.grid.shape, curr_node, rad=1)
            
            # If the cell is not already closed and is walkable, proceed
            for node in block:
                if node not in close:
                    node.g = curr_node.g + 1
                    # Euclidean distance used as metric (+ cell type weight)
                    node.h = sum([(a - b)**2 for (a,b) in zip(node.pos, curr_node.pos)]) + node.weight
                    node.f = node.g + node.h
                    
                    # If the node is in the "open" list and its new weight
                    # is more than the prev one, discard it
                    # here's the opposed case: there's no node like that
                    if [nopen for nopen in opened if (nopen == node and node.g > nopen.g)] == []:                            
                        opened.append(node)
                        

In [25]:
A = [1]*5
B = map(lambda x: x + 1, [3]*5)

In [None]:
path_finder = PathFinder(g1, subj_pos, goal_pos)
print(path_finder.path)

---
## Apply it to Sphinx!

Now we have two results and we want to apply them to Sphinx. What we've to do is:

  1. Create a `.path` file for the subject using the Pathfinder list of points.
  2. Create a `.world` file given the world's objects' and goal poses.
  3. Create one or multiple `.path` files for the pedestrians' paths.
  4. Upon executing Sphinx, set the drone pose. Upon taking off, the height can vary (above head, waist-height, knee-height)

In [22]:
NamedTemporaryFile(dir=mkdtemp(prefix="sphinx_gen"), suffix=".path").name

'/tmp/sphinx_genSDJFjc/tmpDekHfE.path'

In [11]:
from tempfile import NamedTemporaryFile, mkdtemp
from templates import PATH_TEMPLATE_START, PATH_TEMPLATE_END, WORLD_TEMPLATE_START, WORLD_TEMPLATE_END

class RandomSphinxWorld:
    
    def __init__(self, world_conf, subject_path, peds_paths):
        """
        Creates Sphinx-related world configuration files from the generated world setup.
        
        :param world_conf: A dict with keys (...)
        :param subject_path: A list of (x,y) coordinates
        :param peds_paths: A set of lists of (x,y) coordinates
        """
        fdir = mkdtemp(prefix="sphinx_gen")
        
        self.subject_path = RandomSphinxWorld.create_path(fdir, subject_path)
        self.ped_paths = dict()
        
        for (pname, ppath) in peds.items():
            # ppath is a dict with keys ("path", "loop"). No delay_start nor stop_duration nor speed.
            self.ped_paths[pname] = RandomSphinxWorld.create_path(fdir, ppath)
            
        self.world_path = RandomSphinxWorld.create_world(fdir, world_conf)
        
    @staticmethod
    def create_path(fdir, path):
        ff = NamedTemporaryFile(dir=fdir, suffix=".path")
        
        while False:
            ff.write(".")

    @staticmethod
    def create_world(fdir, conf):
        ff = NamedTemporaryFile(dir=fdir, suffix=".path")
        
        while False:
            ff.write(".")


In [None]:
obj_probs = {"tree": 0.4, "door": 0.0, "wall": 0.2}

my_world = RandomWorld(grid=(5,5), goal_dist=4, drone_dist=1, no_peds=1, probs=obj_probs)
my_path = PathFinder(my_world)

# RandomSphinx.remove_last()  # removes all the generated files in the previous run

generator = RandomSphinxWorld(my_world, my_path, my_world.ped_paths)
# world_fpath = generator.world_fpath
# subj_fpath = generator.subj_fpath
# peds_fpaths = generator.peds_fpaths
# "/tmp/worldpath.txt" is set to generator.world_fpath

heights = {
    "above": 1.8,
    "mid": 1.0,
    "below": 0.6
}

generator.init(drone_height="above")  # wait() until it finishes

generator.close_log()  # closes the .csv log