In [361]:
import time
from collections import deque, namedtuple
import gymnasium as gym
import numpy as np
import tensorflow as tf
import tensorflow.keras as keras
import random
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Input, Conv2D, MaxPool2D, Flatten, GlobalAvgPool2D
from tensorflow.keras.losses import MSE
from tensorflow.keras.optimizers import Adam

In [None]:
"""
TODO:
    - Install and experiment with tensorflow-GPU and CUDA.
    - Experiment with different network architectures / hyperparameters.
    - Train agent for much longer and many more steps...
    - Train multiple models and compare with matplotlib graphs...
    - Add way to save from and load models to agent.
"""

In [341]:
tf.__version__

'2.13.0'

In [362]:
class MemoryBuffer():
    """
    Class used to hold the memory experiences of the agent. Data pertaining to each "step" of the environment is
    contained within an "experience" tuple. A buffer holds these tuples, which the agent extracts during learning.
    """
    
    def __init__(self, max_mem_size):
        """
        Args:
            max_mem_size: (int) max size of memory buffer.
            
        Attributes:
            experience: (namedtuple) represents a single "experience" / step of environment. Contains
                the current state, action, reward, next state, and whether the episode is done or truncated.
            buffer: deque used as structure to hold "experiences".
        """
        
        self.experience = namedtuple("Experience", field_names = ["state", "action", "reward", "next_state", "done", "truncated"])
        self.buffer = deque(maxlen = max_mem_size)
        
    def store_memory(self,  state, action, reward, next_state, done, truncated):
        """
        Adds a single "experience" to the memory buffer.
        """
        
        self.buffer.append(self.experience(state, action, reward, next_state, done, truncated))
        
    def get_experiences(self, batch_size):
        """
        Extracts a random selection of experiences from the memory buffer. 
        
        Args: 
            batch_size: (int) number of experiences to extract.
            
        Returns:
            states: (numpy array) each element is a 210x60x3 array representing the current game image.
            actions: (numpy array) array holding each action chosen for each corresponding step.
            rewards: (numpy array) rewards gained from each corresponding step.
            next_states: (numpy array) each element is a 210x60x3 array representing the next game image based on 
                the action taken.
            dones: (numpy array) 1 if corresponding episode is finished, 0 otherwise.
            truncateds: (numpy array) 1 if corresponding episode terminated, 0 otherwise.
        """
        
        random_selection = random.sample(self.buffer, batch_size)
        states = np.array([experience.state for experience in random_selection])
        actions = np.array([experience.action for experience in random_selection])
        rewards = np.array([experience.reward for experience in random_selection])
        next_states = np.array([experience.next_state for experience in random_selection])
        dones = np.array([experience.done for experience in random_selection])
        truncateds = np.array([experience.truncated for experience in random_selection])
        
        return states, actions, rewards, next_states, dones, truncateds
        
        
class SpaceInvaderAgent():
    """
    Represents an agent that can learn to play Atari Space Invaders using Deep-Q Learning. 
    """
    
    def __init__(self, learn_rate, gamma, num_actions, epsilon, batch_size, max_mem_size, epsilon_decay, num_steps_for_learn, epsilon_end = 0.01, update_target_net = 100):
        """
        Args:
            TODO: Add args...
            
        Attributes:
            TODO: Add attributes...
        """
        
        self.actions = [i for i in range(num_actions)]
        self.learn_rate = learn_rate
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_end = epsilon_end
        self.batch_size = batch_size
        self.memory = MemoryBuffer(max_mem_size)
        self.num_steps_for_learn = num_steps_for_learn
        self.learn_step_cntr = 0
        self.update_target_net = update_target_net
        
        self.q_network = Sequential([
            Input(shape = (210, 160, 3)),
            Conv2D(filters = 32, kernel_size = 3, activation = 'relu', padding = 'same'),
            MaxPool2D(),
            Conv2D(filters = 64, kernel_size= 3, activation='relu', padding = 'same'),
            MaxPool2D(),
            Conv2D(filters = 128, kernel_size= 3, activation='relu', padding = 'same'),
            MaxPool2D(),
            Flatten(),
            Dense(64, activation='relu'),
            Dense(64, activation='relu'),
            Dense(num_actions, activation='linear')
            ])
        
        self.q_target = Sequential([
            Input(shape = (210, 160, 3)),
            Conv2D(filters = 32, kernel_size = 3, activation = 'relu', padding = 'same'),
            MaxPool2D(),
            Conv2D(filters = 64, kernel_size= 3, activation='relu', padding = 'same'),
            MaxPool2D(),
            Conv2D(filters = 128, kernel_size= 3, activation='relu', padding = 'same'),
            MaxPool2D(),
            Flatten(),
            Dense(64, activation='relu'),
            Dense(64, activation='relu'),
            Dense(num_actions, activation='linear')
            ])
        
        self.optimizer = Adam(learning_rate = self.learn_rate)
        self.q_network.compile()
        self.q_target.compile()
                               
        
    def store_memory(self, state, action, reward, next_state, done, truncated):
        """
        Adds a single "experience" to the agent's MemoryBuffer.
        """
        
        self.memory.store_memory(state, action, reward, next_state, done, truncated)
                              
    def choose_action(self, state):
        """
        Chooses an action based on the agent's greedy-epsilon policy. Possible actions include: 
        0 (NOOP), 1 (FIRE), 2 (RIGHT), 3 (LEFT), 4 (RIGHTFIRE), 5 (LEFTFIRE)
        
        Args:
            TODO: add state arg...
            
        Returns: 
            action: (int) representing one of the six possible actions.
        """
        
        # Pick a random action if generated random number is less than epsilon.
        if np.random.random() < self.epsilon:
            action = np.random.choice(self.actions)
            
        # Use Q-Network to pick action based on state.
        else:
            q_values = self.q_network(state)
            action = tf.math.argmax(q_values, axis = 1).numpy()[0]
                              
        return action
                              
    def compute_loss(self, states, actions, rewards, next_states, dones, truncateds):
        """
        Calculates the loss for the Deep-Q Network.

        Args:
            states: (numpy array) each element is a 210x60x3 array representing the current game image.
            actions: (numpy array) array holding each action chosen for each corresponding step.
            rewards: (numpy array) rewards gained from each corresponding step.
            next_states: (numpy array) each element is a 210x60x3 array representing the next game image based on 
                the action taken.
            dones: (numpy array) 1 if corresponding episode is finished, 0 otherwise.
            truncateds: (numpy array) 1 if corresponding episode terminated, 0 otherwise.  

        Returns:
            loss: (TensorFlow Tensor(shape=(0,), dtype=int32)) the Mean-Squared Error between the y targets and the Q(s,a) values.
        """
    
        # Determine max-Q(s, a) from next states and calculate y-targets using the target Q-Network.
        max_qsa = tf.reduce_max(self.q_target(next_states), axis=-1)
        y_targets = rewards + (1 - dones) * (1 - truncateds) * self.gamma * max_qsa
        
        # Determine Q-values from chosen actions using the original Q-Network.
        q_values = self.q_network(states)
        q_values = tf.gather_nd(q_values, tf.stack([tf.range(q_values.shape[0]), tf.cast(actions, tf.int32)], axis=1))
        
        # Calculate the loss from the Mean-Squared Error between the y targest and Q-values.
        loss = MSE(y_targets, q_values)

        return loss
    
    @tf.function
    def learn(self):
        """
        Updates the trainable weights of the Q-Network based on the Mean-Sqaured Error loss function. Learning only
        takes place every "num_steps_for_learn" steps. Will not learn unless the MemoryBuffer contains enough
        experiences for a full sample batch (batch_size). Also decrements the agent's epsilon attribute.
        """
        
        # Update learn step counter.
        self.learn_step_cntr += 1
        
        # Decrement epsilon.
        self.epsilon = self.epsilon - self.epsilon_decay if self.epsilon > self.epsilon_end else self.epsilon
        
        # Check if it is time to learn.
        if self.learn_step_cntr % self.num_steps_for_learn != 0:
            return
        
        # Make sure MemoryBuffer has enough experiences for full batch.
        if len(self.memory.buffer) < self.batch_size:
            return
        
        # Update target Q-Network based on step count.
        if self.learn_step_cntr % self.update_target_net == 0:
            self.q_target.set_weights(self.q_network.get_weights())
                              
        # Extract experiences from MemoryBuffer.    
        states, actions, rewards, next_states, dones, truncateds = self.memory.get_experiences(self.batch_size)
            
        # Compute loss with GradientTape.    
        with tf.GradientTape() as tape:
            loss = self.compute_loss(states, actions, rewards, next_states, dones, truncateds)

        # Compute the gradients of the loss with respect to the weights.
        gradients = tape.gradient(loss, self.q_network.trainable_variables)

        # Update the weights of the q_network.
        self.optimizer.apply_gradients(zip(gradients, self.q_network.trainable_variables))
                              
    
    def save_model(self):
        #TODO: Add way to save q_network.
        return
    
    def load_model(self):
        #TODO: Add way to load an existing q_network.
        return
        

In [364]:
# Create SpaceInvader gym environment with rgb_array mode.
env = gym.make("ALE/SpaceInvaders-v5", full_action_space = False, render_mode = 'rgb_array')

# Agent and Model Hyperparameters
ALPHA = 1e-3
GAMMA = 0.995
BATCH_SIZE = 64
MAX_MEM_SIZE = 1000
EPSILON_DECAY = 1e-5
NUM_STEPS_FOR_LEARN = 4

n_episodes = 75
max_num_timesteps = 1000 # per episode

agent = SpaceInvaderAgent(learn_rate = ALPHA, gamma = GAMMA, num_actions = 6, epsilon = 1, batch_size = BATCH_SIZE, max_mem_size = MAX_MEM_SIZE, epsilon_decay = EPSILON_DECAY, num_steps_for_learn = NUM_STEPS_FOR_LEARN)

total_point_history = []
num_steps = []
epsilon_history = []

for i in range(n_episodes):
    start = time.time()
    done = False
    score = 0
    state, info = env.reset()
    
    for t in range(max_num_timesteps):
        state_q = np.expand_dims(state, axis=0)
        action = agent.choose_action(state_q)
        next_state, reward, done, truncated, info = env.step(action)
        score = score + reward
        agent.store_memory(state, action, reward, next_state, done, truncated)
        state = next_state.copy()
        agent.learn()
        
        if done or truncated:
            break
    
    end = time.time()
    duration = end - start
    total_point_history.append(score)
    num_steps.append(agent.learn_step_cntr)
    epsilon_history.append(agent.epsilon)
    
    print(f"\rEpisode {i + 1} | Total points: {score:.2f} | Duration: {duration:.2f} | Total Steps: {agent.learn_step_cntr} | Current Epsilon: {agent.epsilon:.2f}")


Episode 1 | Total points: 65.00 | Duration: 135.12 | Total Steps: 365 | Current Epsilon: 1.00
Episode 2 | Total points: 160.00 | Duration: 234.77 | Total Steps: 895 | Current Epsilon: 0.99
Episode 3 | Total points: 110.00 | Duration: 202.99 | Total Steps: 1348 | Current Epsilon: 0.99
Episode 4 | Total points: 40.00 | Duration: 123.90 | Total Steps: 1628 | Current Epsilon: 0.98
Episode 5 | Total points: 50.00 | Duration: 163.71 | Total Steps: 2000 | Current Epsilon: 0.98
Episode 6 | Total points: 120.00 | Duration: 209.32 | Total Steps: 2474 | Current Epsilon: 0.98
Episode 7 | Total points: 270.00 | Duration: 346.56 | Total Steps: 3252 | Current Epsilon: 0.97
Episode 8 | Total points: 170.00 | Duration: 217.62 | Total Steps: 3744 | Current Epsilon: 0.96
Episode 9 | Total points: 455.00 | Duration: 367.23 | Total Steps: 4572 | Current Epsilon: 0.95
Episode 10 | Total points: 65.00 | Duration: 154.39 | Total Steps: 4920 | Current Epsilon: 0.95
Episode 11 | Total points: 180.00 | Duration:

In [365]:
agent.q_network.summary()

Model: "sequential_222"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_361 (Conv2D)         (None, 210, 160, 32)      896       
                                                                 
 max_pooling2d_360 (MaxPool  (None, 105, 80, 32)       0         
 ing2D)                                                          
                                                                 
 conv2d_362 (Conv2D)         (None, 105, 80, 64)       18496     
                                                                 
 max_pooling2d_361 (MaxPool  (None, 52, 40, 64)        0         
 ing2D)                                                          
                                                                 
 conv2d_363 (Conv2D)         (None, 52, 40, 128)       73856     
                                                                 
 max_pooling2d_362 (MaxPool  (None, 26, 20, 128)    

In [358]:
# Example run using random actions.

# Create SpaceInvader gym environment with rgb_array mode.
env = gym.make("ALE/SpaceInvaders-v5", full_action_space = False, render_mode = 'human')

env.action_space.seed(13)
state, info = env.reset(seed=13)

for _ in range(500):
    state, reward, done, truncated, info = env.step(env.action_space.sample())
    if done or truncated:
        state, info = env.reset()
        
env.close()
 

In [366]:
# Example run using trained agent to choose actions.

# Create SpaceInvader gym environment with human render mode.
env = gym.make("ALE/SpaceInvaders-v5", full_action_space = False, render_mode = 'human')

# Initialize environment.
state, info = env.reset()

for _ in range(500):
    state_q = np.expand_dims(state, axis=0)
    action = agent.choose_action(state_q)
    state, reward, done, truncated, info = env.step(action)
    if done or truncated:
        state, info = env.reset()
        
env.close()


In [354]:
score = 0
state, reward, terminated, truncated, info = env.step(env.action_space.sample())
agent = SpaceInvaderAgent(learn_rate = 1e-3, gamma = 0.99, num_actions = 6, epsilon = 0, batch_size = 30, max_mem_size = 10000, epsilon_decay = -0.01)
state_q = np.expand_dims(state, axis=0)
print(state_q.shape)
action = agent.choose_action(state_q)
next_state, reward, terminated, truncated, info = env.step(action)
agent.store_memory(state, action, reward, next_state, done, truncated)

for _ in range(30):
    state = next_state
    action = env.action_space.sample()
    next_state, reward, terminated, truncated, info = env.step(action)
    score = score + reward
    agent.store_memory(state, action, reward, next_state, done, truncated)
    

states, actions, rewards, next_states, dones, truncateds = agent.memory.get_experiences(agent.batch_size)
#agent.compute_loss(states, actions, rewards, next_states, dones, truncateds)


with tf.GradientTape() as tape:
    loss = agent.compute_loss(states, actions, rewards, next_states, dones, truncateds)
gradients = tape.gradient(loss, agent.q_network.trainable_variables)


(1, 210, 160, 3)
