In [1]:
import numpy as np
import time
from Frozen_Lake import FrozenLakeEnv

env = FrozenLakeEnv()

Если награда не зависит от $s'$:
$$
q(s,a) = R(s, a) + \gamma \sum_{s'} P(s'|s,a) v(s')
$$
Если награда зависит от $s'$:
$$
q(s,a) = \sum_{s'} P(s'|s,a) \Big( R(s,a,s') + \gamma v(s')\Big)
$$
Если награда не зависит от $s'$ - это частный случай того, когда награда зависит от $s'$:
$$
q(s,a) = \sum_{s'} P(s'|s,a) \Big( R(s,a) + \gamma v(s')\Big) = R(s,a) \sum_{s'} P(s'|s,a) + \gamma \sum_{s'} P(s'|s,a) v(s')
 = R(s, a) + \gamma \sum_{s'} P(s'|s,a) v(s')
$$



In [2]:
def get_q_values(v_values, gamma):
    q_values = {}
    for state in env.get_all_states():
        q_values[state] = {}
        for action in env.get_possible_actions(state):
            q_values[state][action] = 0
            for next_state in env.get_next_states(state, action):
                q_values[state][action] += env.get_transition_prob(state, action, next_state) * env.get_reward(state, action, next_state)
                q_values[state][action] += gamma * env.get_transition_prob(state, action, next_state) * v_values[next_state]
    return q_values

In [3]:
def init_policy():
    policy = {}
    for state in env.get_all_states():
        policy[state] = {}
        for action in env.get_possible_actions(state):
            policy[state][action] = 1 / len(env.get_possible_actions(state))
    return policy

In [4]:
def init_v_values():
    v_values = {}
    for state in env.get_all_states():
        v_values[state] = 0
    return v_values

In [5]:
def policy_evaluation_step(v_values, policy, gamma):
    q_values = get_q_values(v_values, gamma)
    new_v_values = init_v_values()
    for state in env.get_all_states():
        new_v_values[state] = 0
        for action in env.get_possible_actions(state):
            new_v_values[state] += policy[state][action] * q_values[state][action]
    return new_v_values

In [6]:
def policy_evaluation(policy, gamma, eval_iter_n):
    v_values = init_v_values()
    for _ in range(eval_iter_n):
        v_values = policy_evaluation_step(v_values, policy, gamma)
    q_values = get_q_values(v_values, gamma)
    return q_values

In [7]:
def policy_improvement(q_values):
    policy = {}
    for state in env.get_all_states():
        policy[state] = {}
        argmax_action = None
        max_q_value = float('-inf')
        for action in env.get_possible_actions(state): 
            policy[state][action] = 0
            if q_values[state][action] > max_q_value:
                argmax_action = action
                max_q_value = q_values[state][action]
        policy[state][argmax_action] = 1
    return policy

In [8]:
iter_n = 100
eval_iter_n = 100
gamma = 0.999

policy = init_policy()
for _ in range(iter_n):
    q_values = policy_evaluation(policy, gamma, eval_iter_n)
    policy = policy_improvement(q_values)

In [12]:
total_rewards = []

for _ in range(1000):
    total_reward = 0
    state = env.reset()
    for _ in range(1000):
        action = np.random.choice(env.get_possible_actions(state), p=list(policy[state].values()))
        state, reward, done, _ = env.step(action)
        total_reward += reward
        
        if done:
            break
    
    total_rewards.append(total_reward)

np.mean(total_rewards)

0.859

In [13]:
policy

{(0, 0): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (0, 1): {'left': 0, 'down': 0, 'right': 0, 'up': 1},
 (0, 2): {'left': 1, 'down': 0, 'right': 0, 'up': 0},
 (0, 3): {'left': 0, 'down': 0, 'right': 0, 'up': 1},
 (1, 0): {'left': 1, 'down': 0, 'right': 0, 'up': 0},
 (1, 1): {None: 1},
 (1, 2): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (1, 3): {None: 1},
 (2, 0): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (2, 1): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (2, 2): {'left': 1, 'down': 0, 'right': 0, 'up': 0},
 (2, 3): {None: 1},
 (3, 0): {None: 1},
 (3, 1): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (3, 2): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (3, 3): {None: 1}}

In [14]:
state = env.reset()
for _ in range(1000):
    action = np.random.choice(env.get_possible_actions(state), p=list(policy[state].values()))
    state, reward, done, _ = env.step(action)
    total_reward += reward

    env.render()
    time.sleep(0.5)
    
    if done:
        break

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
FHFH
*FFH
HFFG

SFFF
FHFH
F*FH
HFFG

SFFF
FHFH
FFFH
H*FG

SFFF
FHFH
FFFH
H*FG

SFFF
FHFH
FFFH
HF*G

SFFF
FHFH
FFFH
HFF*

