## Install Libraries

In [None]:
!pip install gym[classic_control]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pygame==2.1.0
  Downloading pygame-2.1.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.3 MB)
[K     |████████████████████████████████| 18.3 MB 101 kB/s 
Installing collected packages: pygame
Successfully installed pygame-2.1.0


## Import libraries

In [None]:
import gym
import numpy as np
from sklearn.preprocessing import KBinsDiscretizer
import math
from typing import Tuple

In [None]:
n_bins = (6, 12)
environment = gym.make('CartPole-v1', new_step_api = True)

## Discretizer

In [None]:
def discretizer( _ , __ , angle, pole_velocity):
  """Convert continues state intro a discrete state"""
  lower_bounds = [ environment.observation_space.low[2], -math.radians(50) ]
  upper_bounds = [ environment.observation_space.high[2], math.radians(50) ]
  est = KBinsDiscretizer(n_bins=n_bins, encode='ordinal', strategy='uniform')
  est.fit([lower_bounds, upper_bounds ])
  return tuple(map(int,est.transform([[angle, pole_velocity]])[0]))

## Policy Evaluation

In [None]:
def policy_evaluation(policy, P, R, S, discount_factor = 1.0, theta = 1e-9, max_iterations = 1e9):
  eval_iters = 1 # Evaluation iterations

  V = np.zeros(n_bins)  # Value function array

  # Repeat till change in value function reaches threshold
  for i in range(int(max_iterations)):
    delta = 0 #Initialize change in value function to 0

    # Iterate through each state
    for angle in range(n_bins[0]):
      for velocity in range(n_bins[1]):
        v = 0 # Accumulate expected value here

        state = (angle, velocity) # state for this iteration

        for action, action_probability in enumerate(policy[state]):
          environment.state = S[state] # Initialize state of environment to pre-computed reverse map state value

          #check how good next state will be
          obs, reward, terminated, _, _ = environment.step(action)
          next_state = discretizer(*obs)
          # print(f'Next state={next_state}, reward = {reward}, terminated = {terminated}')

          v += action_probability * P[(*state, action, *next_state)] * (R[(*state, action)] + discount_factor * V[next_state])

          if terminated:
            environment.reset()
        
        delta = max(delta, np.abs(V[state] - v)) # absolute change of value function

        V[state] = v  # update value function
    
    eval_iters += 1

    # Terminate if value change less than delta
    if delta < theta:
      print(f'Policy evaluated in {eval_iters} iterations.')
      return V

## One step Lookahead
For choosing the next best action from a state in a greedy member if required

In [None]:
def one_step_lookahead(state, V, P, R, discount_factor):
  """
  Function computes the action values for different actions
  From our state, if we take an action, how is it gonna add up to our returns
  """
  action_values = np.zeros(environment.action_space.n)
  for action in range(environment.action_space.n):
    for angle in range(n_bins[0]):
      for velocity in range(n_bins[1]):
        next_state = (angle, velocity)
        action_values[action] += P[(*state, action, *next_state)] + (R[(*state, action)] + discount_factor * V[next_state])
  return action_values

## Policy iteration

In [None]:
def policy_iteration(P, R, S, discount_factor = 1.0, max_iterations = 1e9):
  # start with a uniform policy
  policy = np.ones([*n_bins, environment.action_space.n]) / environment.action_space.n
  # Initialze counter of evaluated policies
  eval_policies = 1

  # Repeat until convergence or critical number of iterations reached
  for i in range(int(max_iterations)):
    stable_policy = True
    #Evaluate current policy
    V = policy_evaluation(policy, P, R, S, discount_factor = discount_factor)

    # Go through each state and try to improve actions that were taken (policy improvement)
    for angle in range(n_bins[0]):
      for velocity in range(n_bins[1]):
        # Choose best action for current state
        state = (angle, velocity)

        current_action = np.argmax(policy[state])

        # Look one step ahead and evaluate whether the current action is best
        action_value = one_step_lookahead(state, V, P, R, discount_factor)

        # Select better action
        best_action = np.argmax(action_value)

        # If action changes
        if current_action != best_action:
          stable_policy = False
          # Greedy policy update
          policy[state] = np.eye(environment.action_space.n)[best_action]

    eval_policies += 1

    # If the algorithm converged and policy is not changing anymore, then return
    if stable_policy:
      print(f'Evaluate {eval_policies} policies.')
      return policy, V

## The value iteration algorithm

In [None]:
def value_iteration(P, R, S, discount_factor = 1.0, theta = 1e-9, max_iterations = 1e9):
  # Initialize state-value function with zeros for each environment state
  V = np.zeros(n_bins)
  for i in range(int(max_iterations)):
    # Stopping condition
    delta = 0
    # Update ach state
    for angle in range(n_bins[0]):
      for velocity in range(n_bins[1]):
        state = (angle, velocity)
        # One ste lookahead to calculate state-action values
        action_value = one_step_lookahead(state, V, P, R, discount_factor)

        # Select best action to perform based on the highest state-action values
        best_action_value = np.max(action_value)

        # Calculate change
        delta = max(delta, np.abs(V[state] - best_action_value))

        # Update the value function for current state
        V[state] = best_action_value
    
    # Check if stopping condition:
    if delta < theta:
      print(f'Value-iteration converged at iterations{i}.')
      break

  # Create a deteministic policy using the optimal value function
  policy = np.zeros([*n_bins, environment.action_space.n])

  for angle in range(n_bins[0]):
    for velocity in range(n_bins[1]):
      state = (angle, velocity)
      # One step lookeahead to find the best action for this state
      action_value = one_step_lookahead(state, V, P, R, discount_factor)

      # Select best action based on the highest state-action value
      best_action = np.argmax(action_value)

      # Update the policy to perform a better action at a current state
      policy[(*state, best_action)] = 1
  
  return policy, V

In [None]:
def get_variables():  
  P = np.zeros((*n_bins, environment.action_space.n, *n_bins))
  N = np.zeros((*n_bins, environment.action_space.n))
  R = np.zeros((*n_bins, environment.action_space.n))
  S = np.zeros((*n_bins, environment.observation_space.shape[0]))

  MAX_ITER = 10
  DISCOUNT = 0.5
  # MAX_ITER = 10000

  for _ in range(MAX_ITER):
    cur_c = environment.reset()
    cur = discretizer(*cur_c)
    S[cur] += cur_c
    S[cur] *= DISCOUNT
    
    done = False
    while not done:
      action = np.random.randint(2)
      # print(action)
      # print(env.step(action))
      obs, reward, done, _, _ = environment.step(action)
      next_state = discretizer(*obs)
      S[next_state] += obs
      S[next_state] *= DISCOUNT
      P[(*cur, action, *next_state)] += 1
      N[(*cur, action)] += 1
      if not done:
        R[(*cur, action)] += 1
      cur = next_state
  N += 1e-10
  R = R / N

  for angle in range(n_bins[0]):
    for velocity in range(n_bins[1]):
      for action in range(environment.action_space.n):
        for angle2 in range(n_bins[0]):
          for velocity2 in range(n_bins[1]):
            P[(angle, velocity)+ (action, ) +(angle2, velocity2)] /= N[(angle, velocity)+ (action, )]

  return P, N, R, S

## Test

In [None]:
def play_episodes(n_episodes, policy):
  total_reward = 0
  for episodes in range(n_episodes):
    terminated = False
    state = environment.reset()
    state = discretizer(*state)
    while not terminated:
      # Select best action to perform in current state
      action = np.argmax(policy[state])

      # Perform an action and observe how environment acted in response
      next_state, reward, terminated, info, _ = environment.step(action)

      total_reward += reward

      # Update current state
      next_state = discretizer(*next_state)
      state = next_state
  
  average_reward = total_reward / n_episodes
  return total_reward, average_reward

# Number of episodes
N_EPISODES = 10000
# Function to find best policy
solvers = [('Policy Iteration', policy_iteration),
           ('Value Iteration', value_iteration)]

for iteration_name, iteration_function in solvers:
  P, N, R, S = get_variables()

  environment.reset()
  policy, V = iteration_function(P, R, S)

  # Apply best policy
  total_reward, average_reward = play_episodes(N_EPISODES, policy)

  print(f'{iteration_name} :: average reward over {N_EPISODES} episodes = {average_reward} \n\n')

Policy evaluated in 241 iterations.
Policy evaluated in 70 iterations.
Evaluate 3 policies.
Policy Iteration :: average reward over 10000 episodes = 23.4268 




  # This is added back by InteractiveShellApp.init_path()


Value-iteration converged at iterations16.
Value Iteration :: average reward over 10000 episodes = 9.3529 


