# TESTING ALL AGENT

In [4]:
from vizdoom import *
import vizdoom as vzd
import random
import time
import numpy as np
import os
import shutil

In [5]:
import gymnasium as gym
from gymnasium import Env
from gymnasium.spaces import Discrete, Box
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F
from stable_baselines3 import DQN, PPO
from stable_baselines3.common import env_checker
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from stable_baselines3.common.env_util import make_vec_env
from matplotlib import pyplot as plt
import torchvision
import torchaudio
from stable_baselines3.common.callbacks import CheckpointCallback

In [6]:
class VizDoomGym_1(Env): 
    def __init__(self, render=False): 
        super().__init__()
        self.game = DoomGame()
        self.game.load_config('./scenarios/basic.cfg')
        
        # Render frame logic
        if render == False: 
            self.game.set_window_visible(False)
        else:
            self.game.set_window_visible(True)        
        self.game.init()
        
        # Create the action space and observation space
        self.observation_space = Box(low=0, high=255, shape=(100,160,1), dtype=np.uint8) 
        self.action_space = Discrete(3)
        
    def step(self, action):
        """
        take action 
        Example of usages:
            game.step(1) # Move_Left
            game.step(2) # Move_Right
            game.step(3) # attack
        return:
            state, reward, done, info (AMMO)
        """
        actions = np.identity(3)
        reward = self.game.make_action(actions[action], 4) 
        terminated = self.game.is_episode_finished()
        truncated = self.game.get_episode_time() >= self.game.get_episode_timeout() 

        state = np.zeros(self.observation_space.shape)  # Default blank state
        info = {"info": 0}  # Default info

        # if self.game.get_state():
        if ~truncated and ~terminated:
            game_state = self.game.get_state()
            if game_state is not None:  # ✅ Ensure `game_state` is valid
                state = self.grayscale(game_state.screen_buffer)
                info = {"info": game_state.game_variables[0]}
        # else: 
        #     state = np.zeros(self.observation_space.shape)
        #     info = 0 
        # info = {"info":info}
        
        return state, reward, terminated,truncated, info 
    
   
    def render(): 
        pass
    
    def reset(self,seed=None, options=None): 
        """ Restart game """
        # super().reset(seed=seed)
        self.game.new_episode()
        state = self.game.get_state().screen_buffer
        return self.grayscale(state),{}

    # ef reset(self, seed=None, options=None):
    #     super().reset(seed=seed)
        
    #     if not self.game.is_running():
    #         self.game.init()  # Ensure the game is running
        
    #     self.game.new_episode()  # Start a new episode
    #     state = self.game.get_state()
        
    #     if state is None:
    #         raise Exception("ViZDoom failed to start. Check if the configuration file is correct.")
        
    #     return self.preprocess(state.screen_buffer),{}
    
    def grayscale(self, observation):
        """ TO Grayscale """
        gray = cv2.cvtColor(np.moveaxis(observation, 0, -1), cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (160,100), interpolation=cv2.INTER_CUBIC)
        state = np.reshape(resize, (100,160,1))
        return state
    
    def close(self): 
        """ CLOSE """
        self.game.close()

In [7]:
class VizDoomGym_2(Env): 
    def __init__(self, render=False): 
        super().__init__()
        self.game = DoomGame()
        self.game.load_config('./scenarios/defend_the_center.cfg')
        
        # Render frame logic
        if render == False: 
            self.game.set_window_visible(False)
        else:
            self.game.set_window_visible(True)        
        self.game.init()
        
        # Create the action space and observation space
        self.observation_space = Box(low=0, high=255, shape=(100,160,1), dtype=np.uint8) 
        self.action_space = Discrete(3)
        
    def step(self, action):
        """
        take action 
        Example of usages:
            game.step(1) # Turn_Left
            game.step(2) # Turn_Right
            game.step(3) # attack
        return:
            state, reward, terminated,truncated, info (AMMO)
        """
        actions = np.identity(3)
        reward = self.game.make_action(actions[action], 4) 
        terminated = self.game.is_episode_finished()
        truncated = self.game.get_episode_time() >= self.game.get_episode_timeout() 

        state = np.zeros(self.observation_space.shape)  # Default blank state
        info = {"info": 0}  # Default info

        # if self.game.get_state():
        if ~truncated and ~terminated:
            game_state = self.game.get_state()
            if game_state is not None:  # ✅ Ensure `game_state` is valid
                state = self.grayscale(game_state.screen_buffer)
                info = {"info": game_state.game_variables[0]}
        # else: 
        #     state = np.zeros(self.observation_space.shape)
        #     info = 0 
        # info = {"info":info}
        
        return state, reward, terminated,truncated, info 
    
   
    def render(): 
        pass
    
    def reset(self,seed=None, options=None): 
        """ Restart game """
        # super().reset(seed=seed)
        self.game.new_episode()
        state = self.game.get_state().screen_buffer
        return self.grayscale(state),{}

    # ef reset(self, seed=None, options=None):
    #     super().reset(seed=seed)
        
    #     if not self.game.is_running():
    #         self.game.init()  # Ensure the game is running
        
    #     self.game.new_episode()  # Start a new episode
    #     state = self.game.get_state()
        
    #     if state is None:
    #         raise Exception("ViZDoom failed to start. Check if the configuration file is correct.")
        
    #     return self.preprocess(state.screen_buffer),{}
    
    def grayscale(self, observation):
        """ TO Grayscale """
        gray = cv2.cvtColor(np.moveaxis(observation, 0, -1), cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (160,100), interpolation=cv2.INTER_CUBIC)
        state = np.reshape(resize, (100,160,1))
        return state
    
    def close(self): 
        """ CLOSE """
        self.game.close()

In [8]:
class VizDoomGym_3(Env):
    def __init__(self, render=False,config='./scenarios/deadly_corridor-skill-5.cfg'):
        super().__init__()
        self.game = vzd.DoomGame()
        self.game.load_config(config)

        # Render frame logic
        if not render:
            self.game.set_window_visible(False)
        else:
            self.game.set_window_visible(True)
        self.game.init()

        # Create the action space and observation space
        self.observation_space = Box(low=0, high=255, shape=(100, 160, 1), dtype=np.uint8)
        self.action_space = Discrete(7)  # 7 possible actions
        self.actions=np.identity(7, dtype=np.float32)
        


    def custom_reward(self, prev_state, current_state):
        reward = 0
    
        # Extract game variables
        prev_health = prev_state.game_variables[0]  # HEALTH
        prev_hits = prev_state.game_variables[1]  # HITCOUNT
        prev_ammo = prev_state.game_variables[2]  # SELECTED_WEAPON_AMMO
        prev_kills = prev_state.game_variables[3]  # KILLCOUNT
        prev_dmg = prev_state.game_variables[4]  # KILLCOUNT
        prev_dmg_deal = prev_state.game_variables[5]  # KILLCOUNT
        
        current_health = current_state.game_variables[0]  # HEALTH
        current_hits = current_state.game_variables[1]  # HITCOUNT
        current_ammo = current_state.game_variables[2]  # SELECTED_WEAPON_AMMO
        current_kills = current_state.game_variables[3]  # KILLCOUNT
        current_dmg = current_state.game_variables[4]  # KILLCOUNT
        current_dmg_deal = current_state.game_variables[5]  # KILLCOUNT
        
        ammo_delta=current_ammo-prev_ammo 
        hitcount_delta= current_dmg_deal - prev_dmg_deal
        damage_taken_delta=-current_dmg+prev_dmg
        
        reward = damage_taken_delta*60 + hitcount_delta*200  + ammo_delta*50 
        
    

        return reward
        
    def step(self, action):
        prev_state = self.game.get_state()  # Store the previous state
        reward = self.game.make_action(self.actions[action], 4)  # Default reward
        current_state = self.game.get_state()  # Get the current state

        # Compute custom reward
        if prev_state is not None and current_state is not None:
            reward += self.custom_reward(prev_state, current_state)

        terminated = self.game.is_episode_finished()
        truncated = self.game.get_episode_time() >= self.game.get_episode_timeout()

        state = np.zeros(self.observation_space.shape, dtype=np.uint8)  # Default blank state
        info = {"ammo": 0}  # Default info

        if not (terminated or truncated):
            game_state = self.game.get_state()
            if game_state is not None:
                state = self.grayscale(game_state.screen_buffer)
                info = {"ammo": game_state.game_variables[0]}

        return state, reward, terminated, truncated, info

    def reset(self, seed=None, options=None):
        """Restart the game and return the initial state."""
        self.game.new_episode()
        state = self.game.get_state().screen_buffer
        return self.grayscale(state), {}

    def grayscale(self, observation):
        """Convert the observation to grayscale and resize it."""
        gray = cv2.cvtColor(np.moveaxis(observation, 0, -1), cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (160, 100), interpolation=cv2.INTER_CUBIC)
        state = np.reshape(resize, (100, 160, 1))
        return state

    def close(self):
        """Close the game."""
        self.game.close()

### Testing the basic map

In [9]:
model_path = "./train/train_basic/best_model_90000.zip"  
model = PPO.load(model_path)


env = VizDoomGym_1(render=True)  
num_episodes = 10


for episode in range(num_episodes):
    obs,_ = env.reset()
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, terminated, truncated, info = env.step(action)
        time.sleep(0.20)
        total_reward += reward
        done=terminated or truncated
        # time.sleep(1)
    print(f"Episode {episode + 1}: Total Reward = {total_reward}")
    time.sleep(2)
  
# Close environment
env.close()


Episode 1: Total Reward = 95.0
Episode 2: Total Reward = 95.0
Episode 3: Total Reward = 79.0
Episode 4: Total Reward = 95.0
Episode 5: Total Reward = 95.0
Episode 6: Total Reward = 95.0
Episode 7: Total Reward = 71.0
Episode 8: Total Reward = 87.0
Episode 9: Total Reward = 95.0
Episode 10: Total Reward = 95.0


### Testing the defend the center map PPO

In [10]:
model_path = "./train/train_defend_the_center/best_model_90000.zip"  
model = PPO.load(model_path)


env = VizDoomGym_2(render=True)  
num_episodes = 3


for episode in range(num_episodes):
    obs,_ = env.reset()
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, terminated, truncated, info = env.step(action)
        time.sleep(0.20)
        total_reward += reward
        done=terminated or truncated
        # time.sleep(1)
    print(f"Episode {episode + 1}: Total Reward = {total_reward}")
    time.sleep(2)
  
# Close environment
env.close()


Episode 1: Total Reward = 13.0
Episode 2: Total Reward = 11.0
Episode 3: Total Reward = 11.0


### Testing the defend the center map DQN

In [11]:
model_path = "./train/train_defend_the_center/best_model_160000.zip"  
model = DQN.load(model_path)


env = VizDoomGym_2(render=True)  
num_episodes = 3


for episode in range(num_episodes):
    obs,_ = env.reset()
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, terminated, truncated, info = env.step(action)
        time.sleep(0.20)
        total_reward += reward
        done=terminated or truncated
        # time.sleep(1)
    print(f"Episode {episode + 1}: Total Reward = {total_reward}")
    time.sleep(2)
  
# Close environment
env.close()


Episode 1: Total Reward = 10.0
Episode 2: Total Reward = 8.0
Episode 3: Total Reward = 9.0


### Testing the deadly corridor skill 1 with skill 5 enemies

In [12]:
model_path = "./train/train_Deadly_Corridor_COMP_4/best_model_90000.zip"  
model = PPO.load(model_path)


env = VizDoomGym_3(render=True)  
num_episodes = 4


for episode in range(num_episodes):
    obs,_ = env.reset()
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, terminated, truncated, info = env.step(action)
        time.sleep(0.10)
        total_reward += reward
        done=terminated or truncated
        # time.sleep(1)
    print(f"Episode {episode + 1}: Total Reward = {total_reward}")
    time.sleep(2)
  
# Close environment
env.close()


Episode 1: Total Reward = 9420.643325805664
Episode 2: Total Reward = -1200.8211669921875
Episode 3: Total Reward = 1771.3028411865234
Episode 4: Total Reward = 2662.356903076172


### Testing the deadly corridor skill 2 with skill 5 enemies

In [13]:
model_path = "./train/train_Deadly_Corridor_COMP_4/best_model_80000.zip"  
model = PPO.load(model_path)


env = VizDoomGym_3(render=True)  
num_episodes = 4


for episode in range(num_episodes):
    obs,_ = env.reset()
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, terminated, truncated, info = env.step(action)
        time.sleep(0.10)
        total_reward += reward
        done=terminated or truncated
        # time.sleep(1)
    print(f"Episode {episode + 1}: Total Reward = {total_reward}")
    time.sleep(2)
  
# Close environment
env.close()


Episode 1: Total Reward = 4520.613662719727
Episode 2: Total Reward = 4724.812942504883
Episode 3: Total Reward = 2774.8912811279297
Episode 4: Total Reward = 1013.5520629882812


### Testing the deadly corridor skill 3 with skill 5 enemies

In [14]:
model_path = "./train/train_Deadly_Corridor_COMP_5_S_3/best_model_50000.zip"  
model = PPO.load(model_path)


env = VizDoomGym_3(render=True)  
num_episodes = 4


for episode in range(num_episodes):
    obs,_ = env.reset()
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, terminated, truncated, info = env.step(action)
        time.sleep(0.10)
        total_reward += reward
        done=terminated or truncated
        # time.sleep(1)
    print(f"Episode {episode + 1}: Total Reward = {total_reward}")
    time.sleep(2)
  
# Close environment
env.close()


Episode 1: Total Reward = 706.9576263427734
Episode 2: Total Reward = 11340.188201904297
Episode 3: Total Reward = 5020.561706542969
Episode 4: Total Reward = 7140.637786865234


### Testing the deadly corridor skill 4 with skill 5 enemies

In [15]:
model_path = "./train/train_Deadly_Corridor_COMP_5_S_4/best_model_100000.zip"  
model = PPO.load(model_path)


env = VizDoomGym_3(render=True)  
num_episodes = 4


for episode in range(num_episodes):
    obs,_ = env.reset()
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, terminated, truncated, info = env.step(action)
        time.sleep(0.10)
        total_reward += reward
        done=terminated or truncated
        # time.sleep(1)
    print(f"Episode {episode + 1}: Total Reward = {total_reward}")
    time.sleep(2)
  
# Close environment
env.close()


Episode 1: Total Reward = 8958.967880249023
Episode 2: Total Reward = 10676.050201416016
Episode 3: Total Reward = 14169.777465820312
Episode 4: Total Reward = 10167.97006225586


### Testing the deadly corridor skill 5 with skill 5 enemies

In [16]:
model_path = "./train/train_Deadly_Corridor_COMP_5_S_5_2/best_model_100000.zip"  
model = PPO.load(model_path)


env = VizDoomGym_3(render=True)  
num_episodes = 4


for episode in range(num_episodes):
    obs,_ = env.reset()
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, terminated, truncated, info = env.step(action)
        time.sleep(0.10)
        total_reward += reward
        done=terminated or truncated
        # time.sleep(1)
    print(f"Episode {episode + 1}: Total Reward = {total_reward}")
    time.sleep(2)
  
# Close environment
env.close()


Episode 1: Total Reward = 12372.182601928711
Episode 2: Total Reward = 10717.602310180664
Episode 3: Total Reward = 11516.45361328125
Episode 4: Total Reward = 8931.114868164062
