Using a GA to learn defend_the_center.cfg

Import everything

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
#from vizdoom_env import Defend_the_Center_VZG #Will just define in the notebook because its more organized
import random
from torch.utils.tensorboard import SummaryWriter
from vizdoom import * #Import all of vizdoom
import time #To make the program sleep (wait), so we can actually see what's happening
from gymnasium import Env #Import OpenAI Gym's Env class
from gymnasium.spaces import Discrete, Box #Import OpenAI Gym's Discrete and Box spaces
import cv2 #OpenCV for image processing, used for modifying the DOOM environment to make it run faster 
from stable_baselines3.common.callbacks import BaseCallback #Import the BaseCallback class from stable_baselines3 to learn from the environment
from stable_baselines3.common import env_checker #Import the env_checker class from stable_baselines3 to check the environment
import os #To create directories for saving models
import sys #To backtrack to root

original_sys_path = sys.path.copy() #Come back to this path later after we navigate to the parent directory
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))  #Add the parent directory to the path so we can import the pathfinder module
from pathfinder import doomfinder, create_new_best_generation_directory, gamefinder #Import functions from the pathfinder module
sys.path = original_sys_path #Set the path back to the original path


Define enviornment

In [None]:
class Defend_the_Center_VZG(Env): #Used for defend_the_center config
    def __init__(self, config_path, render=False): #Constructor
        
        #The naming convention will be "First map this was used on" + "_VZG" (VizDoomGym), 
        #but each config/map this env is used for will be listed below

        #Maps/Config: defend_the_center

        super(Defend_the_Center_VZG, self).__init__() #Inherit from Env class

        #Args: 
            #config_path (str): The path to the configuration file
            #render (bool): Whether to render the environment or not, false by default

        #Setup game
        self.game = vizdoom.DoomGame() #Create a DoomGame object
        self.game.set_doom_game_path(gamefinder('freedoom2.wad')) #Set the path to the game
        self.game.load_config(config_path) #Load the configuration file from file path, ex: doomfinder("basic.cfg")

        #Set window visibility
        if render == False:
            self.game.set_window_visible(False)
        else:
            self.game.set_window_visible(True)

        self.game.init() #Start the game

        #Setup action and observation space
        self.observation_space = Box(low=0, high=255, shape=(100, 160, 1), dtype=np.uint8) #Observation space, 100x160x1 image
        self.action_space = Discrete(3) #Action space, 3 actions

        #Get game variables
        game_variables = self.game.get_state().game_variables
        ammo, health, killcount = game_variables        
        self.ammo = ammo #Get the ammo count, initialize to the current ammo
        self.health = health #Initialize health to starting health (probably 100)
        self.killcount = killcount #Initialize killcount to 0


    def step(self, action, limit = 1000): #Take a step in the environment 
        #Args:
            #action (int): The action to take
            #limit (int): Unimplemented "limit" for the episode, most likely will be a time limit
        #Returns:
            #observation (np.array): The screen buffer of the environment
            #reward (float): The reward for the action taken
            #terminated (bool) Whether the episode is finished or not (by reaching the goal)
            #truncated (bool): Whether the episode has reached some terminal state without reaching the goal (ie: running out of time)
            #info (dict): Additional information about the environment

        #Specify actions and take a step
        actions = np.identity(3) #Create an identity matrix with 3 rows (3 actions), MOVE_LEFT, MOVE_RIGHT, ATTACK, these are the actions we can take in the environment
        movement_reward = self.game.make_action(actions[action], 4) #Reward for taking a random action, second parameter is frame skip (skip 4 frames before taking the next action), the reason we do this is because it saves us time while being easy to see what is happening 
        reward = movement_reward #Initialize reward to movement reward
        truncated = False #Not implemented yet, so set to False. The idea is that if step passes some sort of limit, like a time limit, then the episode is truncated.
        info = {} #Initialize info to an empty dictionary

        if self.game.get_state(): #If the game is not finished
            observation = self.game.get_state().screen_buffer #Get the screen buffer
            observation = self.greyscale(observation) #Convert the image to greyscale

            game_variables = self.game.get_state().game_variables #Get the game variables
            ammo, health, killcount = game_variables #Unpack the game variables

            
            #Calculate reward deltas
            #ammo_delta = ammo - self.ammo #Current ammo - old ammo = ammo used
            ammo = self.ammo 
            #health_delta = health - self.health  #Current health - old health = damage taken
            health = self.health
            #killcount_delta = killcount - self.killcount #Current killcount - old killcount = enemies killed
            killcount = self.killcount

            #reward = movement_reward*2 + ammo_delta*0.0384615385 + health_delta*0.01 #Calculate the reward, we get 2 pts for each enemy we kill, if we lose all heath our score is subtracted by 1, if we lose all ammo our score is subtracted by 1
            #reward = movement_reward*2 + ammo_delta*0.0384615385 + health_delta*0 #Ignore health delta because it might just be punishing the model too much
            #reward = movement_reward*2 + ammo_delta*0.01 + health_delta*0 #Lower the amount the model is punished for wasting ammo
            #reward = movement_reward*2 #Move punishing ammo wasted over to the fitness function
            reward = movement_reward #Ignore reward shaping
            
            info = {"ammo": ammo, "health": health, "killcount": killcount} #Add ammo and health to the info dictionary
        else:
            observation = np.zeros(self.observation_space.shape) #Return a blank screen

        terminated = self.game.is_episode_finished() #Check if the episode is finished

        return observation, reward, terminated, truncated, info

    def render(self, render_in_greyscale=False): #Render the environment for a frame
        #Args:
            #render_in_greyscale (bool): Whether to render the environment in greyscale or not
        
        if self.game.get_state() and render_in_greyscale:  #Only render if there's a valid game state
            observation = self.game.get_state().screen_buffer
            greyscale_obs = self.greyscale(observation)  #Convert to greyscale
            #Render using OpenCV to visualize
            cv2.imshow("VizDoom Environment", greyscale_obs.squeeze())  #Remove extra dimension and display
            cv2.waitKey(1)  #Wait 1ms between frames to allow for rendering
        elif self.game.get_state():  #Only render if there's a valid game state
            observation = self.game.get_state().screen_buffer
            #Render using OpenCV to visualize
            cv2.imshow("VizDoom Environment", observation.squeeze())  #Remove extra dimension and display
            cv2.waitKey(1)  #Wait 1ms between frames to allow for rendering
        else:
            print("No game state to render.")

            
    def reset(self, seed=None): #Reset the environment when we start a new game
        #Args:
            #seed (int): The seed for the random number generator
        #Returns:
            #(observation, info) (tuple)
                #observation (np.array): The screen buffer of the environment
                #info (dict): Additional information about the environment
            
        super().reset(seed=seed) #Implement seeding
        
        self.game.new_episode() #Start a new episode
        state = self.game.get_state().screen_buffer #Get the screen buffer
        observation = self.greyscale(state) #Convert the image to greyscale
        
        #Gather any additional environment-specific info (like ammo, etc.)
        if self.game.get_state():
            game_variables = self.game.get_state().game_variables #Get the game variables
            ammo, health, killcount = game_variables #Unpack the game variables
            info = {"ammo": ammo, "health": health, "killcount": killcount} #Add to the info dictionary
        else:
            info = {} #No gamestate means no info can be gathered
        
        return (observation, info) #Tuple of observation and info

    def greyscale(self, observation=None): #Convert the enivornment to greyscale and resize it
        #Args:
            #observation (np.array): The image of the environment (the current game frame)
        #Returns:
            #grey_return (np.array): The resized greyscale image of the environment
        
        if observation is None and self.game.get_state(): #If no observation is passed
            observation = self.game.get_state().screen_buffer #Get the screen buffer 

        grey = cv2.cvtColor(np.moveaxis(observation, 0, -1), cv2.COLOR_BGR2GRAY) #Convert the image to greyscale
        resize = cv2.resize(grey, (160, 100), interpolation=cv2.INTER_CUBIC) #Resize the image to 160x100
        state = np.reshape(resize, (100, 160, 1)) #Reshape the image to 100x160x1
        
        return state
    
    def get_state(self): 
        #Returns:
            #state (np.array): The current state of the environment
        return self.game.get_state()

    def close(self): #Close the environment
        self.game.close()

Initalize Agent

In [3]:
#Define a simple neural network for action selection
class DoomAgent(nn.Module):
    def __init__(self):
        super(DoomAgent, self).__init__()
        self.fc1 = nn.Linear(160 * 100, 128)
        self.fc2 = nn.Linear(128, 64)  #Additional hidden layer
        self.fc3 = nn.Linear(64, 3)  #Output layer for 3 actions

    def forward(self, x):
        x = torch.flatten(x, 1)  #Flatten input
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))  #Pass through second hidden layer
        x = torch.softmax(self.fc3(x), dim=1)
        return x

Initialize Population

In [6]:
def initialize_population(pop_size):
    return [DoomAgent() for _ in range(pop_size)]

Define Fitness Function


In [None]:
def fitness(agent, env, episodes=3):
    total_reward = 0
    total_ammo_used = 0  #Track ammo usage for the fitness penalty
    total_steps = 0  #Track total steps taken

    for _ in range(episodes):
        obs, info = env.reset()
        current_ammo = info.get("ammo", 0)  #Get starting ammo if available
        ammo_wasted = 0  #Track ammo wasted for the fitness penalty
        current_health = info.get("health", 100)  #Get starting health if available
        current_killcount = info.get("killcount", 0)  #Get starting killcount if available
        done = False

        while not done:
            obs_tensor = torch.from_numpy(obs).float().unsqueeze(0)
            action_probs = agent(obs_tensor)
            action = torch.argmax(action_probs).item()
            obs, reward, terminated, truncated, info = env.step(action)

            #Accumulate rewards
            total_reward += reward
            
            #Acumlate total steps
            total_steps += 1


            #Calculate ammo wasted during this step
            new_ammo = info.get("ammo", current_ammo) #Get the new ammo count   
            ammo_used = current_ammo - new_ammo #Calculate ammo used
            if ammo_used > current_killcount: #If we used more ammo than enemies killed
                ammo_wasted = ammo_used - current_killcount #Track the wasted ammo
            total_ammo_used += ammo_used #Accumulate total ammo used
            current_ammo = new_ammo #Update the current ammo
            current_killcount = info.get("killcount", current_killcount) #Update the killcount

            done = terminated or truncated

        total_reward -= ammo_wasted  #Penalize for wasted ammo, each enemy killed is worth a point, and each bullet missed loses us a point
    
    avg_reward = total_reward / episodes
    avg_steps = total_steps / episodes

    return avg_reward, avg_steps

Selection, Crossover, and Mutation

In [None]:
def select_parents(population, fitnesses, num_parents=5):
    #Sort by fitness
    sorted_population = [x for _, x in sorted(zip(fitnesses, population), key=lambda item: item[0], reverse=True)]
    return sorted_population[:num_parents]


def crossover(parent1, parent2):
    child = DoomAgent()
    for param_child, param1, param2 in zip(child.parameters(), parent1.parameters(), parent2.parameters()):
        mask = torch.rand_like(param1) > 0.5
        param_child.data = torch.where(mask, param1.data, param2.data)
    return child

def mutate(agent, mutation_rate=0.01):
    for param in agent.parameters():
        if random.random() < mutation_rate:
            param.data += torch.randn_like(param) * mutation_rate

Define functions that create the directories that the logs will be saved in

In [None]:
def create_run_directory(base_dir="runs/vizdoom_ga_defend_the_center"):
    os.makedirs(base_dir, exist_ok=True)
    existing_runs = [int(d.split('_')[-1]) for d in os.listdir(base_dir) if d.split('_')[-1].isdigit()]
    run_number = max(existing_runs, default=0) + 1
    run_dir = os.path.join(base_dir, f"run_{run_number}")
    os.makedirs(run_dir, exist_ok=True)
    
    #Create log directory within the run directory
    log_dir = os.path.join(run_dir, "log")
    os.makedirs(log_dir, exist_ok=True)
    
    return run_dir, log_dir

Define best agents loader

In [None]:
def load_best_agents(model_dir, num_agents=5):
    """Load a specified number of best agents from a previous run's model directory."""
    best_agents = []
    for i in range(num_agents):
        agent = DoomAgent()
        checkpoint_path = os.path.join(model_dir, f"best_agent_gen_{i}.pth")
        agent.load_state_dict(torch.load(checkpoint_path, weights_only=False)) 
        best_agents.append(agent)
    return best_agents

Define Genetic Algorithm

In [None]:
def run_ga(env, generations=20, pop_size=10, num_parents=5, mutation_rate=0.01, initial_population=None):
    #Create new run directories for logs and models
    run_dir, log_dir = create_run_directory()
    writer = SummaryWriter(log_dir)
    
    #Directory to save models for this run
    model_dir = os.path.join(run_dir, "saved_models")
    os.makedirs(model_dir, exist_ok=True)
    
    #Start with the provided initial population or initialize a new one
    population = initial_population if initial_population else initialize_population(pop_size)

    for generation in range(generations):
        fitnesses = []
        episode_lengths = []
        episode_rewards = []

        for agent in population:
            #Use the fitness function to evaluate the agent's fitness
            agent_fitness, total_steps = fitness(agent, env)
            fitnesses.append(agent_fitness)
            episode_rewards.append(agent_fitness)
            episode_lengths.append(total_steps) 
        
        #Calculate metrics
        best_fitness = max(fitnesses)
        avg_fitness = np.mean(fitnesses)
        ep_len_mean = np.mean(episode_lengths)
        ep_rew_mean = np.mean(episode_rewards)

        #Diversity metric (Euclidean distance between agents)
        diversity = 0
        for i in range(len(population)):
            for j in range(i + 1, len(population)):
                diversity += torch.norm(torch.cat([p.flatten() for p in population[i].parameters()]) -
                                        torch.cat([p.flatten() for p in population[j].parameters()])).item()
        diversity /= (len(population) * (len(population) - 1) / 2)  #Normalize diversity calculation

        #Log metrics to TensorBoard
        writer.add_scalar('Best Fitness', best_fitness, generation)
        writer.add_scalar('Average Fitness', avg_fitness, generation)
        writer.add_scalar('Diversity', diversity, generation)
        writer.add_scalar('Episode Length Mean', ep_len_mean, generation)
        writer.add_scalar('Episode Reward Mean', ep_rew_mean, generation)

        print(f'Generation {generation}: Best Fitness = {best_fitness}, Avg Fitness = {avg_fitness}, Diversity = {diversity}')

        #Save the best model of this generation
        best_agent_idx = fitnesses.index(best_fitness)
        best_agent = population[best_agent_idx]
        torch.save(best_agent.state_dict(), f"{model_dir}/best_agent_gen_{generation}.pth")

        #Selection and reproduction
        parents = select_parents(population, fitnesses, num_parents)
        next_population = parents[:]
        while len(next_population) < pop_size:
            parent1, parent2 = random.sample(parents, 2)
            child = crossover(parent1, parent2)
            mutate(child, mutation_rate)
            next_population.append(child)

        population = next_population

    writer.close()
    return population


Train with the ViZDoom Environment

In [None]:
#Initialize Doom environment
env = Defend_the_Center_VZG(doomfinder('defend_the_center_modified.cfg'), render=False)

env_checker.check_env(env) #Check the environment to see if its valid

#Run Genetic Algorithm
trained_agents = run_ga(env, generations=1000, pop_size=30, num_parents=5, mutation_rate=0.01)

Train with the ViZDoom enviornment based off a previous best agent

In [None]:
#Initialize Doom environment
#env = Defend_the_Center_VZG(doomfinder('defend_the_center_modified.cfg'), render=False)

#initial_population = load_best_agents("runs/vizdoom_ga_defend_the_center/run_7/saved_models", num_agents=999)

#Run Genetic Algorithm
#trained_agents = run_ga(env, generations=1000, pop_size=30, num_parents=5, mutation_rate=0.01, initial_population=initial_population)


Test best agent

In [4]:
#Initialize the agent
agent = DoomAgent()

#Load the saved model weights into the agent
agent.load_state_dict(torch.load("runs/vizdoom_ga_defend_the_center/run_9/saved_models/best_agent_gen_999.pth"))

#Set the agent to evaluation mode
agent.eval()

#Initialize the environment
env = Defend_the_Center_VZG(doomfinder('defend_the_center_modified.cfg'), render=True)

for episode in range(5):
    observation, _ = env.reset()  #Reset the environment and get only the observation
    done = False  #Set done to false
    total_reward = 0  #Set total reward to 0

    while not done:  #While the game isn't done
        #Convert the observation to a tensor and pass it through the agent
        obs_tensor = torch.from_numpy(observation).float().unsqueeze(0)
        with torch.no_grad():  # Disable gradient computation
            action_probs = agent(obs_tensor)  # Use agent instead of best_agent
            action = torch.argmax(action_probs).item()
        
        #Take a step in the environment
        observation, reward, done, _, _ = env.step(action)
        total_reward += reward  #Add the reward to the total reward
        time.sleep(0.05)  #Sleep for 0.05 seconds

    print(f'Episode: {episode}, Total Reward: {total_reward}')  #Print the episode and total reward
    time.sleep(2)  #Sleep for 2 seconds between episodes


  agent.load_state_dict(torch.load("runs/vizdoom_ga_defend_the_center/run_9/saved_models/best_agent_gen_999.pth"))


c:\Users\johnn\OneDrive\Desktop\Coding Projects\DOOM-bot\Evolutionary Learning\Maps and Configs
Episode: 0, Total Reward: 0.0
Episode: 1, Total Reward: 1.0
Episode: 2, Total Reward: 2.0
Episode: 3, Total Reward: 0.0
Episode: 4, Total Reward: 0.0
