In [None]:
#https://medium.com/swlh/using-q-learning-for-openais-cartpole-v1-4a216ef237df

In [None]:
import gym
import numpy as np
from sklearn.preprocessing import KBinsDiscretizer
import time, math, random
from typing import Tuple
import matplotlib.pyplot as plt

# Q-Learning

q(s,a) = q(sa-a) + lr(r+ymaxq(s',a'))
ymax: Discount factor (0,1)
a': Future action
s': Future state

In [None]:
env = gym.make('CartPole-v1')

#Q-table
#Actions: left - right
print(env.action_space.n)

In [None]:
class CartPoleAgent:
    def __init__(self, env, episodes):
        self.env = env
        # One table: actions * possible angles, second: actions*pole_vels
        self.n_bins = (6, 12)
        self.q_table = np.zeros(self.n_bins + (env.action_space.n, ))
        self.lower_bounds = [env.observation_space.low[2], -math.radians(50)]
        self.upper_bounds = [env.observation_space.high[2], -math.radians(50)]
        self.lr = 0.1
        self.decay = 24
        self.discount = 1.0
        self.episodes = episodes


    def get_discrete(self, state):
        _, __, angle, pole_vel = state
        est = KBinsDiscretizer(n_bins=self.n_bins, encode='ordinal', strategy='uniform')
        est.fit([self.lower_bounds, self.upper_bounds])
        return tuple(map(int, est.transform([[angle, pole_vel]])[0]))

    def get_policy(self, state):
        return np.argmax(self.q_table[state])

    #Q function
    def update_q_value(self, reward, state, action, new_state):
        return self.lr * (reward + self.discount * np.max(self.q_table[new_state]) - self.q_table[state][action])

    #Adaptive lr
    def get_lr(self, n, min_lr_rate=0.01):
        return max(min_lr_rate, min(1.0, 1.0 - math.log10((n+1) / self.decay)))

    #Decaying explo rate/epsilon
    def get_explo_rate(self, n,min_explo_rate=0.01):
        return max(min_explo_rate, min(1.0, 1.0 - math.log10((n+1) / self.decay)))

    def update_q_table(self, current_state, action, old_val, learnt_val):
        self.q_table[current_state][action] += (1-self.lr) * old_val + self.lr*learnt_val
        return self.q_table[current_state][action]

    def train(self):
        scores = []
        for e in range(self.episodes):
            current_state = self.get_discrete(env.reset())
            self.lr = self.get_lr(e)

            done = False
            #Tracks how many inputs it survives
            score = 0
            while not done:
                action = self.get_policy(current_state)

                #Random action (exploration)
                if np.random.random() < self.get_explo_rate(e):
                    action = self.env.action_space.sample()

                obs, reward, done, _ = self.env.step(action)
                new_state = self.get_discrete(obs)

                #Gets new q-value
                #learnt_value = self.update_q_value(reward, current_state, action, new_state)
                self.q_table[current_state][action] += self.update_q_value(reward, current_state, action, new_state)
                #Gets old val from q-table

                current_state = new_state
                score += 1

            scores.append(score)

        #print(scores)
        print("Training done!")
        return scores





In [None]:
import warnings
warnings.filterwarnings('ignore')
model = CartPoleAgent(env, episodes=5000)
scores = model.train()


#print(model.get_discrete_state())

In [None]:
print(model.q_table)
plt.plot(scores,  c='blue', label='epochs')
plt.legend()