In [1]:
import numpy as np
import tensorflow as tf
import random
from collections import deque
import gym

In [2]:
env = gym.make('CartPole-v1')

input_size = env.observation_space.shape[0]
output_size = env.action_space.n

dis = 0.9
REPLAY_MEMORY = 50000

In [3]:
class DQN:
    def __init__(self, input_size, output_size, name="network"):
        self.input_size = input_size
        self.output_size = output_size
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
        self.model = self.build_model()

    def build_model(self):
        model = tf.keras.Sequential([
            tf.keras.layers.InputLayer(input_shape=(self.input_size,)),
            tf.keras.layers.Dense(24, activation='relu'),
            tf.keras.layers.Dense(24, activation='relu'),
            tf.keras.layers.Dense(self.output_size),
        ])
        model.compile(optimizer=self.optimizer, loss='mse')
        return model

    def predict(self, state):
        state = np.reshape(state, [-1, self.input_size])
        return self.model(state)

    @tf.function
    def update(self, x_stack, y_stack):
        with tf.GradientTape() as tape:
            q_values = self.model(x_stack)
            loss = tf.reduce_mean(tf.square(y_stack - q_values))
        grads = tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
        return loss

In [4]:
def replay_train(mainDQN, targetDQN, train_batch):
    x_stack = np.empty(0).reshape(0, input_size)
    y_stack = np.empty(0).reshape(0, output_size)

    for state, action, reward, next_state, done in train_batch:
        Q = mainDQN.predict(state).numpy()

        if done:
            Q[0, action] = reward
        else:
            Q[0, action] = reward + dis * np.max(targetDQN.predict(next_state))

        y_stack = np.vstack([y_stack, Q])
        x_stack = np.vstack([x_stack, state])

    y_stack = y_stack.astype(np.float32)

    return mainDQN.update(x_stack, y_stack)

In [5]:
def get_copy_var_ops(mainDQN, targetDQN):
    main_weights = mainDQN.model.get_weights()
    targetDQN.model.set_weights(main_weights)

In [6]:
def bot_play(mainDQN):
    s = env.reset()[0]
    reward_sum = 0

    while True:
        env.render()
        a = np.argmax(mainDQN.predict(s))
        s, reward, done, _, _ = env.step(a)
        reward_sum += reward

        if done:
            print("Total score: {}".format(reward_sum))
            break

In [7]:
def main():
    max_episodes = 5000
    replay_buffer = deque(maxlen=REPLAY_MEMORY)

    mainDQN = DQN(input_size, output_size, name="main")
    targetDQN = DQN(input_size, output_size, name="target")

    for episode in range(max_episodes):
        e = 1. / ((episode / 10) + 1)
        done = False
        step_count = 0

        state = env.reset()[0]

        while not done:
            if np.random.rand(1) < e:
                action = env.action_space.sample()
            else:
                action = np.argmax(mainDQN.predict(state))

            next_state, reward, done, _, _ = env.step(action)
            
            if done:
                reward = -100

            replay_buffer.append((state, action, reward, next_state, done))

            state = next_state
            step_count += 1
            if step_count > 10000:
                break

        print(f"Episode: {episode} steps: {step_count}")

        if episode % 10 == 1 and len(replay_buffer) >= 10:
            for _ in range(50):
                minibatch = random.sample(replay_buffer, 10)
                loss = replay_train(mainDQN, targetDQN, minibatch)
            print("Loss: ", loss)

            get_copy_var_ops(mainDQN, targetDQN)

        if episode % 50 == 0:
            bot_play(mainDQN)

    env.close()

In [8]:
main()

  if not isinstance(terminated, (bool, np.bool8)):
  gym.logger.warn(


Episode: 0 steps: 21
Total score: 10.0
Episode: 1 steps: 24
Loss:  tf.Tensor(494.53687, shape=(), dtype=float32)
Episode: 2 steps: 15
Episode: 3 steps: 9
Episode: 4 steps: 13
Episode: 5 steps: 12
Episode: 6 steps: 10
Episode: 7 steps: 12
Episode: 8 steps: 12
Episode: 9 steps: 16
Episode: 10 steps: 17
Episode: 11 steps: 13
Loss:  tf.Tensor(0.8925539, shape=(), dtype=float32)
Episode: 12 steps: 13
Episode: 13 steps: 12
Episode: 14 steps: 17
Episode: 15 steps: 13
Episode: 16 steps: 11
Episode: 17 steps: 10
Episode: 18 steps: 13
Episode: 19 steps: 12
Episode: 20 steps: 11
Episode: 21 steps: 11
Loss:  tf.Tensor(491.2443, shape=(), dtype=float32)
Episode: 22 steps: 10
Episode: 23 steps: 12
Episode: 24 steps: 12
Episode: 25 steps: 12
Episode: 26 steps: 9
Episode: 27 steps: 10
Episode: 28 steps: 10
Episode: 29 steps: 13
Episode: 30 steps: 11
Episode: 31 steps: 11
Loss:  tf.Tensor(1.3169893, shape=(), dtype=float32)
Episode: 32 steps: 11
Episode: 33 steps: 13
Episode: 34 steps: 9
Episode: 35 st

KeyboardInterrupt: 