In [5]:
import numpy as np
import gym

def generate_random_map(size, p):
    """Generates a random valid map (one that has a path from start to goal)
    :param size: size of each side of the grid
    :param p: probability that a tile is frozen
    """
    valid = False

    # DFS to check that it's a valid path.
    def is_valid(res):
        frontier, discovered = [], set()
        frontier.append((0,0))
        while frontier:
            r, c = frontier.pop()
            if not (r,c) in discovered:
                discovered.add((r,c))
                directions = [(1, 0), (0, 1), (-1, 0), (0, -1)]
                for x, y in directions:
                    r_new = r + x
                    c_new = c + y
                    if r_new < 0 or r_new >= size or c_new < 0 or c_new >= size:
                        continue
                    if res[r_new][c_new] == 'G':
                        return True
                    if (res[r_new][c_new] !='H'):
                        frontier.append((r_new, c_new))
        return False

    while not valid:
        p = min(1, p)
        res = np.random.choice(['F', 'H'], (size, size), p=[p, 1-p])
        res[0][0] = 'S'
        res[-1][-1] = 'G'
        valid = is_valid(res)
    return ["".join(x) for x in res]

generate_random_map(5,0.7)

['SFFFF', 'FFFFF', 'FFFHF', 'FFFFF', 'FHHFG']

In [18]:
custom_map = ['SFFFH', 'FHFFF', 'FFHFH', 'FFFFF', 'FHFFG']
gamma= 0.99
eps=0.0001

In [19]:
#Thuật toán lặp chính sách
import numpy as np
import gym

def eval_state_action(V, s, a, gamma=gamma):
    return np.sum([p * (rew + gamma*V[next_s]) for p, next_s, rew, _ in env.P[s][a]])

def policy_evaluation(V, policy, eps=eps):
    '''
    Policy evaluation. Update the value function until it reach a steady state
    '''
    while True:
        delta = 0
        # loop over all states
        for s in range(nS):
            old_v = V[s]
            # update V[s] using the Bellman equation
            V[s] = eval_state_action(V, s, policy[s])
            delta = max(delta, np.abs(old_v - V[s]))

        if delta < eps:
            break

def policy_improvement(V, policy):
    '''
    Policy improvement. Update the policy based on the value function
    '''
    policy_stable = True
    for s in range(nS):
        old_a = policy[s]
        # update the policy with the action that bring to the highest state value
        policy[s] = np.argmax([eval_state_action(V, s, a) for a in range(nA)])
        if old_a != policy[s]: 
            policy_stable = False

    return policy_stable


def run_episodes(env, policy, num_games=1000):
    '''
    Run some games to test a policy
    '''
    tot_rew = 0
    state = env.reset()

    for _ in range(num_games):
        done = False
        while not done:
            # select the action accordingly to the policy
            next_state, reward, done, _ = env.step(policy[state])
                
            state = next_state
            tot_rew += reward 
            if done:
                state = env.reset()

    print('Won %i of %i games!'%(tot_rew, num_games))

            
if __name__ == '__main__':
    # create the environment
    #desc= custom_map  or desc= generate_random_map(size,p) 
    env = gym.make('FrozenLake-v0',desc= custom_map)
    # enwrap it to have additional information from it
    env = env.unwrapped
    env.render()

    # spaces dimension
    nA = env.action_space.n
    nS = env.observation_space.n
    
    # initializing value function and policy
    V = np.zeros(nS)
    policy = np.zeros(nS)

    # some useful variable
    policy_stable = False
    it = 0

    while not policy_stable:
        print(policy.reshape((5,5)))
        policy_evaluation(V, policy)
        policy_stable = policy_improvement(V, policy)
        it += 1

    print('Converged after %i policy iterations'%(it))
    run_episodes(env, policy)
    print(V.reshape((5,5)))
    print(policy.reshape((5,5)))


[41mS[0mFFFH
FHFFF
FFHFH
FFFFF
FHFFG
[[0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]]
[[0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 1. 1.]
 [0. 0. 0. 1. 0.]]
[[0. 0. 1. 0. 0.]
 [0. 0. 1. 2. 1.]
 [0. 0. 0. 0. 0.]
 [0. 0. 1. 2. 1.]
 [0. 0. 1. 2. 0.]]
[[0. 1. 2. 0. 0.]
 [0. 0. 2. 2. 1.]
 [0. 0. 0. 0. 0.]
 [0. 1. 1. 1. 1.]
 [0. 0. 2. 2. 0.]]
[[1. 2. 2. 0. 0.]
 [0. 0. 2. 0. 1.]
 [1. 0. 0. 0. 0.]
 [1. 2. 1. 1. 1.]
 [0. 0. 2. 2. 0.]]
[[0. 3. 2. 0. 0.]
 [0. 0. 2. 0. 1.]
 [1. 0. 0. 0. 0.]
 [1. 3. 1. 1. 1.]
 [0. 0. 2. 2. 0.]]
[[0. 3. 3. 0. 0.]
 [0. 0. 2. 0. 1.]
 [0. 0. 0. 0. 0.]
 [1. 3. 1. 1. 1.]
 [0. 0. 2. 2. 0.]]
Converged after 7 policy iterations
Won 579 of 1000 games!
[[0.36622453 0.32930569 0.30253772 0.28507986 0.        ]
 [0.37759253 0.         0.19102399 0.27636208 0.13610881]
 [0.40054318 0.28460439 0.         0.36137257 0.        ]
 [0.43572637 0.46192058 0.67944319 0.8187138  0.90075766]
 [0.42283587 0.         0.778301

In [20]:
#Thuật toán lặp giá trị
import numpy as np
import gym

def eval_state_action(V, s, a, gamma=gamma):
    return np.sum([p * (rew + gamma*V[next_s]) for p, next_s, rew, _ in env.P[s][a]])

def value_iteration(eps=eps):
    '''
    Value iteration algorithm
    '''
    V = np.zeros(nS)
    it = 0

    while True:
        delta = 0
        # update the value of each state using as "policy" the max operator
        for s in range(nS):
            old_v = V[s]
            V[s] = np.max([eval_state_action(V, s, a) for a in range(nA)])
            delta = max(delta, np.abs(old_v - V[s]))

        if delta < eps:
            break
        else:
            print('Iter:', it, ' delta:', np.round(delta, 5))
        it += 1

    return V

def run_episodes(env, V, num_games=1000):
    '''
    Run some test games
    '''
    tot_rew = 0
    state = env.reset()

    for _ in range(num_games):
        done = False
        while not done:
            action = np.argmax([eval_state_action(V, state, a) for a in range(nA)])
            next_state, reward, done, _ = env.step(action)

            state = next_state
            tot_rew += reward 
            if done:
                state = env.reset()

    print('Won %i of %i games!'%(tot_rew, num_games))

            
if __name__ == '__main__':
    # create the environment
    #desc= custom_map  or desc= generate_random_map(size,p) 
    s=5
    env = gym.make('FrozenLake-v0',desc= custom_map)
    # enwrap it to have additional information from it
    env = env.unwrapped
    env.render()

    # spaces dimension
    nA = env.action_space.n
    nS = env.observation_space.n

    # Value iteration
    V = value_iteration(eps=0.0001)
    # test the value function on 100 games
    run_episodes(env, V)
    # print the state values
    print(V.reshape((5,5)))
    print(policy.reshape((5,5)))


[41mS[0mFFFH
FHFFF
FFHFH
FFFFF
FHFFG
Iter: 0  delta: 0.33333
Iter: 1  delta: 0.22
Iter: 2  delta: 0.15645
Iter: 3  delta: 0.11603
Iter: 4  delta: 0.0916
Iter: 5  delta: 0.06954
Iter: 6  delta: 0.05371
Iter: 7  delta: 0.04149
Iter: 8  delta: 0.03227
Iter: 9  delta: 0.0256
Iter: 10  delta: 0.02357
Iter: 11  delta: 0.02194
Iter: 12  delta: 0.02073
Iter: 13  delta: 0.01919
Iter: 14  delta: 0.01748
Iter: 15  delta: 0.0162
Iter: 16  delta: 0.0151
Iter: 17  delta: 0.01407
Iter: 18  delta: 0.01308
Iter: 19  delta: 0.01214
Iter: 20  delta: 0.01124
Iter: 21  delta: 0.01069
Iter: 22  delta: 0.01023
Iter: 23  delta: 0.00974
Iter: 24  delta: 0.00924
Iter: 25  delta: 0.00873
Iter: 26  delta: 0.00823
Iter: 27  delta: 0.00788
Iter: 28  delta: 0.00761
Iter: 29  delta: 0.00737
Iter: 30  delta: 0.00714
Iter: 31  delta: 0.0069
Iter: 32  delta: 0.00666
Iter: 33  delta: 0.00642
Iter: 34  delta: 0.00618
Iter: 35  delta: 0.00593
Iter: 36  delta: 0.00568
Iter: 37  delta: 0.00544
Iter: 38  delta: 0.0052
Iter