In [1]:
import numpy as np
import math
import random
from mayavi import mlab

float_dtype = np.float32
int_dtype = np.int32

In [2]:
def Bresenham3D(x1, y1, z1, x2, y2, z2):
    ListOfPoints = [[x1, y1, z1]]
    dx = abs(x2 - x1)
    dy = abs(y2 - y1)
    dz = abs(z2 - z1)
    if x2 > x1:
        xs = 1
    else:
        xs = -1
    if y2 > y1:
        ys = 1
    else:
        ys = -1
    if z2 > z1:
        zs = 1
    else:
        zs = -1

    # Driving axis is X-axis
    if dx >= dy and dx >= dz:
        p1 = 2 * dy - dx
        p2 = 2 * dz - dx
        while x1 != x2:
            x1 += xs
            if p1 >= 0:
                y1 += ys
                p1 -= 2 * dx
            if p2 >= 0:
                z1 += zs
                p2 -= 2 * dx
            p1 += 2 * dy
            p2 += 2 * dz
            ListOfPoints.append([x1, y1, z1])

    # Driving axis is Y-axis
    elif dy >= dx and dy >= dz:
        p1 = 2 * dx - dy
        p2 = 2 * dz - dy
        while y1 != y2:
            y1 += ys
            if p1 >= 0:
                x1 += xs
                p1 -= 2 * dy
            if p2 >= 0:
                z1 += zs
                p2 -= 2 * dy
            p1 += 2 * dx
            p2 += 2 * dz
            ListOfPoints.append([x1, y1, z1])

    # Driving axis is Z-axis
    else:
        p1 = 2 * dy - dz
        p2 = 2 * dx - dz
        while z1 != z2:
            z1 += zs
            if p1 >= 0:
                y1 += ys
                p1 -= 2 * dz
            if p2 >= 0:
                x1 += xs
                p2 -= 2 * dz
            p1 += 2 * dy
            p2 += 2 * dx
            ListOfPoints.append([x1, y1, z1])
    return ListOfPoints

In [8]:
x = int_dtype(5.0)
isinstance(x, float)

False

In [4]:
def get_blocks_id_list(data):
    # usually expects dict or list input
    if isinstance(data, dict):
        coords_source = data.values()
    else:
        coords_source = data
    # print("generate blocks id list:\n", data)
    loc_list = np.unique(
        [
            int_dtype(coords - coords % 1 ) for coords in coords_source
        ],
        axis=0
    )
    # print("generate blocks id list, return:\n", loc_list)
    return loc_list


In [6]:
get_blocks_id_list([[0, 0.5, 0]])

TypeError: unsupported operand type(s) for %: 'list' and 'int'

In [59]:
def spawn_goal(distance, agent_type, drone_centroid, direction=(0, 1, 0)):
    """
    spawn the goal location, given drone formation centroid and distance.
    :param distance: a float.
    :param agent_type: accessed via self. for retrieving goal agent id.
    :param drone_centroid: access via self.
    :param direction: a unit vector from drones to goal.
    :return: a dict with key being goal agent id, value being its coord.
    """
    # TODO: move drone centroid, direction to env attributes
    assert agent_type[len(agent_type) - 1] == 0
    goal_id = len(agent_type) - 1
    g_dict = {}
    dis_vec = np.array(direction) * distance
    place_radius = 0.4 # should be accessed via self, a hyperparameter of env.
    displacement = np.random.random(3) * place_radius
    g_dict[goal_id] = drone_centroid + dis_vec + displacement
    return g_dict

In [5]:
def spawn_drones(num_drones, agent_type, centroid):
    """
    spawn given number of drones around a centroid. hyperparameters
    include drone spacing and spawn rule.
    :param num_drones:
    :param agent_type:
    :param centroid: a float np array for centroid coords.
    :return: a dict with keys being drone agent ids, values being their coords.
    """
    # TODO: move centroid, spacing and rule to env attributes
    spacing = float_dtype(0.3)
    even_spawn_rule= np.array(
        [
            [-1, 0, 0], [1, 0, 0],
            [0, 0, -1], [0, 0, 1],
            [-1, 0, 1], [1, 0, -1],
            [1, 0, 1], [-1, 0, -1]
        ], dtype=int_dtype
    )
    spawn_cycle = len(even_spawn_rule)

    drone_ids = [i for i in range(len(agent_type)) if agent_type[i] == 1]
    assert len(drone_ids)==num_drones

    d_dict = {}

    if num_drones % 2:
        d_dict[drone_ids[0]] = centroid
        spawn_num = num_drones - 1
    else:
        spawn_num = num_drones

    for _id in range(spawn_num):
        update = even_spawn_rule[_id % spawn_cycle] * spacing * \
                 math.ceil((_id + 1) / spawn_cycle)
        d_dict[drone_ids[_id]] = centroid + update


    return d_dict

In [15]:
even_spawn_rule= np.array(
        [
            [-1, 0, 0], [1, 0, 0],
            [0, 0, -1], [0, 0, 1],
            [-1, 0, 1], [1, 0, -1],
            [1, 0, 1], [-1, 0, -1]
        ], dtype=int_dtype
    )

even_spawn_rule.shape[0]

8

In [107]:
def spawn_obstacles(num_obstacles, agent_type, g_dict, d_dict):
    # TODO: access num, agent_type from env attribute, and remove it from arguments
    # TODO: access env_size, env_max_height, env_min_height, place radius from env attributes
    env_size = 10
    env_max_height = 20
    env_min_height = 5 # no spawning of obstacles below this height
    place_radius = 0.4

    stagger = np.array([0.5, 0.5, 0.5], dtype=float_dtype)
    displacement = (np.random.rand(num_obstacles, 3) * place_radius).astype(float_dtype)
    o_id = [i for i in range(len(agent_type)) if agent_type[i] == 0.5]
    assert len(o_id) == num_obstacles

    # check the occupied blocks
    g_blk = get_blocks_id_list(g_dict)
    d_blk = get_blocks_id_list(d_dict)
    occupied_blk = np.concatenate((g_blk, d_blk))
    # print(occupied_blk.shape)
    # print(occupied_blk)

    empty_blk = np.array([
        np.array([x, y, z]).astype(int_dtype)
        for x in range(env_size)
        for y in range(env_size)
        for z in range(env_min_height, env_max_height) if not
        np.any(
            np.all(
                np.array([x, y, z]).astype(int_dtype) == occupied_blk, axis=1
            )
        )
    ])

    np.random.shuffle(empty_blk)
    # print(len(empty_blk))
    # print(empty_blk)
    o_dict = {
        o_id[_id]:np.array(
            empty_blk[_id] + stagger + displacement[_id], dtype=float_dtype)
                for _id in range(num_obstacles)
    }
    return o_dict

In [99]:
def spawn_and_generate_city(
        size,
        max_height,
        difficulty_level,
        is_directly_obs,
        distance,
        direction,
        drones_centroid_coord,
        agent_type,
        num_drones,
        num_obstacles,
        sec_num=2
    ):

    assert distance < 1
    assert difficulty_level < 10
    assert difficulty_level > 1


    h_m_list = np.linspace(0.3, 0.7, 10)
    h_s_list = np.full(10, 5.5)
    idx = np.random.choice(10, 4, replace=False)
    h_m = int_dtype(h_m_list[idx] * max_height)
    h_s = int_dtype(h_s_list[idx])
    f_p_s = difficulty_level / 10
    sec_size = int_dtype(size / sec_num)
    b_num = int_dtype(f_p_s * sec_size**2)
    h_min = np.zeros(b_num, dtype=int_dtype)
    h_max = np.full(b_num, max_height, dtype=int_dtype)

    coords2block = np.array([-0.5, -0.5, -0.5], dtype=float_dtype)
    # spawn the drones, goals, and obstacles
    distance = distance * size
    d_dict = spawn_drones(num_drones,agent_type,drones_centroid_coord)
    g_dict = spawn_goal(distance,agent_type, drones_centroid_coord, direction)
    o_dict = spawn_obstacles(num_obstacles,agent_type,g_dict,d_dict)

    # debugging info
    # print("d_dict:\n", d_dict)
    # print("o_dict:\n", o_dict)
    # print("g_dict:\n", g_dict)
    # convert float dict to blocks ids
    d_block = get_blocks_id_list(d_dict)
    g_block = get_blocks_id_list(g_dict)
    o_block = get_blocks_id_list(o_dict)

    # get the L.O.S
    d_centroid_block = int_dtype(drones_centroid_coord + coords2block)
    gx, gy, gz = g_block[0]
    print(d_centroid_block)
    dx, dy, dz = d_centroid_block
    lineOfSight = Bresenham3D(gx, gy, gz, dx, dy, dz)

    occ_blk_list = np.concatenate(
        (d_block, g_block, o_block)
    )

    buf = np.zeros((4, sec_size, sec_size), dtype=int_dtype)
    for i in range(4):
        b_h = np.clip(
            np.random.normal(h_m[i], h_s[i], b_num), h_min, h_max
        )
        b_h_pad = np.concatenate(
            (b_h, np.zeros(sec_size*sec_size - b_num))
        )
        np.random.shuffle(b_h_pad)
        blk_h = np.reshape(b_h_pad, (sec_size, sec_size))
        buf[i] = blk_h

    sec_0_1 = np.hstack((buf[0], buf[1]))
    sec_2_3 = np.hstack((buf[2], buf[3]))
    c_b_m = np.vstack((sec_0_1, sec_2_3)).astype(int_dtype)

    envelope = np.full((size, size), max_height, dtype=int_dtype)
    for (x, y, z) in occ_blk_list:
        envelope[x, y] = z - 1

    if is_directly_obs:
        for (x, y, z) in lineOfSight:
            envelope[x, y] = z - 1
    else:
        for (x, y, _) in lineOfSight:
            # if they already satisfy requirement, no need to touch them
            c_b_m[x, y] = np.where(c_b_m[x, y] > 4, c_b_m[x, y], 5)

    c_b_m = np.where(c_b_m < envelope, c_b_m, envelope)

    return c_b_m, d_dict, o_dict, g_dict

In [96]:
def mayavi_render(size, data, p_dicts):
    """
    render city, as well as drones, goal, and obstacles with different colors.
    :param size: size of the city base map...
    :param data: city base map/array, of shape(size, size).
    :param p_: three dicts
    """
    # set the background color
    mlab.figure(fgcolor=(0, 0, 0), bgcolor=(1, 1, 1))

    x, y = np.mgrid[.5:size-.5:size*1j, .5:size-.5:size*1j]
    mlab.barchart(x, y, data, colormap='inferno')
    mlab.vectorbar(title="Buildings height", nb_labels=10)

    d_dict, g_dict, o_dict = p_dicts
    d_ = np.array([coord for coord in d_dict.values()])
    o_ = np.array([coord for coord in o_dict.values()])
    g_ = np.array([coord for coord in g_dict.values()])

    x1, y1, z1 = np.transpose(d_)
    mlab.points3d(
        x1, y1, z1, color=(1, 0, 0), scale_factor=0.15
    )

    x2, y2, z2 = np.transpose(g_)
    mlab.points3d(
        x2, y2, z2, color=(1, 0, 1), scale_factor=0.15
    )

    x3, y3, z3 = np.transpose(o_)
    mlab.points3d(
        x3, y3, z3, color=(1, 1, 0), scale_factor=0.15
    )
    mlab.show()



### Integrated testing

In [11]:
num_drones = 8
num_goals = 4
num_obstacles = 10
num_agents = 19

drone_set = np.random.choice(
    np.arange(num_agents - 1), num_drones, replace=False
)
agent_type = {}
drones = {}
obstacles = {}
goals = {}

for agent_id in range(num_agents - num_goals):
    if agent_id in set(drone_set):
        agent_type[agent_id] = 1 # drones
        drones[agent_id] = True
    else:
        agent_type[agent_id] = 1 / 2 # obstacles
        obstacles[agent_id] = True

# we append the goal agent id and info at last
for goal_id in range(num_agents - num_goals, num_agents):
    agent_type[goal_id] = 0 # goal
    goals[goal_id] = True

In [12]:
print(f"agent_type: {agent_type}\n")
print(f"drones set: {drone_set}\n")
print(f"drones: {drones}\n")
print(f"obstacles: {obstacles}\n")
print(f"goals:{goals}\n")

agent_type: {0: 0.5, 1: 0.5, 2: 0.5, 3: 1, 4: 1, 5: 0.5, 6: 0.5, 7: 1, 8: 0.5, 9: 1, 10: 1, 11: 0.5, 12: 0.5, 13: 1, 14: 1, 15: 0, 16: 0, 17: 0, 18: 0}

drones set: [10  7  3 15  9  4 14 13]

drones: {3: True, 4: True, 7: True, 9: True, 10: True, 13: True, 14: True}

obstacles: {0: True, 1: True, 2: True, 5: True, 6: True, 8: True, 11: True, 12: True}

goals:{15: True, 16: True, 17: True, 18: True}



In [13]:
g_dict = {
    g_id:np.zeros(3, dtype=float_dtype)
    for g_id in range(num_agents) if agent_type[g_id]==0
}

In [14]:
print(g_dict)

{15: array([0., 0., 0.], dtype=float32), 16: array([0., 0., 0.], dtype=float32), 17: array([0., 0., 0.], dtype=float32), 18: array([0., 0., 0.], dtype=float32)}


In [110]:
data, d, o, g = spawn_and_generate_city(20, 18, 5, True, 0.5, [0, 1, 0], [9.5, 0.5, 9.5], agent_type, 8, 10)

[9 0 9]


In [111]:
d

{0: array([9.19999999, 0.5       , 9.5       ]),
 2: array([9.80000001, 0.5       , 9.5       ]),
 3: array([9.5       , 0.5       , 9.19999999]),
 5: array([9.5       , 0.5       , 9.80000001]),
 6: array([9.19999999, 0.5       , 9.80000001]),
 9: array([9.80000001, 0.5       , 9.19999999]),
 10: array([9.80000001, 0.5       , 9.80000001]),
 11: array([9.19999999, 0.5       , 9.19999999])}

In [112]:
mayavi_render(20, data, (d, o, g))