# Environment Setup and Dependencies
Install and configure required packages (vizdoom, stable-baselines3, etc). Set up logging and directory structure. Define configuration constants and helper functions.

In [None]:
# Install Dependencies

!apt-get update
!apt-get install -y build-essential zlib1g-dev libsdl2-dev libjpeg-dev \
    nasm tar libbz2-dev libgtk2.0-dev cmake git libfluidsynth-dev libgme-dev \
    libopenal-dev timidity libwildmidi-dev unzip ffmpeg

!pip install vizdoom
!pip install stable-baselines3[extra]

# Set up logging and directory structure

import os
import logging

# Define local paths for scenario and storage
LOCAL_SCENARIO_PATH = "/content/scenarios/deathmatch.cfg"
LOCAL_STORAGE_PATH = "/content/scenarios/training_data"
LOCAL_MODEL_PATH = "/content/scenarios/training_data/models"
LOCAL_LOG_PATH = "/content/scenarios/training_data/logs"
LOCAL_TENSORBOARD_PATH = "/content/scenarios/training_data/tensorboard"
LOCAL_WAD_PATH = "/content/scenarios/freedoom2.wad"

# Create local directories
os.makedirs(LOCAL_STORAGE_PATH, exist_ok=True)
os.makedirs(LOCAL_MODEL_PATH, exist_ok=True)
os.makedirs(LOCAL_LOG_PATH, exist_ok=True)
os.makedirs(LOCAL_TENSORBOARD_PATH, exist_ok=True)
print("Created local directories.")

# Configure logging
logging.basicConfig(filename=os.path.join(LOCAL_LOG_PATH, 'setup.log'), level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Directories created and logging configured.")

# Download freedoom2.wad if it doesn't exist
if not os.path.exists(LOCAL_WAD_PATH):
    !wget https://github.com/freedoom/freedoom/releases/download/v0.13.0/freedoom2.wad -O $LOCAL_WAD_PATH
    logging.info(f"Downloaded freedoom2.wad to {LOCAL_WAD_PATH}")
else:
    logging.info(f"Using existing freedoom2.wad at {LOCAL_WAD_PATH}")

# Download deathmatch.cfg if it doesn't exist
if not os.path.exists(LOCAL_SCENARIO_PATH):
    !wget https://raw.githubusercontent.com/mwydmuch/ViZDoom/master/scenarios/deathmatch.cfg -P /content/scenarios/
    logging.info(f"Downloaded deathmatch.cfg to {LOCAL_SCENARIO_PATH}")
else:
    logging.info(f"Using existing deathmatch.cfg at {LOCAL_SCENARIO_PATH}")

# Define configuration constants and helper functions

def get_user_input(prompt, type_=None, min_=None, max_=None, range_=None):
    if min_ is not None and max_ is not None and max_ < min_:
        raise ValueError("min_ must be less than or equal to max_.")
    while True:
        val = input(prompt)
        if type_ is not None:
            try:
                val = type_(val)
            except ValueError:
                print(f"Input must be of type {type_.__name__}.")
                continue
        if min_ is not None and val < min_:
            print(f"Input must be greater than or equal to {min_}.")
        elif max_ is not None and val > max_:
            print(f"Input must be less than or equal to {max_}.")
        elif range_ is not None and val not in range_:
            if isinstance(range_, range):
                template = f"Input must be between {range_.start} and {range_.stop-1}."
            else:
                template = f"Input must be {', '.join(map(str, range_))}."
            print(template)
        else:
            return val

logging.info("Helper functions defined.")

# Custom ViZDoom Environment Class
Implement custom ViZDoom environment class inheriting from gym.Env. Handle observation/action spaces, reward shaping, state management and game lifecycle. Include unit tests.

In [None]:
# Custom ViZDoom Environment Class

from vizdoom import *
import numpy as np
import gymnasium as gym
from gymnasium import spaces
import os
from google.colab import drive
import torch
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv

# Mount Drive and Setup Paths
drive.mount('/content/drive')
BASE_PATH = "/content/drive/MyDrive/ViZDoom-PPO"
os.makedirs(f"{BASE_PATH}/models", exist_ok=True)
os.makedirs(f"{BASE_PATH}/logs", exist_ok=True)

class VizdoomEnv(gym.Env):
    def __init__(self, scenario_path, frame_skip=4):
        super(VizdoomEnv, self).__init__()
        self.game = DoomGame()
        self.game.load_config(scenario_path)
        self.game.set_doom_game_path(LOCAL_WAD_PATH)
        self.game.set_window_visible(False)
        self.game.set_mode(Mode.PLAYER)
        self.game.set_screen_format(ScreenFormat.GRAY8)
        self.game.set_screen_resolution(ScreenResolution.RES_640X480)
        self.game.init()

        self.frame_skip = frame_skip
        self.action_space = spaces.Discrete(self.game.get_available_buttons_size())
        self.observation_space = spaces.Box(low=0, high=255, shape=(self.game.get_screen_height(), self.game.get_screen_width(), 1), dtype=np.uint8)

        self.previous_game_variables = None

    def step(self, action):
        buttons = np.zeros(self.game.get_available_buttons_size())
        buttons[action] = 1

        reward = self.game.make_action(buttons.tolist(), self.frame_skip)
        done = self.game.is_episode_finished()

        state = self.game.get_state().screen_buffer if not done else np.zeros(self.observation_space.shape, dtype=np.uint8)
        state = np.expand_dims(state, axis=-1)

        shaped_reward = reward + self._shape_reward()

        return state, shaped_reward, done, False, {}

    def _shape_reward(self):
        current_game_vars = self.game.get_state().game_variables if self.game.get_state() else None
        reward = 0

        if current_game_vars is None or self.previous_game_variables is None:
            self.previous_game_variables = current_game_vars
            return reward

        reward += (current_game_vars[0] - self.previous_game_variables[0]) * 100.0
        reward -= (self.previous_game_variables[2] - current_game_vars[2]) * 0.1
        reward -= (self.previous_game_variables[1] - current_game_vars[1])
        reward += 0.1

        min_dist_now = self._get_closest_enemy_distance()
        if hasattr(self, 'min_dist_prev'):
            if min_dist_now < self.min_dist_prev and min_dist_now < 500:
                reward += 0.05
            elif min_dist_now > self.min_dist_prev and self.min_dist_prev < 500:
                reward -= 0.05
        self.min_dist_prev = min_dist_now

        self.previous_game_variables = current_game_vars

        return reward

    def _get_closest_enemy_distance(self):
        min_dist = float('inf')
        current_game_vars = self.game.get_state().game_variables if self.game.get_state() else None

        if current_game_vars is None:
            return min_dist

        px, py = current_game_vars[3], current_game_vars[4]

        for obj in self.game.get_state().objects:
            if obj.is_enemy():
                dist = ((px - obj.position_x)**2 + (py - obj.position_y)**2)**0.5
                min_dist = min(min_dist, dist)

        return min_dist

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.game.new_episode()
        state = self.game.get_state().screen_buffer
        state = np.expand_dims(state, axis=-1)
        self.previous_game_variables = None
        self.min_dist_prev = float('inf')
        return state, {}

    def close(self):
        self.game.close()

# Unit tests for VizdoomEnv

import unittest

class TestVizdoomEnv(unittest.TestCase):
    def setUp(self):
        self.env = VizdoomEnv(LOCAL_SCENARIO_PATH)

    def test_initialization(self):
        self.assertIsInstance(self.env, gym.Env)
        self.assertIsNotNone(self.env.game)
        self.assertEqual(self.env.frame_skip, 4)

    def test_step(self):
        state, reward, done, _, _ = self.env.step(0)
        self.assertEqual(state.shape, self.env.observation_space.shape)
        self.assertIsInstance(reward, float)
        self.assertIsInstance(done, bool)

    def test_reset(self):
        state, _ = self.env.reset()
        self.assertEqual(state.shape, self.env.observation_space.shape)

    def test_close(self):
        self.env.close()
        self.assertFalse(self.env.game.is_running())

if __name__ == '__main__':
    unittest.main(argv=[''], exit=False)

# Initialize environment
env = DummyVecEnv([lambda: VizdoomEnv(LOCAL_SCENARIO_PATH)])

# Configure PPO
model = PPO(
    "CnnPolicy",
    env,
    verbose=1,
    tensorboard_log=f"{BASE_PATH}/logs",
    learning_rate=2.5e-4,
    n_steps=128
)

# Train
TIMESTEPS = 100000
model.learn(total_timesteps=TIMESTEPS)
model.save(f"{BASE_PATH}/models/doom_ppo")

# PPO Agent Configuration
Configure PPO agent hyperparameters, neural network architecture, and training settings. Implement input validation and error handling for agent setup.

In [None]:
# PPO Agent Configuration

from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import CheckpointCallback
from stable_baselines3.common.monitor import Monitor
import torch as th

# Define PPO hyperparameters with input validation
def get_ppo_hyperparameters():
    learning_rate = get_user_input("Enter the learning rate (e.g., 0.0003): ", type_=float, min_=1e-6, max_=1e-1)
    n_steps = get_user_input("Enter the number of steps to run for each environment per update (e.g., 2048): ", type_=int, min_=1)
    batch_size = get_user_input("Enter the batch size (e.g., 64): ", type_=int, min_=1)
    n_epochs = get_user_input("Enter the number of epochs (e.g., 10): ", type_=int, min_=1)
    gamma = get_user_input("Enter the discount factor (e.g., 0.99): ", type_=float, min_=0.0, max_=1.0)
    gae_lambda = get_user_input("Enter the GAE lambda (e.g., 0.95): ", type_=float, min_=0.0, max_=1.0)
    clip_range = get_user_input("Enter the clip range (e.g., 0.2): ", type_=float, min_=0.0, max_=1.0)
    ent_coef = get_user_input("Enter the entropy coefficient (e.g., 0.01): ", type_=float, min_=0.0, max_=1.0)
    vf_coef = get_user_input("Enter the value function coefficient (e.g., 0.5): ", type_=float, min_=0.0, max_=1.0)
    max_grad_norm = get_user_input("Enter the maximum gradient norm (e.g., 0.5): ", type_=float, min_=0.0, max_=10.0)
    return {
        "learning_rate": learning_rate,
        "n_steps": n_steps,
        "batch_size": batch_size,
        "n_epochs": n_epochs,
        "gamma": gamma,
        "gae_lambda": gae_lambda,
        "clip_range": clip_range,
        "ent_coef": ent_coef,
        "vf_coef": vf_coef,
        "max_grad_norm": max_grad_norm
    }

# Get PPO hyperparameters from user
ppo_hyperparameters = get_ppo_hyperparameters()
logging.info(f"PPO hyperparameters: {ppo_hyperparameters}")

# Define neural network architecture
policy_kwargs = dict(
    activation_fn=th.nn.ReLU,
    net_arch=[dict(pi=[64, 64], vf=[64, 64])]
)

# Create and configure PPO agent
try:
    env = VizdoomEnv(LOCAL_SCENARIO_PATH)
    env = Monitor(env, LOCAL_LOG_PATH)
    model = PPO(
        "CnnPolicy",
        env,
        verbose=1,
        tensorboard_log=LOCAL_TENSORBOARD_PATH,
        policy_kwargs=policy_kwargs,
        **ppo_hyperparameters
    )
    logging.info("PPO agent created successfully.")
except Exception as e:
    logging.error(f"Error creating PPO agent: {e}")
    print(f"Error creating PPO agent: {e}")
    exit()

# Define checkpoint callback
checkpoint_callback = CheckpointCallback(
    save_freq=max(10000, ppo_hyperparameters["n_steps"] // 10),
    save_path=LOCAL_MODEL_PATH,
    name_prefix="ppo_vizdoom"
)

# Train the PPO agent
try:
    model.learn(total_timesteps=ppo_hyperparameters["n_steps"], callback=checkpoint_callback)
    logging.info("PPO agent training completed.")
except Exception as e:
    logging.error(f"Error during PPO agent training: {e}")
    print(f"Error during PPO agent training: {e}")
    if env:
        env.close()
    exit()

# Save the final model
final_model_path = os.path.join(LOCAL_MODEL_PATH, "ppo_vizdoom_final")
try:
    model.save(final_model_path)
    logging.info(f"Final PPO model saved to: {final_model_path}")
except Exception as e:
    logging.error(f"Error saving final PPO model: {e}")
    print(f"Error saving final PPO model: {e}")

# Close the environment
if env:
    env.close()
logging.info("Environment closed.")

# Training Pipeline
Build training loop with checkpointing, monitoring and logging. Implement early stopping and model saving. Add performance optimization and progress tracking.

In [None]:
# Training Pipeline

import os
import logging
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback
from stable_baselines3.common.monitor import Monitor
import torch as th

# Define PPO hyperparameters with input validation
def get_ppo_hyperparameters():
    learning_rate = get_user_input("Enter the learning rate (e.g., 0.0003): ", type_=float, min_=1e-6, max_=1e-1)
    n_steps = get_user_input("Enter the number of steps to run for each environment per update (e.g., 2048): ", type_=int, min_=1)
    batch_size = get_user_input("Enter the batch size (e.g., 64): ", type_=int, min_=1)
    n_epochs = get_user_input("Enter the number of epochs (e.g., 10): ", type_=int, min_=1)
    gamma = get_user_input("Enter the discount factor (e.g., 0.99): ", type_=float, min_=0.0, max_=1.0)
    gae_lambda = get_user_input("Enter the GAE lambda (e.g., 0.95): ", type_=float, min_=0.0, max_=1.0)
    clip_range = get_user_input("Enter the clip range (e.g., 0.2): ", type_=float, min_=0.0, max_=1.0)
    ent_coef = get_user_input("Enter the entropy coefficient (e.g., 0.01): ", type_=float, min_=0.0, max_=1.0)
    vf_coef = get_user_input("Enter the value function coefficient (e.g., 0.5): ", type_=float, min_=0.0, max_=1.0)
    max_grad_norm = get_user_input("Enter the maximum gradient norm (e.g., 0.5): ", type_=float, min_=0.0, max_=10.0)
    return {
        "learning_rate": learning_rate,
        "n_steps": n_steps,
        "batch_size": batch_size,
        "n_epochs": n_epochs,
        "gamma": gamma,
        "gae_lambda": gae_lambda,
        "clip_range": clip_range,
        "ent_coef": ent_coef,
        "vf_coef": vf_coef,
        "max_grad_norm": max_grad_norm
    }

# Get PPO hyperparameters from user
ppo_hyperparameters = get_ppo_hyperparameters()
logging.info(f"PPO hyperparameters: {ppo_hyperparameters}")

# Define neural network architecture
policy_kwargs = dict(
    activation_fn=th.nn.ReLU,
    net_arch=[dict(pi=[64, 64], vf=[64, 64])]
)

# Create and configure PPO agent
try:
    env = VizdoomEnv(LOCAL_SCENARIO_PATH)
    env = Monitor(env, LOCAL_LOG_PATH)
    model = PPO(
        "CnnPolicy",
        env,
        verbose=1,
        tensorboard_log=LOCAL_TENSORBOARD_PATH,
        policy_kwargs=policy_kwargs,
        **ppo_hyperparameters
    )
    logging.info("PPO agent created successfully.")
except Exception as e:
    logging.error(f"Error creating PPO agent: {e}")
    print(f"Error creating PPO agent: {e}")
    exit()

# Define checkpoint callback
checkpoint_callback = CheckpointCallback(
    save_freq=max(10000, ppo_hyperparameters["n_steps"] // 10),
    save_path=LOCAL_MODEL_PATH,
    name_prefix="ppo_vizdoom"
)

# Define evaluation callback for early stopping
eval_callback = EvalCallback(
    env,
    best_model_save_path=LOCAL_MODEL_PATH,
    log_path=LOCAL_LOG_PATH,
    eval_freq=max(10000, ppo_hyperparameters["n_steps"] // 10),
    deterministic=True,
    render=False
)

# Train the PPO agent
try:
    model.learn(total_timesteps=ppo_hyperparameters["n_steps"], callback=[checkpoint_callback, eval_callback])
    logging.info("PPO agent training completed.")
except Exception as e:
    logging.error(f"Error during PPO agent training: {e}")
    print(f"Error during PPO agent training: {e}")
    if env:
        env.close()
    exit()

# Save the final model
final_model_path = os.path.join(LOCAL_MODEL_PATH, "ppo_vizdoom_final")
try:
    model.save(final_model_path)
    logging.info(f"Final PPO model saved to: {final_model_path}")
except Exception as e:
    logging.error(f"Error saving final PPO model: {e}")
    print(f"Error saving final PPO model: {e}")

# Close the environment
if env:
    env.close()
logging.info("Environment closed.")

# Model Management
Implement model saving/loading, version control and backup. Add model evaluation metrics and visualization. Include security validation for model files.

In [None]:
# Model Management

import os
import logging
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback
from stable_baselines3.common.monitor import Monitor
import torch as th

# Function to save the model with version control
def save_model_with_version(model, save_path, version):
    versioned_path = f"{save_path}_v{version}"
    try:
        model.save(versioned_path)
        logging.info(f"Model saved to: {versioned_path}")
    except Exception as e:
        logging.error(f"Error saving model to {versioned_path}: {e}")
        print(f"Error saving model to {versioned_path}: {e}")

# Function to load the model with security validation
def load_model_with_validation(model_path):
    if os.path.exists(model_path):
        try:
            model = PPO.load(model_path)
            logging.info(f"Model loaded from: {model_path}")
            return model
        except Exception as e:
            logging.error(f"Error loading model from {model_path}: {e}")
            print(f"Error loading model from {model_path}: {e}")
            return None
    else:
        logging.error(f"Model path does not exist: {model_path}")
        print(f"Model path does not exist: {model_path}")
        return None

# Function to evaluate the model
def evaluate_model(model, env, num_episodes=10):
    episode_rewards = []
    for _ in range(num_episodes):
        obs = env.reset()
        done = False
        total_reward = 0
        while not done:
            action, _states = model.predict(obs, deterministic=True)
            obs, reward, done, _, _ = env.step(action)
            total_reward += reward
        episode_rewards.append(total_reward)
    avg_reward = sum(episode_rewards) / num_episodes
    logging.info(f"Average reward over {num_episodes} episodes: {avg_reward}")
    return avg_reward

# Function to visualize evaluation metrics
def visualize_evaluation_metrics(rewards):
    import matplotlib.pyplot as plt
    plt.plot(rewards)
    plt.xlabel('Episode')
    plt.ylabel('Reward')
    plt.title('Model Evaluation Metrics')
    plt.show()

# Example usage
if __name__ == "__main__":
    # Define paths and version
    model_save_path = os.path.join(LOCAL_MODEL_PATH, "ppo_vizdoom")
    model_version = 1

    # Save the model with version control
    save_model_with_version(model, model_save_path, model_version)

    # Load the model with validation
    loaded_model = load_model_with_validation(f"{model_save_path}_v{model_version}")

    # Evaluate the model
    if loaded_model:
        avg_reward = evaluate_model(loaded_model, env)
        print(f"Average reward: {avg_reward}")

        # Visualize evaluation metrics
        visualize_evaluation_metrics([avg_reward])

# Testing & Validation
Write unit tests for environment, agent and training components. Add integration tests and example usage. Implement validation of trained models.

In [None]:
# Testing & Validation

import unittest
from stable_baselines3 import PPO
import numpy as np

class TestVizdoomEnv(unittest.TestCase):
    def setUp(self):
        self.env = VizdoomEnv(LOCAL_SCENARIO_PATH)

    def test_initialization(self):
        self.assertIsInstance(self.env, gym.Env)
        self.assertIsNotNone(self.env.game)
        self.assertEqual(self.env.frame_skip, 4)

    def test_step(self):
        state, reward, done, _, _ = self.env.step(0)
        self.assertEqual(state.shape, self.env.observation_space.shape)
        self.assertIsInstance(reward, float)
        self.assertIsInstance(done, bool)

    def test_reset(self):
        state, _ = self.env.reset()
        self.assertEqual(state.shape, self.env.observation_space.shape)

    def test_close(self):
        self.env.close()
        self.assertFalse(self.env.game.is_running())

class TestPPOAgent(unittest.TestCase):
    def setUp(self):
        self.env = VizdoomEnv(LOCAL_SCENARIO_PATH)
        self.env = Monitor(self.env, LOCAL_LOG_PATH)
        self.model = PPO("CnnPolicy", self.env, verbose=1, tensorboard_log=LOCAL_TENSORBOARD_PATH)

    def test_initialization(self):
        self.assertIsInstance(self.model, PPO)

    def test_training(self):
        try:
            self.model.learn(total_timesteps=100)
            self.assertTrue(True)
        except Exception as e:
            self.fail(f"Training failed with exception: {e}")

    def test_save_load(self):
        model_path = os.path.join(LOCAL_MODEL_PATH, "test_model")
        self.model.save(model_path)
        loaded_model = PPO.load(model_path)
        self.assertIsInstance(loaded_model, PPO)

class TestModelManagement(unittest.TestCase):
    def setUp(self):
        self.env = VizdoomEnv(LOCAL_SCENARIO_PATH)
        self.env = Monitor(self.env, LOCAL_LOG_PATH)
        self.model = PPO("CnnPolicy", self.env, verbose=1, tensorboard_log=LOCAL_TENSORBOARD_PATH)

    def test_save_model_with_version(self):
        model_save_path = os.path.join(LOCAL_MODEL_PATH, "ppo_vizdoom")
        model_version = 1
        save_model_with_version(self.model, model_save_path, model_version)
        self.assertTrue(os.path.exists(f"{model_save_path}_v{model_version}"))

    def test_load_model_with_validation(self):
        model_save_path = os.path.join(LOCAL_MODEL_PATH, "ppo_vizdoom_v1")
        self.model.save(model_save_path)
        loaded_model = load_model_with_validation(model_save_path)
        self.assertIsInstance(loaded_model, PPO)

    def test_evaluate_model(self):
        avg_reward = evaluate_model(self.model, self.env, num_episodes=5)
        self.assertIsInstance(avg_reward, float)

if __name__ == '__main__':
    unittest.main(argv=[''], exit=False)