In [1]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.path import Path

In [2]:
def plot_environment(env):
    plt.imshow(env, cmap='binary')
    plt.show()

def get_circle(radius, center, env_size):
    layer = np.zeros(env_size)
    for y in range(env_size[0]):
        for x in range(env_size[0]):
            if (y - center[0])**2+(x - center[1])**2 <= radius**2:
                layer[y,x] = 1
    return layer

def get_star(center, env_size, num_points, outer_radius, inner_radius):
    size = env_size[0]
    
    points = []
    for i in range(num_points * 2):
        angle = np.pi / num_points * i - np.pi / 2
        r = outer_radius if i % 2 == 0 else inner_radius
        x = center[0] + r * np.cos(angle)
        y = center[1] + r * np.sin(angle)
        points.append((x, y))

    star_path = Path(points)

    y, x = np.mgrid[0:size, 0:size]
    coords = np.vstack((x.ravel(), y.ravel())).T

    mask = star_path.contains_points(coords)
    star_array = mask.reshape((size, size))

    return star_array

In [3]:
def get_circular_trajectory(start, radius, speed, clock_wise = True):
    """
    Get a circular trajectory for an object.

    Parameters
    ----------
    start: tuple
        start coordinate of the trajectory

    radius: int
        radius of the circular trajectory
     
    speed: int
        speed of the object

    clock_wise: bool
        determines whether the object moves clockwise or counter clockwise

    Returns
    -------
    trajectory: list
        list of np.array points on the circular trajectory
    """
    trajectory = []
    x, y = start
    x_0, y_0 =  start - np.array([radius, 0])
    angle_0 = np.arccos(x/(x_0+radius))
    

    linspace = np.linspace(angle_0, angle_0 + 2*np.pi, int(100/speed)) if clock_wise else reversed(np.linspace(angle_0, angle_0 + 2*np.pi, int(100/speed)))
    for angle in linspace:
        x =np.int8(np.around(x_0 + radius * np.cos(angle)))
        y = np.int8(np.around(y_0 + radius * np.sin(angle)))

        point = np.array([x,y])
        trajectory.append(point)


    return trajectory


def get_linear_trajectory(start, end, speed):
    """
    Get a linear trajectory for an object.

    Parameters
    ----------
    start: tuple
        start coordinate of the trajectory

    end: tuple
        end coordinate of the trajectory
    
    speed: int
        speed of the object
    Returns
    -------
    trajectory: list
        list of np.array points on the linear trajectory form start to end
    """
    trajectory = []
    start = np.array(start)
    end = np.array(end)
    vector = end - start
    norm = np.linalg.norm(vector)/speed
    vector_norm = vector/norm

    for i in range(1,int(norm)):
        x, y = start + i*vector_norm
        point = np.array([np.int8(np.around(x)), np.int8(np.around(y))])
        trajectory.append(point)

    trajectory.append(end)
    return trajectory

In [4]:
def get_virtual_point(env_size, radius, virtual_points):
    """
    Get point outside of the grid with the distance of the radius to the border. 
    This point is the either used as end or start point of a trajectory.

    Parameters
    ----------
    env_size: tuple
        size of the environment. (e.g. (64,64))

    radius: int
        radius of the object, that moves on the trajectory
    
    virtual_points: np.array
        array of coordinates of a virtual border around the environment of width 1
    Returns
    -------
    point: tuple
        random point outside of the environment with a distance of the radius to the border
    """
    random_point = virtual_points[np.random.randint(len(virtual_points))]
    point_y, point_x = random_point
    # ensure that the whole object disappears, before reaching its destination
    if point_y == -1: point_y -= radius
    if point_y == env_size[0]: point_y += radius
    if point_x == -1: point_x -= radius
    if point_x == env_size[0]: point_x += radius
    point = point_y, point_x

    return point
    
def random_object_dynamic_env(env_size, n_objects, n_frames = 1, circle_prob = 0.5):
    """
    Generates a random dynamic grid environment. 

    Parameters
    ----------
    env_size: tuple
        Size of the environment. (e.g. (64,64))

    n_objects: int
        Number of objects in the environment.
    
    n_frames: int
        Number of frames of the dynamic environment. (e.g. n_frames = 1 ==> static environment)

    circle_prob: float
        Probability that the random generated object is a circle
    Returns
    -------
    frame_list: list
        List of arrays, which represent the frames of the dynamic environment, where 0s are free space and 1s obstacles.
    """
    frame_list = []
    object_dict = dict()
    # points around the environment
    virtual_environment = np.ones(np.array(env_size) + 2) # virtual environment that is 1 pixel larger on each side
    virtual_environment[1:env_size[0]+1, 1:env_size[0]+1] = 0
    indices = np.where(virtual_environment == 1)
    virtual_points = np.array(list(zip(*indices))) - 1 # points that only exist in the virtual environment
    

    
    frame = np.zeros(env_size)
    # initialization of random objects
    for id in range(n_objects):
        center_y, center_x = np.random.choice(np.arange(env_size[0])), np.random.choice(np.arange(env_size[0]))
        # object is a circle with probability of circle_prob%
        if np.random.random() > 1 - circle_prob:
            radius = np.random.choice(np.arange(10))
            object = get_circle(radius, (center_y, center_x), env_size)
            object_meta_data = radius
        # object is a star with probability of 1- circle_prob%
        else:
            num_points = np.random.choice(np.arange(3, 7))
            radius = np.random.choice(np.arange(5,8))
            inner_radius = np.random.choice(np.arange(1,5))
            object = get_star((center_y, center_x), env_size, num_points, radius, inner_radius)
            object_meta_data = [num_points, radius, inner_radius]

        #object is moving with a probability of 50%
        if np.random.random() > 0.5 and n_frames > 1:
            start = center_y, center_x
            speed = np.random.choice(np.arange(0.25,1,0.25))
            #object is moving on a linear trajectory with a probability of 70%
            if np.random.random() > 0.3:
                end = get_virtual_point(env_size, radius, virtual_points)
                trajectory = get_linear_trajectory(start, end, speed)
                trajectory_type = 'linear'
            #object is moving on a circular trajectory with a probability of 70%
            else:
                radius = np.random.choice(np.arange(env_size[0]/2, env_size[0]))
                #the circular trajectory is clockwise with a probability of 50%
                if np.random.random() > 0.5:
                    clock_wise = True
                else:
                    clock_wise = False
                trajectory = get_circular_trajectory(start, radius, speed, clock_wise)
                trajectory_type = 'circular'
        else:
            trajectory = None
            trajectory_type = None

        object_dict[id] = [object, object_meta_data, trajectory, trajectory_type]
        frame += object

    frame[frame > 0] = 1 # environment only consists of 0s and 1s

    frame_list.append(frame)



    for _ in range(1, n_frames):
        frame = np.zeros(env_size)
        for id, object_data in object_dict.items():
            object, object_meta_data, trajectory, trajectory_type = object_data


            if trajectory == None:
                frame += object

            elif trajectory == []:
                # empty trajectory means the object reached its end point outside of the environment on a virtual point
                # ==> generate new moving object to conserve the number of objects 
                # only objects on a linear trajectory reach their end point, so the the trajectory of the new object has to be linear again
                if np.random.random() > 0.5:
                    radius = np.random.choice(np.arange(10))
                    object = get_circle(radius, (center_y, center_x), env_size)
                    object_meta_data = radius
                else:
                    num_points = np.random.choice(np.arange(3, 7))
                    radius = np.random.choice(np.arange(5,8))
                    inner_radius = np.random.choice(np.arange(1,5))
                    object = get_star((center_y, center_x), env_size, num_points, radius, inner_radius)
                    object_meta_data = [num_points, radius, inner_radius]

                start = get_virtual_point(env_size, radius, virtual_points)
                end = get_virtual_point(env_size, radius, virtual_points)
                trajectory = get_linear_trajectory(start, end, speed)
                trajectory_type = 'linear'

                object_dict[id] = [object, object_meta_data, trajectory, trajectory_type]
                
            # the object is placed on its next point based on its trajectory
            else:
                if isinstance(object_meta_data, list):
                    num_points, radius, inner_radius = object_meta_data
                    next_point = trajectory[0]
                    trajectory.remove(next_point)
                    frame += get_star(next_point, env_size, num_points, radius, inner_radius)
                else:
                    radius = object_meta_data
                    next_point = trajectory[0]
                    trajectory.remove(next_point)
                    frame += get_circle(radius, next_point, env_size)
                if trajectory_type == 'circular':
                    trajectory.append(next_point) #we add the last point to the trajectory so that the circular trajectory is endless in theory 
        frame[frame > 0] = 1 # ensure that also overlapping obstacles are represented as 1
        frame_list.append(frame)
    
    return frame_list

In [5]:
# frame_list = random_object_dynamic_env((64,64), 30, 300)

In [6]:
# from PIL import Image
# import numpy as np
# import matplotlib.pyplot as plt
# from tqdm import tqdm

# frames = []

# for frame in frame_list:
#     plt.imshow(frame, cmap='binary')
#     plt.axis('off')


#     plt.savefig("temp.png", bbox_inches='tight', pad_inches=0)
#     frames.append(Image.open("temp.png"))

# # save as GIF
# frames[0].save(
#     "maze_animation.gif",
#     save_all=True,
#     append_images=frames[1:],
#     duration=100,
#     loop=0
# )

In [7]:
# generate frames for the reconstruction loss training

def generate_training_data(env_size, n_objects, n_envs, n_frames = 1, circle_prob = 0.5):
    frames = []
    for _ in range(n_envs):
        frame = random_object_dynamic_env(env_size, n_objects, n_frames, circle_prob)
        frames.append(*frame)

    np.save(f'environments/environments_{env_size[0]}_{n_objects}_{n_envs}_{n_frames}_test.npy', frames)


In [8]:
generate_training_data((64,64), 12, 1000, 1, 1)