In [1]:
from vizdoom import DoomGame  
import random
import time
import numpy as np
import gym
from gym import Env
from gym.spaces import Discrete, Box
import cv2
from matplotlib import pyplot as plt
import os
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3 import PPO

In [2]:
###################
##   CONSTANTS   ##
###################
ACTION_NUM = 3
EPISODES_NUM = 10
AGENT_MODEL_PATH_PREFIX = './agents/agent_for_'
TENSORBOARD_LOG_PATH_PREFIX = './logs/logs_for_'
CURRENT_CONFIGURATION_INDEX = 0

actions = np.identity(ACTION_NUM, dtype=np.uint8)
configurations = [{
                    'name': 'basic',
                    'configFilePath': 'VizDoom/scenarios/basic.cfg',
                    'actionNumber': 3,
                  }, {
                    'name': 'defend_the_center',
                    'configFilePath': 'VizDoom/scenarios/defend_the_center.cfg',
                    'actionNumber': 3,
                  }]


In [3]:
class VizDoomGym(Env):
  def __init__(self, envConfig, render=False):
    super().__init__()
    self.game = DoomGame()
    self.game.load_config(envConfig["configFilePath"])
   
    self.game.set_window_visible(render)

    print(self.game)

    self.game.init()
            
    self.action_number = envConfig["actionNumber"]

    self.action_space = Discrete(self.action_number)
    self.observation_space = Box(0, 255, [100, 160, 1], np.uint8)

  def close(self):
    self.game.close()
  
  def step(self, action):
    actions = np.identity(self.action_number, dtype=np.uint8)
    reward = self.game.make_action(actions[action], 5)

    done = self.game.is_episode_finished()
    state = self.game.get_state()

    
    if not state:
      return np.zeros(self.observation_space.shape), reward, done, {'ammo': 0}
    
    img = self.grayscale(state.screen_buffer)
    ammo = state.game_variables[0]
    misc = {"ammo": ammo}
    # plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    return img, reward, done, misc
  
  def reset(self):
    self.game.new_episode()
    state = self.game.get_state()
    return self.grayscale(state.screen_buffer)
  
  def render():
    pass
  
  def grayscale(self, observation):
    grayscaled = cv2.cvtColor(np.moveaxis(observation, 0, -1), cv2.COLOR_BGR2GRAY)
    resized = cv2.resize(grayscaled, (160, 100), cv2.INTER_CUBIC)
    return np.reshape(resized, (100, 160, 1))
  

In [None]:

# env = VizDoomGym(actionNumber=ACTION_NUM, render=True)


# for episode in range(EPISODES_NUM):
#   env.reset()
#   done = False
#   while not done:
#     _, _, done, _ = env.step(random.randrange(ACTION_NUM))
#     time.sleep(0.02)
#   time.sleep(5)

# env.close()

In [None]:
class AgentCallback(BaseCallback):
  def __init__(self, check_freq, save_path, verbose=1):
    super(AgentCallback, self).__init__(verbose)
    self.check_freq = check_freq
    self.save_path = save_path

  def __init_callback(self):
    if self.save_path is not None:
      os.makedirs(self.save_path, exist_ok=True)
  
  def _on_step(self):
    if self.n_calls % self.check_freq == 0:
      model_path = os.path.join(self.save_path, f"model_{self.n_calls}")
      self.model.save(model_path)
    return True
  
  
agentCallback = AgentCallback(check_freq=10000, save_path=f"{AGENT_MODEL_PATH_PREFIX}{configurations[CURRENT_CONFIGURATION_INDEX]['name']}")

In [None]:

env = VizDoomGym(actionNumber=ACTION_NUM, render=False)  
model = PPO('CnnPolicy', env, tensorboard_log=f"{TENSORBOARD_LOG_PATH_PREFIX}{configurations[CURRENT_CONFIGURATION_INDEX]['name']}", verbose=1, learning_rate=0.0001, n_steps=2048)
print("Model created")
model.learn(total_timesteps=100000, callback=agentCallback)
print("Model trained")

In [4]:
env = VizDoomGym(configurations[0], render=True)

<vizdoom.vizdoom.DoomGame object at 0x7fa1ecbb5b70>


In [5]:
from stable_baselines3.common.evaluation import evaluate_policy
model = PPO.load(f"{AGENT_MODEL_PATH_PREFIX}{configurations[CURRENT_CONFIGURATION_INDEX]['name']}/model_best")

In [6]:
evaluate_policy(model, env, n_eval_episodes=100)



In [8]:
from IPython.display import clear_output

for episode in range(10):
    obs = env.reset()
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, done, info = env.step(action)
        clear_output()
        plt.imshow(obs)
        plt.show()
        time.sleep(0.2)
    time.sleep(1)

KeyboardInterrupt: 

In [None]:
env.close()