In [None]:
import gymnasium as gym
import torch

In [None]:
env = gym.make("FrozenLake-v1", render_mode="ansi")

n_state = env.observation_space.n
n_action = env.action_space.n


In [None]:
state, _ = env.reset()
print(env.render())

In [None]:
gamma = 0.99
threshold = 0.0001

In [None]:

V_res = torch.zeros(n_state)

for state in range(n_state):
    v_actions = torch.zeros(n_action)
    for action in range(n_action):
        for trans_prob, new_state, reward, _ in env.unwrapped.P[state][action]:
            #print(f"state={state}, action={action}, new_state={new_state} trans_prob={trans_prob},  reward={reward}")
            v_actions[action] += trans_prob * (reward + gamma * V_res[new_state])
    #print(f'state={state} v_actions={v_actions} max={torch.max(v_actions)} ')

In [None]:
def value_iteration(env: gym.Env, gamma, threshold):
    """
    Solve a given environment with value iteration algorithm
    @param env: OpenAI Gym environment
    @param gamma: discount factor
    @param threshold: the evaluation will stop once values for all states are less than the threshold
    @return: values of the optimal policy for the given environment
    """
    n_state = env.observation_space.n
    n_action = env.action_space.n
    V = torch.zeros(n_state)
    while True:
        V_temp = torch.empty(n_state)
        for state in range(n_state):
            v_actions = torch.zeros(n_action)
            for action in range(n_action):
                for trans_prob, new_state, reward, _ in env.unwrapped.P[state][action]:
                    # Суммируем все переходы из state в new_state
                    v_actions[action] += trans_prob * (reward + gamma * V[new_state])
            # находим action с максимальным значением - это будет оптимальное значение ценности для state        
            V_temp[state] = torch.max(v_actions)
        # контроль сходимости к оптимальному значению
        max_delta = torch.max(torch.abs(V - V_temp))
        V = V_temp.clone()
        if max_delta <= threshold:
            break
    return V


In [None]:
V_optimal = value_iteration(env, gamma, threshold)
print('Optimal values:\n{}'.format(V_optimal))


In [None]:
def extract_optimal_policy(env: gym.Env, V_optimal, gamma):
    """
    Obtain the optimal policy based on the optimal values
    @param env: OpenAI Gym environment
    @param V_optimal: optimal values
    @param gamma: discount factor
    @return: optimal policy
    """
    n_state = env.observation_space.n
    n_action = env.action_space.n
    optimal_policy = torch.zeros(n_state)
    for state in range(n_state):
        v_actions = torch.zeros(n_action)
        for action in range(n_action):
            for trans_prob, new_state, reward, _ in env.unwrapped.P[state][action]:
                v_actions[action] += trans_prob * (reward + gamma * V_optimal[new_state])
        optimal_policy[state] = torch.argmax(v_actions)
    return optimal_policy


In [None]:
optimal_policy = extract_optimal_policy(env, V_optimal, gamma)
print('Optimal policy:\n{}'.format(optimal_policy))


In [None]:
def run_episode(env: gym.Env, policy):
    state, _ = env.reset()
    episode_reward = 0
    while True:
        action = policy[state].item()
        state, reward, is_truncated, is_done, info = env.step(action)
        episode_reward += reward
        if is_done or is_truncated:
            break
    return episode_reward

In [None]:
n_episode = 1000
total_rewards = []
for episode in range(n_episode):
    total_reward = run_episode(env, optimal_policy)
    total_rewards.append(total_reward)

print('Average total reward under the optimal policy: {}'.format(sum(total_rewards) / n_episode))

