### Libraries

All classes and functions, which can be imported into the training or evaluation scripts

In [1]:
import tensorflow as tf

In [2]:
# no need for this if running on a CPU
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

In [3]:
import gym
import numpy as np
import time
import random
import cv2
import matplotlib.pyplot as plt
from IPython.display import clear_output

In [4]:
from keras.initializers import VarianceScaling
from keras.layers import (Conv2D, Dense, Flatten, Input, Lambda)
from keras.models import Model
from keras.optimizers import Adam

Using TensorFlow backend.


In [1]:
class Agent(object):
    
    def __init__(self, replay_buffer, policy_dqn, target_dqn, n_actions, train_batch_size,
                 input_shape, eps_start=1.0, eps_min=0.1, eps_decay=0.996,
                 discount_factor=0.99, debug=False):
        self.replay_buffer = replay_buffer
        self.policy_dqn = policy_dqn
        self.target_dqn = target_dqn
        self.eps_cur = eps_start
        self.eps_min = eps_min
        self.eps_decay = eps_decay
        self.n_actions = n_actions
        self.batch_size = train_batch_size
        self.discount_factor = discount_factor
        self.debug = debug
        
        # copy weights from policy network into target
        self.update_target_network()
        
    def choose_action(self, state):
        # draw random number between 0 and 1 and if it's lower than the 
        # current epsilon (exploration rate), then choose a random action,
        # otherwise use NN to predict the action using the optimal policy
        random_n = np.random.random()
        if random_n < self.eps_cur:
            # explore environment, basically choose random action
            action = np.random.choice(self.n_actions)
            process_type = 'RandomChoice'
        else:
            # here we are not exploring, but rather exploiting the knowledge
            state = state[np.newaxis, :]  # extend dimension by +1, as we need a row vector for NN
            actions = self.policy_dqn.predict(state)
            action = np.argmax(actions[0])
            process_type = 'NN'
        if self.debug:
            # print(f'Action chosen: {action} by {process_type}')
            pass

        return action
        
    def add_experience(self, action, state, reward, new_state, terminal):
        self.replay_buffer.add_experience(action, state, reward, new_state, terminal)
        
    def learn(self, frame_number):
        # only learn if we've reached a minimum number of processed frames
        if frame_number < self.batch_size:           
            return
        # get sample minibatch from replay buffer:
        actions, states, rewards, new_states, \
            terminal_flags = self.replay_buffer.sample_minibatch(self.batch_size)
        
        # Learning: fit Policy DQN
        # targetQ according to Bellman equation: 
        # Q = r + gamma*max Q'
        if self.debug:
            # print(f'Train at frame {frame_number}')
            pass
        
        # Policy Network: estimate which action is the best for new states (s')
        future_policy_q_vals = self.policy_dqn.predict(new_states)
        arg_q_max = future_policy_q_vals.argmax(axis=1)
        
        # Target Network: estimate Q-values for new states (s')
        future_target_q_vals = self.target_dqn.predict(new_states)
        
        # Policy Network: feed forward to retrieve Q-Values for current states (s)
        cur_policy_q_vals = self.policy_dqn.predict(states)
        
        # create a copy of Q-values estimated for current state (s)
        q_targets = cur_policy_q_vals[:]
        
        # extract future Q-values for the max actions in current state
        batch_index = np.arange(self.batch_size, dtype=np.int32)
        double_q = future_target_q_vals[batch_index, arg_q_max]
        
        # update Q-values for actions selected for current state
        # using Bellman equation
        q_targets[batch_index, actions] = reward + \
            self.discount_factor * double_q * terminal_flags
        
        # fit Policy Network:
        # X: current states
        # y: new Q-Targets
        _ = self.policy_dqn.fit(states, q_targets, verbose=0)
        
    def dec_epsilon(self):
        # decrement epsilon by eps_decay value
        self.eps_cur = self.eps_cur * self.eps_decay
        self.eps_cur = np.max([self.eps_min, self.eps_cur])
        
    def update_target_network(self):
        if self.debug:
            # print('Update Target Net weights')
            pass
        self.target_dqn.set_weights(self.policy_dqn.get_weights())

    def save_model(self, to_file):
        self.policy_dqn.save(to_file)

    def load_model(self, from_file):
        self.policy_dqn = load_model(from_file)
        if self.eps_cur <= self.eps_min:
            self.update_target_network()

In [9]:
class GameWrapper(object):
    
    def __init__(self, env_name, debug=False):
        self.env = gym.make(env_name).unwrapped
        self.n_actions = self.env.action_space.n
        self.n_obsevations = self.env.observation_space.shape
        self.state = None
        self.debug = debug
    
    def reset(self):
        self.state = self.env.reset()
    
    def step(self, action):
        """
        Return reward and done boolean
        """
        self.state, reward, done, _ = self.env.step(action)
        return reward, done
        
    def close(self):
        self.env.close()

In [7]:
class ReplayBuffer(object):
    
    def __init__(self, buffer_size, n_actions, input_shape, debug=False):
        self.buffer_size = buffer_size
        self.experience_counter = 0
        self.actions_memory = np.zeros((buffer_size), dtype=np.int32)
        self.states_memory = np.zeros((buffer_size, *input_shape))
        self.new_states_memory = np.zeros((buffer_size, *input_shape))
        self.rewards_memory = np.zeros(buffer_size, dtype=np.float32)
        self.terminal_memory = np.zeros(buffer_size, dtype=np.uint8)
        self.debug = debug
        
    def add_experience(self, action, state, reward, new_state, terminal):
        idx = np.mod(self.experience_counter, self.buffer_size)  # module will give us the current index to use
        self.actions_memory[idx] = action
        self.states_memory[idx] = state
        self.rewards_memory[idx] = reward
        self.new_states_memory[idx] = new_state
        self.terminal_memory[idx] = 1 - int(terminal)
        self.experience_counter += 1
        # print debug info
        if self.debug:
            print(f'{self.experience_counter} experience(s) added to replay buffer')

    def sample_minibatch(self, batch_size):
        # check how many experiences are filled
        n_take = np.min([self.experience_counter, self.buffer_size])
        # sample random N-elements
        idx = np.random.choice(n_take, batch_size)
        # print debug info
        if self.debug:
            print(f'Minibatch of {n_take} experiences sampled')
        # return tuple of sampled experiences
        return (self.actions_memory[idx], self.states_memory[idx], self.rewards_memory[idx],
                self.new_states_memory[idx], self.terminal_memory[idx])

In [8]:
def build_nn(input_shape, n_actions, lr=0.001, debug=False):
    
    # Define network layers
    model_input = Input(shape=(input_shape))
    x = Dense(256, activation='relu')(model_input)
    x = Dense(256, activation='relu')(x)
    x = Dense(n_actions, activation='linear')(x)  # linear is default, but let's be explicit        
    
    # Build model
    model = Model(model_input, x)
    model.compile(Adam(lr), loss='mse')

    return model