In [2]:
import gymnasium as gym
import vizdoom
import numpy as np
import cv2
import stable_baselines3 as sb3
import matplotlib.pyplot as plt
import os
import time

import sb3_contrib

import optuna
import torch

  from .autonotebook import tqdm as notebook_tqdm


SETTINGS

In [3]:
DEFAULT_SCENARIO_PATH = r"vizdoom\scenarios\deadly_corridor_custom_skill1.cfg"

# Height and width of the resized image
# IMAGE_SHAPE = (60, 80)

# Training parameters
TRAINING_TIMESTEPS = int(20e3)
N_STEPS = 128
N_ENVS = 1
FRAME_SKIP = 4

CHECKPOINT_DIR = './train/train_corridor_sens'
LOG_DIR = './logs/log_corridor_sens'

In [4]:
class VizDoomEnv(gym.Env):
    def __init__(self, config_path, render=False):
        super().__init__()

        # Setup the Doom game environment
        self.game = vizdoom.DoomGame()
        self.game.load_config(config_path)
        self.num_actions = 7  # Number of actions in the game (depends on scenario)

        if render:
            self.game.set_window_visible(True)
        else:
            self.game.set_window_visible(False)
        
        self.game.init()

        # Create action space and observation space
        self.action_space = gym.spaces.Discrete(self.num_actions)
        self.observation_space = gym.spaces.Box(
            low=0,
            high=255,
            shape=(100, 160, 1),
            dtype=np.uint8
        )

        # Initialize previous values for shaping reward 
        self.selected_weapon_ammo_available_per_episode = self.game.get_state().game_variables[3]
        self.previous_selected_weapon_ammo = -1

        self.previous_killcount = 0
        self.previous_hits_taken = 0

        # Initialize reward shaping weights
        self.game_reward_weight = 0.01
        self.delta_selected_weapon_ammo_weight = 0.25
        self.delta_hits_taken_weight = -0.5
        self.delta_killcount_weight = 5

    def step(self, action):

        actions = np.eye(self.num_actions, dtype=np.uint8)
        game_reward = self.game.make_action(actions[action], FRAME_SKIP)  # FRAME_SKIP ticks per action
        terminated = self.game.is_episode_finished()

        state = self.game.get_state()
        if state is not None:
            observation = self.simplify_observation(state.screen_buffer)
            health, killcount, hits_taken, selected_weapon_ammo = state.game_variables

            # Calculate deltas for shaping reward
            if self.previous_selected_weapon_ammo == -1:
                self.previous_selected_weapon_ammo = selected_weapon_ammo
            delta_selected_weapon_ammo = selected_weapon_ammo - self.previous_selected_weapon_ammo
            delta_killcount = killcount - self.previous_killcount
            delta_hits_taken = hits_taken - self.previous_hits_taken

            # Update previous values
            self.previous_selected_weapon_ammo = selected_weapon_ammo
            self.previous_killcount = killcount
            self.previous_hits_taken = hits_taken

            # Calculate the shaped reward
            shaped_reward = (
                self.game_reward_weight * game_reward
                + self.delta_selected_weapon_ammo_weight * delta_selected_weapon_ammo
                + self.delta_killcount_weight * delta_killcount
            )

            # print(shaped_reward)

            # print(f"Shaped_reward: {shaped_reward}, Killcount: {0}, Hits Taken: {delta_hits_taken}, Selected Weapon Ammo: {delta_selected_weapon_ammo}")

            info = {"game_reward": self.game_reward_weight * game_reward, "ammo_reward": self.delta_selected_weapon_ammo_weight * delta_selected_weapon_ammo,
                    "kills_reward": self.delta_killcount_weight * delta_killcount}
        else:
            observation = np.zeros(self.observation_space.shape, dtype=np.uint8)
            shaped_reward = 0
            info = {"game_reward": 0, "ammo_reward": 0,
                    "kills_reward": 0}

        return observation, shaped_reward, terminated, False, info
    
    def reset(self, seed=None):
        self.game.new_episode()
        state = self.game.get_state()
        observation = self.simplify_observation(state.screen_buffer)
        # health, killcount, hits_taken, selected_weapon_ammo = state.game_variables
        info = {"info": 0}

        self.previous_selected_weapon_ammo = -1
        self.previous_killcount = 0
        self.previous_hits_taken = 0

        return observation, info
    
    def close(self):
        self.game.close()

    def simplify_observation(self, observation):
        # # Convert the observation to grayscale and resize it
        gray_observation = cv2.cvtColor(np.moveaxis(observation,0,-1), cv2.COLOR_BGR2GRAY)
        cropped_obervation = gray_observation[:100, :]
        resized_observation = cv2.resize(cropped_obervation, None, fx=1, fy=1, interpolation=cv2.INTER_AREA)
        simplified_observation = np.expand_dims(resized_observation, axis=-1)  # Add channel dimension
        
        # COLOR
        # cropped_obervation = np.moveaxis(observation,0,-1)[:100, :, :]
        # simplified_observation = cropped_obervation

        return simplified_observation



In [5]:
%matplotlib qt
# %matplotlib inline
env = VizDoomEnv(DEFAULT_SCENARIO_PATH, render=True)
observation = env.reset()[0]
plt.imshow(cv2.cvtColor(observation, cv2.COLOR_BGR2RGB))
env.close()

In [6]:
plt.close()

In [7]:
from stable_baselines3.common import env_checker
# Check validity of the environment
env = VizDoomEnv(DEFAULT_SCENARIO_PATH)
sb3.common.env_checker.check_env(env)
env.close()

Callback (saving)

In [50]:
log_name = 'RPPO_lr1e-5'

In [51]:
class TrainAndLoggingCallback(sb3.common.callbacks.BaseCallback):

    def __init__(self, check_freq, save_path, log_name="unkown", verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path
        self.log_name = log_name

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, '{}_best_model_{}'.format(self.log_name, self.n_calls))
            self.model.save(model_path)

        return True
    
callback = TrainAndLoggingCallback(check_freq=10000, save_path=CHECKPOINT_DIR, log_name=log_name)

Train Model

In [8]:
# Non rendered environment
envs = sb3.common.env_util.make_vec_env(VizDoomEnv, n_envs=N_ENVS, env_kwargs = {'config_path': DEFAULT_SCENARIO_PATH})

In [52]:
# FOR SENSITIVITY
envs = sb3.common.env_util.make_vec_env(VizDoomEnv, n_envs=N_ENVS, env_kwargs = {'config_path': r"vizdoom\scenarios\deadly_corridor_custom_skill1.cfg"})
agent = sb3_contrib.RecurrentPPO('CnnLstmPolicy', envs, verbose=1, tensorboard_log=LOG_DIR, learning_rate=1e-5)
torch.set_num_threads(1)
agent.learn(total_timesteps=TRAINING_TIMESTEPS, callback=callback, tb_log_name=log_name)
agent.save(os.path.join(CHECKPOINT_DIR, log_name))


Using cuda device
Wrapping the env in a VecTransposeImage.
Logging to ./logs/log_corridor_sens\RPPO_lr1e-5_1
----------------------------
| time/              |     |
|    fps             | 55  |
|    iterations      | 1   |
|    time_elapsed    | 2   |
|    total_timesteps | 128 |
----------------------------
-------------------------------------------
| time/                   |               |
|    fps                  | 27            |
|    iterations           | 2             |
|    time_elapsed         | 9             |
|    total_timesteps      | 256           |
| train/                  |               |
|    approx_kl            | 4.6240166e-07 |
|    clip_fraction        | 0             |
|    clip_range           | 0.2           |
|    entropy_loss         | -1.95         |
|    explained_variance   | 0.000338      |
|    learning_rate        | 1e-05         |
|    loss                 | 1.21          |
|    n_updates            | 10            |
|    policy_gradient_loss | 

In [None]:
# New fance pancy agent?
lr_fun = lambda f: f * 2.5e-4
lr_fun = lambda f: 2.5e-4 * np.exp(3*(f - 1))
cr_fun = lambda f: 0.1    * np.exp(0.1*(f - 1))
# Batch size seems important
agent = sb3_contrib.RecurrentPPO('CnnLstmPolicy', envs, verbose=1, tensorboard_log=LOG_DIR, n_steps=1024, batch_size=12, gae_lambda=0.95, gamma=0.99, n_epochs=10, ent_coef=0.01, learning_rate=lr_fun, clip_range=cr_fun)

Using cuda device
Wrapping the env in a VecTransposeImage.


In [25]:
# Reload model from disc
agent = sb3_contrib.RecurrentPPO.load(r'.\train\train_corridor2\RPPO_SR1_ns1024_bs12_V3skill5_V2_2.zip', env=envs)

Wrapping the env in a VecTransposeImage.


In [None]:
# Model initialization
# model = PPO("CnnPolicy", env, verbose=1, tensorboard_log=LOG_DIR, learning_rate=1e-5, n_steps=2048)
# model = PPO('CnnPolicy', env, verbose=1, tensorboard_log=LOG_DIR, learning_rate=1e-4, n_steps=2048*4)
# agent = sb3.PPO("CnnPolicy", envs, verbose=1, tensorboard_log=LOG_DIR, learning_rate=1e-3, n_steps=N_STEPS)
# model.learning_rate


In [None]:
envs = sb3.common.env_util.make_vec_env(VizDoomEnv, n_envs=N_ENVS, env_kwargs = {'config_path': r"vizdoom\scenarios\deadly_corridor_custom_skill5.cfg"})
agent.set_env(envs)
agent.learning_rate = lambda f: 2.5e-4 * np.exp(3*(f - 1))

Wrapping the env in a VecTransposeImage.


In [20]:
torch.set_num_threads(1)
agent.learn(total_timesteps=TRAINING_TIMESTEPS, callback=callback, tb_log_name=log_name)
agent.save(os.path.join(CHECKPOINT_DIR, log_name))

Logging to ./logs/log_corridor2\RPPO_SR1_ns1024_bs12_V3_10


---------------------------------
| rollout/           |          |
|    ep_len_mean     | 185      |
|    ep_rew_mean     | 17.2     |
| time/              |          |
|    fps             | 89       |
|    iterations      | 1        |
|    time_elapsed    | 11       |
|    total_timesteps | 1024     |
---------------------------------
----------------------------------------
| rollout/                |            |
|    ep_len_mean          | 198        |
|    ep_rew_mean          | 24.5       |
| time/                   |            |
|    fps                  | 42         |
|    iterations           | 2          |
|    time_elapsed         | 47         |
|    total_timesteps      | 2048       |
| train/                  |            |
|    approx_kl            | 0.07193583 |
|    clip_fraction        | 0.739      |
|    clip_range           | 0.0999     |
|    entropy_loss         | -1.61      |
|    explained_variance   | 0.768      |
|    learning_rate        | 0.000242   |
|   

In [None]:
agent.save(os.path.join(CHECKPOINT_DIR, log_name + 'skill5_V2_2abc'))

In [18]:
envs.close()

In [None]:
DEFAULT_HYPERPARAMS = {
    "policy": "CnnLstmPolicy",
}

Test model

In [18]:
# Reload model from disc
agent = sb3.PPO.load(r'.\train\train_corridor2\RPPO_SR1_ns1024_bs12_V3skill1.zip')

In [13]:
env = VizDoomEnv(DEFAULT_SCENARIO_PATH, render=True)

In [17]:
mean_reward, std_reward = sb3.common.evaluation.evaluate_policy(agent, env, n_eval_episodes=10, deterministic=True)
env.close()
mean_reward



np.float64(32.49422091019805)

In [14]:
test_env = VizDoomEnv(r"vizdoom\scenarios\deadly_corridor_custom_skill5.cfg", render=True)
for episode in range(2): 
    obs = test_env.reset()[0]
    done = False
    state = None
    episode_starts = True

    total_reward = 0
    while not done: 
        action, state = agent.predict(obs, state=state, episode_start=episode_starts)
        obs, reward, done, _, info = test_env.step(action)
        episode_starts = done
        time.sleep(0.1)
        total_reward += reward
    print('Total Reward for episode {} is {}'.format(episode, total_reward))
    time.sleep(2)

test_env.close()

Total Reward for episode 0 is 16.024110107421873
Total Reward for episode 1 is 25.815352020263678


In [165]:
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Polygon
import os


def run_eval_and_plots(agent, config_path, map_vertices, output_name, episodes=10, render=False):
    """
    Run VizDoom episodes with the given agent and plot:
    1. Position heatmap with map overlay
    2. Reward components per episode
    3. Action usage frequency histogram
    
    Saves all figures as PDFs in output/final_run folder with the given output_name prefix.
    """
    # Create output directory if it doesn't exist
    os.makedirs("output/final_run", exist_ok=True)

    env = VizDoomEnv(config_path, render=render)
    all_positions = []
    reward_components = {"total_reward": [], "game_reward": [], "ammo_reward": [], "kills_reward": []}
    action_counts = []

    for ep in range(episodes):
        obs, _ = env.reset()
        done = False
        state = None
        episode_start = True

        ep_rewards = {"total_reward": 0, "game_reward": 0, "ammo_reward": 0, "kills_reward": 0}
        ep_actions = np.zeros(env.num_actions, dtype=int)

        while not done:
            action, state = agent.predict(obs, state=state, episode_start=episode_start)
            obs, reward, done, _, info = env.step(action)
            episode_start = done
            ep_actions[action] += 1

            # Record agent position
            if env.game.get_state():
                pos_x = env.game.get_game_variable(vizdoom.GameVariable.POSITION_X)
                pos_y = env.game.get_game_variable(vizdoom.GameVariable.POSITION_Y)
                all_positions.append((pos_x, pos_y))

                # Extract reward components from game_variables
                game_reward, ammo_reward, kills_reward = info["game_reward"], info["ammo_reward"], info["kills_reward"]
                ep_rewards["total_reward"] += reward  # shaped reward
                ep_rewards["game_reward"] += game_reward
                ep_rewards["ammo_reward"] += ammo_reward
                ep_rewards["kills_reward"] += kills_reward

        # store per-episode totals
        for k in reward_components:
            reward_components[k].append(ep_rewards[k])
        action_counts.append(ep_actions)
        

    env.close()

    # swap x,y (since Doom's X=forward, Y=sideways) -> we want Y vertical
    swapped_positions = np.array([(y, x) for (x, y) in np.array(all_positions)])
    swapped_vertices  = np.array([(y, x) for (x, y) in np.array(map_vertices)])
    # swapped_vertices=np.array(map_vertices)

    # compute bounds with 10% margin
    min_x, min_y = swapped_vertices.min(axis=0)
    max_x, max_y = swapped_vertices.max(axis=0)
    dx, dy = max_x - min_x, max_y - min_y
    margin_x, margin_y = 0.1 * dx, 0.1 * dy

    # ---- Plot 1: Position heatmap ----
    plt.figure(figsize=(8,6))
    kde = sns.kdeplot(
        x=swapped_positions[:,0], 
        y=swapped_positions[:,1], 
        cmap="magma", fill=True, thresh=0.05, alpha=0.7, cut=1, cbar=True
    )

    # add plots for walls
    plt.plot(swapped_vertices[:,0], swapped_vertices[:,1], color="black", linewidth=1.5, zorder=2)
    plt.scatter(swapped_vertices[:,0], swapped_vertices[:,1], color="red", s=10, zorder=3, label="Map vertices")

    # Add colorbar label
    cbar = kde.collections[0].colorbar
    cbar.set_label('Relative position density', rotation=270, labelpad=15)

    # axis limits
    plt.xlim(min_x - margin_x, max_x + margin_x)
    plt.ylim(min_y - margin_y, max_y + margin_y)

    # plt.gca().set_aspect("equal", adjustable="box")
    plt.title("Agent Position Heatmap")
    plt.xlabel("X")
    plt.ylabel("Y")
    plt.savefig(f"output/final_run/{output_name}_position_heatmap.pdf", bbox_inches='tight')
    plt.show()

    # ---- Plot 2: Total reward components across all episodes ----
    plt.figure(figsize=(10, 6))

    # Calculate statistics
    avg_rewards = {k: np.mean(v) for k, v in reward_components.items()}
    std_rewards = {k: np.std(v) for k, v in reward_components.items()}

    # Create the bar plot with error bars
    bars = plt.bar(avg_rewards.keys(), avg_rewards.values(), 
                yerr=list(std_rewards.values()),
                color='skyblue',
                capsize=5)  # Adds caps to error bars

    # Add value labels on top (positive) or bottom (negative) of each bar
    for bar, key in zip(bars, avg_rewards.keys()):
        height = bar.get_height()
        sigma  = std_rewards[key]

        if height >= 0:
            # Positive bar → label above
            y_pos = height + sigma + 0.05*max(avg_rewards.values())
            va = 'bottom'
        else:
            # Negative bar → label below
            y_pos = height - sigma - 0.05*abs(min(avg_rewards.values()))
            va = 'top'

        plt.text(
            bar.get_x() + bar.get_width()/2., 
            y_pos,
            f'{height:.1f} ± {sigma:.1f}',
            ha='center', va=va
        )

    # Dynamically extend ylim to fit both positive and negative labels
    max_height = max(h + std_rewards[k] for h, k in zip(avg_rewards.values(), avg_rewards.keys()) if h >= 0)
    min_height = min(h - std_rewards[k] for h, k in zip(avg_rewards.values(), avg_rewards.keys()) if h < 0)

    plt.ylim(bottom=min_height * 1.15, top=max_height * 1.15)


    plt.title("Average Reward Components Across All Episodes\n(mean ± std)", pad=20)
    plt.ylabel("Average Value", labelpad=10)
    plt.xticks(rotation=45, ha='right')

    if any(v < 0 for v in avg_rewards.values()):
        plt.gca().spines['bottom'].set_position('zero')

    plt.grid(axis='y', alpha=0.3, linestyle='--')
    plt.tight_layout()
    plt.savefig(f"output/final_run/{output_name}_reward_components.pdf", bbox_inches='tight')
    plt.show()

    
    # ---- Plot 3: Action frequency histogram ----
    action_counts = np.array(action_counts)
    # Mean and std per action across episodes
    mean_actions = action_counts.mean(axis=0)
    std_actions  = action_counts.std(axis=0)

    # Define action names in the same order as the action indices
    action_names = [
        "MOVE_LEFT",
        "MOVE_RIGHT",  # Note: You had a typo in your list (MOVE_RIGHT vs MOVE_RIGHT)
        "ATTACK",      # Note: You had a typo in your list (ATTACK vs ATTACK)
        "MOVE_FORWARD",
        "MOVE_BACKWARD",
        "TURN_LEFT",
        "TURN_RIGHT"
    ]

    plt.figure(figsize=(10, 6))

    bars = plt.bar(action_names, mean_actions, 
                yerr=std_actions, 
                color="lightgreen", 
                capsize=5)

    # Add value labels: mean ± std
    for bar, mu, sigma in zip(bars, mean_actions, std_actions):
        height = bar.get_height()
        plt.text(
        bar.get_x() + bar.get_width()/2., 
        height + sigma + 0.05*max(mean_actions),  # offset above error bar
        f'{mu:.1f} ± {sigma:.1f}',
        ha='center', va='bottom'
    )

    plt.title("Average Action Usage Across All Episodes\n(mean ± std)", pad=20)
    plt.xlabel("Action")
    plt.ylabel("Average Count per Episode")
    plt.xticks(rotation=45, ha='right')
    plt.grid(axis='y', alpha=0.3, linestyle='--')

    # After plotting bars + error bars
    max_height = max(h + s for h, s in zip(mean_actions, std_actions))
    plt.ylim(top=max_height * 1.15)  # 15% headroom


    plt.tight_layout()
    plt.savefig(f"output/final_run/{output_name}_action_frequency.pdf", bbox_inches='tight')
    plt.show()

In [203]:
plt.close()

In [205]:
map_vertices = [
    (-32.0, -32.0),
    (-32.0, 32.0),
    (1344.0, -32.0),
    (1184.0, -32.0),
    (1344.0, 32.0),
    (1088.0, -96.0),
    (96.0, 96.0),
    (448.0, 32.0),
    (544.0, 96.0),
    (352.0, 32.0),
    (864.0, -32.0),
    (448.0, -32.0),
    (352.0, -32.0),
    (768.0, -32.0),
    (224.0, 96.0),
    (672.0, 96.0),
    (1184.0, 32.0),
    (768.0, 32.0),
    (224.0, -96.0),
    (96.0, -96.0),
    (544.0, -96.0),
    (960.0, -96.0),
    (672.0, -96.0),
    (864.0, 32.0),
    (960.0, 96.0),
    (1088.0, 96.0),
]

positive_y = sorted([v for v in map_vertices if v[1] > 0], key=lambda v: v[0])
negative_y = sorted([v for v in map_vertices if v[1] <= 0], key=lambda v: v[0], reverse=True)

sorted_vertices = positive_y + negative_y

# Reload model from disc
agent = sb3_contrib.RecurrentPPO.load(r'.\train\train_corridor2\RPPO_SR1_ns1024_bs12_V3_best_model_750000.zip')
# agent = sb3_contrib.RecurrentPPO('CnnLstmPolicy', env)
run_eval_and_plots(agent, "vizdoom\scenarios\deadly_corridor_custom_skill5.cfg", sorted_vertices, output_name='placeholder', episodes=10, render=False)



In [16]:
test_env.close()