In [1]:
import sys 
sys.path.append('../../..')

import numpy as np
import matplotlib.pyplot as plt

from grid_world.action import Action
from grid_world.grid_world import GridWorld
from grid_world.agents.q_explorer_agent import QExplorerAgent
from grid_world.visualization.format_objects import get_policy_rec_str, get_policy_eval_str, get_world_str
from grid_world.utils.returns import returns_from_reward
from grid_world.utils.policy import get_policy_rec, get_random_policy, sample_action

np.random.seed(21)

In [2]:
gworld = GridWorld(
    grid_shape=(4,5), 
    terminal_states_coordinates=((0,4),),
    walls_coordinates=((0,1), (1,1), (2,3)),
    traps_coordinates=((1,3),),
)
print(get_world_str(gworld))

3               

2          █    

1    █     ☠    

0 ⚐  █        ✘ 

  0  1  2  3  4 


In [3]:
from dynamic_programing.policy_improvement import dynamic_programing_gpi


# lets make some restrictions on the available actions
actions = [Action.up, Action.down, Action.left, Action.right]

def r(effect):
    if effect == -1:
        return -100
    elif effect == 1:
        return 0
    else:
        return -1
    
rewards_dict = {(s, a): r(gworld.take_action(s, a)[1]) for s in gworld.states for a in actions}
rewards = lambda x, y: rewards_dict[(x, y)]

## Guide

In [4]:
def world_model(s, a, world = gworld):
    final_state = world.take_action(s, a)[0]
    return (lambda x: 1 if x == final_state else 0)

policy, eval_function = dynamic_programing_gpi(
    world_model=world_model,
    reward_function=rewards,
    actions=actions,
    states=gworld.states,
)

policy converged in 4 epochs


In [5]:
pi_r = get_policy_rec(policy, gworld, actions)
print(get_policy_rec_str(pi_r, gworld))

 ↓  ↓  ↓  →  ↓ 

 →  →  ↓  █  ↓ 

 ↑  █  ↓  ☠  ↓ 

 ↑  █  →  →  ✘ 




## Agent

### discovery run

In [6]:
from typing import Final, Collection

from grid_world.action import Action
from grid_world.grid_world import GridWorld
from grid_world.state import State
from grid_world.type_aliases import Policy, RewardFunction, Q
from grid_world.utils.evaluators import best_q_value
from grid_world.utils.policy import (
    get_random_policy,
    sample_action,
    get_explorer_policy,
    get_reasonable_actions,
)
from grid_world.utils.returns import returns_from_reward
from utils.operations import add_tuples

In [7]:
class BasicAgent:
    def __init__(
        self,
        reward_function: RewardFunction,
        actions: Collection[Action] = None,
        policy: Policy = None,
        gamma: float = 1,
        alpha: float = 0.1,
        epsilon: float = 0.1,
    ):
        self.reward_function: Final = reward_function
        self.actions: Final = actions if actions is not None else tuple(Action)
        self.policy = Policy if policy is not None else get_random_policy(self.actions)
        self.gamma = gamma
        self.alpha = alpha
        self.epsilon = epsilon
        self.world_map: set[State] = set()
            
    def update_world_map(self, state, action, new_state):
        if new_state == state:
            self.world_map.add(
                State(add_tuples(state.coordinates, action.direction), "wall")
            )
        else:
            self.world_map.add(new_state)

    
def run_random_episode(
    agent, world, max_steps = 1000000
):
    
    state = world.initial_state
    episode_terminated = False
    episode_states = [state]
    episode_actions = []
    episode_rewards = []

    for _ in range(max_steps):
        action = sample_action(agent.policy, state, agent.actions)
        new_state, effect = world.take_action(state, action)
        reward = agent.reward_function(effect)
        agent.update_world_map(state, action, new_state)

        episode_actions.append(action)
        episode_states.append(new_state)
        episode_rewards.append(reward)

        if new_state.kind == "terminal":
            episode_terminated = True
            break
            
        state = new_state

    return episode_terminated, episode_states, episode_actions, episode_rewards

In [8]:
agent = BasicAgent(r, actions)
episode_terminated, episode_states, episode_actions, episode_rewards = run_random_episode(agent, gworld)
len(set(episode_states))

11

### determine optimistc world and policy

In [9]:
def get_state_by_kind(kind, world_map, world_size):
    return tuple(
        a.coordinates for a in agent.world_map if (
            a.kind == kind and all(0 <= x < world_size for x in a.coordinates)
        )
    )
            
get_state_by_kind("terminal", agent.world_map, 14)

((0, 4),)

In [10]:
def build_opt_world(world_size, agent):
    return GridWorld(
        grid_shape=(world_size, world_size), 
        terminal_states_coordinates=get_state_by_kind("terminal", agent.world_map, world_size),
        walls_coordinates=get_state_by_kind("wall", agent.world_map, world_size),
        traps_coordinates=get_state_by_kind("trap", agent.world_map, world_size),
    )
optimistic_world = build_opt_world(6, agent)
print(get_world_str(optimistic_world))

5                  

4    █             

3                  

2          █       

1                  

0 ⚐  █        ✘    

  0  1  2  3  4  5 


In [11]:
def get_world_model(world):
    return lambda s, a: lambda x: 1 if x == world.take_action(s, a)[0] else 0

def build_gpi_policy(world, r_map, actions):
    world_model = get_world_model(world)
    
    rewards_dict = {(s, a): r(world.take_action(s, a)[1]) 
                    for s in world.states
                    for a in actions
                    }
    rewards = lambda x, y: rewards_dict[(x, y)]

    policy, _ = dynamic_programing_gpi(
        world_model=world_model,
        reward_function=rewards,
        actions=actions,
        states=world.states,
    )
    return policy

policy = build_gpi_policy(optimistic_world, r, actions)

policy converged in 1 epochs


In [12]:
pi_r = get_policy_rec(policy, optimistic_world, actions)
print(get_policy_rec_str(pi_r, optimistic_world))

 ↓  →  ↓  ↓  ↓  ↓ 

 ↓  █  ↓  ↓  ↓  ↓ 

 ↓  ↓  ↓  →  ↓  ↓ 

 ↓  ↓  ↓  █  ↓  ↓ 

 →  →  ↓  ↓  ↓  ↓ 

 ↑  █  →  →  ✘  ← 




### improved run

In [13]:
agent.policy = policy

def run_opt_episode(
    agent, world, max_steps = 1000000
):
    
    state = world.initial_state
    episode_terminated = False
    episode_states = [state]
    episode_actions = []
    episode_rewards = []
    
    optimistic_world = build_opt_world(6, agent)
    policy_rec = get_policy_rec(agent.policy, optimistic_world, agent.actions)

    for _ in range(max_steps):
        action = policy_rec[state]
        new_state, effect = world.take_action(state, action)
        reward = agent.reward_function(effect)
        agent.update_world_map(state, action, new_state)

        episode_actions.append(action)
        episode_states.append(new_state)
        episode_rewards.append(reward)

        if new_state.kind == "terminal":
            episode_terminated = True
            break
            
        #check if policy is going well; if not we update our optimistic map, and then our policy
        if new_state == state or new_state.kind == "trap":
            optimistic_world = build_opt_world(6, agent)
            agent.policy = build_gpi_policy(optimistic_world, r, actions)
            policy_rec = get_policy_rec(agent.policy, optimistic_world, agent.actions)

        state = new_state

    return episode_terminated, episode_states, episode_actions, episode_rewards

In [14]:
agent.policy = policy
episode_terminated, episode_states, episode_actions, episode_rewards = run_opt_episode(agent, gworld)
len(set(episode_states))

policy converged in 1 epochs
 ↓  →  ↓  ↓  ↓  ↓ 

 ↓  █  ↓  ↓  ↓  ↓ 

 ↓  ↓  ↓  →  ↓  ↓ 

 →  →  ↓  █  ↓  ↓ 

 ↑  █  ↓  ↓  ↓  ↓ 

 ↑  █  →  →  ✘  ← 




9

In [15]:
optimistic_world = build_opt_world(6, agent)
pi_r = get_policy_rec(agent.policy, optimistic_world, actions)
print(get_policy_rec_str(pi_r, optimistic_world))

 ↓  →  ↓  ↓  ↓  ↓ 

 ↓  █  ↓  ↓  ↓  ↓ 

 ↓  ↓  ↓  →  ↓  ↓ 

 →  →  ↓  █  ↓  ↓ 

 ↑  █  ↓  ↓  ↓  ↓ 

 ↑  █  →  →  ✘  ← 




In [16]:
print(get_policy_rec_str(pi_r, gworld))

 ↓  ↓  ↓  →  ↓ 

 →  →  ↓  █  ↓ 

 ↑  █  ↓  ☠  ↓ 

 ↑  █  →  →  ✘ 


