In [1]:
# Import necessary libraries.
import tensorflow as tf
from tensorflow import keras
import gym 
import math
import numpy as np
import random
from collections import deque

In [2]:
# Create the environment.
env_name = "CartPole-v1"
env = gym.make(env_name)

# Define parameters

In [3]:
# Define necessary parameters.
n_episodes = 1000
n_win_ticks = 3000

gamma = 1.0
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.995
alpha = 0.01  
alpha_decay = 0.01

batch_size = 64

quet = False 

memory = deque(maxlen = 100000)

SHOW_EVERY = 20

# Building neural network

In [4]:
# Make the neural network model
# That has the role of q-matrix
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

model = Sequential()
model.add(Dense(24, input_dim = 4, activation = 'relu'))
model.add(Dense(48, activation = 'relu'))
model.add(Dense(48, activation = 'relu'))
model.add(Dense(2, activation = 'softmax'))
model.compile(loss = 'mse', optimizer = Adam(lr = alpha, decay = alpha_decay))

# Define necessary functions

In [5]:
def remember(state, action, reward, next_state, done):
    # Save situations.
    memory.append((state, action, reward, next_state, done))   
    
def choose_action(state, epsilon):
    # Prediction funcion. e % to choose a random choice else predict accordin the model.
    return env.action_space.sample() if np.random.random() <= epsilon else np.argmax(model.predict(state))

def get_epsilon(t):
    # Calculate the next epsilon according an math equation.
    return max(epsilon_min, min(epsilon, 1.0 - math.log10((t+1)*epsilon_decay)))

def preprocess_state(state):
    # Resize the state to fit with neural network.
    return np.reshape(state, [1,4])

def replay(batch_size, epsilon):
    # Define the training data.
    x_batch, y_batch = [], []
    # From the memory take at random, a amount of situations for batches.
    minibatch = random.sample(memory, min(len(memory), batch_size))
    
    for state, action, reward, next_state, done in minibatch:  
        # Calculate outputs of q-matrix
        y_target = model.predict(state)
        y_target[0][action] = reward if done else reward + gamma*np.max(model.predict(next_state)[0])
        x_batch.append(state[0])
        y_batch.append(y_target[0])
    # Train the model
    model.fit(np.array(x_batch), np.array(y_batch), batch_size = len(x_batch), verbose = 0)
    # decrease the epsilon. As the time gone, i want more prediction from q-matrix instead random.
    if epsilon > epsilon_min:
        epsilon *= epsilon_decay

# Define run function

In [6]:
def run():
    
    # Save the scores of 100 last episodes.
    scores = deque(maxlen = 100)
    
    for e in range(n_episodes):
        
        # start the environment and resize the state matrix.
        state = preprocess_state(env.reset())
        done = False
        # Score parameter.
        i = 0  
        
        while not done:
            
            # STEP1 choose an action.
            action = choose_action(state, get_epsilon(e))
            
            #STEP2 Do the action and get the new state and reward.
            next_state, reward, done, _ = env.step(action)
            # Resize the state matrix.
            next_state = preprocess_state(next_state)
            
            # STEP3 save the situation.
            remember(state, action, reward, next_state, done)
            # Restart the loop.
            state = next_state
            # Ingrece the i (score paremeter) in each loop!
            i += 1 
         
        # <<We are outside of the loop.>>
        # Add the numper of loops "i".
        scores.append(i) 
        # Get the average score of last episodes.
        mean_score = np.mean(scores) 
        
        # Check if the problem has solved.
        if mean_score >= n_win_ticks and e >= 100:
            # If want to print messages, it will print the episode which solved the puzzle.
            if not quet: 
                print('Run {} episodes. Solved after {} trails'.format(e, e-100))
            return e -100
        
        # Show the environment SHOW_EVERY times.
        if e % SHOW_EVERY == 0 and not quet:
            ## Take a image every 20 cycles.
            env.render()
            print('[Episode {}] - Mean survival time over last 100 episodes was {} ticks.'.format(e,mean_score))
        
        #STEP4 Train the neural network according the new situations of new episode.
        replay(batch_size, get_epsilon(e))
    
    # Print message.
    if not quet:
        print('Did not solve after {} episodes'.format(e))

In [None]:
run()

In [None]:
env.close()

In [None]:
model.save('saved.model')

In [9]:
new_model = keras.models.load_model('saved.model')

# Test of saved model.

In [13]:
done = False
state =env.reset()
state = preprocess_state(state)
ticks = 0
while not done:
    ticks += 1
    action = np.argmax(new_model.predict(state))
    state, reward, done, _ = env.step(action)
    state = preprocess_state(state)
    env.render()
print("The model run for {} ticks".format(ticks))
env.close()

The model run for 500 ticks
