In [20]:
import numpy as np
import sys

sys.path.append("../mini_rl_lib/")
from mdp import *
from policies import *
from wrappers import *

## Example 1

In [24]:
transition_probabilities = np.array([
    [[0.50, 0.00, 0.50],
     [0.70, 0.10, 0.20],
     [0.40, 0.00, 0.60]],
    [[0.00, 0.00, 1.00],
     [0.00, 0.95, 0.05],
     [0.30, 0.30, 0.40]],
])

def transition_function(s, a, next_s):
    p = transition_probabilities[a, s, next_s]
    #print(s, a, next_s, p)
    return p

In [25]:
def reward_function(s, a, next_s, r):
    if s == 0 and next_s == 1 and a == 0:
        return +5
    if s == 2 and next_s == 0 and a == 1:
        return -1
    return 0
    
all_rewards = np.array([-1, 0, +5])

In [45]:
def terminate_function(s):
    return s == 2

In [46]:
config = MDPConfig(
    state_space_type = SpaceType.DISCRETE,
    action_space_type = SpaceType.DISCRETE,
    transition_function_type = MDPTransitionType.SAS,
    reward_function_type = MDPRewardType.SAS,
    n_states = 3,
    n_actions = 2,
)

model = MDP(config)

In [47]:
state_range = Range(config.n_states)
observation_wrapper = DiscreteObservationWrapper(model, state_range)

In [48]:
action_range = Range(config.n_actions)
action_wrapper = DiscreteActionWrapper(model, action_range)

In [49]:
model.init(observation_wrapper, action_wrapper, transition_function, reward_function, terminate_function, all_rewards)

In [50]:
gamma = 0.9
eps = 1e-3
policy = ValueIterationDeterministicMDPPolicy(model, gamma, eps)
policy.get_policy()

array([0, 0, 0])

In [51]:
policy.fit()
policy.get_policy()

array([0, 0, 0])

In [53]:
observation, info = model.reset(seed=42)

for i in range(100):
    action = model.action_space.sample()
    observation, reward, terminated, truncated, info = model.step(action)
    print(action, "=>", observation, reward, terminated, truncated, info)

    if terminated or truncated:
        break

1 => 2 0 True False {}


In [57]:
observation, info = model.reset(seed=42)

for i in range(100):
    action = policy.get_policy()[observation]
    observation, reward, terminated, truncated, info = model.step(action)
    print(action, "=>", observation, reward, terminated, truncated, info)

    if terminated or truncated:
        break

0 => 2 None False True {'Error': 'Probabilities sum to zero'}


## Example 2

In [21]:
# Define the MDP configuration
mdp_config = MDPConfig(
    state_space_type=SpaceType.DISCRETE,
    action_space_type=SpaceType.DISCRETE,
    transition_function_type=MDPTransitionType.SA_DETERMINISTIC,
    reward_function_type=MDPRewardType.SA,
    n_states=9,  # 3x3 grid
    n_actions=4  # Up, Down, Left, Right
)

simple_gridworld_mdp = MDP(mdp_config)

def transition_function(s, a, next_s):
    if a == 0:  # Up
        return max(s - 3, 0)
    elif a == 1:  # Down
        return min(s + 3, 8)
    elif a == 2:  # Left
        return max(s - 1, 0) if s % 3 != 0 else s
    elif a == 3:  # Right
        return min(s + 1, 8) if (s + 1) % 3 != 0 else s

def reward_function(s, a, next_s, r):
    if next_s == 8:  # Goal state
        return 10
    else:
        return -10

def terminate_function(s):
    return s == 8

simple_gridworld_mdp.init(
    states=DiscreteObservationWrapper(simple_gridworld_mdp, Range(9)),
    actions=DiscreteActionWrapper(simple_gridworld_mdp, Range(4)),
    transition_function=transition_function,
    reward_function=reward_function,
    terminate_function=terminate_function
)


In [22]:
policy = ValueIterationDeterministicMDPPolicy(simple_gridworld_mdp, gamma, eps)
policy.get_policy()

array([0, 0, 0, 0, 0, 0, 0, 0, 0])

In [23]:
policy.fit()
policy.get_policy()

array([1, 1, 1, 1, 1, 1, 1, 1, 1])