<a href="https://colab.research.google.com/github/LawZhou/Cartpole-v0/blob/main/Cartpole_v0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import gym
import numpy as np
from tqdm import tqdm


class CartpoleAgent():
    def __init__(self, env, num_episodes, bins=(10, 10, 20, 20), min_lr=0.1, epsilon=0.2, lr=1.0,
                 discount_factor=1.0, lr_decay=0.25, render=False):
        '''
        Information about Cartpole-v0 env
        check https://github.com/openai/gym/blob/master/gym/envs/classic_control/cartpole.py for more details.
        Observation:
        Type: Box(4)
        Num     Observation               Min                     Max
        0       Cart Position             -4.8                    4.8
        1       Cart Velocity             -Inf                    Inf
        2       Pole Angle                -0.418 rad (-24 deg)    0.418 rad (24 deg)
        3       Pole Angular Velocity     -Inf                    Inf
        Actions:
        Type: Discrete(2)
        Num   Action
        0     Push cart to the left
        1     Push cart to the right

        env: the environment.
        num_episodes: number of episodes to train.
        bins: a tuple specifies the number of bins for each observation.
        min_lr: the minimum learning rate.
        epsilon: the probability of exploration.
        lr: learning rate.
        discount_factor: discount factor.
        lr_decay: the rate of learning rate decay.
        render: to toggle render during training.
        '''
        self.num_episodes = num_episodes
        self.min_lr = min_lr
        self.epsilon = epsilon
        self.discount_factor = discount_factor
        self.lr_decay = lr_decay
        self.lr = lr
        self.env = env
        self.render = render
        self.render_every = 2000  # Render an episode for every {self.render_every} episodes

        # Discretize the continuous space using bins.
        self.bins = bins
        self.position_bins = np.linspace(self.env.observation_space.low[0],
                                         self.env.observation_space.high[0], num=self.bins[0])
        self.pos_velocity_bins = np.linspace(-4, 4, num=self.bins[1])
        self.angle_bins = np.linspace(self.env.observation_space.low[2],
                                      self.env.observation_space.high[2], num=self.bins[2])
        self.angle_velopcity_bins = np.linspace(-4, 4, num=self.bins[3])

        self.Q = np.zeros(self.bins + (self.env.action_space.n,))  # Q-table

    def train(self):
        '''
        Train the model for self.num_episodes episodes.
        '''
        for ep in tqdm(range(self.num_episodes)):
            state = self.env.reset()
            state = self.discretize_state(state)
            self.lr = self.get_learning_rate()
            done = False
            while not done:
                if self.render and ep % 2000 == 0:
                    env.render()
                action = self.choose_action(state)
                next_state, reward, done, _ = self.env.step(action)
                next_state = self.discretize_state(next_state)
                self.update_Q(state, action, reward, next_state)
                state = next_state

    def update_Q(self, state, action, reward, next_state):
        '''
        Update the Q table by equation:
        Q(S, A) <- Q(S, A) + alpha*[reward + discount_factor*max_a(Q(S', a) - Q(S, A))]
        '''
        self.Q[state][action] += self.lr * (
                reward + self.discount_factor * np.max(self.Q[next_state]) - self.Q[state][action])

    def discretize_state(self, obs):
        '''
        Discretize the continuous state using bins.
        '''
        discrete_pos = np.digitize(obs[0], bins=self.position_bins)-1  # -1 turns bin into index
        discrete_pos_vel = np.digitize(obs[1], bins=self.pos_velocity_bins)-1
        discrete_angle = np.digitize(obs[2], bins=self.angle_bins)-1
        discrete_angle_vel = np.digitize(obs[3], bins=self.angle_velopcity_bins)-1
        discrete_state = np.array([discrete_pos, discrete_pos_vel, discrete_angle, discrete_angle_vel]).astype(np.int)
        return tuple(discrete_state)

    def choose_action(self, state, greedy=False):
        '''
        Choose action by following epsilon-greedy policy.
        '''
        if not greedy:
            # For training
            if np.random.random() < self.epsilon:  # Exploration
                return self.env.action_space.sample()
            else:
                return np.argmax(self.Q[state]) # Exploitation
        else:
            # For evaluation
            return np.argmax(self.Q[state])


    def get_learning_rate(self):
        '''
        Decay the learning rate to slow down learning in the later episodes.
        '''
        return max(self.min_lr, self.lr - self.lr * self.lr_decay)


    def run(self):
        '''
        Run an episode using the updated Q table.
        '''
        state = self.env.reset()
        for ep in range(50000):
            state = self.discretize_state(state)
            action = self.choose_action(state, greedy=True)
            obs, reward, done, info = self.env.step(action)
            if done:
                break
            state = obs
        return ep

def run_episodes(agent, play_eps=2000):
    '''
    Run {play_eps} episodes and compute average returns.
    return True if the problem is solved.
    '''
    stepsRecorder = []
    num_solved_ep = 0
    solved = False
    for _ in range(play_eps):
        returns = agent.run()
        num_solved_ep += 1 if returns >= 195 else 0
        if num_solved_ep >= 100: solved = True
        stepsRecorder.append(returns)
    stepsRecorder = np.array(stepsRecorder)
    print(f'Finish with mean steps: {np.mean(stepsRecorder)} in {play_eps} episodes')
    print(f'{np.count_nonzero(stepsRecorder >= 195)} episodes last more than 195 steps.')
    return solved


In [None]:
env = gym.make('CartPole-v0')

print('train using Q learning:')
env.reset()
agent = CartpoleAgent(env, num_episodes=5000, render=False)  # Toggle render to enable render during training
agent.train()
solved = run_episodes(agent)
if solved:
  print('Problem solved.')

  2%|▏         | 120/5000 [00:00<00:04, 1190.76it/s]

train using Q learning:


100%|██████████| 5000/5000 [00:53<00:00, 93.72it/s]


Finish with mean steps: 198.9995 in 2000 episodes
2000 episodes last more than 195 steps.
Problem solved.
