# Part 3: Level generation

In [13]:
# Install Flatland
%cd /content
!git clone https://gitlab.aicrowd.com/flatland/flatland.git/ --branch 223_UpdateEditor_55_notebooks
%cd flatland
!pip install -e .

/content
fatal: destination path 'flatland' already exists and is not an empty directory.
/content/flatland
Obtaining file:///content/flatland
Installing collected packages: flatland-rl
  Found existing installation: flatland-rl 2.1.10
    Can't uninstall 'flatland-rl'. No files were found to uninstall.
  Running setup.py develop for flatland-rl
Successfully installed flatland-rl


In [0]:
import PIL
from flatland.utils.rendertools import RenderTool

def render_env(env):
    env_renderer = RenderTool(env, gl="PILSVG")
    env_renderer.render_env()

    image = env_renderer.get_image()
    pil_image = PIL.Image.fromarray(image)
    display(pil_image)
    return image

Random rail network
---

The difficulty of a railway network depends on the **dimensions** (width x height) and the **number of agents** in the network. By varying the number of start and goal connections (nr_start_goal) and the number of extra railway elements added (nr_extra) the number of alternative paths of each agents can be modified. The more possible paths an agent has to reach its target the easier the task becomes. Here we don’t specify any observation builder but rather use the standard tree observation. If you would like to use a custom obervation please follow the instructions in the next tutorial. Feel free to vary these parameters to see how your own agent holds up on different setting. The evalutation set of railway configurations will cover the whole spectrum from easy to complex tasks.



In [0]:
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
import numpy as np

number_of_agents = 1

# More realistic networks can be built using sparse_rail_generator
sparse_env = RailEnv(
    width=50,
    height=50,
    rail_generator=sparse_rail_generator(
        max_num_cities=5,  # Number of cities (= train stations)
        grid_mode=False,  # Distribute the cities evenly in a grid
        max_rails_between_cities=2,  # Max number of rails connecting to a city
        max_rails_in_city=5  # Number of parallel tracks in cities
    ),
    obs_builder_object=SingleAgentNavigationObs(),
    number_of_agents=number_of_agents
)

observation, info = sparse_env.reset()

In [0]:
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.envs.observations import TreeObsForRailEnv
from flatland.core.grid.grid4_utils import get_new_position
from typing import List

# Build observations which indicate the shortest path to the target
class SingleAgentNavigationObs(ObservationBuilder):
    """
    We build a representation vector with 3 binary components, indicating which of the 3 available directions
    for each agent (Left, Forward, Right) lead to the shortest path to its target.
    E.g., if taking the Left branch (if available) is the shortest route to the agent's target, the observation vector
    will be [1, 0, 0].
    """

    def __init__(self):
        super().__init__()

    def reset(self):
        pass

    def get(self, handle: int = 0) -> List[int]:
        agent = self.env.agents[handle]

        if agent.position:
            possible_transitions = self.env.rail.get_transitions(*agent.position, agent.direction)
        else:
            possible_transitions = self.env.rail.get_transitions(*agent.initial_position, agent.direction)

        num_transitions = np.count_nonzero(possible_transitions)

        # Start from the current orientation, and see which transitions are available;
        # organize them as [left, forward, right], relative to the current orientation
        # If only one transition is possible, the forward branch is aligned with it.
        if num_transitions == 1:
            observation = [0, 1, 0]
        else:
            min_distances = []
            for direction in [(agent.direction + i) % 4 for i in range(-1, 2)]:
                if possible_transitions[direction]:
                    new_position = get_new_position(agent.position, direction)
                    min_distances.append(
                        self.env.distance_map.get()[handle, new_position[0], new_position[1], direction])
                else:
                    min_distances.append(np.inf)

            observation = [0, 0, 0]
            observation[np.argmin(min_distances)] = 1

        return observation

In [49]:
action_to_direction = {0: 'no-op', 1: 'left', 2: 'forward', 3: 'right', 4: 'halt'}

print("Directions of shortest paths")
for agent_handle in observation:
    for idx, shortest in enumerate(observation[agent_handle]):
        if shortest:
            action = np.argmax(observation[0]) + 1
            print('- Agent {}: {}'.format(agent_handle, action_to_direction[action]))

Directions of shortest paths
- Agent 0: forward


In [50]:
from IPython.display import clear_output

obs, info = sparse_env.reset()

# Move in a direction that is on a shortest path
# This results in an optimal policy if and only if there is a single agent!
for step in range(150):
    obs, all_rewards, done, _ = sparse_env.step({0: action})
    action = np.argmax(obs[0]) + 1

    clear_output(wait=True)
    render_env(sparse_env)

    print("Timestep: {}".format(step))
    print("Action: {} ({})".format(action, action_to_direction[action]))
    print("Rewards: {}".format(all_rewards))
    print("Done: {}".format(done))

    if done['__all__']:
        print("All done!")
        break

KeyboardInterrupt: ignored