#### Setting up game configurations

In [1]:
from vizdoom import *
import random # for random actions
import time
import numpy as np

# Open AI Gym dependencies
import gymnasium as gym
from gymnasium import Env
from gymnasium.spaces import Discrete, Box # for random actions and nxm for random observation space (frames)
import cv2

# Stable Baselines3 dependencies
import os
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common import env_checker
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
from stable_baselines3.common.evaluation import evaluate_policy

# Optimizer
import optuna 

Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
Users of this version of Gym should be able to simply replace 'import gym' with 'import gymnasium as gym' in the vast majority of cases.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.
  from .autonotebook import tqdm as notebook_tqdm


### Converting it to a Gym Environment

In [2]:
# Game configuration : Take Cover configuration file
config = 'github_vizdoom_repo/ViZDoom/scenarios/take_cover_lev1.cfg'

In [None]:
# Creating Vizdoom OpenAI Gym Environment
class VizDoomGym(Env):
    def __init__(self, render_mode = False, config = config): # By default, rendering is disabled
        # Inheriting from the Env class
        super().__init__()
        # Setup game
        self.game = DoomGame()
        self.game.load_config(config) 

        # Rendering mode : if unabled, the game will not be displayed but the training will be faster
        if render_mode == False:
            self.game.set_window_visible(False)
        else:
            self.game.set_window_visible(True)

        # Start the game
        self.game.init()
        
        # In order to get the game frame size, run a dummy demo and get the screen buffer shape  with game.get_state().screen_buffer.shape
        self.observation_space = Box(low=0, high=255, shape=(100, 160, 1), dtype = np.uint8)
        # Action space
        self.action_space = Discrete(2) # left, right
        self.current_step = 0

        # Strategy-based reward:
        # With only the movement reward the agent can only learn once it dies when shooted by a fireball
        # In order to make it learn while it plays, we add a small reward for dodging the fireballs
        self.damage_taken = 0
        self.hits_taken = 0


    # Defining how to make a step in the env
    def step(self, action):
        actions = np.identity(2, dtype=np.uint8) # Possible actions [left, right]
        movement_reward = self.game.make_action(actions[action], 4) # Defyining the frame skip parameter to 4

        # Check if the frames are over
        reward = 0
        if self.game.get_state():
            state = self.game.get_state().screen_buffer
            obs = self.grayscale(state)

            # Performe reward shaping
            game_variables = self.game.get_state().game_variables
            health, damage_taken, hits_taken  = game_variables

            # Reward function
            damage_taken_delta = - damage_taken + self.damage_taken
            self.damage_taken = damage_taken
            hits_taken_delta = - hits_taken + self.hits_taken
            self.hits_taken = hits_taken

            terminated = self.game.is_episode_finished()
            truncated = self.current_step >= 2100  # Max steps

            # Combine the basic reward of the game with the shaped reward 
            reward = movement_reward + damage_taken_delta*2 + hits_taken_delta*50

            info = {"health": health}
        else: # Default zeros observation
            obs = np.zeros(self.observation_space.shape, dtype=np.uint8)
            terminated = True
            truncated = False
            info = {} 

        self.current_step += 1
        return obs, reward, terminated, truncated, info # Changed parameters order according to Gymnasium API
    
    def render():
        pass
    
    # What appens when starting a new episode
    def reset(self, seed = None, options = None):
        super().reset(seed=seed)
        if seed is not None:
            random.seed(seed)
            np.random.seed(seed)

        self.game.new_episode()
        state = self.game.get_state().screen_buffer
        obs = self.grayscale(state)
        info = {"health": self.game.get_state().game_variables[0],
                "damage_take" : self.game.get_state().game_variables[1],
                "hits_talen" : self.game.get_state().game_variables[2]}
        self.current_step = 0

        return obs, info

    # Grayscale and resize the frames in order to reduce the observation space
    def grayscale(self, observation):
        gray = cv2.cvtColor(np.moveaxis(observation, 0, -1), cv2.COLOR_BGR2GRAY) # moveaxis moves the first element (0) to last position (-1)
        resize = cv2.resize(gray, (160, 100), interpolation=cv2.INTER_CUBIC)
        state = np.reshape(resize, (100,160, 1))
        return state

    def close(self):
        self.game.close()

In [4]:
# Check pass to see if the environment works
env_checker.check_env(env=VizDoomGym())

### Setting up Callbacks

In [5]:
class TrainAndLoggingCallback(BaseCallback):
    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}.zip'.format(self.n_calls))
            self.model.save(model_path)
            
        return True

In [6]:
CHECKPOINT_DIR = './train/train_TakeCover_RewardShaping'
OPT_DIR = './opt/opt_TakeCover_RewardShaping'
LOG_DIR = './logs/log_TakeCover_RewardShaping'

In [7]:
callback = TrainAndLoggingCallback(check_freq=10000, save_path = CHECKPOINT_DIR)

### Hyperparameters Tuning with Optuna

In [None]:
# Objective function to test hyperparameters
def optimize_ppo(trial):
    return {
        'n_steps': trial.suggest_int('n_steps', 2048, 8192, step = 64), # number of frames used in a single training step (must be multiple of 64)
        'gamma': trial.suggest_float('gamma', 0.8, 0.9999, log = True), # discount factor
        'learning_rate': trial.suggest_float('learning_rate', 1e-5, 1e-4, log = True),
        'clip_range': trial.suggest_float('clip_range', 0.1, 0.4),
        'gae_lambda': trial.suggest_float('gae_lambda', 0.8, 0.99)
    }

In [None]:
# Run a training loop and return the mean reward
def optimize_agent(trial):
    try:
        model_params = optimize_ppo(trial) # set of hyperparameters to test

        # Create new env
        env = VizDoomGym(config=config)
        env = Monitor(env, LOG_DIR)
        env = DummyVecEnv([lambda: env])
        env = VecFrameStack(env, 4, channels_order='last')

        # Set up, train and evaluatethe PPO model
        model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, **model_params)
        model.learn(total_timesteps=100000)
        
        mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=5)
        env.close()

        # Save the best model 
        SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(trial.number))
        model.save(SAVE_PATH)

        return mean_reward


    except Exception as e:
        print(e)
        return -1000 # Return a very low reward if training fails

In [None]:
# Begin an optuna study
study = optuna.create_study(direction='maximize')
study.optimize(optimize_agent, n_trials=10, n_jobs=-1, show_progress_bar = True)

In [None]:
# Print the best hyperparameters
print("Best hyperparameters combo: ", study.best_params)

# Saving best study results into csv file
with open('./best_hyperparameters_TakeCover_RewardShaping.csv', 'w') as f:
    for key, value in study.best_params.items():
        f.write(f"{key},{value}\n")

### Fine-Tuning the best Agent

In [None]:
env = VizDoomGym(config=config)

In [None]:
# instantiate PPO model
model = PPO('CnnPolicy', # policy type -> CnnPolicy since we are working on image frames
            env,
            tensorboard_log=LOG_DIR,
            verbose=1,
            learning_rate=study.best_params['learning_rate'],
            n_steps=study.best_params['n_steps'], 
            clip_range=study.best_params['clip_range'],
            gamma= study.best_params['gamma'],
            gae_lambda=study.best_params['gae_lambda']
)

In [None]:
model.learn(total_timesteps = 250000, callback=callback)

### Curriculum Learning

##### Level 2

In [None]:
# Load the best model in the previous environment (Level 1)
model.load("TO DO") 

In [None]:
# Load level 2 configuration
env = VizDoomGym(config='github_vizdoom_repo/ViZDoom/scenarios/take_cover_lev2.cfg')
model.set_env(env)

# Retrain the model 
model.learn(total_timesteps = 250000, callback=callback)

##### Level 3

In [None]:
# Load best level 2 model
model.load("TO DO") 

# Load level 3 configuration
env = VizDoomGym(config='github_vizdoom_repo/ViZDoom/scenarios/take_cover_lev3.cfg')
model.set_env(env)

# Retrain the model 
model.learn(total_timesteps = 250000, callback=callback)

##### Level 4

In [None]:
# Load best level 3 model
model.load("TO DO") 

# Load level 4 configuration
env = VizDoomGym(config='github_vizdoom_repo/ViZDoom/scenarios/take_cover_lev4.cfg')
model.set_env(env)

# Retrain the model 
model.learn(total_timesteps = 250000, callback=callback)

### Testing the trained agent on real-time game

In [None]:
# Reload the best model from disk
BEST_MODEL_DIR = "TO DO"
model = PPO.load(BEST_MODEL_DIR) 

In [12]:
env = VizDoomGym(render_mode= True, config=config) # rendered-env

In [13]:
mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=50)
print(f"Mean reward over 50 episodes: {mean_reward}")



Mean reward over 50 episodes: 385.84


In [14]:
env.close()



*** Fatal Error ***
Address not mapped to object (signal 11)
Address: 0x7f3df31a5f28

Generating vizdoom-crash.log and killing process 64373, please wait... sh: 1: gdb: not found
