In [4]:
import gym
import gym_maze
from keras.layers import Conv2D, Flatten, Dense
from keras.optimizers import Adam
from keras.models import Sequential
from IPython.display import SVG, display
from keras.utils.vis_utils import model_to_dot
from collections import deque
import numpy as np
import os
os.environ["SDL_VIDEODRIVER"] = "dummy"

In [10]:
class DQN:
    
    def __init__(self, 
                 state_size, 
                 action_size, 
                 batch_size=32, 
                 lr=0.0001,
                 dr=0.99,
                 memory_size=20000, 
                 epsilon_min=0.01, 
                 epsilon_max=1.0
                 ):
        self.lr = lr
        self.dr = dr
        self.memory = deque(maxlen=memory_size)  # Experience replay buffer
        self.e = 1.0  # Start value of epsilon decent
        self.e_max = epsilon_max
        self.e_min = epsilon_min
        self.e_decay = 0.999
        self.s_space = state_size[1:]
        self.a_space = action_size
        self.b_size = 32
        self.model = None
     
    def build_model(self):
        self.model = Sequential()
        self.model.add(Conv2D(64, (4, 4), 
                         padding="same", 
                         strides=(4, 4), 
                         activation="relu", 
                         input_shape=self.s_space, 
                         data_format="channels_last")
                  )

        self.model.add(Conv2D(64, (3, 3), padding="same", strides=(4, 4), activation="relu",))
        self.model.add(Conv2D(64, (2, 2), padding="same", strides=(4, 4), activation="relu",))
        self.model.add(Flatten())
        self.model.add(Dense(512,  activation="relu"))
        self.model.add(Dense(self.a_space, activation='linear'))
    
        # DQN.huber_loss
        self.model.compile(loss="mse", optimizer=Adam(lr=self.lr))  # Consider using Huber Loss

        return display(SVG(model_to_dot(self.model, show_shapes=True).create(prog='dot', format='svg')))

    def act(self, state):
        if np.random.rand() <= self.e:
            return np.random.randint(0, self.a_space)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])  # returns action

    def train(self):
        if len(self.memory) < self.b_size:
            return
        
        # Retrieve random sequence of memory indexes
        random_experience_idxs = np.random.choice(len(self.memory), size=self.b_size)
        
        # Get actual memory data into a mini batch
        mini_batch = [self.memory[idx] for idx in random_experience_idxs]
        
        # Iterate over all memories
        for state, action, reward, next_state, done in mini_batch:

            # While not in terminal state
            if not done:
                # Q-Update RUle
                target = (reward + self.dr * np.amax(self.model.predict(next_state)[0]))
            else:
                # End of sequence
                target = reward
            
            # Predict pi* using s0
            target_f = self.model.predict(state)
            
            # Update old with new target
            target_f[0][action] = target
            
            # Train weights
            self.model.fit(state, target_f, epochs=1, verbose=0)
        
        # Epsilon descent
        if self.e > self.e_min:
            self.e *= self.e_decay

In [11]:
# Define a few parameters
lr = 0.0001  # Learning rate
dr = 0.99  # Discount rate

episodes = 100

In [12]:
environment = gym.make('maze-v0')
environment.render()  # Render initial state

In [13]:
agent = DQN(environment.observation_space.shape, environment.action_space.shape, lr=lr, dr=dr)
agent.build_model()

<IPython.core.display.SVG object>

In [14]:
# Reset environment to start state
s = environment.reset()

for episode in range(episodes):
    
    t = False
    
    while not t:
        
        # Draw environment
        environment.render()
        
        a = agent.act(s)
        
        s1, r, t, _ = environment.step(a)
        
        agent.memory.append((s, a, r, s1, t))
        
        agent.train()