In [1]:
from numpy.random import randint as rint

In [2]:
def get_random_queue(tot_area=24, res=0.2, people_range=(2, 5)):
    """
    tot_area:     measure of the side of the local window's squared area, in meters
    res:          resolution of discretization, in meters
    people_range: possible number of people from randomization
    """
    n_people = rint(people_range[0], people_range[1]+1)
    area_dim = (1.2, 1+1.5*n_people)
    
    if np.random.rand() <= 0.015:
        return just_goal(tot_area, res, area_dim)

    orientation = np.random.choice(['vertical','orizontal'])
    direction   = np.random.choice(['a_b','d_c'])

    return get_queue(tot_area, res, area_dim, n_people, orientation, direction)

In [None]:
def just_goal(tot_area, res, area_dim):
    """
    """
    tot_area, area_dim = q_discretize(tot_area, res, area_dim)
    l = int(tot_area/2)
    goal = np.array([rint(-l, l), rint(-l, l)])
    people = np.array([])
    obs = np.array([])
    area = np.array([
        goal + [-1,-1],
        goal + [+1,-1],
        goal + [+1,+1],
        goal + [-1,+1]
    ])

    return goal, people, obs, area

In [15]:
def get_queue(tot_area=24, res=0.2, area_dim=(1.2,7), n_people=4, orientation="vertical", direction="a_b"):
    """
    tot_area:     measure of the side of the local window's squared area, in meters
    res:          resolution of discretization, in meters
    area_dim:     area occupied by the queue, (width, length) in meters
                  a good approximation is (2, 1+2*n_people)
    n_people:     number of people in the queue
    orientation:  orientation of the queue, "vertical" or "orizontal"
    direction:    the side of the queue where the goal is positioned, either "a_b" or "d_c"

    For clarity, the four angles of the queue area found by get_q_area() will alway be positioned 
    in the same way:

       d              c               b                              c
        |            |                 |-----------------------------
        |            |                 |
        |            |                 |
        |            |                 |
        |            |                 |
        |            |                 |-----------------------------
        |            |                a                              d
        |            |
        |            |
        |------------|
       a              b
    
    """
    tot_area, area_dim = q_discretize(tot_area, res, area_dim)
    area = get_q_area(tot_area, area_dim, orientation)
    goal = get_q_goal(area, area_dim,orientation, direction)
    people = get_people(area, area_dim, n_people, orientation, direction)
    obs = get_q_obs(area, area_dim, people, orientation, direction)

    return goal, people, obs, area

In [4]:
def q_discretize(tot_area, res, area_dim):
    """
    """
    tot_area = int(tot_area/res)
    area_dim = (np.array(area_dim)/res).astype(np.uint8)
    if area_dim[0]%2 == 0:
        area_dim[0]+=1
    
    return tot_area, tuple(area_dim)

In [5]:
def get_q_area(tot_area, area_dim, orientation):
    """
    """
    l = int(tot_area/2)-1
    n, m = area_dim
    if n%2 == 0: n+=1
    if orientation == "vertical":
        a = np.array([rint(-l, l-n), rint(-l, l-m)])
        b = a + np.array([n-1, 0])
        c = a + np.array([n-1, m-1])
        d = a + np.array([0, m-1])
    elif orientation == "orizontal":
        a = np.array([rint(-l, l-m), rint(-l, l-n)])
        d = a + np.array([m-1, 0])
        c = a + np.array([m-1, n-1])
        b = a + np.array([0, n-1])

    if np.any(a >= l) or np.any(b>=l) or np.any(c>=l) or np.any(d>=l):
        raise SystemError("Queue out of bounds")
        
    return np.array([a, b, c, d], dtype=np.int32)

In [6]:
def get_q_goal(area, area_dim, ori, direc):
    """
    """
    a,b,c,d = area
    n, m = area_dim 
    if ori == "vertical":
        if direc == "a_b":
            goal = [a[0]+(n//2), a[1]+3]
        if direc == "d_c":
            goal = [d[0]+(n//2), d[1]-3]
    elif ori == "orizontal":
        if direc == "a_b":
            goal = [a[0]+3, a[1]+(n//2)]
        if direc == "d_c":
            goal = [d[0]-3, d[1]+(n//2)]
    
    return np.array(goal, dtype=np.int32)

In [7]:
def get_people(area, area_dim, n_people, ori, direc):
    """
    wall_dist: minimum distance of people from lateral walls. If 0, people could spawn into walls
    s:         length of the area allocated for each person in the queue to spawn 
    """
    a,b,c,d = area
    wall_dist = 1
    s = (area_dim[1]-6)//n_people
    people = []
    if ori == "vertical":
        if direc == "a_b":
            k = a[1]+4
            for _ in range(n_people):
                people.append([rint(a[0]+wall_dist, b[0]-wall_dist), rint(k, k+s)])
                k += s
        if direc == "d_c": 
            k = d[1]-4
            for _ in range(n_people):
                people.append([rint(a[0]+wall_dist, b[0]-wall_dist), rint(k-s+1, k+1)])
                k -= s
    elif ori == "orizontal":
        if direc == "a_b":
            k = a[0]+4
            for _ in range(n_people):
                people.append([rint(k, k+s), rint(a[1]+wall_dist, b[1]-wall_dist)])
                k += s
        if direc == "d_c":
            k = d[0]-4
            for _ in range(n_people):
                people.append([rint(k-s+1, k+1), rint(d[1]+wall_dist, c[1]-wall_dist)])
                k -= s
            
    return np.array(people, dtype=np.int32)

In [8]:
def get_q_obs(area, area_dim, people, orientation, direction):
    a,b,c,d = area
    last_person = people[-1]
    if orientation == "vertical" and direction == "a_b":
        l = np.arange(a[1], min(last_person[1]+5, d[1]+1))
        w = np.arange(a[0], b[0]+1)
        a_d = np.vstack((np.ones(len(l), dtype=np.int32) * a[0],l)).T
        b_c = np.vstack((np.ones(len(l), dtype=np.int32) * b[0],l)).T
        a_b = np.vstack((w,np.ones(len(w), dtype=np.int32) * a[1])).T
        a_b1 = np.vstack((w,np.ones(len(w), dtype=np.int32) * (a[1]+1))).T
        a_d1 = np.vstack((np.ones(len(l), dtype=np.int32) * (a[0]+1),l)).T
        a_d2 = np.vstack((np.ones(len(l), dtype=np.int32) * (a[0]+2),l)).T
        a_d3 = np.vstack((np.ones(len(l), dtype=np.int32) * (a[0]+3),l)).T
        b_c1 = np.vstack((np.ones(len(l), dtype=np.int32) * (b[0]-1),l)).T
        b_c2 = np.vstack((np.ones(len(l), dtype=np.int32) * (b[0]-2),l)).T
        b_c3 = np.vstack((np.ones(len(l), dtype=np.int32) * (b[0]-3),l)).T
        return np.concatenate((a_d, b_c, a_b, a_b1))

    elif orientation == "vertical" and direction == "d_c":
        l = np.arange(max(last_person[1]-4, a[1]), d[1]+1)
        w = np.arange(a[0], b[0]+1)
        a_d = np.vstack((np.ones(len(l), dtype=np.int32) * a[0],l)).T
        b_c = np.vstack((np.ones(len(l), dtype=np.int32) * b[0],l)).T
        d_c = np.vstack((w,np.ones(len(w), dtype=np.int32) * d[1])).T
        d_c1 = np.vstack((w,np.ones(len(w), dtype=np.int32) * (d[1]-1))).T
        a_d1 = np.vstack((np.ones(len(l), dtype=np.int32) * (a[0]+1),l)).T
        a_d2 = np.vstack((np.ones(len(l), dtype=np.int32) * (a[0]+2),l)).T
        a_d3 = np.vstack((np.ones(len(l), dtype=np.int32) * (a[0]+3),l)).T
        b_c1 = np.vstack((np.ones(len(l), dtype=np.int32) * (b[0]-1),l)).T
        b_c2 = np.vstack((np.ones(len(l), dtype=np.int32) * (b[0]-2),l)).T
        b_c3 = np.vstack((np.ones(len(l), dtype=np.int32) * (b[0]-3),l)).T
        return np.concatenate((a_d, b_c, d_c, d_c1))

    elif orientation == "orizontal" and direction == "a_b":
        l = np.arange(a[0], min(last_person[0]+5, d[0]+1))
        w = np.arange(a[1], b[1]+1)
        a_d = np.vstack((l,np.ones(len(l), dtype=np.int32) * a[1])).T
        b_c = np.vstack((l,np.ones(len(l), dtype=np.int32) * b[1])).T
        a_b = np.vstack((np.ones(len(w), dtype=np.int32) * a[0],w)).T
        a_b1 = np.vstack((np.ones(len(w), dtype=np.int32) * (a[0]+1),w)).T
        a_d1 = np.vstack((l,np.ones(len(l), dtype=np.int32) * a[1]+1)).T
        a_d2 = np.vstack((l,np.ones(len(l), dtype=np.int32) * a[1]+2)).T
        a_d3 = np.vstack((l,np.ones(len(l), dtype=np.int32) * a[1]+3)).T
        b_c1 = np.vstack((l,np.ones(len(l), dtype=np.int32) * b[1]-1)).T
        b_c2 = np.vstack((l,np.ones(len(l), dtype=np.int32) * b[1]-2)).T
        b_c3 = np.vstack((l,np.ones(len(l), dtype=np.int32) * b[1]-3)).T
        return np.concatenate((a_d, b_c, a_b, a_b1))

    elif orientation == "orizontal" and direction == "d_c":
        l = np.arange(max(last_person[0]-4, a[0]), d[0]+1)
        w = np.arange(a[1], b[1]+1)
        a_d = np.vstack((l,np.ones(len(l), dtype=np.int32) * a[1])).T
        b_c = np.vstack((l,np.ones(len(l), dtype=np.int32) * b[1])).T
        d_c = np.vstack((np.ones(len(w), dtype=np.int32) * d[0],w)).T
        d_c1 = np.vstack((np.ones(len(w), dtype=np.int32) * (d[0]-1),w)).T
        a_d1 = np.vstack((l,np.ones(len(l), dtype=np.int32) * a[1]+1)).T
        a_d2 = np.vstack((l,np.ones(len(l), dtype=np.int32) * a[1]+2)).T
        a_d3 = np.vstack((l,np.ones(len(l), dtype=np.int32) * a[1]+3)).T
        b_c1 = np.vstack((l,np.ones(len(l), dtype=np.int32) * b[1]-1)).T
        b_c2 = np.vstack((l,np.ones(len(l), dtype=np.int32) * b[1]-2)).T
        b_c3 = np.vstack((l,np.ones(len(l), dtype=np.int32) * b[1]-3)).T
        return np.concatenate((a_d, b_c, d_c, d_c1))

In [9]:
def plot_queue(people, obs, goal, tot_area=24, res=0.5):
    l = int(tot_area/res)
    lm = int(l/2)
    img1 = np.ones((l,l,3), dtype=np.uint8)*255
    img2 = np.ones((l,l,3), dtype=np.uint8)*255
    
    for person in people:
        img1[person[1]+lm, person[0]+lm, 0] = 0
        img2[person[1]+lm, person[0]+lm, 0] = 0
    
    for ob in obs:
        img2[ob[1]+lm, ob[0]+lm, 0] = 0
        img2[ob[1]+lm, ob[0]+lm, 1] = 0
        img2[ob[1]+lm, ob[0]+lm, 2] = 0

    img1[goal[1]+lm, goal[0]+lm, 1] = 0
    img2[goal[1]+lm, goal[0]+lm, 1] = 0

    fig, (ax1, ax2) = plt.subplots(2)
    
    fig.set_figheight(15)
    fig.set_figwidth(15)
    
    ax1.imshow(img1, extent=[-lm,lm,-lm,lm], origin="lower")
    ax2.imshow(img2, cmap="Greys", extent=[-lm,lm,-lm,lm], origin="lower")

In [10]:
def test_queue_module1(tot_area=24, res=0.2, area_dim=(1.2,7), n_people=4, orientation="vertical"):
    goal, people, obs, area = get_queue(tot_area, res, area_dim, n_people, orientation)
    plot_queue(people, obs, goal, tot_area, res)

In [11]:
def test_queue_module2(tot_area=24, res=0.2, people_range=(2, 5)):
    goal, people, obs, area = get_random_queue(tot_area, res, people_range)
    plot_queue(people, obs, goal, tot_area, res)