# Import

In [1]:
# Import vizdoom for game env
from vizdoom import * 
# Import random for action sampling
import random
# Import time for sleeping
import time 
# Import numpy for identity matrix
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# Import environment base class from OpenAI Gym
from gym import Env
# Import gym spaces 
from gym.spaces import Discrete, Box
# Import opencv 
import cv2
# import ppo for training
from stable_baselines3 import PPO

# Train

In [2]:
# Create Vizdoom OpenAI Gym Environment
class VizDoomGym(Env): 
    # Function that is called when we start the env
    def __init__(self, render=False, config='ViZDoom/scenarios/deadly_corridor_s1.cfg'): 
        # Inherit from Env
        super().__init__()
        # Setup the game 
        self.game = DoomGame()
        self.game.load_config(config)
        
        # Render frame logic
        if render == False: 
            self.game.set_window_visible(False)
        else:
            self.game.set_window_visible(True)
        
        # Start the game 
        self.game.init()
        
        # Create the action space and observation space
        self.observation_space = Box(low=0, high=255, shape=(100,160,1), dtype=np.uint8) 
        self.action_space = Discrete(7)
        
        # Game variables: HEALTH DAMAGE_TAKEN HITCOUNT SELECTED_WEAPON_AMMO
        self.damage_taken = 0
        self.hitcount = 0
        self.ammo = 52 ## CHANGED
        self.ep_length = 0
        
        
    # This is how we take a step in the environment
    def step(self, action):
        # Specify action and take step 
        actions = np.identity(7)
        movement_reward = self.game.make_action(actions[action]) 
        
        reward = 0 
        # Get all the other stuff we need to retun 
        if self.game.get_state(): 
            self.ep_length = self.ep_length+1
            state = self.game.get_state().screen_buffer
            state = self.grayscale(state)
            
            # Reward shaping
            game_variables = self.game.get_state().game_variables
            health, damage_taken, hitcount, ammo, dead = game_variables
            
            # Calculate reward deltas
            damage_taken_delta = -damage_taken + self.damage_taken
            self.damage_taken = damage_taken
            hitcount_delta = hitcount - self.hitcount
            self.hitcount = hitcount
            ammo_delta = ammo - self.ammo
            self.ammo = ammo
            
            #reward = movement_reward + damage_taken_delta*10 + hitcount_delta*200  + ammo_delta*5 
            #reward = movement_reward + damage_taken_delta*30 + killcount_delta*100
            #reward = damage_taken_delta*50 + hitcount_delta*1000
            #reward = 0.5*movement_reward + damage_taken_delta*10 + hitcount_delta*400 + ammo_delta*5
            reward = movement_reward
            info = ammo
        else: 
            state = np.zeros(self.observation_space.shape)
            info = 0 
        
        info = {"ep_length": self.ep_length}
        done = self.game.is_episode_finished()
        
        if done:
            self.ep_length = 0
        
        return state, reward, done, info 
    
    # Define how to render the game or environment 
    def render(): 
        pass
    
    # What happens when we start a new game 
    def reset(self): 
        self.game.new_episode()
        state = self.game.get_state().screen_buffer
        self.damage_taken = 0
        self.hitcount = 0
        self.ammo = 52 ## CHANGED
        self.ep_length = 0
        return self.grayscale(state)
    
    # Grayscale the game frame and resize it 
    def grayscale(self, observation):
        gray = cv2.cvtColor(np.moveaxis(observation, 0, -1), cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (160,100), interpolation=cv2.INTER_CUBIC)
        state = np.reshape(resize, (100,160,1))
        return state
    
    # Call to close down the game
    def close(self): 
        self.game.close()
    
    def get_game_variables(self):
        return self.game.get_state().game_variables

# Log

In [None]:
# Import os for file nav
import os 
# Import callback class from sb3
from stable_baselines3.common.callbacks import BaseCallback

In [None]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
CHECKPOINT_DIR = './train/train_corridor'
LOG_DIR = './logs/log_corridor'

In [None]:
callback = TrainAndLoggingCallback(check_freq=10000, save_path=CHECKPOINT_DIR)

# Evaluate 

In [3]:
# Import eval policy to test agent
from stable_baselines3.common.evaluation import evaluate_policy

In [4]:
import sys
newer_python_version = sys.version_info.major == 3 and sys.version_info.minor >= 8
# print("here1")
# model_path = './models/train/corridor_final_models/train_corridor_6.4/best_model_320000.zip'
# print("here2")
# env = VizDoomGym(render=True, config='ViZDoom/scenarios/deadly_corridor_s2.cfg')  #creating rendered environment
# print("here3")
# if newer_python_version:
#     custom_objects = {
#         "learning_rate": 0.0,
#         "lr_schedule": lambda _: 0.0,
#         "clip_range": lambda _: 100.0,
#     }
# print("here4")
# model = PPO.load(model_path, env=env, custom_objects=custom_objects) #put the checkpoint that you want to evaluate here

In [5]:
model_path = './models/train/corridor_final_models/train_corridor_4.5/best_model_560000.zip'
# print("here2")
env = VizDoomGym(render=True, config='ViZDoom/scenarios/deadly_corridor_s2.cfg')  #creating rendered environment
# print("here3")



In [6]:
if newer_python_version:
    custom_objects = {
        "learning_rate": 0.0,
        "lr_schedule": lambda _: 0.0,
        "clip_range": lambda _: 100.0,
    }
print("here4")
model = PPO.load(model_path, env=env) #put the checkpoint that you want to evaluate here
print("here5")

here4
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.
here5


	Missing key(s) in state_dict: "pi_features_extractor.cnn.0.weight", "pi_features_extractor.cnn.0.bias", "pi_features_extractor.cnn.2.weight", "pi_features_extractor.cnn.2.bias", "pi_features_extractor.cnn.4.weight", "pi_features_extractor.cnn.4.bias", "pi_features_extractor.linear.0.weight", "pi_features_extractor.linear.0.bias", "vf_features_extractor.cnn.0.weight", "vf_features_extractor.cnn.0.bias", "vf_features_extractor.cnn.2.weight", "vf_features_extractor.cnn.2.bias", "vf_features_extractor.cnn.4.weight", "vf_features_extractor.cnn.4.bias", "vf_features_extractor.linear.0.weight", "vf_features_extractor.linear.0.bias".  
  "You are probably loading a model saved with SB3 < 1.7.0, "


In [None]:
# Evaluate mean reward for 10 games
mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=10)



In [None]:
plot_reward = []
plot_ep_len = []
plot_group = []
win_eps = []
for episode in range(100):
    time.sleep(1)
    obs = env.reset()
    done = False
    total_reward = 0
    ep_length_final = 0
    count = 0
    reward_list = []
    
    while not done: 
        
        action, _ = model.predict(obs)
        obs, reward, done, info = env.step(action)
        time.sleep(0.1)
        reward_list.append(reward)
#         print('Iteration: {}'.format(count), end = ' ')
        count = count + 1
#         print('reward: {}'.format(reward), end = ' ')
        
        if(info["ep_length"] != 0):
            ep_length_final = info["ep_length"]
    
        total_reward += reward
        
        if not done:
            game_variables = env.get_game_variables()
            health, damage_taken, hitcount, ammo, dead = game_variables
#             print('health: {}'.format(health))
            
    health, damage_taken, hitcount, ammo, dead = game_variables
    if done:
        pass
#         print('health: {}'.format(health))
    plot_reward.append(total_reward)
    plot_ep_len.append(ep_length_final)
    reward_list_print = ['%.2f' % elem for elem in reward_list]
#     print('')
#     print('REWARD LIST: {}'.format(reward_list_print))
    
    if(total_reward > 2000):
        win_eps.append(ep_length_final)
        plot_group.append('Won')
        print('')
        print('WON!!. Episode {}: Episode Length = {}, Reward = {}'.format(episode, ep_length_final, total_reward))
        print('')
    else:
        plot_group.append('Lost')
        print('')
        print('LOST!!. Game {}: Episode Length = {}, Reward = {}'.format(episode, ep_length_final, total_reward))
        print('')
    
df = pd.DataFrame({'plot_reward': plot_reward, 'plot_ep_len': plot_ep_len, 'plot_group': plot_group})

groups = df.groupby(plot_group)
for name, group in groups:
    print(group)
    plt.plot(group.plot_ep_len, group.plot_reward, marker='o', linestyle='', markersize=7, label=name)

plt.legend()

In [None]:
env.close()