In [1]:
import gym 
import time

envs = gym.envs.registry.values()
print("Available environments:")
for env in envs:
    print(env.id)
    
env = gym.make('FrozenLake-v1', render_mode = 'human', desc=None, map_name="4x4", is_slippery=True)
n_state = env.observation_space.n 

print('Number of states:', n_state)

n_action = env.action_space.n 

print('Number of actions: ', n_action)



Available environments:
CartPole-v0
CartPole-v1
MountainCar-v0
MountainCarContinuous-v0
Pendulum-v1
Acrobot-v1
LunarLander-v2
LunarLanderContinuous-v2
BipedalWalker-v3
BipedalWalkerHardcore-v3
CarRacing-v2
Blackjack-v1
FrozenLake-v1
FrozenLake8x8-v1
CliffWalking-v0
Taxi-v3
Reacher-v2
Reacher-v4
Pusher-v2
Pusher-v4
InvertedPendulum-v2
InvertedPendulum-v4
InvertedDoublePendulum-v2
InvertedDoublePendulum-v4
HalfCheetah-v2
HalfCheetah-v3
HalfCheetah-v4
Hopper-v2
Hopper-v3
Hopper-v4
Swimmer-v2
Swimmer-v3
Swimmer-v4
Walker2d-v2
Walker2d-v3
Walker2d-v4
Ant-v2
Ant-v3
Ant-v4
Humanoid-v2
Humanoid-v3
Humanoid-v4
HumanoidStandup-v2
HumanoidStandup-v4
Number of states: 16
Number of actions:  4


In [2]:
env.reset()
time.sleep(2)

In [3]:
env.render()
time.sleep(2)

In [5]:
#new_state,info = env.reset()

new_state, reward,terminated, truncated, info = env.step(2)
print(new_state)
print(reward)
print(terminated)
print(truncated)
print(info)

4
0.0
False
False
{'prob': 0.3333333333333333}


In [7]:
env.render()
time.sleep(2)

In [19]:
import torch 
def run_episode(env, policy):
    state,info = env.reset()
    total_reward = 0 
    steps = 0
    is_done = False 
    while not is_done:
        action = policy[state].item()
        state, reward, terminated, truncated,info = env.step(action) 
        is_done = terminated or truncated
        total_reward += reward
        steps += 1
        if is_done:
            break
    return total_reward
    
n_episodes = 1000
total_rewards = []
print("Running episodes...") 
print(env.spec.id)
print("Number of episodes: ", n_episodes) 
print("Number of states: ", n_state)
print("Number of actions: ", n_action) 

for episode in range(n_episodes):
    random_policy = torch.randint( high = n_action, size = (n_state,))
    total_reward =  run_episode(env, random_policy)
    total_rewards.append(total_reward) 

print(f"Average reward over {n_episodes} episodes: {sum(total_rewards)/n_episodes}")


Running episodes...
FrozenLake-v1
Number of episodes:  1000
Number of states:  16
Number of actions:  4
Average reward over 1000 episodes: 0.008


In [20]:
print("Transition Matrix:", env.env.P[6])

Transition Matrix: {0: [(0.3333333333333333, 2, 0.0, False), (0.3333333333333333, 5, 0.0, True), (0.3333333333333333, 10, 0.0, False)], 1: [(0.3333333333333333, 5, 0.0, True), (0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 7, 0.0, True)], 2: [(0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 7, 0.0, True), (0.3333333333333333, 2, 0.0, False)], 3: [(0.3333333333333333, 7, 0.0, True), (0.3333333333333333, 2, 0.0, False), (0.3333333333333333, 5, 0.0, True)]}


In [2]:
gamma = 0.99
threshold = 0.0001

#sample of a MDP algorithm understanding the value iteration algorithm
def value_iteration(env, gamma, threshold):
    n_state = env.observation_space.n
    n_action = env.action_space.n
    v = torch.zeros(n_state)
    
    while True: 
        v_temp = torch.empty(n_state)
        for state in range(n_state):
            v_action = torch.zeros(n_action)
            for action in range(n_action):
                for trans_prob, next_state, reward, _ in env.env.P[state][action]:
                    v_action[action] += trans_prob*(reward + gamma * v[next_state])
                v_temp[state] = torch.max(v_action)
        max_delta = torch.max(torch.abs(v - v_temp))
        v = v_temp.clone()
        if max_delta <= threshold:
            break
            
    return v 

v_optimal = value_iteration(env,gamma, threshold)
print("Optimal value function: ", v_optimal)

NameError: name 'env' is not defined

In [None]:
def extract_policy(env, v, gamma):
    n_state = env.observation_space.n
    n_action = env.action_space.n
    optimal_policy = torch.zeros(n_state, dtype=torch.int64)
    
    for state in range(n_state):
        v_action = torch.zeros(n_action)
        for action in range(n_action):
            for trans_prob, new_state, reward, _ in env.env.P[state][action]:
                v_action[action] += trans_prob*(reward + gamma * v_optimal[new_state])
        optimal_policy[state] = torch.argmax(v_action)
    return optimal_policy

optimal_policy = extract_policy(env, v, gamma)
print("Optimal policy: \n", optimal_policy)

NameError: name 'env' is not defined

In [None]:
#policy integration algorithm
def policy_evaluation(env, policy, gamma, threshold):
    n_state = policy.shape[0]
    v = torch.zeros(n_state)
    while True: 
        v_temp = torch.zeros(n_state)
        for state in range(n_state):
            action = policy[state].item()
            for trans_prob, new_state, reward, _ in env.env.P[state][action]:
                v_temp[state] += trans_prob*(reward + gamma * v[new_state])
            max_delta = torch.max(torch.abs(v - v_temp))
            v = v_temp.clone()
            if max_delta <= threshold: 
                break
    return v 

def policy_improvement(env, v,  gamma):
    n_state = env.observation_space.n 
    n_action = env.action_space.n 
    policy = torch.zeros(n_state)
    for state in range(n_state):
        v_action = torch.zeros(n_action)
        for action in range(n_action):
            for trans_phob, new_state, reward, _ in env.env.P[state][action]:
                v_action[action] += trans_phob *(reward + gamma*v[new_state])
        policy[state] = torch.argmax(v_action)
    return policy

def policy_iteration(env, gamma, threshold):
    n_state = env.observation_space.n
    n_action = env.action_space.n
    policy = torch.randint(high = n_action, size = (n_state,)).float()
    while True:
        v = policy_evaluation(env, policy, gamma, threshold)
        new_policy = policy_improvement(env, v, gamma)
        if torch.equal(policy, new_policy):
            break
        policy = new_policy.clone()
    return policy    