In [None]:
import gym

In [None]:
gym.envs.register(
    id='FrozenLakeNotSlippery-v0',
    entry_point='gym.envs.toy_text:FrozenLakeEnv',
    kwargs={'map_name' : '4x4', 'is_slippery': False},
    max_episode_steps=100,
    reward_threshold=0.74
)

In [None]:
# Create the gridworld-like environment
env=gym.make('FrozenLakeNotSlippery-v0')
# Let's look at the model of the environment (i.e., P):
env.env.P
# Question: what is the data in this structure saying? Relate this to the course
# presentation of P

{0: {0: [(1.0, 0, 0.0, False)],
  1: [(1.0, 4, 0.0, False)],
  2: [(1.0, 1, 0.0, False)],
  3: [(1.0, 0, 0.0, False)]},
 1: {0: [(1.0, 0, 0.0, False)],
  1: [(1.0, 5, 0.0, True)],
  2: [(1.0, 2, 0.0, False)],
  3: [(1.0, 1, 0.0, False)]},
 2: {0: [(1.0, 1, 0.0, False)],
  1: [(1.0, 6, 0.0, False)],
  2: [(1.0, 3, 0.0, False)],
  3: [(1.0, 2, 0.0, False)]},
 3: {0: [(1.0, 2, 0.0, False)],
  1: [(1.0, 7, 0.0, True)],
  2: [(1.0, 3, 0.0, False)],
  3: [(1.0, 3, 0.0, False)]},
 4: {0: [(1.0, 4, 0.0, False)],
  1: [(1.0, 8, 0.0, False)],
  2: [(1.0, 5, 0.0, True)],
  3: [(1.0, 0, 0.0, False)]},
 5: {0: [(1.0, 5, 0, True)],
  1: [(1.0, 5, 0, True)],
  2: [(1.0, 5, 0, True)],
  3: [(1.0, 5, 0, True)]},
 6: {0: [(1.0, 5, 0.0, True)],
  1: [(1.0, 10, 0.0, False)],
  2: [(1.0, 7, 0.0, True)],
  3: [(1.0, 2, 0.0, False)]},
 7: {0: [(1.0, 7, 0, True)],
  1: [(1.0, 7, 0, True)],
  2: [(1.0, 7, 0, True)],
  3: [(1.0, 7, 0, True)]},
 8: {0: [(1.0, 8, 0.0, False)],
  1: [(1.0, 12, 0.0, True)],
  2: [(

The returned dict is the description of all 16 (0-15) possible states of game.

For each state, the description is also a dict with each possible action corresponding to unique key. The value is a list that includes probability of transitioning into the state, next state, reward and whether the game has been over.

In [None]:
# Now let's investigate the observation space (i.e., S using our nomenclature),
# and confirm we see it is a discrete space with 16 locations
print(env.observation_space)

Discrete(16)


In [None]:
stateSpaceSize = env.observation_space.n
print(stateSpaceSize)

16


In [None]:
# Now let's investigate the action space (i.e., A) for the agent->environment
# channel
print(env.action_space)

Discrete(4)


In [None]:
# The gym environment has ...sample() functions that allow us to sample
# from the above spaces:
for g in range(1,10,1):
  print("sample from S:",env.observation_space.sample()," ... ","sample from A:",env.action_space.sample())



sample from S: 8  ...  sample from A: 3
sample from S: 6  ...  sample from A: 0
sample from S: 5  ...  sample from A: 2
sample from S: 7  ...  sample from A: 2
sample from S: 3  ...  sample from A: 3
sample from S: 6  ...  sample from A: 3
sample from S: 7  ...  sample from A: 2
sample from S: 3  ...  sample from A: 3
sample from S: 9  ...  sample from A: 1


In [None]:
env.action_space.sample()

3

In [None]:
# The enviroment also provides a helper to render (visualize) the environment
env.reset()
env.render()


[41mS[0mFFF
FHFH
FFFH
HFFG


In [66]:
# We can act as the agent, by selecting actions and stepping the environment
# through time to see its responses to our actions
env.reset()
exitCommand=False
while not(exitCommand):
  env.render()
  print("Enter the action as an integer from 0 to",env.action_space.n," (or exit): ")
  userInput=input()
  if userInput=="exit":
    break
  action=int(userInput)
  (observation, reward, compute, probability) = env.step(action)
  print("--> The result of taking action",action,"is:")
  print("     S=",observation)
  print("     R=",reward)
  print("     p=",probability)

  env.render()



[41mS[0mFFF
FHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
0
--> The result of taking action 0 is:
     S= 0
     R= 0.0
     p= {'prob': 1.0}
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
1
--> The result of taking action 1 is:
     S= 4
     R= 0.0
     p= {'prob': 1.0}
  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
2
--> The result of taking action 2 is:
     S= 5
     R= 0.0
     p= {'prob': 1.0}
  (Right)
SFFF
F[41mH[0mFH
FFFH
HFFG
  (Right)
SFFF
F[41mH[0mFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
3
--> The result of taking action 3 is:
     S= 5
     R= 0
     p= {'prob': 1.0}
  (Up)
SFFF
F[41mH[0mFH
FFFH
HFFG
  (Up)
SFFF
F[41mH[0mFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
exit


In [None]:
# Question: draw a table indicating the correspondence between the action
# you input (a number) and the logic action performed.
# Question: draw a table that illustrates what the symbols on the render image
# mean?
# Question: Explain what the objective of the agent is in this environment?

In [None]:
input2action = {0:"left",1:"down",2:"right",3:"up"}
symbol2meaning = {"S":"Starting Point", "F":"Ice surface", "H":"Hole", "G":"Destination"}
agtobjective = "Move from S to G step by step without trapped by H"
print("table indicating the correspondence between the action you input (a number) and the logic action performed: " , input2action)
print("table that illustrates what the symbols on the render image mean: ",symbol2meaning)
print(agtobjective)

table indicating the correspondence between the action you input (a number) and the logic action performed:  {0: 'left', 1: 'down', 2: 'right', 3: 'up'}
table that illustrates what the symbols on the render image mean:  {'S': 'Starting Point', 'F': 'Ice surface', 'H': 'Hole', 'G': 'Destination'}
Move from S to G step by step without trapped by H


In [None]:
# Practical: Code up an AI that will employ random action selection in order
# to drive the agent. Test this random action selection agent with the
# above environment (i.e., code up a loop as I did above, but instead
# of taking input from a human user, take it from the AI you coded).

In [None]:
class AIplayer():
  def __init__(self):
    self.estimations = {}
    self.state = []

  def random_action(self,env):
    action = env.action_space.sample()
    return action


In [None]:
env.reset()
maxstep = 50
for _ in range(maxstep):
  env.render()
  print("Enter the action as an integer from 0 to",env.action_space.n," (or exit): ")
  player = AIplayer()
  
  action=int(player.random_action(env))
  (observation, reward, compute, probability) = env.step(action)
  print("--> The result of taking action",action,"is:")
  print("     S=",observation)
  print("     R=",reward)
  print("     p=",probability)

  env.render()


[41mS[0mFFF
FHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
--> The result of taking action 0 is:
     S= 0
     R= 0.0
     p= {'prob': 1.0}
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
--> The result of taking action 0 is:
     S= 0
     R= 0.0
     p= {'prob': 1.0}
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
--> The result of taking action 2 is:
     S= 1
     R= 0.0
     p= {'prob': 1.0}
  (Right)
S[41mF[0mFF
FHFH
FFFH
HFFG
  (Right)
S[41mF[0mFF
FHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
--> The result of taking action 1 is:
     S= 5
     R= 0.0
     p= {'prob': 1.0}
  (Down)
SFFF
F[41mH[0mFH
FFFH
HFFG
  (Down)
SFFF
F[41mH[0mFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
--> The result of taking action 3 is:
     S= 5
     R= 

In [None]:
# Now towards dynamic programming. Note that env.env.P has the model
# of the environment.
#
# Question: How would you represent the agent's policy function and value function?
# Practical: revise the above AI solver to use a policy function in which you
# code the random action selections in the policy function. Test this.
# Practical: Code the C-4 Policy Evaluation (Prediction) algorithm. You may use
# either the inplace or ping-pong buffer (as described in the lecture). Now
# randomly initialize your policy function, and compute its value function.
# Report your results: policy and value function. Ensure your prediction
# algo reports how many iterations it took.
#
# (Optional): Repeat the above for q.
#
# Policy Improvement:
# Question: How would you use P and your value function to improve an arbitrary
# policy, pi, per Chapter 4?
# Practical: Code the policy iteration process, and employ it to arrive at a
# policy that solves this problem. Show your testing results, and ensure
# it reports the number of iterations for each step: (a) overall policy
# iteration steps and (b) evaluation steps.
# Practical: Code the value iteration process, and employ it to arrive at a
# policy that solves this problem. Show your testing results, reporting
# the iteration counts.
# Comment on the difference between the iterations required for policy vs
# value iteration.
#
# Optional: instead of the above environment, use the "slippery" Frozen Lake via
# env = gym.make("FrozenLake-v0")

policy function would be a argmax function on the top of value function  [#state * #action].

In [None]:
import numpy as np

In [None]:
class AIplayer2():
  def __init__(self):
    self.estimations = {}
    self.state = []

  def random_action(self,state):
    self.estimations = {i:[0.25,0.25,0.25,0.25] for i in range(16)}
    action = np.random.choice(np.where(b == b.max())[0])np.argmax(self.estimations[state])
    return action


In [None]:
env.reset()
maxstep = 50
state = 0
for _ in range(maxstep):
  env.render()
  print("Enter the action as an integer from 0 to",env.action_space.n," (or exit): ")
  player = AIplayer2()
  action=int(player.random_action(state))
  (observation, reward, compute, probability) = env.step(action)
  print("--> The result of taking action",action,"is:")
  print("     S=",observation)
  print("     R=",reward)
  print("     p=",probability)
  state = observation
  env.render()


[41mS[0mFFF
FHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
--> The result of taking action 2 is:
     S= 1
     R= 0.0
     p= {'prob': 1.0}
  (Right)
S[41mF[0mFF
FHFH
FFFH
HFFG
  (Right)
S[41mF[0mFF
FHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
--> The result of taking action 3 is:
     S= 1
     R= 0.0
     p= {'prob': 1.0}
  (Up)
S[41mF[0mFF
FHFH
FFFH
HFFG
  (Up)
S[41mF[0mFF
FHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
--> The result of taking action 0 is:
     S= 0
     R= 0.0
     p= {'prob': 1.0}
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
--> The result of taking action 3 is:
     S= 0
     R= 0.0
     p= {'prob': 1.0}
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
Enter the action as an integer from 0 to 4  (or exit): 
--> The result of taking action 3 is:
     S= 0
     R= 0.0
    

In [194]:
class AIplayer3():
  def __init__(self):
    self.states = 16
    self.estimations = {}
    self.p = {i:np.ones(4)/(4) for i in range(self.states)}
    self.epsilon = 0.6

  def train(self):
    self.epsilon = 0.6

  def eval(self):
    self.epsilon = 0

  def choose_action(self,state):
    action = np.argmax(self.p[state])#np.argmax(self.estimations[state]*self.p[state])#np.argmax(self.estimations[state])
    return action
  
  def backupV(self, env, gamma=1, theta=1e-8):
    self.estimations = {i:0 for i in range(self.states)}
    iter = 1
    while True:
      iter+=1
      delta = 0
      for s in range(self.states):
        v = 0
        for a, prob in enumerate(self.p[s]):
          for p, s1, r, _ in env.P[s][a]:
            v += prob* p * (r + gamma * self.estimations[s1])
        delta = max(delta, np.abs(self.estimations[s]-v))
        self.estimations[s] = v
      if delta < theta:
        print("Evaluation done in {} iterations".format(iter))
        break


  def backupP(self, env, gamma):
      for s in range(self.states):
        for a in range(len(self.p[s])):
          for p, s1, r, _ in env.P[s][a]:
            self.p[s][a] =  p * (r + gamma * self.estimations[s1])
        best_a = np.argwhere(self.p[s]==np.max(self.p[s])).flatten()
        self.p[s] = np.sum([np.eye(env.nA)[i] for i in best_a], axis=0)/len(best_a)
      

# Policy Evaluation

In [195]:
player = AIplayer3()
player.backupV(env, gamma=1, theta=1e-8)


Evaluation done in 58 iterations


# Policy improvement

We can update the policy by modify the decision metrix self.p with the old value function and P

In [197]:
player.backupP(env, gamma)
player.p

{0: array([0., 1., 0., 0.]),
 1: array([0., 0., 1., 0.]),
 2: array([0., 1., 0., 0.]),
 3: array([1., 0., 0., 0.]),
 4: array([0., 1., 0., 0.]),
 5: array([0.25, 0.25, 0.25, 0.25]),
 6: array([0., 1., 0., 0.]),
 7: array([0.25, 0.25, 0.25, 0.25]),
 8: array([0., 0., 1., 0.]),
 9: array([0., 1., 0., 0.]),
 10: array([0., 1., 0., 0.]),
 11: array([0.25, 0.25, 0.25, 0.25]),
 12: array([0.25, 0.25, 0.25, 0.25]),
 13: array([0., 0., 1., 0.]),
 14: array([0., 0., 1., 0.]),
 15: array([0.25, 0.25, 0.25, 0.25])}

# Overall Policy Iteration Process

In [193]:
player = AIplayer3()
num_iter = 100
max_steps = 12
for i in range(num_iter):
  player.backupV(env, gamma=1, theta=1e-8)
  player.backupP(env, gamma=1)
  if i%10 == 0:
    s = env.reset()
    G = 0
    for _ in range(max_steps):
      a = player.choose_action(s)
      s1,r,d,_ = env.step(a)
      G += r
      s = s1
    print("{} iterations | 12 step Return: {}".format(i,G))
  

0 iterations | 12 step Return: 1.0
10 iterations | 12 step Return: 1.0
20 iterations | 12 step Return: 1.0
30 iterations | 12 step Return: 1.0
40 iterations | 12 step Return: 1.0
50 iterations | 12 step Return: 1.0
60 iterations | 12 step Return: 1.0
70 iterations | 12 step Return: 1.0
80 iterations | 12 step Return: 1.0
90 iterations | 12 step Return: 1.0


# Value iteration

In [263]:
class AIplayer4():
  def __init__(self):
    self.p = np.zeros((16,4))

  def choose_action(self,s):
    return np.argmax(self.p[s])

  def backup(self, theta, env, gamma=0.8):
    V = np.max(self.p,axis=1)
    iter = 0
    while True:
      print(self.p, V)
      iter += 1
      delta = 0
      for s in range(env.nS):
        v = V[s]
        for a in range(env.nA):
          for _,s1,r,_ in env.P[s][a]:
            self.p[s,a] = r + gamma*V[s1]
        V[s] = max(self.p[s])
        delta = max(delta, abs(V[s] - v))
      if delta < theta:
        print("converged after {} iterations".format(iter))
        break  
    for s in range(env.nS):
      for a in range(env.nA):
        for _,s1,r,_ in env.P[s][a]:
          self.p[s,a] = r + gamma*V[s1]              

In [264]:
player = AIplayer4()
player.backup(1e-8,env,gamma=0.8)

[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]] [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 0. 0. 0.]] [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
[[0.   0.   0.   0.  ]
 [0.   0.   0.   0.  ]
 [0.   0.   0.   0.  ]
 [0.   0.   0.   0.  ]
 [0.   0.   0.   0.  ]
 [0.   0.   0.   0.  ]
 [0.   0.   0.   0.  ]
 [0.   0.   0.   0.  ]
 [0.   0.   0.   0.  ]
 [0.   0.   0.   0.  ]
 [0.   0.8  0.   0.  ]
 [0.   0.   0.   0.  ]
 [0.   0.   0.   0.  ]
 [0.   0.   0.8  0.  ]
 [0.64 0.8  1.   0.64]
 [0.   0.   0.   0.  ]] [0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.8 0.  

In [265]:
player.p

array([[0.262144, 0.32768 , 0.32768 , 0.262144],
       [0.262144, 0.      , 0.4096  , 0.32768 ],
       [0.32768 , 0.512   , 0.32768 , 0.4096  ],
       [0.4096  , 0.      , 0.32768 , 0.32768 ],
       [0.32768 , 0.4096  , 0.      , 0.262144],
       [0.      , 0.      , 0.      , 0.      ],
       [0.      , 0.64    , 0.      , 0.4096  ],
       [0.      , 0.      , 0.      , 0.      ],
       [0.4096  , 0.      , 0.512   , 0.32768 ],
       [0.4096  , 0.64    , 0.64    , 0.      ],
       [0.512   , 0.8     , 0.      , 0.512   ],
       [0.      , 0.      , 0.      , 0.      ],
       [0.      , 0.      , 0.      , 0.      ],
       [0.      , 0.64    , 0.8     , 0.512   ],
       [0.64    , 0.8     , 1.      , 0.64    ],
       [0.      , 0.      , 0.      , 0.      ]])

In [267]:

max_steps = 12
s = env.reset()
G = 0
for _ in range(max_steps):
  a = player.choose_action(s)
  s1,r,d,_ = env.step(a)
  G += r
  s = s1
print("12 step Return: {}".format(G))

12 step Return: 1.0


Since the problem itself is not very complex, both policy iteration and value iteration converge fast (32 iterations with only 1 improvement and 7 iterations, repectively). The difference between two is that value iteration process directly on the policy map (s,a) instead of estimating V(s) based on a fixed given policy and then update the policy map to improve the policy. However, since the 4*4 fronzen land is very sample, the policy iteration was able to solve the problem with only one time policy improvement. 