In [None]:
!pip install vizdoom
!cd github & git clone https://github.com/mwydmuch/ViZDoom

In [None]:
from vizdoom import *
import random
import time
import numpy as np

In [None]:
game = DoomGame()
game.load_config('github/VizDoom/scenarios/basic.cfg')
game.init()

actions = np.identity(3, dtype=np.uint8)

episodes = 10

for episode in range(episodes):
    game.new_episode
    
    while not game.is_episode_finished():
        state = game.get_state()
        img = state.screen_buffer
        info = state.game_variables

         # 4 is frame skip, 4 frames are processed before reward is returned
        reward = game.make_action(random.choice(actions), 4)

        print('reward:', reward)
        time.sleep(0.02)
    print('Result:', game.get_total_reward())
    time.sleep(2)


In [None]:
game.close() 

In [None]:
# Conveting Doom to gym environment

!pip install gym

from gym import Env
from gym.spaces import Discrete, Box
import cv2

In [None]:
class VizDoom(Env):
    def __init__(self, render=False):
        super().__init__()
        
        self.game = DoomGame()
        self.game.load_config('github/VizDoom/scenarios/basic.cfg')
        
        if render:
            self.game.set_window_visible(True)
        else:
            self.game.set_window_visible(False)
            
        self.game.init()
        
        self.observation_space = Box(low=0, high=255, shape=(100,160,1), dtype=np.uint8)
        self.action_space = Discrete(3)
        
    def step(self, action):
        actions = np.identity(3)
        reward = self.game.make_action(actions[action])
        
        # Useful when the next state is invalid(there's no next state when game ends)
        if self.game.get_state():
            state = self.game.get_state().screen_buffer
            state = self.grayscale(state)
            ammo = self.game.get_state().game_variables[0]
            info = ammo
        else:
            state = np.zeros(self.observation_space.shape)
            info = 0
        info = {'info': info}
        done = self.game.is_episode_finished()
        
        return state, reward, done, info
    
    def render(self):
        pass
    
    def reset(self):
        self.game.new_episode()
        state = self.game.get_state().screen_buffer
        return self.grayscale(state)
    
    def grayscale(self, observation):
        gray = cv2.cvtColor(np.moveaxis(observation, 0, -1), cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (160,100), interpolation=cv2.INTER_CUBIC)
        state = np.reshape(resize, (100,160,1))
        return state
    
    def close(self):
        self.game.close()

In [None]:
env = VizDoom(render=True)

# We can check if the environment class satisfies stable_baselines env checker

from stable_baselines3.common import env_checker
env_checker.check_env(env)

In [None]:
!pip install matplotlib
from matplotlib import pyplot as plt

plt.imshow(cv2.cvtColot(state, cv2.COLOR_BGR2RGB))

In [None]:
!pip install torch==1.10.1+cu113 torchvision==0.11.2+cu113 torchaudio===0.10.1+cu113 -f https://download.pytorch.org/whl/cu113/torch_stable.html
!pip install stable-baselines3[extra]

import os 
from stable_baselines3.common.callbacks import BaseCallback


class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
CHECKPOINT_DIR = './train/train_basic'
LOG_DIR = './logs/log_basic'

callback = TrainAndLoggingCallback(check_freq=100000, save_path=CHECKPOINT_DIR)


In [None]:
# Training the model

from stable_baselines3 import PPO

env = VizDoom()
model = PPO('CnnPolicy', env, tensorboard_log=LOG_DIR, vebose=1, 
            learning_rate=0.0001, n_steps=2048)
model.learn(total_timesteps=1000000, callback=callback)

In [None]:
# Evaluating and Testing model

from stable_baselines3.common.evaluation import evaluate_policy

model = PPO.load('model_zip')
env = VizDoom(render=True)
mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=100)
print('Mean_reward', mean_reward)

In [None]:
for episode in range(100):
    obs = env.reset()
    done = False
    total_reward = 0
    
    while not done:
        action, _ = model.predict(obs)
        obs_, reward, done, info = env.step(action)
        total_reward += reward
        obs = obs_
    print(' Total reward for episode {} is {}'.format(episode, total_reward))
    time.sleep(2)