Steps
Get the Pacman environment
Make the DQN model
Train the model based on epsilon greedy strategy and save the model and weights
Test the model on the test case


# Step 1 Get the Pacman environment

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import random
import time
import pyglet

In [None]:
from mini_pacman import PacmanGame

env = PacmanGame(field_shape=(10,10), nmonsters=2,
                 ndiamonds=3, nwalls=4, monster_vision_range=1)
#env.print_field()  # will print a picture in text symbols
# env.render()  # creates graphical rendering of the field

In [None]:
from tabulate import tabulate
print(tabulate([[1,"Down-Left"], \
                [2,"Down"], \
                [3,"Down-Right"], \
                [4,"Left"], \
                [5,"No Move"], \
                [6,"Right"], \
                [7,"Up-Left"], \
                [8,"Up"], \
                [9,"Up-Right"]], \
               headers = ["Action Code", "Move"], \
              tablefmt="orgtbl"))

# Step 2 Make the DQN model


In [None]:
import random
import gc
import time
import numpy as np

from keras.models import Sequential, clone_model
from keras.layers import Dense, InputLayer
from keras.optimizers import Adam
from keras.callbacks import CSVLogger, TensorBoard
import keras.backend as K

Create a function constructing DQN with 3 hidden layers of 8 units each, input with the shape of observation of the environment and output with the shape of available actions.

In [None]:
def create_dqn_model(input_shape, nb_actions):
    model = Sequential()
    model.add(Dense(units=8, input_shape=input_shape, activation="relu"))
    model.add(Dense(units=8, activation="relu"))
    model.add(Dense(units=8, activation="relu"))
    model.add(Dense(nb_actions, activation="linear"))
    return model

Compile the online network using Adam optimizer and loss function of type mse. Clone the online network as target network fixing the same weights as in online network.

In [None]:
def get_state(obs):
    v = []
    x,y = obs["player"]
    v.append(x)
    v.append(y)
    for x, y in obs["monsters"]:
        v.append(x)
        v.append(y)
    for x, y in obs["diamonds"]:
        v.append(x)
        v.append(y)
    for x, y in obs["walls"]:
        v.append(x)
        v.append(y)
    return v

In [None]:
obs = env.reset()
array_obs = np.array(get_state(obs))

input_shape = array_obs.shape
nb_actions = 9
print("input_shape: ", input_shape)
print("nb_actions: ", nb_actions)

online_network = create_dqn_model(input_shape, nb_actions)
online_network.compile(optimizer=Adam(), loss="mse")
target_network = clone_model(online_network)
target_network.set_weights(online_network.get_weights())

In [None]:
from IPython.display import SVG
from keras.utils.vis_utils import model_to_dot
print(online_network.summary())

SVG(model_to_dot(online_network).create(prog="dot", format="svg"))

In [None]:
from keras.utils import plot_model
plot_model(online_network, to_file="online_network.png",show_shapes=True,show_layer_names=True)

In [None]:
from collections import deque
replay_memory_maxlen = 1_000_000
replay_memory = deque([], maxlen=replay_memory_maxlen)

In [None]:
def epsilon_greedy(q_values, epsilon, n_outputs):
    if random.random() < epsilon:
        # make sure n_outputs is a list of possible actions
        return random.choice(n_outputs)  # random action # return random pic from list of possbile actions 
    else:
        return (np.argmax(q_values) + 1)  # q-optimal action

In [None]:
n_steps = 100_000 # number of times 
warmup = 1_000 # first iterations after random initiation before training starts
training_interval = 4 # number of steps after which dqn is retrained
copy_steps = 2_000 # number of steps after which weights of 
                   # online network copied into target network
gamma = 0.99 # discount rate
batch_size = 64 # size of batch from replay memory 
eps_max = 1.0 # parameters of decaying sequence of eps
eps_min = 0.05
eps_decay_steps = 50_000

In [None]:
step = 0
iteration = 0
done = True 

obs = env.reset()
array_obs = np.array([get_state(obs)])

print("Observtion: ", [array_obs])
q_values = online_network.predict(array_obs)[0]
print("Q-values", q_values)

In [None]:
step = 0
iteration = 0
done = True
warmup = 64

for iter in range(warmup):
    if done:
        obs = env.reset()
    iteration += 1
    obs = env.get_obs()
    obs_state = np.array([get_state(obs)])
    q_values = online_network.predict(obs_state)[0]  
    epsilon = max(eps_min, eps_max - (eps_max-eps_min) * step/eps_decay_steps)
    nb_actions = obs['possible_actions']
    action = epsilon_greedy(q_values, epsilon, nb_actions)
    next_obs = env.make_action(action)
    done = next_obs['end_game']
    replay_memory.append((get_state(obs), action, next_obs['reward'], get_state(next_obs), next_obs['end_game']))
    obs = next_obs
len(replay_memory)

In [None]:
minibatch = random.sample(replay_memory, batch_size)
replay_state = np.array([x[0] for x in minibatch])
replay_action = np.array([x[1] - 1 for x in minibatch])
replay_rewards = np.array([x[2] for x in minibatch])
replay_next_state = np.array([x[3] for x in minibatch])
replay_done = np.array([x[4] for x in minibatch], dtype=int)
replay_action
replay_rewards

In [None]:
target_predict = target_network.predict(replay_next_state)
print('Target prediction shape: ', target_predict.shape)
print('Example of predicted values: ', target_predict[3])

In [None]:
print('Shape of the max: ', np.amax(target_predict,axis=1).shape)

In [None]:
target_for_action = replay_rewards + (1-replay_done) * gamma * \
                                    np.amax(target_predict, axis=1)
target = online_network.predict(replay_state)  # targets coincide with predictions ...
print('Target predicted by online network')
print(target[:5])
target[np.arange(batch_size), replay_action] = target_for_action
print('Update with values predicted by target network')
print(target[:5])
print('Replay actions and target_for_action')
for i in range(5):
    print(replay_action[i],target_for_action[i])

# Step 3 Train the model based on epsilon greedy strategy and save the model and weights

In [None]:
step = 0

In [None]:
step = 0
iteration = 0
done = True

while step < n_steps:
    if done:
        obs = env.reset()
    iteration += 1
    obs = env.get_obs()
    obs_state = np.array([get_state(obs)])
    q_values = online_network.predict(obs_state)[0]  
    epsilon = max(eps_min, eps_max - (eps_max-eps_min) * step/eps_decay_steps)
    nb_actions = obs['possible_actions']
    action = epsilon_greedy(q_values, epsilon, nb_actions)
    next_obs = env.make_action(action)
    done = next_obs['end_game']
    replay_memory.append((get_state(obs), action, next_obs['reward'], get_state(next_obs), next_obs['end_game']))
    obs = next_obs

    if iteration >= warmup and iteration % training_interval == 0:
        step += 1
        minibatch = random.sample(replay_memory, batch_size)
        replay_state = np.array([x[0] for x in minibatch])
        replay_action = np.array([x[1] - 1 for x in minibatch])
        replay_rewards = np.array([x[2] for x in minibatch])
        replay_next_state = np.array([x[3] for x in minibatch])
        replay_done = np.array([x[4] for x in minibatch], dtype=int)
        replay_action
        target_for_action = replay_rewards + (1-replay_done) * gamma * \
                                    np.amax(target_network.predict(replay_next_state), axis=1)
        target = online_network.predict(replay_state)  # targets coincide with predictions ...
        target[np.arange(batch_size), replay_action] = target_for_action  #...except for targets with actions from replay
        online_network.fit(replay_state, target, epochs=step, verbose=1, initial_epoch=step-1)
        if step % copy_steps == 0:
            target_network.set_weights(online_network.get_weights())

In [None]:
online_network.save("saved_dqn_model.h5")

In [None]:
from keras.models import load_model
dqn_model = load_model("saved_dqn_model.h5")

In [None]:
def test_dqn(env, n_games, model, nb_actions, eps=0.05, render=False, sleep_time=0.01):
    scores = []
    for i in range(n_games):
        obs = env.reset()
        score = 0
        done = False
        while not done:
            obs_state = np.array([get_state(obs)])
            q_values = online_network.predict(obs_state)[0] 
            epsilon = max(eps_min, eps_max - (eps_max-eps_min) * step/eps_decay_steps)
            nb_actions = obs["possible_actions"]
            action = epsilon_greedy(q_values, epsilon, nb_actions)
            next_obs = env.make_action(action)
            reward = obs["reward"]
            done = next_obs["end_game"]
            score += reward
            obs = next_obs
            if render:
                env.render()
                time.sleep(sleep_time)
        scores.append(score)
    return scores

In [None]:
# set render=True in order to see how good (or bad) is the trained Q-network
scores = test_dqn(env, 10, dqn_model, nb_actions, eps=0.01, render=True)

In [None]:
scores

In [None]:
def moving_average(v, window=100):
    out = []
    for j in range(len(v)):
        out.append(sum(v[max(j-window+1, 0):j]) / (min(j, window) + 1))
    return out

In [None]:
ma_scores = moving_average(scores, window=100)
plt.plot(ma_scores)

In [None]:
nn_mean, nn_std, nn_min,nn_max, nn_median = np.mean(scores), np.std(scores), \
    np.min(scores), np.max(scores), np.median(scores)

In [None]:
from tabulate import tabulate
all_summaries = np.array([['DQNet',nn_mean,nn_std,nn_min,nn_max, nn_median]])
headers = ['Policy','Mean','Std','Min','Max','Median']
summary_table = tabulate(all_summaries, headers)
print(summary_table)