In [1]:
import numpy as np
import random
from collections import deque
import gym


In [49]:
from tensorflow.keras.models import Sequential, clone_model
from tensorflow.keras.layers import Dense , Activation ,Flatten
from tensorflow.keras.optimizers import Adam

In [50]:
env_name = 'CartPole-v1'
env = gym.make(env_name)

In [51]:
num_observations = env.observation_space.shape[0]
num_actions = env.action_space.n

In [70]:
model = Sequential()

model.add(Dense(16, input_shape=(1, num_observations)))
model.add(Activation('relu'))

model.add(Dense(32))
model.add(Activation('relu'))


model.add(Dense(num_actions))
model.add(Activation('linear'))

print(model.summary())

Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_9 (Dense)             (None, 1, 16)             80        
                                                                 
 activation_9 (Activation)   (None, 1, 16)             0         
                                                                 
 dense_10 (Dense)            (None, 1, 32)             544       
                                                                 
 activation_10 (Activation)  (None, 1, 32)             0         
                                                                 
 dense_11 (Dense)            (None, 1, 2)              66        
                                                                 
 activation_11 (Activation)  (None, 1, 2)              0         
                                                                 
Total params: 690
Trainable params: 690
Non-trainable 

In [71]:
target_model = clone_model(model)

Hyperparameters and Update Function

In [72]:
EPOCHS = 1000

epsilon = 1.0
EPSILON_REDUCE = 0.995 
LEARNING_RATE = 0.001 
GAMMA = 0.95


In [73]:
def epsilon_greedy_action_selection(model, epsilon, observation):
    if np.random.random() > epsilon:
        prediction = model.predict(observation)  
        action = np.argmax(prediction)
    else:
        action = np.random.randint(0, env.action_space.n) 
    return action

In [74]:
replay_buffer = deque(maxlen=20000)
update_target_model = 10

def replay(replay_buffer, batch_size, model, target_model):
    
    # As long as the buffer has not enough elements we do nothing
    if len(replay_buffer) < batch_size: 
        return
    
    # Take a random sample from the buffer with size batch_size
    samples = random.sample(replay_buffer, batch_size)  
    
    target_batch = []     
    zipped_samples = list(zip(*samples))  
    states, actions, rewards, new_states, dones = zipped_samples  
    
    # Predict targets for all states from the sample
    targets = target_model.predict(np.array(states))
    
    # Predict Q-Values for all new states from the sample
    q_values = model.predict(np.array(new_states)) 
    # Now we loop over all predicted values to compute the actual targets
    for i in range(batch_size):  
        q_value = max(q_values[i][0])  
        
        target = targets[i].copy()  
        if dones[i]:
            target[0][actions[i]] = rewards[i]
        else:
            target[0][actions[i]] = rewards[i] + q_value * GAMMA
        target_batch.append(target)

    # Fit the model based on the states and the updated targets for 1 epoch
    model.fit(np.array(states), np.array(target_batch), epochs=1, verbose=0)  

In [75]:
# update our target network every once in a while
def update_model_handler(epoch, update_target_model, model, target_model):
    if epoch > 0 and epoch % update_target_model == 0:
        target_model.set_weights(model.get_weights())

Train Model

In [76]:
model.compile(loss='mse', optimizer=Adam(learning_rate=LEARNING_RATE))


In [None]:
best_so_far = 0
for epoch in range(EPOCHS):
    observation = env.reset()  # Get inital state
    
    # Keras expects the input to be of shape [1, X] thus we have to reshape
    observation = observation.reshape([1, 4])  
    done = False  
    
    points = 0
    while not done:  # as long current run is active
        
        # Select action acc. to strategy
        action = epsilon_greedy_action_selection(model, epsilon, observation)
        
        # Perform action and get next state
        next_observation, reward, done, info = env.step(action)  
        next_observation = next_observation.reshape([1, 4])  # Reshape!!
        replay_buffer.append((observation, action, reward, next_observation, done))  # Update the replay buffer
        observation = next_observation  # update the observation
        points+=1

        # Most important step! Training the model by replaying
        replay(replay_buffer, 32, model, target_model)

    
    epsilon *= EPSILON_REDUCE  # Reduce epsilon
    
    # Check if we need to update the target model
    update_model_handler(epoch, update_target_model, model, target_model)
    
    if points > best_so_far:
        best_so_far = points
    if epoch %25 == 0:
        print(f"{epoch}: Points reached: {points} - epsilon: {epsilon} - Best: {best_so_far}")
