In [22]:
import gymnasium as gym
from pogema import GridConfig
from stable_baselines3 import DQN, A2C
from stable_baselines3.common.evaluation import evaluate_policy

%load_ext autoreload
%autoreload 2
%matplotlib inline

grid_config = GridConfig(
    size=8,
    density=0.3,
    num_agents=1,
    max_episode_steps=30,
    seed = 42
)

env = gym.make("Pogema-v0",grid_config=grid_config)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


### A2C

In [9]:
a2c_model = A2C(
    "MlpPolicy",
    env,
    verbose=1,
    gamma=0.99,
    learning_rate=0.0007,
    seed=42
)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [10]:
# Evaluate untrained agent using separate env

val_env = gym.make("Pogema-v0",grid_config=grid_config)

val_a2c_model = A2C(
    "MlpPolicy",
    val_env,
    verbose=1,
    gamma=0.99,
    learning_rate=0.0007,
    seed=42
)

mean_reward, std_reward = evaluate_policy(
    val_a2c_model,
    val_a2c_model.get_env(),
    deterministic=True,
    n_eval_episodes=20,
)

print(f"mean_reward: {mean_reward:.2f} +/- {std_reward:.2f}")

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
mean_reward: 0.00 +/- 0.00


In [11]:
# Train agent and save it
a2c_model.learn(total_timesteps=int(1.2e5))
a2c_model.save("saved/a2c_baseline")

------------------------------------
| rollout/              |          |
|    ep_len_mean        | 28.6     |
|    ep_rew_mean        | 0.118    |
| time/                 |          |
|    fps                | 1398     |
|    iterations         | 100      |
|    time_elapsed       | 0        |
|    total_timesteps    | 500      |
| train/                |          |
|    entropy_loss       | -1.53    |
|    explained_variance | -162     |
|    learning_rate      | 0.0007   |
|    n_updates          | 99       |
|    policy_loss        | -0.063   |
|    value_loss         | 0.0098   |
------------------------------------
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 24.3     |
|    ep_rew_mean        | 0.439    |
| time/                 |          |
|    fps                | 1397     |
|    iterations         | 200      |
|    time_elapsed       | 0        |
|    total_timesteps    | 1000     |
| train/                |          |
|

### Load trained agent and evaluate it

In [14]:
a2c_model = A2C.load("saved/a2c_baseline")

env.reset()

mean_reward, std_reward = evaluate_policy(a2c_model, env, deterministic=True, n_eval_episodes=20)
print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

mean_reward:1.00 +/- 0.00


### Video Recording

In [23]:
# USING SEED 42 

from IPython.display import SVG, display
from pogema.animation import AnimationMonitor, AnimationConfig

env = AnimationMonitor(env)

def evaluate_success_rate(model, env, num_episodes=100):
    success_count = 0
    step_array = []
    for i in range(num_episodes):
        obs = env.reset()

        # Check if observation is a tuple and extract the first element if true.
        if isinstance(obs, tuple):
            obs = obs[0]
        max_step = 100
        steps_taken = 0
        done = truncated = False
        while not done and max_step > 0:
            action, _ = model.predict(obs)
            next_obs, reward, done, truncated, info = env.step(action)
            print(action,max_step,success_count,done)
            max_step -= 1
            steps_taken += 1
            # Check if next_obs is a tuple and extract the first element if true.
            if isinstance(next_obs, tuple):
                next_obs = next_obs[0]
            obs = next_obs

            # Check if agent was successful in that episode.
            if done:
                success_count += 1
                step_array.append(steps_taken)
                env.save_animation(f"render{i}.svg", AnimationConfig(egocentric_idx=0))
                break

    success_rate = success_count / num_episodes
    return success_rate, step_array

success_rate,step_array = evaluate_success_rate(a2c_model, env)
print(f"Agent Success Rate: {success_rate * 100:.2f}%")
print(f"steps to termination : {step_array}")

1 100 0 False
4 99 0 False
4 98 0 False
4 97 0 True
1 100 1 False
4 99 1 False
4 98 1 False
4 97 1 True
1 100 2 False
4 99 2 False
4 98 2 False
4 97 2 True
1 100 3 False
4 99 3 False
4 98 3 False
4 97 3 True
1 100 4 False
4 99 4 False
4 98 4 False
4 97 4 True
1 100 5 False
4 99 5 False
4 98 5 False
4 97 5 True
1 100 6 False
4 99 6 False
4 98 6 False
4 97 6 True
1 100 7 False
4 99 7 False
4 98 7 False
4 97 7 True
1 100 8 False
4 99 8 False
4 98 8 False
4 97 8 True
1 100 9 False
4 99 9 False
4 98 9 False
4 97 9 True
1 100 10 False
4 99 10 False
4 98 10 False
4 97 10 True
1 100 11 False
4 99 11 False
4 98 11 False
4 97 11 True
1 100 12 False
4 99 12 False
4 98 12 False
4 97 12 True
1 100 13 False
4 99 13 False
4 98 13 False
4 97 13 True
1 100 14 False
4 99 14 False
4 98 14 False
4 97 14 True
1 100 15 False
4 99 15 False
4 98 15 False
4 97 15 True
1 100 16 False
4 99 16 False
4 98 16 False
4 97 16 True
1 100 17 False
4 99 17 False
4 98 17 False
4 97 17 True
1 100 18 False
4 99 18 False
4 9

In [21]:
# RANDOM SEED
from IPython.display import SVG, display
from pogema.animation import AnimationMonitor, AnimationConfig

env = AnimationMonitor(env)

def evaluate_success_rate(model, env, num_episodes=100):
    success_count = 0
    step_array = []
    for i in range(num_episodes):
        obs = env.reset()

        # Check if observation is a tuple and extract the first element if true.
        if isinstance(obs, tuple):
            obs = obs[0]
        max_step = 100
        steps_taken = 0
        done = truncated = False
        while not done and max_step > 0:
            action, _ = model.predict(obs)
            next_obs, reward, done, truncated, info = env.step(action)
            print(action,max_step,success_count,done)
            max_step -= 1
            steps_taken += 1
            # Check if next_obs is a tuple and extract the first element if true.
            if isinstance(next_obs, tuple):
                next_obs = next_obs[0]
            obs = next_obs

            # Check if agent was successful in that episode.
            if done:
                success_count += 1
                step_array.append(steps_taken)
                env.save_animation(f"render{i}.svg", AnimationConfig(egocentric_idx=0))
                break

    success_rate = success_count / num_episodes
    return success_rate, step_array

success_rate,step_array = evaluate_success_rate(a2c_model, env)
print(f"Agent Success Rate: {success_rate * 100:.2f}%")
print(f"steps to termination : {step_array}")

4 100 0 False
4 99 0 False
4 98 0 False
4 97 0 False
4 96 0 False
4 95 0 False
4 94 0 False
4 93 0 False
4 92 0 False
4 91 0 False
4 90 0 False
4 89 0 False
4 88 0 False
4 87 0 False
4 86 0 False
4 85 0 False
4 84 0 False
4 83 0 False
4 82 0 False
4 81 0 False
4 80 0 False
4 79 0 False
4 78 0 False
4 77 0 False
4 76 0 False
4 75 0 False
4 74 0 False
4 73 0 False
4 72 0 False
4 71 0 False
4 70 0 False
4 69 0 False
4 68 0 False
4 67 0 False
4 66 0 False
4 65 0 False
4 64 0 False
4 63 0 False
4 62 0 False
4 61 0 False
4 60 0 False
4 59 0 False
4 58 0 False
4 57 0 False
4 56 0 False
4 55 0 False
4 54 0 False
4 53 0 False
4 52 0 False
4 51 0 False
4 50 0 False
4 49 0 False
4 48 0 False
4 47 0 False
4 46 0 False
4 45 0 False
4 44 0 False
4 43 0 False
4 42 0 False
4 41 0 False
4 40 0 False
4 39 0 False
4 38 0 False
4 37 0 False
4 36 0 False
4 35 0 False
4 34 0 False
4 33 0 False
4 32 0 False
4 31 0 False
4 30 0 False
4 29 0 False
4 28 0 False
4 27 0 False
4 26 0 False
4 25 0 False
4 24 0 Fals