<a href="https://colab.research.google.com/github/Ogunfool/Approximate-RL-Methods-for-CartPole-Environment-on-OpenAI-gym/blob/main/CartPole_Approximate_Qlearning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import necessary libraries
import gym
import numpy as np
import matplotlib.pyplot as plt
from sklearn.kernel_approximation import RBFSampler

# Global Variables
GAMMA = 0.99
ALPHA = 0.1

In [None]:
# Some checks
# OpenAI - gym
import gym
env = gym.make('CartPole-v1')
s = env.reset()
a = env.action_space.sample()
print(s)
print(a)

env.action_space.n

In [None]:
# Lets build some functions/callbacks
# Epsilon_greedy function (Behaviour Policy)
def epsilon_greedy(model, s, eps=0.1):
  # we'll use epsilon-soft to ensure all states are visited
  # what happens if you don't do this? i.e. eps=0
  p = np.random.random()
  if p < (1 - eps):
    values = model.predict_all_actions(s)
    return np.argmax(values)
  else:
    return model.env.action_space.sample() #This is how you generate random actions from the environment (OpenAI gym)


def gather_samples(env, n_episodes=10000):
  samples = []
  for _ in range(n_episodes):
    s = env.reset()
    done = False
    truncated = False
    while not (done or truncated):
      a = env.action_space.sample()
      sa = np.concatenate((s, [a]))
      samples.append(sa)

      s, r, done, truncated = env.step(a)
  return samples

In [None]:
# Lets build the model
# env.action_space.n = returns the no of elements in the action space method of the environment
class Model:
  def __init__(self, env):
    # fit the featurizer to data
    self.env = env
    samples = gather_samples(env)
    self.featurizer = RBFSampler()
    self.featurizer.fit(samples)
    dims = self.featurizer.n_components # returns the dimension or the no of components the featurizer will return

    # initialize linear model weights
    self.w = np.zeros(dims)

  def predict(self, s, a):
    sa = np.concatenate((s, [a]))
    x = self.featurizer.transform([sa])[0]
    return x @ self.w

  def predict_all_actions(self, s):
    return [self.predict(s, a) for a in range(self.env.action_space.n)]

  def grad(self, s, a):
    sa = np.concatenate((s, [a]))
    x = self.featurizer.transform([sa])[0]
    return x


def test_agent(model, env, n_episodes=20):
  reward_per_episode = np.zeros(n_episodes)
  for it in range(n_episodes):
    done = False
    truncated = False
    episode_reward = 0
    s = env.reset()
    while not (done or truncated):
      a = epsilon_greedy(model, s, eps=0) # epsilon = 0 during test. i.e Always act greedily, no exploration. # But not randomly, based on the Q values
      s, r, done, truncated = env.step(a)
      episode_reward += r
    reward_per_episode[it] = episode_reward
  return np.mean(reward_per_episode)

# Play one episode, you basically just want to see how many rewards you can collect/episode in a single episode
def watch_agent(model, env, eps):
  done = False
  truncated = False
  episode_reward = 0
  s = env.reset()
  while not (done or truncated):
    a = epsilon_greedy(model, s, eps=eps)
    s, r, done, truncated = env.step(a)
    episode_reward += r
  print("Episode reward:", episode_reward)


In [None]:
# instantiate environment
env = gym.make("CartPole-v1")
model = Model(env)
reward_per_episode = []

# watch untrained agent
watch_agent(model, env, eps=0)

# Basic Q-learning loop (s,a,r,s2)
# repeat until convergence
n_episodes = 1500
for it in range(n_episodes):
  s = env.reset()
  episode_reward = 0
  done = False
  truncated = False
  while not (done or truncated):
    a = epsilon_greedy(model, s)
    s2, r, done, truncated = env.step(a)

    # Let's do gradient descent update
    # get the target
    if done:
      target = r
    else:
      values = model.predict_all_actions(s2) # A prediction over all possible actions for a given state
      target = r + GAMMA * np.max(values)

    # update the model
    g = model.grad(s, a)
    err = target - model.predict(s, a)
    model.w += ALPHA * err * g
    
    # accumulate reward
    episode_reward += r

    # update state
    s = s2

  if (it + 1) % 50 == 0:
    print(f"Episode: {it + 1}, Reward: {episode_reward}")

  # early exit
  if it > 20 and np.mean(reward_per_episode[-20:]) == 200:
    print("Early exit")
    break
  
  reward_per_episode.append(episode_reward)

# test trained agent
test_reward = test_agent(model, env)
print(f"Average test reward: {test_reward}")

plt.plot(reward_per_episode)
plt.title("Reward per episode")
plt.show()

# watch trained agent
watch_agent(model, env, eps=0)


In [None]:
 watch_agent(model, env, eps=0)

Episode reward: 65.0
