# Import

In [2]:
# Agent
import random
import numpy as np
from collections import deque
from tensorflow.keras import models, layers, optimizers, activations, losses

# Utils

# Training and Evaluating
import os
import gym
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

# DQN Agent

In [3]:
class DQNAgent():
    """
    Represents a Deep Q-Network(DQN) agent
    """
    def __init__(self, state_size, action_size, gamma = 0.95,epsilon=0.5, epsilon_min = 0.01, epsilon_decay = 0.98, learning_rate = 0.001, buffer_size = 4098):
        """
        Creates a Deep Q-Networks (DQN) agent.

        :param state_size: number of dimensions of the feature vector of the state.
        :type state_size: int.
        :param action_size: number of actions.
        :type action_size: int.
        :param gamma: discount factor.
        :type gamma: float.
        :param epsilon: epsilon used in epsilon-greedy policy.
        :type epsilon: float.
        :param epsilon_min: minimum epsilon used in epsilon-greedy policy.
        :type epsilon_min: float.
        :param epsilon_decay: decay of epsilon per episode.
        :type epsilon_decay: float.
        :param learning_rate: learning rate of the action-value neural network.
        :type learning_rate: float.
        :param buffer_size: size of the experience replay buffer.
        :type buffer_size: int.
        """
        self.state_size = state_size
        self.action_size = action_size
        self.replay_buffer = deque(maxlen=buffer_size)
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.learning_rate = learning_rate
        self.model = self.make_model()
        
    def make_model(self):
        """
        Makes the action-value neural network model using Keras.

        :return: action-value neural network.
        :rtype: Keras' model.
        """
        
        # Choose Best Model to Problem
        model = models.Sequential()
        # TODO Input_Shape
        model.add(layers.Conv2D(32,(8,8),strides=(4,4),activation='relu',input_shape = (state_size[0],state_size[1],state_size[2])))
        model.add(layers.Conv2D(64,(4,4),strides=(2,2),activation='relu'))
        model.add(layers.Conv2D(64,(3,3),activation = 'relu'))
        model.add(layers.Flatten())
        model.add(layers.Dense(512, activation = 'relu'))
        model.add(layers.Dense(256, activation = 'relu'))
        model.add(layers.Dense(self.action_size, activation='linear'))
        model.compile(loss=losses.mse,
                      optimizer=optimizers.Adam(lr=self.learning_rate))
        model.summary()
        return model
    
    def act(self, state):
        """
        Chooses an action using an epsilon-greedy policy.

        :param state: current state.
        :type state: NumPy array with dimension (1, 2).
        :return: chosen action.
        :rtype: int.
        """
        action_value = self.model.predict(state)
        gr_action = np.argmax(action_value)
        p = np.random.rand()
        if p >= self.epsilon:
            return gr_action
        else:
            num_actions = action_value.shape[1]
            action = np.random.randint(num_actions)
            return action
    
    def append_experience(self, state, action, reward, next_state, done):
        """
        Appends a new experience to the replay buffer (and forget an old one if the buffer is full).

        :param state: state.
        :type state: NumPy array with dimension (1, 2).
        :param action: action.
        :type action: int.
        :param reward: reward.
        :type reward: float.
        :param next_state: next state.
        :type next_state: NumPy array with dimension (1, 2).
        :param done: if the simulation is over after this experience.
        :type done: bool.
        """
        self.replay_buffer.append((state, action, reward, next_state, done))

    def replay(self, batch_size):
        """
        Learns from memorized experience.

        :param batch_size: size of the minibatch taken from the replay buffer.
        :type batch_size: int.
        :return: loss computed during the neural network training.
        :rtype: float.
        """
        minibatch = random.sample(self.replay_buffer, batch_size)
        states, targets = [], []
        for state, action, reward, next_state, done in minibatch:
            target = self.model.predict(state)
            if not done:
                target[0][action] = reward + self.gamma * np.max(self.model.predict(next_state)[0])
            else:
                target[0][action] = reward
            # Filtering out states and targets for training
            states.append(state[0])
            targets.append(target[0])
        history = self.model.fit(np.array(states), np.array(targets), epochs=1, verbose=0)
        # Keeping track of loss
        loss = history.history['loss'][0]
        return loss
    
    def load(self, name):
        """
        Loads the neural network's weights from disk.

        :param name: model's name.
        :type name: str.
        """
        self.model.load_weights(name)
    
    def save(self, name):
        """
        Saves the neural network's weights to disk.

        :param name: model's name.
        :type name: str.
        """
        self.model.save_weights(name)
    
    def update_epsilon(self):
        """
        Updates the epsilon used for epsilon-greedy action selection.
        """
        self.epsilon *= self.epsilon_decay
        if self.epsilon < self.epsilon_min:
            self.epsilon = self.epsilon_min

# Utils

# Training DQN

In [None]:
# TODO Define Number os Episodes
NUM_EPISODES = 300
RENDER = False
fig_format = 'eps'

# Needs to be commented to enable training with GPU
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

tf.compat.v1.disable_eager_execution()


env = gym.make("ALE/SpaceInvaders-v5")

if RENDER:
    ENV = gym.make("ALE/SpaceInvaders-v5", render_mode = 'human')
state_size = env.observation_space.shape
action_size = env.action_space.n

agent = DQNAgent(state_size, action_size)

if os.path.exists('space_invaders.h5'):
    print('Loading previus learning session.')
    agent.load("space_invaders.h5")
else:
    print('Previus session not found')
done = False

# TODO Define batch_size
batch = 32
return_history = []

In [None]:
# Testing Env
env = gym.make("ALE/SpaceInvaders-v5", render_mode='human')
eps = 5
for ep in range(1, eps+1):
    state = env.reset()
    done = False
    score = 0
    while not done:
        action = random.choice([0,1,2,3,4,5])
        n_state, reward, done, info = env.step(action)
        score += reward
    print("Episode:{} Score:{}".format(ep, score))
env.close()


Episode:1 Score:285.0


# Evaluating DQN

In [4]:
def plot_point(point_list, style):
    x = []
    y = []
    for point in point_list:
        x.append(point[0])
        y.append(point[1])
    plt.plot(x,y,style)


NUM_EPISODES = 5 # Number of episodes used for evaluation
fig_format = 'eps'
# Needs to be commented to enable training with GPU
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

tf.compat.v1.disable_eager_execution()


env = gym.make("ALE/SpaceInvaders-v5")
state_size = env.observation_space.shape
action_size = env.action_space.n

# DQN Agent with Greedy Policy(For Evaluation)
agent = DQNAgent(state_size, action_size, epsilon=0.0, epsilon_min=0.0)


# Checking if weights from previous learning session exists
if os.path.exists('space_invaders.h5'):
    print('Loading previus learning session.')
    agent.load("space_invaders.h5")
else:
    print('Unable to evaluate without traininig')
    exit(-1)
return_history = []
score_history = []

for episodes in range(1, NUM_EPISODES + 1):
    state = env.reset()
    # TODO TRANSFORM STATE
    score= 0.0
    cumulative_reward = 0.0
    while not done:
        action = agent.act(state)
        
        n_state, reward, done, info = env.step(action)
        
        #TODO Transform n_state
        
        score += reward
        # TODO Implementing reward engineering
        
        state = n_state
        cumulative_reward = agent.gamma * cumulative_reward + reward
    print("episode: {}/{}, score: {:.6}, reward: {:.6}, epsilon: {:.3}"
                  .format(episodes, NUM_EPISODES, score, cumulative_reward, agent.epsilon))
    return_history.append(cumulative_reward)
    score_history.append(score)

print('Mean score: ', np.mean(score_history))
print('Mean return: ', np.mean(return_history))

# Plots score history
plt.plot(score_history, 'b')
plt.xlabel('Episode')
plt.ylabel('Score')
plt.savefig('dqn_evaluation_score.' + fig_format, fig_format=fig_format)

# Plots return history
plt.plot(return_history, 'b')
plt.xlabel('Episode')
plt.ylabel('Return')
plt.savefig('dqn_evaluation_reward.' + fig_format, fig_format=fig_format)

  logger.warn(


Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 51, 39, 32)        6176      
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 24, 18, 64)        32832     
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 22, 16, 64)        36928     
_________________________________________________________________
flatten (Flatten)            (None, 22528)             0         
_________________________________________________________________
dense (Dense)                (None, 512)               11534848  
_________________________________________________________________
dense_1 (Dense)              (None, 256)               131328    
_________________________________________________________________
dense_2 (Dense)              (None, 6)                 1

NameError: name 'done' is not defined