This file is for all intents and purposes a copy of the `basic_doom` scenario file, but with minor modifications to account for loading a different scenario config and different parameters such as availability of different and more game variables. We will also need to tune parameters, the `env` class, and a different reward structure. <br>

We will be tackling the Deadly Corridor scenario in this file. We want the doom skill set to max difficulty so the agent learns to defend itself and we will go over reward shaping to incentivize it properly. The goal is to reach the armour at the end of the corridor and kill all 6 demons along the way. We have additional buttons available to us in this case as well. This is the hardest task for the agent to learn. Added additional game variables to the config in order to shape the reward, being `DAMAGE_TAKEN`, `HITCOUNT` and `SELECTED_WEAPON_AMMO` _without_ any comma separation. We will also have 5 copies of the config file with only the difficulty value changing so the agent can progressively get better at learning what to do and increase in difficulty. This will be done through a process called curriculum learning.

NOTE: In my case I ended up with a pretty bad model, but I didn't vary any hyperparameters between checkpoints. Keep that in mind when trying it out for yourself.

In [None]:
# Imports 
from vizdoom import *  # Game env
import random  # Random action sampling
import time  # For sleeping+
import numpy as np  # Identity matrix and more

# We're going to need to define some shapes to get our frame data working with gym
# The way we do this is through Spaces
# The types we're going to use are Box and Discrete
# Box is an array of any shape
# Discrete is a set of discrete binary actions
from gym import Env  # OpenAI gym base class
from gym.spaces import Box, Discrete  # Spaces for gym
import cv2  # OpenCV for image processing
import matplotlib.pyplot as plt

%matplotlib inline

# Importing dependencies for training
import os  # For file nav
# Callback class for RL
from stable_baselines3.common.callbacks import BaseCallback
# We can use this to check our environment's format
from stable_baselines3.common import env_checker
# PPO for training
from stable_baselines3 import PPO
# Eval policy for testing models
from stable_baselines3.common.evaluation import evaluate_policy
# Monitor wrapper
from stable_baselines3.common.monitor import Monitor

In [None]:
# Create random actions
actions = np.identity(7, dtype=np.uint8)
actions

# Index 0 represents move left
# Index 1 represents move right
# Index 2 represents shoot

In [None]:
# Example of a Discrete space
Discrete(7).sample()  # Returns 0, 1 or 2
# We can use these for our action indices

In [None]:
# For example:
actions[Discrete(7).sample()]

In [None]:
# Example of a Box space
Box(low=0, high=10, shape=(10, 10), dtype=np.uint8).sample()  # Returns a random array
# We will use this for our frame data
# In this case the shape would be passed as (640, 480) to fit the screen

In [None]:
# Create ViZDoom OpenAI Gym Environment
class VizDoomGym(Env):  # inherit from the Env base class
    # Initialization method
    def __init__(self, render=False, config="git_doom/ViZDoom/scenarios/deadly_corridor_1.cfg"):  # Render false lets us train faster
        # Inherit from env
        super().__init__()
        # Game setup
        self.game = DoomGame()
        # Config now loaded as param for curriculum learning
        self.game.load_config(config)
        if render == False:
            self.game.set_window_visible(False)
        else:
            self.game.set_window_visible(True)
        self.game.init()  # Start the game after setting the params

        # Define the action and observation space
        self.observation_space = Box(low=0, high=255, 
            shape=(100, 160, 1), dtype=np.uint8)  # Resized shapes
        self.action_space = Discrete(7)  # Straightforward 

        # Game variables: HEALTH DAMAGE_TAKEN HITCOUNT SELECTED_WEAPON_AMMO
        self.damage_taken = 0
        self.hitcount = 0
        self.selected_weapon_ammo = 52
        # We want the CHANGE in these values

    # Perform an action/take a step in the environment
    def step(self, action):
        # Specify action and take step
        actions = np.identity(7, dtype=np.uint8)
        movement_reward = self.game.make_action(actions[action], 4)  # 4 frames skipped
        reward = 0

        # Get the other stuff we need to return, with error safety
        if self.game.get_state():
            state = self.game.get_state().screen_buffer
            state = self.grayscale(state)

            # Reward shaping
            game_variables = self.game.get_state().game_variables
            health, damage_taken, hitcount, ammo = game_variables

            # Calculate delta (change)
            damage_taken_delta = -damage_taken + self.damage_taken
            self.damage_taken = damage_taken
            hitcount_delta = hitcount - self.hitcount
            self.hitcount = hitcount
            ammo_delta = ammo - self.selected_weapon_ammo
            self.selected_weapon_ammo = ammo

            # Reward weights
            reward = movement_reward + damage_taken_delta * 10 + hitcount_delta * 200 + ammo_delta * 5

            info = ammo
        else:
            state = np.zeros(self.observation_space.shape)
            info = 0  # Dummy descriptor
        
        info = {"info": info}
        done = self.game.is_episode_finished()

        return state, reward, done, info
    
    # Gray Scale and resize the game frame with cv2
    def grayscale(self, observation):  # Effectively passing our game frame
        # The image is the same we're just moving the colour channels to the last index
        # That's the format np expects
        gray = cv2.cvtColor(np.moveaxis(observation, 0, -1), cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (160, 100), interpolation=cv2.INTER_CUBIC)  # Make smaller
        state = np.reshape(resize, (100, 160, 1))  # Reshape to fit our space
        return state

    # Start a new game
    def reset(self):
        self.game.new_episode()
        state = self.game.get_state().screen_buffer
        return self.grayscale(state) 

    # Close the environment/game
    def close(self):
        self.game.close()

    # Pre-Defined in VizDoom, staying pass for now
    # Usually where we define how to render the game or environment
    def render(self):
        pass

In [None]:
# Create environment
env = VizDoomGym(render=True)

# Testing
state = env.reset()
print(state.shape)

print(f'Observation space shape: {env.observation_space.sample().shape}')
print(f'Action space: {env.action_space.sample()}')

print(env.step(2))

plt.imshow(cv2.cvtColor(state, cv2.COLOR_BGR2RGB))

# Check if the environment is set up correctly with SB3's checker
env_checker.check_env(env)  # Will throw error if not
print("Environment is in correct format")

env.close()

In [None]:
# Standard Training and Logging Callback, refer to SB3 docs
class TrainAndLoggingCallback(BaseCallback):
    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path
    
    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, "best_model_{}".format(self.n_calls))
            self.model.save(model_path)

In [None]:
# Directory paths
CHECKPOINT_DIR_BASIC = './train/train_deadly'
LOG_DIR_BASIC = './logs/log_deadly'

In [None]:
callback = TrainAndLoggingCallback(check_freq=10000, 
    save_path=CHECKPOINT_DIR_BASIC)
# This saves the pytorch model's weights every 10000 steps

In [None]:
# The algorithm we're going to use to train our model is PPO
# The PPO implementation can be found in Stable Baselines 3
# Create a non rendered environment for training
train_env = VizDoomGym(config="git_doom/ViZDoom/scenarios/deadly_corridor_1.cfg")
print("Creating training environment...")

# Create an instance of PPO
model = PPO(
    'CnnPolicy',  # Since we're using images
    train_env,  # Our environment for training
    tensorboard_log=LOG_DIR_BASIC,  # Where to save our logs
    verbose=1,  # Verbosity level
    learning_rate=0.00001,  # Learning rate
    n_steps=8192,  # Number of steps to train
    clip_range=0.1,  # Clip range 10% - clips the gradient to prevent significant change
    gamma=0.95,  # Discount factor - 95%
    gae_lambda=0.9  # Smoothing parameter to calculate the advantage, 90%
)

In [None]:
# Training loop
model.learn(
    total_timesteps=400000,  # Number of steps to train
    callback=callback,  # Callback to save the model
)
train_env.close()  # Cleanup

In [None]:
# Curriculum learning step 2
train_env_2 = VizDoomGym(config="git_doom/ViZDoom/scenarios/deadly_corridor_2.cfg")
model.set_env(train_env_2)
model.learn(
    total_timesteps=100000,  # Number of steps to train
    callback=callback,  # Callback to save the model
)
train_env_2.close()  # Cleanup

In [None]:
# Curriculum learning step 3
train_env_3 = VizDoomGym(config="git_doom/ViZDoom/scenarios/deadly_corridor_3.cfg")
model.set_env(train_env_3)
model.learn(
    total_timesteps=100000,  # Number of steps to train
    callback=callback,  # Callback to save the model
)
train_env_3.close()  # Cleanup

In [None]:
# Curriculum learning step 4
train_env_4 = VizDoomGym(config="git_doom/ViZDoom/scenarios/deadly_corridor_4.cfg")
model.set_env(train_env_4)
model.learn(
    total_timesteps=100000,  # Number of steps to train
    callback=callback,  # Callback to save the model
)
train_env_4.close()  # Cleanup

In [None]:
# Curriculum learning step 5
train_env_5 = VizDoomGym(config="git_doom/ViZDoom/scenarios/deadly_corridor_5.cfg")
model.set_env(train_env_5)
model.learn(
    total_timesteps=100000,  # Number of steps to train
    callback=callback,  # Callback to save the model
)
train_env_5.close()  # Cleanup

In [None]:
# Now we test the model, fully trained

# Load the model's most recent and best version
model = PPO.load("./train/train_deadly/best_model_610000")  # Can also use model.load

# Test and evaluate model stats

# Creat a test environment
test_env = VizDoomGym(render=True, config="git_doom/ViZDoom/scenarios/deadly_corridor_3.cfg")
# Wrap with SB3 Monitor class for error safety
test_env = Monitor(test_env, allow_early_resets=False)  # I don't want logs for this, you may set it to a log dir
# Evaluate mean reward for 100 games
print("Ripping and Tearing...")
mean_reward, _ = evaluate_policy(model, test_env, n_eval_episodes=10)
print("It is done")
print(f"Mean reward: {mean_reward}")
test_env.close()

In [None]:
# Use a loop to slow things down for visual clarity without evaluate_policy
test_env = VizDoomGym(render=True, config="git_doom/ViZDoom/scenarios/deadly_corridor_3.cfg")
test_env = Monitor(test_env, allow_early_resets=False)

for episode in range(5):  # Play 5 games
    obs = test_env.reset()  # Store the current frame in observation
    done = False
    total_reward = 0
    while not done:
        action, _ = model.predict(obs)  # Get the action based on the observation
        obs, reward, done, info = test_env.step(action)  # Get info from step
        time.sleep(0.06)  # Have a small pause
        total_reward += reward  # Update the reward
    print(f"Episode {episode} reward: {total_reward}")
    time.sleep(2)  # Between each episode

test_env.close()