## **Reinforcement Learning: Dynamic Programming**

In [None]:
import gym

In [None]:
gym.envs.register(
    id='FrozenLakeNotSlippery-v0',
    entry_point='gym.envs.toy_text:FrozenLakeEnv',
    kwargs={'map_name' : '4x4', 'is_slippery': False},
    max_episode_steps=100,
    reward_threshold=0.74
)

In [None]:
env=gym.make('FrozenLakeNotSlippery-v0')
env.env.P

{0: {0: [(1.0, 0, 0.0, False)],
  1: [(1.0, 4, 0.0, False)],
  2: [(1.0, 1, 0.0, False)],
  3: [(1.0, 0, 0.0, False)]},
 1: {0: [(1.0, 0, 0.0, False)],
  1: [(1.0, 5, 0.0, True)],
  2: [(1.0, 2, 0.0, False)],
  3: [(1.0, 1, 0.0, False)]},
 2: {0: [(1.0, 1, 0.0, False)],
  1: [(1.0, 6, 0.0, False)],
  2: [(1.0, 3, 0.0, False)],
  3: [(1.0, 2, 0.0, False)]},
 3: {0: [(1.0, 2, 0.0, False)],
  1: [(1.0, 7, 0.0, True)],
  2: [(1.0, 3, 0.0, False)],
  3: [(1.0, 3, 0.0, False)]},
 4: {0: [(1.0, 4, 0.0, False)],
  1: [(1.0, 8, 0.0, False)],
  2: [(1.0, 5, 0.0, True)],
  3: [(1.0, 0, 0.0, False)]},
 5: {0: [(1.0, 5, 0, True)],
  1: [(1.0, 5, 0, True)],
  2: [(1.0, 5, 0, True)],
  3: [(1.0, 5, 0, True)]},
 6: {0: [(1.0, 5, 0.0, True)],
  1: [(1.0, 10, 0.0, False)],
  2: [(1.0, 7, 0.0, True)],
  3: [(1.0, 2, 0.0, False)]},
 7: {0: [(1.0, 7, 0, True)],
  1: [(1.0, 7, 0, True)],
  2: [(1.0, 7, 0, True)],
  3: [(1.0, 7, 0, True)]},
 8: {0: [(1.0, 8, 0.0, False)],
  1: [(1.0, 12, 0.0, True)],
  2: [(

We can clearly see that P is a list of dictionaries, which stores all the information of the States (from state 0 to state 15). In each state, we can find the key of the dictionary (0, 1, 2, 3) represents the actions we can take (0:left, 1:down, 2:right, 3:up). For each actions in the state, there exist four elements. The second element contains the next state (after taking this action). The third element stores the reward (if the next state is 15, it get the reward 1, otherwise is 0). The last element is a boolean to check if the game is over (reach the last place 15 or lose the game). The first element is all ones in this case, so we don't know what exactly it is. However, it could be the probability that the agent would take this action. If it is played by human, then probability of all the actions are set to one.

In [None]:
print(env.observation_space)

Discrete(16)


In [None]:
stateSpaceSize = env.observation_space.n
print(stateSpaceSize)

16


In [None]:
print(env.action_space)

Discrete(4)


In [None]:
actionSpaceSize = env.action_space.n
print(actionSpaceSize)

4


In [None]:
for g in range(1,10,1):
  print("sample from S:",env.observation_space.sample()," ... ","sample from A:",env.action_space.sample())

sample from S: 10  ...  sample from A: 2
sample from S: 6  ...  sample from A: 1
sample from S: 0  ...  sample from A: 3
sample from S: 8  ...  sample from A: 1
sample from S: 15  ...  sample from A: 0
sample from S: 15  ...  sample from A: 1
sample from S: 9  ...  sample from A: 2
sample from S: 4  ...  sample from A: 0
sample from S: 8  ...  sample from A: 3


In [None]:
env.reset()
env.render()


[41mS[0mFFF
FHFH
FFFH
HFFG


In [None]:
env.reset()
exitCommand=False
while not(exitCommand):
  env.render()
  print("Enter the action as an integer from 0 to",env.action_space.n," (or exit): ")
  userInput=input()
  if userInput=="exit":
    break
  action=int(userInput)
  (observation, reward, compute, probability) = env.step(action)
  print("--> The result of taking action",action,"is:")
  print("     S=",observation)
  print("     R=",reward)
  print("     p=",probability)

  env.render()



[41mS[0mFFF
FHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
0
--> The result of taking action 0 is:
     S= 0
     R= 0.0
     p= {'prob': 1.0}
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
1
--> The result of taking action 1 is:
     S= 4
     R= 0.0
     p= {'prob': 1.0}
  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
exit


In [None]:
import pandas as pd
 
data = {'Input':['0', '1', '2', '3'],
        'Logic Action':['Left', 'Down', 'Right', 'Up']}
 
df = pd.DataFrame(data)
df

Unnamed: 0,Input,Logic Action
0,0,Left
1,1,Down
2,2,Right
3,3,Up


In [None]:
data = {'Symbols':['S', 'H', 'F', 'G'],
        'Meaning':['Start', 'Hole', 'Frozen', 'Goal']}
 
df = pd.DataFrame(data)
df

Unnamed: 0,Symbols,Meaning
0,S,Start
1,H,Hole
2,F,Frozen
3,G,Goal


The agent should try to avoid all the holes in the map (4x4). Starting from the starting point, finding a suitable route to reach the goal point and get the reward (1) in the shortest time. Stopping at the State H (Hole) would lose the game immediately and receive nothing (0).

In [None]:
import random

env.reset()
exitCommand=False
while not(exitCommand):
  action=random.randint(0, 3)
  (observation, reward, compute, probability) = env.step(action)
  
  print("--> The result of taking action",action,"is:")
  print("     S=",observation)
  print("     R=",reward)
  print("     p=",probability)

  env.render()

  if compute and reward == 1 :
    print("Success!")
    break
  if compute and reward == 0 :
    print("Hole!")
    break

--> The result of taking action 3 is:
     S= 0
     R= 0.0
     p= {'prob': 1.0}
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
--> The result of taking action 1 is:
     S= 4
     R= 0.0
     p= {'prob': 1.0}
  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG
--> The result of taking action 2 is:
     S= 5
     R= 0.0
     p= {'prob': 1.0}
  (Right)
SFFF
F[41mH[0mFH
FFFH
HFFG
Hole!


We know that if we can find the optimal value functions, we can easily obtain optimal policies. We have $p(s',r|s,a)$, which represents a set of probabilities. For value function $v_{\pi}$, we could start with $v_{\pi}(s)=\sum_{a}\pi(a|s)\sum_{s',r}p(s',r|s,a)[r+\gamma v_{\pi}(s')]$, where $\pi(a|s)$ is the probability of taking action a in state s. Then we can pick up any initial value $v_0$ to implement iterative policy evaluation based on the Bellman equation ($v_{k+1}(s)=\sum_{a}\pi(a|s)\sum_{s',r}p(s',r|s,a)[r+\gamma v_{k}(s')]$). Each iteration of iterative strategy evaluation will update the value of each state once to generate a new approximate function $v_{k+1}$.

In [None]:
import numpy as np

def random_action_policy(stateSpaceSize, actionSpaceSize):
    action = np.ones([stateSpaceSize, actionSpaceSize]) / actionSpaceSize
    return action

In [None]:
import random

env.reset()
exitCommand=False
while not(exitCommand):
  env.render()
  policy = random_action_policy(stateSpaceSize, actionSpaceSize)
  action = np.random.choice(np.where(policy[observation] == policy[observation].max())[0])
  (observation, reward, compute, probability) = env.step(action)
  
  print("--> The result of taking action",action,"is:")
  print("     S=",observation)
  print("     R=",reward)
  print("     p=",probability)

  env.render()

  if compute and reward == 1 :
    print("Success!")
    break
  if compute and reward == 0 :
    print("Hole!")
    break


[41mS[0mFFF
FHFH
FFFH
HFFG
--> The result of taking action 1 is:
     S= 4
     R= 0.0
     p= {'prob': 1.0}
  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG
--> The result of taking action 3 is:
     S= 0
     R= 0.0
     p= {'prob': 1.0}
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
--> The result of taking action 1 is:
     S= 4
     R= 0.0
     p= {'prob': 1.0}
  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG
--> The result of taking action 2 is:
     S= 5
     R= 0.0
     p= {'prob': 1.0}
  (Right)
SFFF
F[41mH[0mFH
FFFH
HFFG
Hole!


In [None]:
def policy_evaluation(env, policy, gamma, theta, max_iter, if_print):
  ## Note: stateSpaceSize = 16, actionSpaceSize = 4
  ## Output -> V (4x4) -> Values of each state for the policy
  iter = 0
  V = np.zeros(stateSpaceSize)

  ## i tracks the iteration numbers
  for i in range (max_iter):
    ## Start with delta = 0
    delta = 0
    ## Loop for each state in stateSpace
    for s in range(stateSpaceSize):
      Vs = 0
      ## Loop for each actions in actionSpace
      for a in range(actionSpaceSize):
        action_probability = policy[s][a]
        probability, next, reward, done = env.P[s][a][0]
        ## Prob taking action * prob of env * (reward + gamma * value of next state)
        Vs += action_probability * probability * (reward + gamma * V[next])
      ## Update the delta
      delta = max(delta, np.abs(V[s] - Vs))
      V[s] = Vs
    ## Loop until delta < theta
    if delta < theta:
      iter = i
      break
  if if_print:
    print("Total iterations:", i)
  return V,i

In [None]:
policy = random_action_policy(stateSpaceSize, actionSpaceSize)
V,iter_num = policy_evaluation(env.env, policy, 1, 1e-8, 100000000, True)
V.resize((4,4))
V

Total iterations: 56


array([[0.01393977, 0.01163091, 0.02095297, 0.01047648],
       [0.01624865, 0.        , 0.04075153, 0.        ],
       [0.03480619, 0.08816993, 0.14205316, 0.        ],
       [0.        , 0.17582037, 0.43929118, 0.        ]])

In [None]:
def q_func(env, V, gamma):
  Q = np.zeros([stateSpaceSize, actionSpaceSize])

  for s in range(stateSpaceSize):
    q = np.zeros(actionSpaceSize)
    for a in range(actionSpaceSize):
      probability, next, reward, done = env.P[s][a][0]
      q[a] += probability * (reward + gamma * V[next])
    Q[s] = q
  
  return Q

In [None]:
policy = random_action_policy(stateSpaceSize, actionSpaceSize)
V,iter_num = policy_evaluation(env.env, policy, 1, 1e-8, 100000000, False)
Q = q_func(env.env, V, 1)
Q

array([[0.01393977, 0.01624865, 0.01163091, 0.01393977],
       [0.01393977, 0.        , 0.02095297, 0.01163091],
       [0.01163091, 0.04075153, 0.01047648, 0.02095297],
       [0.02095297, 0.        , 0.01047648, 0.01047648],
       [0.01624865, 0.03480619, 0.        , 0.01393977],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.14205316, 0.        , 0.02095297],
       [0.        , 0.        , 0.        , 0.        ],
       [0.03480619, 0.        , 0.08816993, 0.01624865],
       [0.03480619, 0.17582037, 0.14205316, 0.        ],
       [0.08816993, 0.43929118, 0.        , 0.04075153],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.17582037, 0.43929118, 0.08816993],
       [0.17582037, 0.43929118, 1.        , 0.14205316],
       [0.        , 0.        , 0.        , 0.        ]])

We have the env P and a value function. We want to find a better policy $\pi'$ so that $v_{\pi'}(s)\geq v_{\pi}$. Base on this, we could select at each state the action that appears best. $\pi'(s)$ could be express as $\pi'(s)=\arg\max_{a}\sum_{s',r}p(s',r|s,a)[r+\gamma v_{\pi}(s')]$. Now, the new greedy policy could better or equal to the original policy based on the policy improvement theorem. After that, we will do policy iteration, value iteration based on the previous improvement steps. Starting from a original policy, repeating policy evaluation and policy improvement to find the suitable policy.

In [None]:
def policy_improve(env,V,gamma) :
  max_num = 0
  new_policy = np.zeros([stateSpaceSize, actionSpaceSize]) / actionSpaceSize

  for s in range(stateSpaceSize):
    max_num = max(q_func(env, V, gamma)[s])
              
    if max_num == 0 :
      continue

    best_action = np.where(q_func(env, V, 1)[s] == max_num)[0]
    length = len(best_action)
    for i in best_action :
      new_policy[s][i] = 1/length
  
  return new_policy

In [None]:
def policy_iteration(env, gamma, theta, max_iter) :
  over_iter = 0
  Eval_iter = 0
  stateSpaceSize = env.observation_space.n
  actionSpaceSize = env.action_space.n
  ## Step 1: Initialization 
  policy = random_action_policy(stateSpaceSize, actionSpaceSize)

  ## i tracks the overall iteration numbers
  for i in range(max_iter):
    ## Step 2: Policy Evaluation 
    V,k = policy_evaluation(env, policy, gamma, theta, 10000, False)
    Eval_iter +=k
    ## Step 3: Policy Improvement
    new_policy = policy_improve(env,V,gamma)
    ## Check if two policies are the same
    N,v = policy_evaluation(env, new_policy, gamma, theta, 10000, False)
    if (new_policy == policy).all():
      break;
    #if np.max(abs(V - N)) < 1e-6:
    #  over_iter = i
    #  break;
    over_iter = i
    policy = new_policy.copy()
  
  print("Overall policy iteration steps :", over_iter)
  print("Evaluation steps. :", Eval_iter)

  return policy, V

In [None]:
policy_PI, V_PI = policy_iteration(env.env, 1, 1e-8, 10)
V_PI.resize((4,4))
V_PI

Overall policy iteration steps : 9
Evaluation steps. : 2766


array([[1., 1., 1., 1.],
       [1., 0., 1., 0.],
       [1., 1., 1., 0.],
       [0., 1., 1., 0.]])

In [None]:
def value_iteration(ebv,gamma,theta,max_iter) :
  stateSpaceSize = env.observation_space.n
  actionSpaceSize = env.action_space.n
  V = np.zeros(stateSpaceSize)
  ## i tracks the iteration numbers
  for i in range (max_iter):
    ## Start with delta = 0
    delta = 0
    ## Loop for each state in stateSpace
    for s in range(stateSpaceSize):
      Vs = V[s]
      V[s] = max(q_func(env, V, 1)[s])
      delta = max(delta, np.abs(V[s] - Vs))

    if delta < theta:
      break
      
  print("Overall policy iteration steps :", i)

  policy = policy_improve(env,V,gamma)
  return policy,V

In [None]:
policy_VI, V_VI = value_iteration(env, 1, 1e-8, 2000)
V_VI.resize((4,4))
V_VI

Overall policy iteration steps : 6


array([[1., 1., 1., 1.],
       [1., 0., 1., 0.],
       [1., 1., 1., 0.],
       [0., 1., 1., 0.]])

We can see that the iteration time for value_iteration is shorter than policy_iteration. Since we do not need to go through policy improvement in each iteration. It could save a lot of time.

In [None]:
import random

env.reset()
exitCommand=False
while not(exitCommand):
  policy = policy_VI
  action = np.random.choice(np.where(policy[observation] == policy[observation].max())[0])
  (observation, reward, compute, probability) = env.step(action)

  if compute and reward == 1 :
    env.render()
    print("Success!")
    break
  if compute and reward == 0 :
    print("Hole!")
    break

  (Right)
SFFF
FHFH
FFFH
HFF[41mG[0m
Success!
