In [None]:
import gymnasium as gym
import numpy as np
# import matplotlib
# import matplotlib.pyplot as plt
# import matplotlib.cm as cm
# import matplotlib.pyplot as plt
# import matplotlib.animation as animation
# import torch.nn as nn



https://www.gymlibrary.dev/environments/box2d/lunar_lander/

## Planning:
The goal is to use value iteration to solve a simple doorkey problem using minigrid as an environment. 

I want to first build a system to build a good intuition for the value iteration method, then once I have created an agent and training loop based on that upgrade it to a fully connect MLP.

## approach
I want to first 


TODO:
- [ ] Test the environment with a random policy
- [ ] Create a table system for storing states
- [ ] Create action selection system based on state estimates
- [ ] Create an agent class that implements the actions
- [ ] Create a training loop
- [ ] Build evaluations
- [ ] Rebuild with FCN instead of tables


In [None]:
# make a new gym environment with doorkey 5x5 minigrid
env = gym.make('FrozenLake-v1', render_mode="rgb_array")

# print gym environment information
print('observation space:', env.observation_space)
print('action space:', env.action_space)





### Create value iteration table
Value iteration is a simple technique where you map each of the possible states that might exist in your environment with an estimation of the value of being in this state.

This works because we recursively update our value estimations from our final goal state which gives us maximum reward to infer the value of being in other states. Normally the value of a state is based on whether it moves us closer to or further away from our reward.




In [None]:
env.observation_space

In [None]:
# We create our initial table, which is the same size as our environment's state space
def initialize_values(env):
    return np.zeros(env.observation_space.n)


In [136]:
# Calculate q values for each action in a given state

def calculate_q_value(env, state, value_table, gamma=0.9):
    q_value = []
    for action in range(env.action_space.n):
        next_states_rewards = []
        for next_state_reward in env.P[state][action]:
            transition_probability, next_state, reward, _ = next_state_reward
            next_states_rewards.append((transition_probability * (reward + gamma * value_table[next_state])))
        q_value.append(np.sum(next_states_rewards))

    return q_value

def update_value_table(value_table, state, q_value):
    value_table[state] = max(q_value)
    return value_table

def has_converged(value_table, updated_value_table, epsilon):
    return np.sum(np.fabs(updated_value_table - value_table )) <= epsilon

def value_iteration(env, gamma=.99, epsilon=1e-14):
    value_table = initialize_values(env)
    
    while True: 
        updated_value_table = np.copy(value_table)
        for state in range(env.observation_space.n):
            q_value = calculate_q_value(env, state, value_table, gamma)
            value_table =  update_value_table(value_table, state, q_value)

        if has_converged(value_table, updated_value_table, epsilon):
            print("Value iteration converged")
            break
    return value_table


Value iteration converged


In [137]:
def extract_policy(env, value_table, gamma=0.99 ):
    observation_space_size = env.observation_space.n
    policy = np.zeros(observation_space_size, dtype=np.int64)
    for state in range(observation_space_size):
        Q_table = np.zeros(env.action_space.n)
        for action in range(env.action_space.n):
            for next_state_reward in env.P[state][action]:
                trans_prob, next_state, reward_prob, _ = next_state_reward
                Q_table[action] +=(trans_prob * (reward_prob + gamma * value_table[next_state]))
        print(np.argmax(Q_table))
        policy[state] = int(np.argmax(Q_table))
    return policy




0
3
3
3
0
0
0
0
3
1
0
0
0
2
1
0


array([0, 3, 3, 3, 0, 0, 0, 0, 3, 1, 0, 0, 0, 2, 1, 0])

In [138]:
# run episode
def run_episode(env, policy):
    state, _= env.reset()
    total_reward = 0
    done = False
    while not done:
        action = policy[state]
        state, reward, done,_,  info = env.step(action)
        total_reward += reward 
    
    return total_reward


In [139]:
def evaluate_policy(env, policy, num_episodes=100):
    scores = [run_episode(env, policy) for _ in range(num_episodes)]
    return np.mean(scores)

In [174]:
env = gym.make('FrozenLake-v1')
num_episodes = 1000
value_table = value_iteration(env, gamma=0.99, epsilon=1e-10)
policy = extract_policy(env, value_table)
score = evaluate_policy(env, policy, num_episodes)
print(f"Average score over {num_episodes} episodes: ", score)

Value iteration converged
0
3
3
3
0
0
0
0
3
1
0
0
0
2
1
0
Average score over 1000 episodes:  0.841


In [165]:
env = gym.make('FrozenLake-v1', render_mode="rgb_array")

import imageio
import matplotlib.pyplot as plt

def run_episode_visual(env, policy):
    state, _= env.reset()
    done = False
    frames = []  # List to hold the frames
    while not done:
        img = env.render()  # Get the current state as an RGB image
        frames.append(img)
        action = policy[state]
        state, reward, done,_ ,  info = env.step(action)
    frames.append(env.render())

    imageio.mimsave('policy_run.gif', frames, duration=0.1)

run_episode_visual(env, policy)



In [163]:
policy

array([0, 3, 3, 3, 0, 0, 0, 0, 3, 1, 0, 0, 0, 2, 1, 0])