In [None]:
from sklearn.preprocessing import KBinsDiscretizer
import gymnasium as gym
import numpy as np
import time, math, random
# using
# env = gym.make("CartPole-v1", render_mode="human")
# allows us to view the model as it is training, but it also slows down the process
env = gym.make("CartPole-v1", render_mode="rgb_array")

# Bins for pole's angle = 6
# Bins for pole's angular velocity = 12
n_bins = ( 6 , 12 )

# The actual pole angular velocity can go from -INF to +INF. We bin is to
# between -50 rad/s and 50 rad/s
lower_bounds = [ env.observation_space.low[2], -math.radians(50) ]
upper_bounds = [ env.observation_space.high[2], math.radians(50) ]

# We create a discretizer function that returns the bin
# that the angle and pole angular velocity falls into
est = KBinsDiscretizer(n_bins=n_bins, encode='ordinal', strategy='uniform')
est.fit([lower_bounds, upper_bounds ])
def discretizer(angle : float , pole_velocity : float ) -> tuple:
    """Convert continuous state into a bin (discrete state)"""
    bins = est.transform([[angle, pole_velocity]])[0]
    # return the bin for the angle, and the bin for the pole velocity
    return (int(bins[0]), int(bins[1]))

# Build a 3-dimensional table of 6 x 12 x 2 (because 2 possible actions)
Q_table = np.zeros(n_bins + (env.action_space.n,))

def policy( state : tuple ):
    """Choosing action based on policy"""
    return np.argmax(Q_table[state])

def new_Q_value( reward : float ,  new_state : tuple , discount_factor=1 ) -> float:
    """Temporal difference for updating Q-value of state-action pair"""
    future_optimal_value = np.max(Q_table[new_state])
    learned_value = reward + discount_factor * future_optimal_value
    return learned_value

# Adaptive learning of Learning Rate
def learning_rate(n : int , min_rate=0.01 ) -> float  :
    """Decaying learning rate"""
    return max(min_rate, min(1.0, 1.0 - math.log10((n + 1) / 25)))

def exploration_rate(n : int, min_rate= 0.1 ) -> float :
    """Decaying exploration rate"""
    return max(min_rate, min(1, 1.0 - math.log10((n  + 1) / 25)))

n_episodes = 10000
for e in range(n_episodes):
    # Discretize state into buckets
    observation, info = env.reset()
    # Get current state as a tuple of 2 bins
    # Send in only the angle and pole velocity
    current_state = discretizer(observation[2], observation[3])
    terminated = False
    truncated = False
    run_time = 0

    while not (terminated or truncated):
        run_time += 1
        # policy action
        action = policy(current_state) # exploit

        # insert random action in order to explore our state space more fully
        if np.random.random() < exploration_rate(e) :
            action = env.action_space.sample() # explore

        # increment enviroment
        obs, reward, terminated, truncated, info = env.step(action)
        new_state = discretizer(obs[2], obs[3])

        # Update Q-Table
        lr = learning_rate(e)
        learnt_value = new_Q_value(reward , new_state )
        old_value = Q_table[current_state][action]
        Q_table[current_state][action] = (1-lr)*old_value + lr*learnt_value

        current_state = new_state

        # Render the cartpole environment
        env.render()
    print(run_time)

env.close()
