In [1]:
import gym
import tensorflow as tf
import numpy as np
from tensorflow import keras

In [2]:
from collections import deque
import time
import random

In [3]:
RANDOM_SEED = 5
tf.random.set_seed(RANDOM_SEED)

env = gym.make('CartPole-v1')
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

print(f"Action space: {env.action_space}")
print(f"State space: {env.observation_space}")

Action space: Discrete(2)
State space: Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)


In [4]:
train_episodes = 300
test_episodes = 100
learning_rate = 0.001

def agent(state_shape, action_shape):
    """The agent maps X-states to Y-actions, where X-states are the inputs and represent
        the states on the grid. Y-actions are the outputs and represent the Q-Value of each action"""
    # Just build a keras NN with specific inputs - outputs shapes
    init = tf.keras.initializers.HeUniform()
    model = keras.Sequential()
    model.add(keras.layers.Dense(24, input_shape=state_shape, activation='relu', kernel_initializer=init))
    model.add(keras.layers.Dense(12, activation='relu', kernel_initializer=init))
    model.add(keras.layers.Dense(action_shape, activation='linear', kernel_initializer=init))
    model.compile(loss=tf.keras.losses.Huber(), optimizer=tf.keras.optimizers.Adam(lr=learning_rate), metrics=['accuracy'])

    return model

def get_QS(model, state, step):
    return model.predict(state.reshape([1, state.shape[0]]))[0]

In [5]:
agent_learning_rate = 0.7
discount_factor = 0.618

def train(env, replay_memory, model, target_model, done):
    """
    This updates the model using the bellman equation
    """
    MIN_REPLAY_SIZE = 1000
    if len(replay_memory) < MIN_REPLAY_SIZE:
        return

    batch_size = 64 * 2
    mini_batch = random.sample(replay_memory, batch_size)
    current_states = np.array([transition[0] for transition in mini_batch])
    current_qs_list = model.predict(current_states)
    new_current_states = np.array([transition[3] for transition in mini_batch])
    future_qs_list = target_model.predict(new_current_states)

    X = []
    Y = []
    for index, (observation, action, reward, new_observation, done) in enumerate(mini_batch):
        if not done:
            max_future_q = reward + discount_factor * np.max(future_qs_list[index])
        else:
            max_future_q = reward

        current_qs = current_qs_list[index]
        current_qs[action] = (1 - agent_learning_rate) * current_qs[action] + agent_learning_rate * max_future_q

        X.append(observation)
        Y.append(current_qs)
    model.fit(np.array(X), np.array(Y), batch_size=batch_size, verbose=0, shuffle=True)

In [6]:
def main():
    epsilon = 1 # Epsilon greedy algorithm
    max_epsilon = 1
    min_epsilon = 0.01
    decay = 0.01
    print("observation space: ", env.observation_space.shape)
    model = agent(env.observation_space.shape, env.action_space.n)
    # target model is updated once every 100 steps
    target_model = agent(env.observation_space.shape, env.action_space.n)
    target_model.set_weights(model.get_weights())

    replay_memory = deque(maxlen=50_000)
    target_update_counter = 0

    X = []
    y = []

    steps_to_update_target_model = 0
    for episode in range(train_episodes):
        total_training_rewards = 0
        observation = env.reset()
        done = False
        truncated = False

        while not (done or truncated):
            steps_to_update_target_model += 1
            if True:
                env.render()

            random_number = np.random.rand()

            if random_number <= epsilon:
                # explore
                action = env.action_space.sample()
            else:
                # exploit
                encoded = observation[0] if isinstance(observation, tuple) else observation
                encoded_reshaped = encoded.reshape([1, encoded.shape[0]])
                predicted = model.predict(encoded_reshaped).flatten()
                action = np.argmax(predicted)
            new_observation, reward, done, truncated, info = env.step(action)
             
            replay_memory.append([observation[0] if isinstance(observation, tuple) else observation, action, reward, new_observation, done])

            if steps_to_update_target_model % 4 == 0 or done:
                train(env, replay_memory, model, target_model, done)

            observation = new_observation
            total_training_rewards += reward

            if done or truncated:
                print(f"Total training rewards: {total_training_rewards}, after n steps = {episode}, with final reward = {reward}")
                total_training_rewards += 1

                if steps_to_update_target_model >= 100:
                    print("Copying main network weights  to the target network weights")
                    target_model.set_weights(model.get_weights())
                    steps_to_update_target_model = 0
                break
        epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay * episode) # interpolate
    env.close()
    return model

In [7]:
my_model = main()

observation space:  (4,)


  "You are calling render method without specifying any render mode. "


Total training rewards: 13.0, after n steps = 0, with final reward = 1.0
Total training rewards: 20.0, after n steps = 1, with final reward = 1.0
Total training rewards: 30.0, after n steps = 2, with final reward = 1.0
Total training rewards: 17.0, after n steps = 3, with final reward = 1.0
Total training rewards: 17.0, after n steps = 4, with final reward = 1.0
Total training rewards: 13.0, after n steps = 5, with final reward = 1.0
Copying main network weights  to the target network weights
Total training rewards: 46.0, after n steps = 6, with final reward = 1.0
Total training rewards: 13.0, after n steps = 7, with final reward = 1.0
Total training rewards: 14.0, after n steps = 8, with final reward = 1.0
Total training rewards: 28.0, after n steps = 9, with final reward = 1.0
Copying main network weights  to the target network weights
Total training rewards: 25.0, after n steps = 10, with final reward = 1.0
Total training rewards: 24.0, after n steps = 11, with final reward = 1.0
To

In [15]:
env = gym.make("CartPole-v1", render_mode="human")
def watch_agent(model, env, eps):
    done = False
    truncated = False
    episode_reward = 0

    s, info = env.reset()
    while not (done or truncated):
        theInput = np.asarray(s, dtype="float32")
        print(theInput.shape)
        a = np.argmax(get_QS(model, s, 0))
        print('step is', a)
        s, r, done, truncated, info = env.step(a)
        episode_reward += r
    print("Episode reward:", episode_reward)

watch_agent(my_model, env, eps=0)

env.close()

(4,)
step is 0
(4,)
step is 0
(4,)
step is 0
(4,)
step is 1
(4,)
step is 1
(4,)
step is 1
(4,)
step is 0
(4,)
step is 1
(4,)
step is 1
(4,)
step is 0
(4,)
step is 0
(4,)
step is 1
(4,)
step is 1
(4,)
step is 1
(4,)
step is 0
(4,)
step is 0
(4,)
step is 0
(4,)
step is 1
(4,)
step is 0
(4,)
step is 1
(4,)
step is 0
(4,)
step is 1
(4,)
step is 0
(4,)
step is 1
(4,)
step is 1
(4,)
step is 1
(4,)
step is 0
(4,)
step is 0
(4,)
step is 0
(4,)
step is 0
(4,)
step is 1
(4,)
step is 1
(4,)
step is 0
(4,)
step is 1
(4,)
step is 1
(4,)
step is 1
(4,)
step is 0
(4,)
step is 1
(4,)
step is 0
(4,)
step is 0
(4,)
step is 0
(4,)
step is 0
(4,)
step is 0
(4,)
step is 1
(4,)
step is 1
(4,)
step is 1
(4,)
step is 0
(4,)
step is 1
(4,)
step is 1
(4,)
step is 1
(4,)
step is 0
(4,)
step is 1
(4,)
step is 0
(4,)
step is 0
(4,)
step is 0
(4,)
step is 0
(4,)
step is 0
(4,)
step is 1
(4,)
step is 1
(4,)
step is 1
(4,)
step is 1
(4,)
step is 1
(4,)
step is 0
(4,)
step is 1
(4,)
step is 0
(4,)
step is 0
(4,)
step 

None
