In [4]:
import gymnasium as gym
import random
import numpy as np

import tensorflow as tf
from keras.models import Model
from keras.layers import Input, Dense
from keras.optimizers import Adam, RMSprop

from collections import deque

env = gym.make('CartPole-v1', render_mode="human")

def random_games():
    # Each of this episode is its own game.
    for episode in range(10):
        env.reset()
        # This is each frame, up to 500... but we wont make it that far with random.
        for t in range(500):
            # Display the environment
            env.render()

            # Just create a sample action in any environment
            # In CartPole, the action can be 0 or 1 (left or right)
            action = env.action_space.sample()

            # Execute the environment with an action and returns the observation of the environment, if the env is over and other infos
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            print(t, next_state, reward, done, info, action)
            if done:
                break

random_games()

In [5]:
def OurModel(input_shape, action_space):
    x_input = Input(input_shape)

    # 'Dense' is the basic form of a neural network layer
    # Input Layer of state size(4) and Hidden Layer with 512 nodes
    x = Dense(512, input_shape=input_shape, activation="relu", kernel_initializer='he_uniform')(x_input)

    # Hidden layer with 256 nodes
    x = Dense(256, activation="relu", kernel_initializer='he_uniform')(x)

    # Hidden layer with 64 nodes
    x = Dense(64, activation="relu", kernel_initializer='he_uniform')(x)

    # Output Layer with # of actions: 2 nodes (left, right)
    x = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(x)

    model = Model(inputs = x_input, outputs = x, name='cartpole_dqn_model')
    model.compile(loss="mse", optimizer=RMSprop(lr=0.00025, rho=0.95, epsilon=0.01), metrics=["accuracy"])

    model.summary()
    return model

In [8]:
class DQNAgent:
    def __init__(self):
        self.env = gym.make('CartPole-v1')

        # By default cartpole-v1 has max episode steps = 500
        self.state_size = self.env.observation_space.shape[0]
        self.action_size = self.env.action_space.n

        self.MAX_EPISODES = 1000
        self.memory = deque(maxlen=2000)

        self.GAMMA = 0.95
        self.epsilon = 1.0
        self.EPSILON_MIN = 0.001
        self.EPSILON_DECAY = 0.999
        self.BATCH_SIZE = 64

        self.TRAIN_START = 1000

        # Create model
        self.model = OurModel(input_shape=(self.state_size, ), action_space=self.action_size)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

        # Decay epsilon after training started
        if len(self.memory) > self.TRAIN_START:
            if self.epsilon > self.EPSILON_MIN:
                self.epsilon *= self.EPSILON_DECAY

    def act(self, state):
        if np.random.random() <= self.epsilon:
            return random.randrange(self.action_size)
        else:
            return np.argmax(self.model.predict(state, verbose=0))

    def replay(self):
        # Do not train the network is training did not started
        if len(self.memory) < self.TRAIN_START:
            return

        # Randomly sample minibatch from the memory
        minibatch = random.sample(self.memory, min(len(self.memory), self.BATCH_SIZE))

        state = np.zeros((self.BATCH_SIZE, self.state_size))
        next_state = np.zeros((self.BATCH_SIZE, self.state_size))
        action, reward, done = [], [], []

        # do this before prediction
        # for speedup, this could be done on the tensor level
        # but easier to understand using a loop
        for i in range(self.BATCH_SIZE):
            state[i] = minibatch[i][0]
            action.append(minibatch[i][1])
            reward.append(minibatch[i][2])
            next_state[i] = minibatch[i][3]
            done.append(minibatch[i][4])

        # Batch prediction to save speed
        target = self.model.predict(state, verbose=0)
        target_next = self.model.predict(next_state, verbose=0)

        for i in range(self.BATCH_SIZE):
            # Correction on the Q value for the action used
            if done[i]:
                target[i][action[i]] = reward[i]
            else:
                # Standard - DQN
                # DQN chooses the max Q value among next actions
                # selection and evaluation of action is on the target Q Network
                # Q_max = max_a' Q_target(s', a')
                target[i][action[i]] = reward[i] + self.GAMMA * (np.amax(target_next[i]))

        # Train the neural network
        self.model.fit(state, target, batch_size=self.BATCH_SIZE, verbose=0)

    def load(self, name):
        self.model = tf.keras.models.load_model(name)

    def save(self, name):
        self.model.save(name)

    def run(self):
        for episode in range(self.MAX_EPISODES):
            state, _ = self.env.reset()
            state = np.reshape(state, [1, self.state_size])
            done = False
            i = 0
            while not done:
                #self.env.render()
                action = self.act(state)
                next_state, reward, terminated, truncated, _ = self.env.step(action)
                done = terminated or truncated
                next_state = np.reshape(next_state, [1, self.state_size])
                if not done or i == self.env.spec.max_episode_steps -1:
                    reward = reward
                else:
                    reward = -100

                self.remember(state, action, reward, next_state, done)

                state = next_state
                i += 1

                if done:
                    print(f"Episode: {episode}/{self.MAX_EPISODES}, score: {i}, epsilon: {self.epsilon:.2}")
                    if i == 500:
                        print("Saving trained model as cartpole-dqn.h5")
                        self.save("cartpole-dqn.h5")
                        return

                self.replay()


    def test(self):
        self.load("cartpole-dqn.h5")
        for e in range(self.MAX_EPISODES):
            state = self.env.reset()
            state = np.reshape(state, [1, self.state_size])
            done = False
            i = 0
            while not done:
                self.env.render()
                action = np.argmax(self.model.predict(state))
                next_state, reward, done, _ = self.env.step(action)
                state = np.reshape(next_state, [1, self.state_size])
                i += 1
                if done:
                    print("episode: {}/{}, score: {}".format(e, self.MAX_EPISODES, i))
                    break

In [9]:
agent = DQNAgent()
agent.run()

Model: "cartpole_dqn_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 4)]               0         
                                                                 
 dense_4 (Dense)             (None, 512)               2560      
                                                                 
 dense_5 (Dense)             (None, 256)               131328    
                                                                 
 dense_6 (Dense)             (None, 64)                16448     
                                                                 
 dense_7 (Dense)             (None, 2)                 130       
                                                                 
Total params: 150,466
Trainable params: 150,466
Non-trainable params: 0
_________________________________________________________________
Episode: 0/1000, score: 26, epsilon: 1.0
E