# Imports

In [1]:
import numpy as np
import os
import sys
import tensorflow as tf
import random

%matplotlib nbagg
import matplotlib
import matplotlib.animation as animation
import matplotlib.pyplot as plt
plt.rcParams['axes.labelsize'] = 14
plt.rcParams['xtick.labelsize'] = 12
plt.rcParams['ytick.labelsize'] = 12

# Setup

In [2]:
import gym
env = gym.make("CartPole-v0")

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m


In [3]:
state = env.reset()

In [4]:
state

array([-0.03665658, -0.01980497,  0.04695702, -0.0012428 ])

In [5]:
pos, vel, ang, ang_vel = state
print(pos)
print(vel)
print(ang)
print(ang_vel)

-0.03665657751521315
-0.019804966428977413
0.04695701737512192
-0.0012427975989536683


In [6]:
from PIL import Image, ImageDraw

try:
    from pyglet.gl import gl_info
    openai_cart_pole_rendering = True   # no problem, let's use OpenAI gym's rendering function
except Exception:
    openai_cart_pole_rendering = False  # probably no X server available, let's use our own rendering function

def render_cart_pole(env, state):
    if openai_cart_pole_rendering:
        # use OpenAI gym's rendering function
        return env.render(mode="rgb_array")
    else:
        # rendering for the cart pole environment (in case OpenAI gym can't do it)
        img_w = 600
        img_h = 400
        cart_w = img_w // 12
        cart_h = img_h // 15
        pole_len = img_h // 3.5
        pole_w = img_w // 80 + 1
        x_width = 2
        max_ang = 0.2
        bg_col = (255, 255, 255)
        cart_col = 0x000000 # Blue Green Red
        pole_col = 0x669acc # Blue Green Red

        pos, vel, ang, ang_vel = state
        img = Image.new('RGB', (img_w, img_h), bg_col)
        draw = ImageDraw.Draw(img)
        cart_x = pos * img_w // x_width + img_w // x_width
        cart_y = img_h * 95 // 100
        top_pole_x = cart_x + pole_len * np.sin(ang)
        top_pole_y = cart_y - cart_h // 2 - pole_len * np.cos(ang)
        draw.line((0, cart_y, img_w, cart_y), fill=0)
        draw.rectangle((cart_x - cart_w // 2, cart_y - cart_h // 2, cart_x + cart_w // 2, cart_y + cart_h // 2), fill=cart_col) # draw cart
        draw.line((cart_x, cart_y - cart_h // 2, top_pole_x, top_pole_y), fill=pole_col, width=pole_w) # draw pole
        return np.array(img)

def plot_cart_pole(env, state):
    plt.close()  # or else nbagg sometimes plots in the previous cell
    img = render_cart_pole(env, state)
    plt.imshow(img)
    plt.axis("off")
    plt.show()

In [7]:
plot_cart_pole(env, state);

<IPython.core.display.Javascript object>

# Experimentation

In [8]:
env.action_space

Discrete(2)

In [9]:
env.observation_space

Box(4,)

## Take 1 step right

In [10]:
state = env.reset()
state, reward, done, info = env.step(1)
print(state)
print(reward)
print(done)

plt.close()  # or else nbagg sometimes plots in the previous cell
img = render_cart_pole(env, state)
plt.imshow(img)
plt.axis("off")

[ 0.03956075  0.21993608  0.0015008  -0.25736609]
1.0
False


<IPython.core.display.Javascript object>

(-0.5, 599.5, 399.5, -0.5)

## Take 1 step left

In [11]:
state = env.reset()
state, reward, done, info = env.step(0)
print(state)
print(reward)
print(done)

plt.close()  # or else nbagg sometimes plots in the previous cell
img = render_cart_pole(env, state)
plt.imshow(img)
plt.axis("off")

[ 0.02189321 -0.15486819  0.01182046  0.30339194]
1.0
False


<IPython.core.display.Javascript object>

(-0.5, 599.5, 399.5, -0.5)

## Keep going left until fall

In [12]:
state = env.reset()
while True:
    state, reward, done, info = env.step(0)
    print(state)
    if done:
        break

plt.close()  # or else nbagg sometimes plots in the previous cell
img = render_cart_pole(env, state)
plt.imshow(img)
plt.axis("off")

[ 0.01597608 -0.23606959 -0.03673939  0.27014817]
[ 0.01125469 -0.43064852 -0.03133643  0.55102067]
[ 0.00264172 -0.62531666 -0.02031601  0.83366811]
[-0.00986461 -0.82015522 -0.00364265  1.11989314]
[-0.02626771 -1.0152292   0.01875521  1.41143123]
[-0.0465723  -1.21057859  0.04698384  1.70991749]
[-0.07078387 -1.40620782  0.08118219  2.01684567]
[-0.09890803 -1.60207287  0.1215191   2.33351699]
[-0.13094948 -1.79806576  0.16818944  2.6609771 ]
[-0.1669108  -1.99399643  0.22140898  2.99994098]


<IPython.core.display.Javascript object>

(-0.5, 599.5, 399.5, -0.5)

## Keep going right until fall

In [13]:
state = env.reset()
while True:
    state, reward, done, info = env.step(1)
    print(state)
    if done:
        break

plt.close()  # or else nbagg sometimes plots in the previous cell
img = render_cart_pole(env, state)
plt.imshow(img)
plt.axis("off")

[-0.0121287   0.2264034  -0.03756043 -0.27849155]
[-0.00760063  0.42204049 -0.04313026 -0.58278037]
[ 8.40175948e-04  6.17739322e-01 -5.47858651e-02 -8.88731992e-01]
[ 0.01319496  0.81356025 -0.0725605  -1.19812167]
[ 0.02946617  1.00954228 -0.09652294 -1.51263523]
[ 0.04965701  1.2056915  -0.12677564 -1.83382323]
[ 0.07377084  1.40196777 -0.16345211 -2.16304716]
[ 0.1018102   1.59826921 -0.20671305 -2.50141592]
[ 0.13377558  1.7944145  -0.25674137 -2.84971196]


<IPython.core.display.Javascript object>

(-0.5, 599.5, 399.5, -0.5)

# Improve Cart-Pole

## Animation functions

In [14]:
def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch,

def plot_animation(frames, repeat=False, interval=40):
    plt.close()  # or else nbagg sometimes plots in the previous cell
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    return animation.FuncAnimation(fig, update_scene, fargs=(frames, patch), frames=len(frames), repeat=repeat, interval=interval)

## Custom Policy
1. if angle and angular_velocity are both <0, then it is definitely failling left => move left to balance
2. if angle and angular_velocity are both >0, then it is definitely failling right => move right to balance
3. if above are both false => just move random for 1 step

In [15]:
def custom_policy(state, env):
    ang = state[2]
    ang_vel = state[3]
    if ang<0 and ang_vel<0: # falling left => move left
        return 0
    if ang>0 and ang_vel>0: # falling right => move right
        return 1    
    return env.action_space.sample() # play randomly

In [16]:
print(env._max_episode_steps)

200


In [17]:
def run_policy(num_of_episodes, max_steps=None):
    if max_steps is not None:
        env._max_episode_steps = max_steps
    print("Max episode steps:", env._max_episode_steps)
    frames = []
    rewards = []
    for episode in range(num_of_episodes):
        episode_reward = 0
        state = env.reset()
        for step in range(env._max_episode_steps):
            if episode==0: # check out 1 animation
                img = render_cart_pole(env, state)
                frames.append(img)

            action = custom_policy(state, env)
            state, reward, done, info = env.step(action)        
            episode_reward += reward
            if done:            
                break
        rewards.append(episode_reward)
        print(episode, episode_reward)
    return rewards, frames

### Try with the definition of `solved` as per the docs – 100 consecutive trials with default max_episode_steps

In [18]:
rewards, frames = run_policy(100)

Max episode steps: 200
0 200.0
1 200.0
2 200.0
3 200.0
4 200.0
5 200.0
6 200.0
7 200.0
8 200.0
9 200.0
10 200.0
11 200.0
12 200.0
13 200.0
14 200.0
15 200.0
16 200.0
17 200.0
18 200.0
19 200.0
20 200.0
21 200.0
22 200.0
23 200.0
24 200.0
25 200.0
26 200.0
27 200.0
28 200.0
29 200.0
30 200.0
31 200.0
32 200.0
33 200.0
34 200.0
35 200.0
36 200.0
37 200.0
38 200.0
39 200.0
40 200.0
41 200.0
42 200.0
43 200.0
44 200.0
45 200.0
46 200.0
47 200.0
48 200.0
49 200.0
50 200.0
51 200.0
52 200.0
53 200.0
54 200.0
55 200.0
56 200.0
57 200.0
58 200.0
59 200.0
60 200.0
61 200.0
62 200.0
63 200.0
64 200.0
65 200.0
66 200.0
67 200.0
68 200.0
69 200.0
70 200.0
71 200.0
72 200.0
73 200.0
74 200.0
75 200.0
76 200.0
77 200.0
78 200.0
79 200.0
80 200.0
81 200.0
82 200.0
83 200.0
84 200.0
85 200.0
86 200.0
87 200.0
88 200.0
89 200.0
90 200.0
91 200.0
92 200.0
93 200.0
94 200.0
95 200.0
96 200.0
97 200.0
98 200.0
99 200.0


In [19]:
np.mean(rewards), np.std(rewards), np.min(rewards), np.max(rewards)

(200.0, 0.0, 200.0, 200.0)

In [20]:
video = plot_animation(frames)
plt.show()

<IPython.core.display.Javascript object>

### Try with 100 consecutive trials with max_episode_steps as 1000

In [21]:
rewards, frames = run_policy(100, 1000)

Max episode steps: 1000
0 271.0
1 612.0
2 708.0
3 613.0
4 899.0
5 1000.0
6 909.0
7 463.0
8 479.0
9 222.0
10 248.0
11 484.0
12 1000.0
13 1000.0
14 590.0
15 1000.0
16 775.0
17 965.0
18 468.0
19 1000.0
20 1000.0
21 646.0
22 513.0
23 511.0
24 1000.0
25 679.0
26 1000.0
27 411.0
28 621.0
29 848.0
30 984.0
31 252.0
32 953.0
33 434.0
34 272.0
35 868.0
36 1000.0
37 552.0
38 416.0
39 479.0
40 416.0
41 1000.0
42 833.0
43 510.0
44 620.0
45 462.0
46 1000.0
47 352.0
48 477.0
49 1000.0
50 612.0
51 442.0
52 1000.0
53 321.0
54 299.0
55 620.0
56 765.0
57 288.0
58 714.0
59 650.0
60 1000.0
61 470.0
62 511.0
63 1000.0
64 283.0
65 408.0
66 1000.0
67 445.0
68 1000.0
69 1000.0
70 716.0
71 351.0
72 532.0
73 525.0
74 603.0
75 839.0
76 914.0
77 441.0
78 308.0
79 285.0
80 589.0
81 544.0
82 273.0
83 589.0
84 319.0
85 581.0
86 101.0
87 479.0
88 874.0
89 479.0
90 586.0
91 558.0
92 184.0
93 863.0
94 276.0
95 335.0
96 560.0
97 353.0
98 331.0
99 671.0


In [22]:
np.mean(rewards), np.std(rewards), np.min(rewards), np.max(rewards)

(617.02, 258.64512290008486, 101.0, 1000.0)

In [23]:
video = plot_animation(frames)
plt.show()

<IPython.core.display.Javascript object>

# Solve using Q-Learning

In [288]:
env = gym.make("CartPole-v0")

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m


In [289]:
env.action_space

Discrete(2)

In [290]:
env.observation_space

Box(4,)

In [291]:
env.observation_space.low

array([-4.8000002e+00, -3.4028235e+38, -4.1887903e-01, -3.4028235e+38],
      dtype=float32)

In [292]:
env.observation_space.high

array([4.8000002e+00, 3.4028235e+38, 4.1887903e-01, 3.4028235e+38],
      dtype=float32)

In [293]:
state_bounds = list(zip(env.observation_space.low, env.observation_space.high))
state_bounds

[(-4.8, 4.8),
 (-3.4028235e+38, 3.4028235e+38),
 (-0.41887903, 0.41887903),
 (-3.4028235e+38, 3.4028235e+38)]

In [294]:
state_bounds[1] = (-1.0, 1.0) # limit the range of velocity just to avoid large numbers (its not playing an important role as we have picked bin_size=1 for it)
state_bounds[3] = state_bounds[2] # choose the range of angular_velocity same as angle (basically limit the max angle it can turn in 1 sec)
state_bounds

[(-4.8, 4.8),
 (-1.0, 1.0),
 (-0.41887903, 0.41887903),
 (-0.41887903, 0.41887903)]

In [295]:
NUM_BINS = (1, 1, 6, 3)

In [296]:
qtable = np.zeros(NUM_BINS + (env.action_space.n,))

In [297]:
state = env.reset()
state

array([-0.02792291, -0.01976941,  0.04841651, -0.04169525])

In [298]:
def state_to_bin(state):
    bin_indexes = []
    for i in range(len(state)):                
        if state[i]<=state_bounds[i][0]:
            bin_index = 0
        elif state[i]>=state_bounds[i][1]:
            bin_index = NUM_BINS[i]-1
        else:
            param_range = state_bounds[i][1] - state_bounds[i][0] # total range from min => max
            # convert the observed param value to be in the scale of (0, max-min) => by subtracting min
            # this helps evaluating bin_index easy (indexes are integers starting from 0)
            param_obs = state[i]-(state_bounds[i][0])
            bin_index = int(param_obs/(param_range/NUM_BINS[i]))
            
        bin_indexes.append(bin_index)
    
    return tuple(bin_indexes)

In [299]:
state_bin = state_to_bin(state)
state_bin

(0, 0, 3, 1)

In [300]:
SOLVED_AVG_STEPS = 195.0
SOLVED_CONSECUTIVE_TRIALS = 100

In [301]:
total_episodes = 1000

min_learning_rate = 0.1
gamma = 1.0

# Exploration parameters
epsilon = 1.0
min_epsilon = 0.01

In [302]:
def get_exploration_rate(t):
    return max(min_epsilon, min(1, 1.0 - math.log10((t+1)/25)))

def get_learning_rate(t):
    return max(min_learning_rate, min(0.5, 1.0 - math.log10((t+1)/25)))

In [303]:
qtable = np.zeros(NUM_BINS + (env.action_space.n,))

episode_steps = []
streak = 0

for episode in range(total_episodes):
    if episode%50 == 0:
        print("*"*50)
        print("Episode", episode)
    state = env.reset()
    done = False
    
    epsilon = get_exploration_rate(episode)
    learning_rate = get_learning_rate(episode)    
    
    for step in range(env._max_episode_steps):        
        state_bin = state_to_bin(state) # discretize continuous state        
        
        # explore or exploit
        if random.uniform(0,1) < epsilon:
            action = env.action_space.sample() # random
        else:
            action = np.argmax(qtable[state_bin]) # action with the highest Q-value, for a given state
        
        # take action
        new_state, reward, done, info = env.step(action)
        new_state_bin = state_to_bin(new_state)

        # update Q-table
        qtable[state_bin + (action,)] += learning_rate * (reward + gamma*np.max(qtable[new_state_bin]) - qtable[state_bin + (action,)])
        
        state = new_state        
        
        if done:
            if streak == 0 and step > SOLVED_AVG_STEPS:
                episode_steps = []
            episode_steps.append(step)
            break 
            
    if np.mean(episode_steps) >= SOLVED_AVG_STEPS:
        streak += 1
    else:
        streak = 0
        
    if streak >= SOLVED_CONSECUTIVE_TRIALS:
        print("Solved in %d episodes" % episode)
        break

**************************************************
Episode 0
**************************************************
Episode 50
**************************************************
Episode 100
**************************************************
Episode 150
**************************************************
Episode 200
Solved in 245 episodes
