### Initialize Required Classes, Methods, and Functions

Required installations: Pygame, Gym, Tensorflow, SciPy, NumPy, Pandas

In [None]:
import pygame
import gym
from gym import spaces
import numpy as np
import random
import time
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers

#This block initializes the environment and A2C Agents.

class GymEnvironment(gym.Env):
    def __init__(self):
        super(GymEnvironment, self).__init__()

        # Define the screen dimensions
        self.screen_width = 400
        self.screen_height = 300

        # Define the action space (up, down, left, right, none)
        self.action_space = spaces.Discrete(5)

        # Define the observation space
        self.observation_space = spaces.Box(low=0, high=255, shape=(self.screen_height, self.screen_width, 3), dtype=np.uint8)

        # Initialize Pygame
        pygame.init()

        # Create the game screen
        self.screen = pygame.display.set_mode((self.screen_width, self.screen_height))
        pygame.display.set_caption("Gym Environment")

        # Initialize game state variables
        self.player_size = 10
        self.player_x = self.screen_width // 2
        self.player_y = self.screen_height // 2

        # Enemy related variables
        self.enemies = []
        self.enemy_spawn_interval = 3.0  # In seconds
        self.enemy_speed = 3  # Pixels moved per enemy update
        self.enemy_lifetime = 20.0  # In seconds
        self.max_time = 20.0  # Maximum time allowed for the game (in seconds)
        self.start_time = 0.0  # Keep track of the starting time
        self.last_enemy_spawn_time = time.time()

        # Scoring related variables
        self.score = 0
        self.last_score_time = time.time()

        # Powerup related variables
        self.powerup_active = False
        self.powerup_spawn_interval = 3.0  # In seconds
        self.last_powerup_spawn_time = time.time()
        self.powerup_x = 0
        self.powerup_y = 0
            
    def reset(self):
        # Reset the game state
        self.player_x = self.screen_width // 2
        self.player_y = self.screen_height // 2
        self.enemies = []
        self.last_enemy_spawn_time = time.time()
        self.enemy_spawn_interval = 3.0  # In seconds
        self.enemy_speed = 3  # Pixels moved per enemy update
        self.score = 0
        self.last_score_time = time.time()
        self.powerup_active = False
        self.last_powerup_spawn_time = time.time()
        self.start_time = time.time()
        return self._get_observation()

    def _get_observation(self):
        # Capture the current game screen as the observation
        screen_surface = pygame.display.get_surface()
        observation = pygame.surfarray.array3d(screen_surface)
        observation = np.transpose(observation,(1,0,2))
        return observation

    def _spawn_enemy(self):
        enemy_size = random.uniform(0.5, 1.5) * self.player_size
        enemy_x = random.randint(0, self.screen_width - 1)
        enemy_y = random.randint(0, self.screen_height - 1)
        enemy_spawn_time = time.time()
        self.enemies.append((enemy_x, enemy_y, enemy_size, enemy_spawn_time))

    def _move_enemies(self, episode):
        for i, (enemy_x, enemy_y, enemy_size, enemy_spawn_time) in enumerate(self.enemies):
            # Calculate distance between enemy and player
            distance_to_player = np.sqrt((self.player_x - enemy_x)**2 + (self.player_y - enemy_y)**2)
            speed = self.enemy_speed
            
            # Enemies only seek player if episode count is 25 or greater, otherwise enemies move randomly.
            if episode > 24:
                # Enemies move towards the player if they are within a certain distance
                if distance_to_player < 1000:  
                    # Calculate direction vector towards the player
                    dx = self.player_x - enemy_x
                    dy = self.player_y - enemy_y

                    # Move the enemy one step towards the player in the cardinal direction
                    if abs(dx) >= abs(dy):
                        self.enemies[i] = (enemy_x + speed * np.sign(dx), enemy_y, enemy_size, enemy_spawn_time)
                    else:
                        self.enemies[i] = (enemy_x, enemy_y + speed * np.sign(dy), enemy_size, enemy_spawn_time)
            elif episode <= 24:
                direction = random.choice(['up', 'down', 'left', 'right'])
                if direction == 'up':
                    self.enemies[i] = (enemy_x, max(0, enemy_y - speed), enemy_size, enemy_spawn_time)
                elif direction == 'down':
                    self.enemies[i] = (enemy_x, min(self.screen_height - 1, enemy_y + speed), enemy_size, enemy_spawn_time)
                elif direction == 'left':
                    self.enemies[i] = (max(0, enemy_x - speed), enemy_y, enemy_size, enemy_spawn_time)
                elif direction == 'right':
                    self.enemies[i] = (min(self.screen_width - 1, enemy_x + speed), enemy_y, enemy_size, enemy_spawn_time)
    
    def _is_colliding(self, x1, y1, size1, x2, y2, size2, buffer=5):
        return (x1 + size1 + buffer > x2 and x1 < x2 + size2 + buffer and
                y1 + size1 + buffer > y2 and y1 < y2 + size2 + buffer)

    def _pick_up_powerup(self):

        if self.powerup_active and self._is_colliding(self.player_x, self.player_y, self.player_size, self.powerup_x, self.powerup_y, 10):
            self.score *= 2
            self.powerup_active = False

            # Increase enemy speed and decrease enemy spawn interval
            self.enemy_speed *= 1.5
            self.enemy_spawn_interval = max(0.01, self.enemy_spawn_interval - 0.5)


    def _spawn_powerup(self):
        self.powerup_x = random.randint(0, self.screen_width - 1)
        self.powerup_y = random.randint(0, self.screen_height - 1)
        self.powerup_active = True

    def _remove_old_enemies(self):
        current_time = time.time()
        self.enemies = [(x, y, size, spawn_time) for x, y, size, spawn_time in self.enemies
                        if current_time - spawn_time <= self.enemy_lifetime]

    def _calculate_reward_boost(self):
        if self.powerup_active:
            distance_to_powerup = np.sqrt((self.player_x - self.powerup_x)**2 + (self.player_y - self.powerup_y)**2)
            max_distance = np.sqrt((self.screen_width - 1)**2 + (self.screen_height - 1)**2)  # Max distance on the screen
            normalized_distance = distance_to_powerup / max_distance
            return 2.0 - 2*normalized_distance  # The closer to the powerup, the higher the reward boost
        return 0.0


    def step(self, action, episode):
        # Execute the specified action from the allowed 5
        if action == 0:  # Up
            self.player_y = max(0, self.player_y - 5)
        elif action == 1:  # Down
            self.player_y = min(self.screen_height - self.player_size, self.player_y + 5)
        elif action == 2:  # Left
            self.player_x = max(0, self.player_x - 5)
        elif action == 3:  # Right
            self.player_x = min(self.screen_width - self.player_size, self.player_x + 5)
        else:  # No movement
            pass
        
        # Calculate the power up proximity bonus
        reward_boost = self._calculate_reward_boost()

        # Update the score
        self.score = self.score + 1 + reward_boost
        
        # Spawn new enemies periodically if Episode count is 10 or greater
        current_time = time.time()
        
        if episode > 9:
            if current_time - self.last_enemy_spawn_time > self.enemy_spawn_interval:
                self._spawn_enemy()
                self.last_enemy_spawn_time = current_time

        # Spawn powerup periodically
        if not self.powerup_active and current_time - self.last_powerup_spawn_time > self.powerup_spawn_interval:
            self._spawn_powerup()
            self.last_powerup_spawn_time = current_time

        # Move enemies
        self._move_enemies(episode)

        # Check if player picks up the powerup
        self._pick_up_powerup()
        
        # Remove old enemies
        self._remove_old_enemies()

        # Check for collisions between the player and enemies
        for enemy_x, enemy_y, enemy_size, _ in self.enemies:
            if self._is_colliding(self.player_x, self.player_y, self.player_size, enemy_x, enemy_y, enemy_size):
                self.score -= round(self.score / 2,0)
                return self._get_observation(), self.score, True, {'score': self.score}

        # Render the screen with the updated player, enemy, and powerup positions
        self.screen.fill((0, 0, 0))
        pygame.draw.rect(self.screen, (255, 255, 255), (self.player_x, self.player_y, self.player_size, self.player_size))
        for enemy_x, enemy_y, enemy_size, _ in self.enemies:
            pygame.draw.rect(self.screen, (255, 0, 0), (enemy_x, enemy_y, enemy_size, enemy_size))
        if self.powerup_active:
            pygame.draw.rect(self.screen, (0, 255, 0), (self.powerup_x, self.powerup_y, 10, 10))
        pygame.display.flip()

        # Get the current observation
        observation = self._get_observation()

        # Calculate the reward
        reward = self.score
        # Check if the game is done due to reaching the time limit
        if current_time-self.start_time >= self.max_time:
            return self._get_observation(), self.score, True, {'score': self.score, 'time_elapsed': current_time-self.start_time}

        # Check if the game is done
        done = False

        return observation, reward, done, {'score': self.score}

    def render(self, mode='human'):
        # Required render method for Gym environments
        if mode == 'human':
            pygame.display.update()

    def close(self):
        # Close the Pygame window
        pygame.quit()

class A2CAgent:
    def __init__(self, state_shape, action_size, epsilon = 0.1, learning_rate_actor=0.001, learning_rate_critic=0.005, gamma=0.99):
        # Initialize the actor and critic neural networks
        self.actor = self.build_actor_network(state_shape, action_size)
        self.critic = self.build_critic_network(state_shape)

        # Define the optimizers
        self.optimizer_actor = optimizers.Adam(learning_rate=learning_rate_actor)
        self.optimizer_critic = optimizers.Adam(learning_rate=learning_rate_critic)

        # Set the discount factor (gamma)
        self.gamma = gamma
        
        # Set the epsilon factor
        self.epsilon = epsilon

    def build_actor_network(self, state_shape, action_size):
        model = models.Sequential()
        model.add(layers.Conv2D(16, (3, 3), activation='relu', input_shape=state_shape))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Flatten())
        model.add(layers.Dense(32, activation='relu'))
        model.add(layers.Dense(action_size, activation='softmax'))
        return model

    def build_critic_network(self, state_shape):
        model = models.Sequential()
        model.add(layers.Conv2D(16, (3, 3), activation='relu', input_shape=state_shape))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Flatten())
        model.add(layers.Dense(32, activation='relu'))
        model.add(layers.Dense(1, activation='linear'))
        return model

    def get_action(self, state, env):
        state = np.expand_dims(state, axis=0)
        action_probs = self.actor.predict(state, verbose=0)[0]

        # Epsilon-greedy action selection
        if np.random.rand() < self.epsilon:
            action = np.random.choice(len(action_probs))
        else:
            # Check the validity of the action (ensure player stays on the screen)
            valid_actions = []
            for a in range(len(action_probs)):
                player_x, player_y = env.player_x, env.player_y
                if a == 0:  # Up
                    player_y = max(0, player_y - 5)
                elif a == 1:  # Down
                    player_y = min(env.screen_height - env.player_size, player_y + 5)
                elif a == 2:  # Left
                    player_x = max(0, player_x - 5)
                elif a == 3:  # Right
                    player_x = min(env.screen_width - env.player_size, player_x + 5)
                # Check if the new player position is different from the original position
                if (player_x, player_y) != (env.player_x, env.player_y):
                    valid_actions.append(a)

            # If all moves lead to the player staying in the same position, choose a random action
            if not valid_actions:
                action = np.random.choice(len(action_probs))
            else:
                action = np.random.choice(valid_actions)

        return action

    def compute_advantages(self, rewards, values, dones):
        n = len(rewards)
        advantages = np.zeros_like(rewards)

        last_advantage = 0
        for t in reversed(range(n)):
            if dones[t]:
                last_advantage = 0
            delta = rewards[t] + self.gamma * values[t + 1] * (1 - int(dones[t])) - values[t]
            last_advantage = delta + self.gamma * last_advantage * (1 - int(dones[t]))
            advantages[t] = last_advantage

        return advantages

    def train(self, states, actions, advantages, targets):
        with tf.GradientTape() as tape_actor, tf.GradientTape() as tape_critic:
            # Actor loss (policy gradient)
            action_probs = self.actor(states, training=True)
            action_mask = tf.one_hot(actions, depth=action_probs.shape[1])
            log_probs = tf.math.log(tf.reduce_sum(action_probs * action_mask, axis=1))
            actor_loss = -tf.reduce_mean(log_probs * advantages)

            # Critic loss (mean squared error)
            values = self.critic(states, training=True)
            critic_loss = tf.reduce_mean(tf.square(targets - values))

        # Update actor and critic weights
        gradients_actor = tape_actor.gradient(actor_loss, self.actor.trainable_variables)
        gradients_critic = tape_critic.gradient(critic_loss, self.critic.trainable_variables)
        self.optimizer_actor.apply_gradients(zip(gradients_actor, self.actor.trainable_variables))
        self.optimizer_critic.apply_gradients(zip(gradients_critic, self.critic.trainable_variables))
        
    def save_model(self, actor_file, critic_file, total_episodes):
        
        # Create unique filenames for actor and critic model weights
        actor_file = f'{actor_file}_episodes_{total_episodes}.h5'
        critic_file = f'{critic_file}_episodes_{total_episodes}.h5'
        
        # Save the actor model weights
        self.actor.save_weights(actor_file)

        # Save the critic model weights
        self.critic.save_weights(critic_file)
    
    def load_model(self, actor_file, critic_file):
        # Load the actor model weights
        self.actor.load_weights(actor_file)

        # Load the critic model weights
        self.critic.load_weights(critic_file)

        
# The function below tests fully trained models.
def test_agent(env, agent, total_episodes=100):
    # Load the trained agent's model weights
    actor_file = 'actor_model_weights.h5_episodes_100.h5'  # Change this filename accordingly
    critic_file = 'critic_model_weights.h5_episodes_100.h5'  # Change this filename accordingly
    agent.actor.load_weights(actor_file)
    agent.critic.load_weights(critic_file)

    results = []

    for episode in range(total_episodes):
        state = env.reset()
        done = False
        total_reward = 0

        while not done:
            action = agent.get_action(state, env)
            state, reward, done, _ = env.step(action, 100)
            total_reward += reward

        results.append(total_reward)
        print(f"Test Episode {episode + 1}/{total_episodes}, Total Reward: {total_reward}")

    return results     


### Train a New Agent

In [None]:
# Initialize the environment and the A2C agent
env = GymEnvironment()
state_shape = env.observation_space.shape
action_size = env.action_space.n
agent = A2CAgent(state_shape, action_size)

# Training loop
total_episodes = 100 #This can be changed based on training requirements
for episode in range(total_episodes):
    state = env.reset()
    done = False

    states, actions, rewards, dones, values = [], [], [], [], []
    total_reward = 0

    while not done:
        action = agent.get_action(state, env)
        next_state, reward, done, _ = env.step(action, episode)

        # Store the transition
        states.append(state)
        actions.append(action)
        rewards.append(reward)
        dones.append(done)
        values.append(agent.critic.predict(np.expand_dims(state, axis=0), verbose =0)[0][0])

        state = next_state
        total_reward += reward

    # Compute the value targets and advantages
    next_value = 0 if done else agent.critic.predict(np.expand_dims(state, axis=0), verbose =0)[0][0]
    advantages = agent.compute_advantages(rewards, values + [next_value], dones)
    value_targets = rewards + agent.gamma * np.array(values[1:] + [next_value]) * (1 - np.array(dones))

    # Convert data to numpy arrays
    states = np.array(states)
    actions = np.array(actions)
    advantages = np.array(advantages)
    value_targets = np.array(value_targets)

    # Train the agent
    agent.train(states, actions, advantages, value_targets)

    # Print the episode results
    print(f"Episode {episode + 1}/{total_episodes}, Total Reward: {total_reward}")

# Save the trained agent
actor_file = 'actor_model_weights.h5'
critic_file = 'critic_model_weights.h5'
agent.save_model(actor_file, critic_file, total_episodes)
    
# Close the environment
env.close()

### Test a Trained Agent

In [None]:
# Initialize the environment and the A2C agent
env = GymEnvironment()
state_shape = env.observation_space.shape
action_size = env.action_space.n
agent = A2CAgent(state_shape, action_size)

# Test the trained agent in 100 games and record the results
test_results = test_agent(env, agent, total_episodes=100)

# Close the environment
env.close()

### Run an Untrained Agent
Note: due to the way the game is built, this will produce scores that are incongruent to trained agents.

In [None]:
if __name__ == "__main__":
    env = GymEnvironment()
    game_score = []
    total_reward = 0

    # Test the environment for 100 games
    while len(game_score) <= 99:
        env.render()
        action = env.action_space.sample()
        observation, reward, done, info = env.step(action, 100)
        total_reward += reward
        if done:
            game_score.append(total_reward)
            total_reward = 0
            env.reset()
            
    env.close()

### Roll Results into a DataFrame
Note: This will require training a variety of different models. CSVs generated from training will be provided.

In [None]:
import pandas as pd

df10 = pd.read_csv('10_episodes.csv') #Can change filenames as needed
df20 = pd.read_csv('20_episodes.csv')
df50 = pd.read_csv('50_episodes.csv')
df100 = pd.read_csv('100_episodes.csv')

all_results = pd.concat([df10,df20,df50,df100],axis=1)
all_results = all_results.drop('Unnamed: 0', axis = 1)
all_results.columns = ['10 Episodes','20 Episodes','50 Episodes','100 Episodes'] #Can change column names as needed
df_eps = pd.DataFrame([i+1 for i in range(100)])
all_results.insert(0,'Episode',df_eps)

### Line Plot of Results

In [None]:
import matplotlib.pyplot as plt

plt.plot(all_results['Episode'],all_results['10 Episodes'], label = '10 Episodes')
plt.plot(all_results['Episode'],all_results['20 Episodes'], label = '20 Episodes')
plt.plot(all_results['Episode'],all_results['50 Episodes'], label = '50 Episodes')
plt.plot(all_results['Episode'],all_results['100 Episodes'], label = '100 Episodes')
plt.xlabel('Episode')
plt.ylabel('Total Episode Rewards')
plt.title('Trained Agent Rewards over 100 Episodes Played')
plt.legend()
plt.show()

### Bar Plot of Means of Results

In [None]:
means = pd.DataFrame([np.mean(all_results['10 Episodes']),
                      np.mean(all_results['20 Episodes']),
                      np.mean(all_results['50 Episodes']),
                      np.mean(all_results['100 Episodes'])])
xlbl = ['10 Episodes','20 Episodes','50 Episodes','100 Episodes']

plt.bar(xlbl,means[0])
plt.ylabel('Mean Total Reward')
plt.title('Mean Rewards over 100 Episodes for Each Training Set')
plt.show()

### Two-tailed T-tests Across All Columns

In [None]:
import scipy.stats as stats

def perform_ttest(column1, column2):
    stat, p = stats.ttest_ind(column1, column2)
    print(f"T-test result - p-value: {p}")

for i in range(1,len(all_results.columns)):
    for j in range(i + 1, len(all_results.columns)):
        column1 = all_results[all_results.columns[i]]
        column2 = all_results[all_results.columns[j]]
        print(f"Performing t-test between '{all_results.columns[i]}' and '{all_results.columns[j]}':")
        perform_ttest(column1, column2)