## 1. Getting VizDoom up and running

In [1]:
# import VizDoom for game env
from vizdoom import *
# Import random for action sampling
import random
# Import time for sleeping
import time
# import numpy for identity matrix
import numpy as np
# Import os to deal with filepaths
import os

from matplotlib import pyplot as plt

## 2. Converting it to a Gym Environment

In [2]:
# Import environment base class from OpenAI Gym
from gymnasium import Env
# Import gym spaces
from gymnasium.spaces import Discrete, Box
# Import Opencv for greyscaling observations
import cv2

# Import environment checker
# Discrete(3).sample() returns a number from 0, 1, 2 -> used as index to select action
# Box(low=0, high=10, shape=(10,10)).sample() -> getting 10x10 array with low=0 and high=10
import optuna
from stable_baselines3 import PPO
from stable_baselines3.common import env_checker
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack

LEVEL = 'deadly_corridor'
DOOM_SKILL = 's1'

### Environment configuration for Reward Shaping 
#### Additional game variables needed for this level:
- DAMAGE_TAKEN (-)
- DAMAGECOUNT (+)
- SELECTED_WEAPON_AMMO (-)

In [3]:
# Create VizDoom OpenAI Gym Environment
class VizDoomGym(Env): 
    def __init__(self, render=False, config=f'VizDoom/scenarios/{LEVEL}_{DOOM_SKILL}.cfg'):
        """
        Function called when we start the env.
        """

        # Inherit from Env
        super().__init__()
        
        # Set up game
        self.game = DoomGame()
        self.game.load_config(config)
        

        # Whether we want to render the game 
        if render == False:
            self.game.set_window_visible(False)
        else:
            self.game.set_window_visible(True)

        # Start the game
        self.game.init()
        
        # Create action space and observation space
        self.observation_space = Box(low=0, high=255, shape=(100, 160, 1), dtype=np.uint8)
        self.action_space = Discrete(7)

        # Game variables: HEALTH DAMAGE_TAKEN DAMAGECOUNT SELECTED_WEAPON_AMMO 
        ## We want the change in these variable values, rather than the PiT values
        self.damage_taken = 0
        self.hitcount = 0
        self.ammo = 52

    
    def step(self, action):
        """
        How we take a step in the environment.
        """

        # Specify action and take step
        actions = np.identity(7, dtype=np.uint8)
        # Movement rewards encapsulates predefined reward in the environment config
        movement_reward = self.game.make_action(actions[action], 4) # get action using index -> left, right, shoot

        reward = 0
        # Get all the other stuff we need to return 
        if self.game.get_state():  # if nothing is
            state = self.game.get_state().screen_buffer
            state = self.grayscale(state)  # Apply Grayscale
            # ammo = self.game.get_state().game_variables[0] 

            # Reward shaping
            game_variables = self.game.get_state().game_variables # get current PiT game variables
            health, damage_taken, hitcount, ammo = game_variables # unpack

            # calculate change in damage_taken, hitcount, ammo
            damage_taken_delta = -damage_taken + self.damage_taken # disincentivizng us to take damage
            self.damage_taken = damage_taken
            hitcount_delta = hitcount - self.hitcount # increments by +1: incentivizing more hitcounts (1 hitcount = 1 reward)
            self.hitcount = hitcount
            ammo_delta = ammo - self.ammo # increments by -1: disincentiving us to take shots that miss
                                          # hitcount and ammo will cancel each other out
            self.ammo = ammo

            # Pack everything into reward function (tuned weights)
            reward = movement_reward + damage_taken_delta*10 + hitcount_delta*200 + ammo_delta*5
            
            info = ammo
        # If we dont have anything turned from game.get_state
        else:
            # Return a numpy zero array
            state = np.zeros(self.observation_space.shape)
            # Return info (game variables) as zero
            info = 0

        info = {"info":info}
        done = self.game.is_episode_finished()
        truncated = False  # Assuming it's not truncated, modify if applicable
        
        return state, reward, done, truncated, info

    
    def render(self):
        """
        Define how to render the game environment.
        """
        pass

    
    def reset(self, seed=None):
        """
        Function for defining what happens when we start a new game.
        """
        if seed is not None:
            self.game.set_seed(seed)
            
        self.game.new_episode()
        state = self.game.get_state().screen_buffer  # Apply Grayscale

        return self.grayscale(state), {}

    
    def grayscale(self, observation):
        """
        Function to grayscale the game frame and resize it.
        observation: gameframe
        """
        # Change colour channels 
        gray = cv2.cvtColor(np.moveaxis(observation, 0, -1), cv2.COLOR_BGR2GRAY)

        # Reduce image pixel size for faster training
        resize = cv2.resize(gray, (160,100), interpolation=cv2.INTER_CUBIC)
        state = np.reshape(resize,(100, 160,1))
        return state

    def close(self):
        """
        Call to close down the game.
        """
        self.game.close()


In [None]:
env = VizDoomGym(render=True)

Environment checker

In [None]:
env_checker.check_env(env)

In [None]:
env.close()

## 4. Optuna optimisation framework for HPO

In [4]:
LOG_DIR = './logs/log_corridor_hpo'
OPT_DIR = './opt/opt_corridor_hpo'

In [5]:
# Function to return test hyperparameters - define the objective function
def optimise_ppo(trial):
    return {
        'n_steps': trial.suggest_int('n_steps', 2048, 8192),
        'gamma': trial.suggest_float('gamma', 0.8, 0.9999, log=True),
        'learning_rate': trial.suggest_float('learning_rate', 1e-5, 1e-4, log=True),
        'clip_range': trial.suggest_float('clip_range', 0.1, 0.4),
        'gae_lambda': trial.suggest_float('gae_lambda', 0.8, 0.99)
    }

In [6]:
# Run a training loop and return mean reward
def optimise_agent(trial):
    try:
        model_params = optimise_ppo(trial)
        model_params['n_steps'] = round(model_params['n_steps']/64) * 64
        
        # Create environment 
        env = VizDoomGym()
        env = Monitor(env, LOG_DIR)
        env = DummyVecEnv([lambda: env])
        env = VecFrameStack(env, 4, channels_order='last')

        # Create algo 
        model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=0, **model_params)
        # model.learn(total_timesteps=300)
        model.learn(total_timesteps=100000)

        # Evaluate model 
        mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=20)
        env.close()

        SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(trial.number))
        model.save(SAVE_PATH)

        return mean_reward
        
    except Exception as e:
        return -1000

In [None]:
# Creating the experiment
study = optuna.create_study(direction='maximize')
study.optimize(optimise_agent, n_trials=100, n_jobs=20)
# study.optimize(optimize_agent, n_trials=100, n_jobs=1)

[I 2024-04-02 15:28:26,994] A new study created in memory with name: no-name-96cebd0c-2c3a-4697-9b04-fd779123c4e4
[I 2024-04-04 03:56:56,190] Trial 4 finished with value: -15.986538599999994 and parameters: {'n_steps': 2823, 'gamma': 0.9631059078391256, 'learning_rate': 3.2252073369692905e-05, 'clip_range': 0.23934589909770057, 'gae_lambda': 0.8321502792041914}. Best is trial 4 with value: -15.986538599999994.
[I 2024-04-04 04:31:40,298] Trial 6 finished with value: -40.23874135 and parameters: {'n_steps': 3236, 'gamma': 0.8086597453736779, 'learning_rate': 1.3308147900414879e-05, 'clip_range': 0.16327174237081, 'gae_lambda': 0.909335380740592}. Best is trial 4 with value: -15.986538599999994.
[I 2024-04-04 05:07:42,333] Trial 18 finished with value: 378.8636208500001 and parameters: {'n_steps': 5049, 'gamma': 0.8206512736846115, 'learning_rate': 7.74205514669805e-05, 'clip_range': 0.20701537636628392, 'gae_lambda': 0.8206267863297668}. Best is trial 18 with value: 378.8636208500001.
[

In [None]:
study.best_params

## 5. Setup Callback
Save model at different state of training

In [None]:
# Import os for file nav
import os
# Import callback class from sb3
from stable_baselines3.common.callbacks import BaseCallback

In [None]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)
        return True
                                      

In [None]:
CHECKPOINT_DIR = './train/train_corridor'

In [None]:
# Create instance of callback
callback = TrainAndLoggingCallback(check_freq=10000, save_path=CHECKPOINT_DIR) 
# after every 10000 steps of training the model, weights are saved for the pytorch agent

## 5. Train Model using Curriculum


In [None]:
# Import PPO for training
from stable_baselines3 import PPO

In [None]:
# Non rendered environment
DOOM_SKILL = 's1'
env = VizDoomGym(config=f'VizDoom/scenarios/{LEVEL}_{DOOM_SKILL}.cfg')
env = Monitor(env, LOG_DIR)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, 4, channels_order='last')

In [None]:
model_params = study.best_params
model_params['n_steps'] = 7488
model_params

In [None]:
# n_steps: How many steps/frames the agent is going to take and store in the buffer 
# before run through training of actor and critique
# ideally not too close to end of game (300) but somewhere close
model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, **model_params) 

In [None]:
model.learn(total_timesteps=400000, callback=callback)

#### Load saved best model and apply Curriculum Learning (S2 - S5)

In [None]:
model.load('./train/train_corridor/best_model_400000.zip')

In [None]:
# Non rendered environment for S2
DOOM_SKILL = 's2'
env = VizDoomGym(config=f'VizDoom/scenarios/{LEVEL}_{DOOM_SKILL}.cfg')
model.set_env(env)
model.learn(total_timesteps=400000, callback=callback)

In [None]:
# Non rendered environment for S3
DOOM_SKILL = 's3'
env = VizDoomGym(config=f'VizDoom/scenarios/{LEVEL}_{DOOM_SKILL}.cfg')
model.set_env(env)
model.learn(total_timesteps=400000, callback=callback)

In [None]:
# Non rendered environment for S4
DOOM_SKILL = 's4'
env = VizDoomGym(config=f'VizDoom/scenarios/{LEVEL}_{DOOM_SKILL}.cfg')
model.set_env(env)
model.learn(total_timesteps=400000, callback=callback)

In [None]:
# Non rendered environment for S5
DOOM_SKILL = 's5'
env = VizDoomGym(config=f'VizDoom/scenarios/{LEVEL}_{DOOM_SKILL}.cfg')
model.set_env(env)
model.learn(total_timesteps=400000, callback=callback)

## 5. Test Model


In [None]:
# Import eval policy to test agent
from stable_baselines3.common.evaluation import evaluate_policy

In [None]:
# Reload model from disc
model = PPO.load('./train/train_corridor/best_model_250000')

In [None]:
# Create rendered envrironment
env = VizDoomGym(render=True)

In [None]:
# Evaluate mean reward for 10 games
mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=10)

In [None]:
mean_reward

In [None]:

for episode in range(5):
    total_reward = 0
    obs = env.reset()[0]
    done = False
    while not done:
        action, _ = model.predict(obs) # Use model to predict what action to take
        obs, reward, done, _, info = env.step(action) # take the predicted action
        time.sleep(0.1)
        total_reward += reward
    print('Total Reward for episode {} is {}'.format(episode,total_reward))
    time.sleep(2)

In [None]:
env.close()