In [2]:
import gym
import numpy as np
import random
import math

In [3]:
environment = gym.make('CartPole-v1')

In [4]:
environment.reset()

array([-0.0425023 ,  0.00437766,  0.01588009,  0.01659871], dtype=float32)

In [5]:
for dummy in range(100):
    environment.render()
    environment.step(environment.action_space.sample())

  logger.warn(


In [6]:
environment.reset()

array([ 0.01365825,  0.04215448,  0.03110193, -0.03966234], dtype=float32)

In [7]:
no_buckets = (1, 1, 6, 3)
no_actions = environment.action_space.n

In [8]:
state_value_bounds = list(zip(environment.observation_space.low,
environment.observation_space.high))
state_value_bounds[1] = [-0.5, 0.5]
state_value_bounds[3] = [-math.radians(50), math.radians(50)]

In [9]:
action_index = len(no_buckets)

In [10]:
q_value_table = np.zeros(no_buckets + (no_actions,))

In [11]:
min_explore_rate = 0.01
min_learning_rate = 0.1

In [12]:
max_episodes = 1000
max_time_steps = 250
streak_to_end = 120
solved_time = 199
discount = 0.99
no_streaks = 0

In [13]:
def select_action(state_value, explore_rate):
    if random.random() <explore_rate:
        action = environment.action_space.sample()
    else:
        action = np.argmax(q_value_table[state_value])
    return action

In [14]:
def select_explore_rate(x):
    return max(min_explore_rate, min(1, 1.0 - math.log10((x+1)/25)))

In [15]:
def select_learning_rate(x):
    return max(min_learning_rate, min(0.5, 1.0 - math.log10((x+1)/25)))

In [16]:
def bucketize_state_value(state_value):
    bucket_indexes = []
    for i in range(len(state_value)):
        if state_value[i] <= state_value_bounds[i][0]:
            bucket_index = 0
        elif state_value[i] >= state_value_bounds[i][1]:
            bucket_index = no_buckets[i] - 1
        else:
            bound_width = state_value_bounds[i][1] - state_value_bounds[i][0]
            offset = (no_buckets[i]-1)*state_value_bounds[i][0]/bound_width
            scaling = (no_buckets[i]-1)/bound_width
            bucket_index = int(round(scaling*state_value[i] - offset))
            bucket_indexes.append(bucket_index)
    return tuple(bucket_indexes)

In [18]:
for episode_no in range(max_episodes):
    explore_rate = select_explore_rate(episode_no)
    learning_rate = select_learning_rate(episode_no)

observation = environment.reset()

start_state_value = bucketize_state_value(observation)
previous_state_value = start_state_value
 
for time_step in range(max_time_steps):
    environment.render()
    selected_action = select_action(previous_state_value, explore_rate)
    observation, reward_gain, completed, _ = environment.step(selected_action)
    state_value = bucketize_state_value(observation)
    best_q_value = q_value_table[state_value] #np.amax(q_value_table[state_value])
    q_value_table[previous_state_value + (selected_action,)] += learning_rate * (reward_gain + discount * (best_q_value) - q_value_table[previous_state_value + (selected_action,)])




In [1]:
print('Episode number : %d' % episode_no)
print('Time step : %d' % time_step)
print('Selection action : %d' % selected_action)
print('Current state : %s' % str(state_value))
print('Reward obtained : %f' % reward_gain)
print('Best Q value : %f' % best_q_value)
print('Learning rate : %f' % learning_rate)
print('Explore rate : %f' % explore_rate)
print('Streak number : %d' % no_streaks)
if completed:
    print('Episode %d finished after %f time steps' % (episode_no, time_step))
if time_step>= solved_time:
    no_streaks += 1
else:
    no_streaks = 0
    break
 
previous_state_value = state_value
 
if no_streaks>streak_to_end:
    break   


NameError: name 'episode_no' is not defined

In [10]:
import random
import gym
import math
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from tensorflow.keras.optimizers import Adam

class DQNCartPoleSolver():
    def __init__(self, n_episodes=200, n_win_ticks=195, max_env_steps=None, gamma=1.0, epsilon=1.0, epsilon_min=0.01, epsilon_log_decay=0.995, alpha=0.01, alpha_decay=0.01, batch_size=64, monitor=False, quiet=False):
        self.memory = deque(maxlen=100000)
        self.env = gym.make('CartPole-v1')
        self.env.theta_threshold_radians = 0.20944
        self.env.x_threshold = 2.4
        self.env.force_mag = 100
        if monitor: self.env = gym.wrappers.Monitor(self.env, '../data/cartpole-1', force=True)
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_log_decay
        self.alpha = alpha
        self.alpha_decay = alpha_decay
        self.n_episodes = n_episodes
        self.n_win_ticks = n_win_ticks
        self.batch_size = batch_size
        self.quiet = quiet
        if max_env_steps is not None: self.env._max_episode_steps = max_env_steps

        # Init model
        self.model = Sequential()
        self.model.add(Dense(24, input_dim=4, activation='tanh'))
        self.model.add(Dense(48, activation='tanh'))
        self.model.add(Dense(2, activation='linear'))
        self.model.compile(loss='mse', optimizer=Adam(lr=self.alpha, decay=self.alpha_decay))

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def choose_action(self, state, epsilon):
        return self.env.action_space.sample() if (np.random.random() <= epsilon) else np.argmax(self.model.predict(state))

    def get_epsilon(self, t):
        return max(self.epsilon_min, min(self.epsilon, 1.0 - math.log10((t + 1) * self.epsilon_decay)))

    def preprocess_state(self, state):
        return np.reshape(state, [1, 4])

    def replay(self, batch_size):
        x_batch, y_batch = [], []
        minibatch = random.sample(
            self.memory, min(len(self.memory), batch_size))
        for state, action, reward, next_state, done in minibatch:
            y_target = self.model.predict(state)
            y_target[0][action] = reward if done else reward + self.gamma * np.max(self.model.predict(next_state)[0])
            x_batch.append(state[0])
            y_batch.append(y_target[0])
        
        self.model.fit(np.array(x_batch), np.array(y_batch), batch_size=len(x_batch), verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def run(self):
        scores = deque(maxlen=100)

        for e in range(self.n_episodes):
            state = self.preprocess_state(self.env.reset())
            done = False
            i = 0
            while not done:
                action = self.choose_action(state, self.get_epsilon(e))
                next_state, reward, done, _ = self.env.step(action)
                next_state = self.preprocess_state(next_state)
                self.remember(state, action, reward, next_state, done)
                state = next_state
                i += 1

            scores.append(i)
            mean_score = np.mean(scores)
            if mean_score >= self.n_win_ticks and e >= 100:
                if not self.quiet: print('Ran {} episodes. Solved after {} trials ✔'.format(e, e - 100))
                return e - 100
            if e % 100 == 0 and not self.quiet:
                print('[Episode {}] - Mean survival time over last 100 episodes was {} ticks.'.format(e, mean_score))

            self.replay(self.batch_size)
        
        if not self.quiet: print('Did not solve after {} episodes 😞'.format(e))
        return e

if __name__ == '__main__':
    agent = DQNCartPoleSolver()
    agent.run()

[Episode 0] - Mean survival time over last 100 episodes was 18.0 ticks.
[Episode 100] - Mean survival time over last 100 episodes was 15.36 ticks.
Did not solve after 199 episodes 😞
