In [1]:
from vizdoom import * 

import os
import sys
import time 
import random

import cv2
import numpy as np

from gym import Env, wrappers
from gym.spaces import Discrete, Box

from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3 import DQN, PPO

In [2]:
sys.path.insert(0, '../external_lib/ViZDoom/')

###### Right now this is being used to train the model on the health gathering scenario with the config files(action space and game variables) changed to accommodate curriculum learning in deathmatch.
###### action suppression is by modified reward (i.e high negative reward on unfavorable action)

In [3]:
CONFIG = '../resources/cfg/simpler_basic.cfg'

CHECKPOINT_DIR = '../resources/checkpoints/basic_1.0_ppo'
LOG_DIR = '../resources/checkpointslog_basic'

In [4]:
# Create Vizdoom OpenAI Gym Environment

class VizDoomGym(Env): 

    def __init__(self, render=False, config = CONFIG): 
        super().__init__()

        self.game = DoomGame()
        self.game.load_config(config)
        
        if render == False: 
            self.game.set_window_visible(False)
        else:
            self.game.set_window_visible(True)
        
        self.game.init()
        
        # Create the action space and observation space
        self.observation_space = Box(low=0, high=255, shape=(100,160,1), dtype=np.uint8) 
        self.action_space = Discrete(3)
        
        # Game variables: KILLCOUNT HEALTH ARMOR SELECTED_WEAPON SELECTED_WEAPON_AMMO
        self.damage_taken = 0
        self.killcount = 0
        self.ammo = 0
        self.ep_length = 0
        
        
    def step(self, action):
        actions = np.identity(3)
        movement_reward = self.game.make_action(actions[action], 3) 
        
        reward = 0 
        # Get all the other stuff we need to retun 
        if self.game.get_state(): 
            self.ep_length = self.ep_length+1
            state = self.game.get_state().screen_buffer
            state = self.grayscale(state)
            
            # Reward shaping
            game_variables = self.game.get_state().game_variables
            killcount, health, armour, selected_weapon, ammo = game_variables  #ammo of selected weapon  #damage_taken is to armor
            
            # Calculate reward deltas
            damage_taken_delta = -health + self.damage_taken
            self.damage_taken = health
            
            reward = movement_reward*1.00 - damage_taken_delta*1.0      #current reward function
            info = health
        else: 
            state = np.zeros(self.observation_space.shape)
            info = 0 
        
        info = {"info": info}
        done = self.game.is_episode_finished()
        
        if done:
            self.ep_length = 0
        
        return state, reward, done, info 
    
    # Define how to render the game or environment 
    def render(): 
        pass
    
    # What happens when we start a new game 
    def reset(self): 
        self.game.new_episode()
        state = self.game.get_state().screen_buffer
        self.damage_taken = 0
        self.hitcount = 0
        self.ammo = 0 ## CHANGED
        self.ep_length = 0
        return self.grayscale(state)
    
    # Grayscale the game frame and resize it 
    def grayscale(self, observation):
        gray = cv2.cvtColor(np.moveaxis(observation, 0, -1), cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (160,100), interpolation=cv2.INTER_CUBIC)
        state = np.reshape(resize, (100,160,1))
        return state
    
    # Call to close down the game
    def close(self): 
        self.game.close()
    
    def get_game_variables(self):
        return self.game.get_state().game_variables

In [5]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

## Game

In [6]:
env = VizDoomGym(config=CONFIG, render=False)
model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1, learning_rate=0.00001, n_steps=8192, 
            clip_range=.1, gamma=.95, gae_lambda=.9)


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.


## Training

In [7]:
callback = TrainAndLoggingCallback(check_freq=50000, save_path=CHECKPOINT_DIR)
model.set_env(env)
model.learn(total_timesteps=800000, callback=callback)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.


error: OpenCV(4.6.0) /Users/runner/work/opencv-python/opencv-python/opencv/modules/imgproc/src/color.simd_helpers.hpp:92: error: (-2:Unspecified error) in function 'cv::impl::(anonymous namespace)::CvtHelper<cv::impl::(anonymous namespace)::Set<3, 4, -1>, cv::impl::(anonymous namespace)::Set<1, -1, -1>, cv::impl::(anonymous namespace)::Set<0, 2, 5>, cv::impl::(anonymous namespace)::NONE>::CvtHelper(cv::InputArray, cv::OutputArray, int) [VScn = cv::impl::(anonymous namespace)::Set<3, 4, -1>, VDcn = cv::impl::(anonymous namespace)::Set<1, -1, -1>, VDepth = cv::impl::(anonymous namespace)::Set<0, 2, 5>, sizePolicy = cv::impl::(anonymous namespace)::NONE]'
> Invalid number of channels in input image:
>     'VScn::contains(scn)'
> where
>     'scn' is 1


In [14]:
# Evaluate mean reward for 10 games
mean_reward, ep_length = evaluate_policy(model, env, n_eval_episodes=100, return_episode_rewards = True)
for i in range(100):
    print('Episode {}: Episode Length = {}, Reward = {}'.format(i, ep_length[i], mean_reward[i]))


KeyboardInterrupt



In [7]:
for episode in range(100): 
    
    obs = env.reset()
    done = False
    total_reward = 0
    
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, done, info = env.step(action)
        time.sleep(0.10)
        total_reward += reward
        
    print('Total Reward for episode {} is {}'.format(total_reward, episode))
    
    time.sleep(2)

TypeError: make_action(): incompatible function arguments. The following argument types are supported:
    1. (self: vizdoom.vizdoom.DoomGame, arg0: list) -> float
    2. (self: vizdoom.vizdoom.DoomGame, arg0: list, arg1: int) -> float

Invoked with: <vizdoom.vizdoom.DoomGame object at 0x107f01770>, array([0., 0., 0., 1.]), 4