In [1]:
import numpy as np
import random
from collections import deque
import gym
import tensorflow as tf

In [2]:
env_name = 'CartPole-v1'
env = gym.make(env_name)

In [3]:
env.reset()

for step in range(1000):
    env.render(mode="human")
    random_action = env.action_space.sample()
    observation, reward, done, info = env.step(random_action)
env.close()



In [3]:
# 4 observations
num_observations = env.observation_space.shape[0]
num_observations

4

In [4]:
num_actions = env.action_space.n
num_actions

2

In [86]:
#input shape= 4 ANN ---> neurons = actions
model= tf.keras.models.Sequential([
    tf.keras.layers.Dense(units=num_actions, input_shape=(1,num_observations), activation='relu'),
    tf.keras.layers.Dense(32, activation='relu'),
    tf.keras.layers.Dense(num_actions, activation='linear') # Neurons == actions_space
])


In [65]:
model.summary()

Model: "sequential_7"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_23 (Dense)            (None, 1, 1, 2)           10        
                                                                 
 dense_24 (Dense)            (None, 1, 1, 32)          96        
                                                                 
 dense_25 (Dense)            (None, 1, 1, 2)           66        
                                                                 
Total params: 172
Trainable params: 172
Non-trainable params: 0
_________________________________________________________________


In [87]:
target_model = tf.keras.models.clone_model(model)

In [47]:
EPOCHS = 300 
epsilon = 1.0
EPSILON_DECAY = 0.995
LEARNING_RATE = 0.001
DISCOUNT_FACTOR = 0.95




In [89]:
def epsilon_greedy_action_selection(model, epsilon, observation):
    if np.random.random() > epsilon:
        prediction = model.predict(observation.reshape(1,1,4), verbose=0)  # perform the prediction on the observation
        action = np.argmax(prediction)  # Chose the action with the higher value
    else:
        action = np.random.randint(0, env.action_space.n)  # Else use random action
    return action

In [80]:
replay_buffer = deque(maxlen=20000) 
update_target_model_every = 10

In [81]:
def replay(replay_buffer, batch_size, model, target_model):
    if len(replay_buffer) < batch_size:
        return
    minibatch = random.sample(replay_buffer, batch_size)
    target_batch = []

    zipped_samples = list(zip(*minibatch))
    states, actions, rewards, new_states, dones = zipped_samples

    targets = target_model.predict(np.array(states), verbose= 0)

    q_values = model.predict(np.array(new_states), verbose= 0)

    for i in range(batch_size):
        q_value = max(q_values[i][0])
        target = targets[i].copy()
        if dones[i]:
            target[0][actions[i]]= rewards[i]
        else:
            target[0][actions[i]]= rewards[i] + DISCOUNT_FACTOR * q_value 
        target_batch.append(target)
    
    model.fit(np.array(states), np.array(target_batch), epochs=1, verbose=0)


In [82]:
def update_model_handler(epoch, update_target_model_every, model, target_model):
    if epoch > 0 and epoch % update_target_model_every == 0:
        target_model.set_weights(model.get_weights())

In [88]:
model.compile(loss = 'mse', optimizer = tf.keras.optimizers.Adam(LEARNING_RATE))

In [90]:
best_so_far = 0
for epoch in range(EPOCHS):
    observation = env.reset()

    # (1,X) [a,b,c,d].reshape(1,4)
    observation = observation.reshape([1,4])
    done = False
    points = 0

    while not done:
        action = epsilon_greedy_action_selection(model, epsilon, observation)

        new_observation, reward, done, info = env.step(action)
        new_observation = new_observation.reshape([1,4])

        replay_buffer.append((observation, action, reward, new_observation, done))

        observation = new_observation
        points += 1

        replay(replay_buffer, 32, model, target_model)
        

    epsilon = EPSILON_DECAY * epsilon
    update_model_handler(epoch, update_target_model_every, model, target_model)

    if points > best_so_far:
        best_so_far = points

    if epoch % 25  == 0:
        print("Epoch: {}/{} | Points: {} | Epsilon: {} |BSF: {}".format(epoch, EPOCHS, points, epsilon, best_so_far))

Epoch: 0/300 | Points: 13 | Epsilon: 0.918316468354365 |BSF: 13
Epoch: 25/300 | Points: 14 | Epsilon: 0.810157377815473 |BSF: 33
Epoch: 50/300 | Points: 13 | Epsilon: 0.7147372386831305 |BSF: 46
Epoch: 75/300 | Points: 15 | Epsilon: 0.6305556603555866 |BSF: 70
Epoch: 100/300 | Points: 12 | Epsilon: 0.5562889678716474 |BSF: 70
Epoch: 125/300 | Points: 21 | Epsilon: 0.4907693883854626 |BSF: 70
Epoch: 150/300 | Points: 86 | Epsilon: 0.43296668905325736 |BSF: 177


In [6]:
observation = env.reset()
model = tf.keras.models.load_model('cartpole_model.h5')
for step in range(300):
    env.render(mode="human")
    action = np.argmax(model.predict(observation.reshape(1,1,4), verbose=0))
    observation, reward, done, info = env.step(action)
    if done:
        break
env.close()