In [7]:
import numpy as np
from rlberry.agents import AgentWithSimplePolicy

In [5]:
import math
import rlberry.spaces as spaces
from rlberry_research.envs.finite import GridWorld
from rlberry_research.rendering import Scene, GeometricPrimitive

import rlberry

logger = rlberry.logger


# def get_nroom_state_coord(state_index, nroom_env):
#     yy, xx = nroom_env.index2coord[state_index]
#     # centering
#     xx = xx + 0.5
#     yy = yy + 0.5
#     # map to [0, 1]
#     xx = xx / nroom_env.ncols
#     yy = yy / nroom_env.nrows
#     return np.array([xx, yy])
#     2024/08/25: modified
#     make the trap states random


class NRoom(GridWorld):
    """
    GridWorld with N rooms of size L x L. The agent starts in the middle room.

    There is one small and easy reward in the first room,
    one big reward in the last room and zero reward elsewhere.

    There is a 5% error probability in the transitions when taking an action.

    Parameters
    ----------
    nrooms : int
        Number of rooms.
    reward_free : bool, default=False
        If true, no rewards are given to the agent.
    array_observation:
        If true, the observations are converted to an array (x, y)
        instead of a discrete index.
        The underlying discrete space is saved in env.discrete_observation_space.
    room_size : int
        Dimension (L) of each room (L x L).
    success_probability : double, default: 0.95
        Sucess probability of an action. A failure is going to the wrong direction.
    remove_walls : bool, default: False
        If True, remove walls. Useful for debug.
    initial_state_distribution: {'center', 'uniform'}
        If 'center', always start at the center.
        If 'uniform', start anywhere with uniform probability.
    include_traps: bool, default: False
        If true, each room will have a terminal state (a "trap").
    Notes
    -----
    The function env.sample() does not handle conversions to array states
    when array_observation is True. Only the functions env.reset() and
    env.step() are covered.
    """

    name = "N-Room"

    def __init__(
        self,
        nrooms=7,
        reward_free=False,
        array_observation=False,
        room_size=5,
        success_probability=0.95,
        remove_walls=False,
        initial_state_distribution="center",
        include_traps=False,
    ):
        assert nrooms > 0, "nrooms must be > 0"
        assert initial_state_distribution in ("center", "uniform")

        self.reward_free = reward_free
        self.array_observation = array_observation
        self.nrooms = nrooms
        self.room_size = room_size
        self.success_probability = success_probability
        self.remove_walls = remove_walls
        self.initial_state_distribution = initial_state_distribution
        self.include_traps = include_traps

        # Max number of rooms/columns per row
        self.max_rooms_per_row = 5

        # Room size (default = 5x5)
        self.room_size = room_size

        # Grid size
        self.room_nrows = math.ceil(nrooms / self.max_rooms_per_row)
        if self.room_nrows > 1:
            self.room_ncols = self.max_rooms_per_row
        else:
            self.room_ncols = nrooms
        nrows = self.room_size * self.room_nrows + (self.room_nrows - 1)
        ncols = self.room_size * self.room_ncols + (self.room_ncols - 1)

        # # walls
        walls = []
        for room_col in range(self.room_ncols - 1):
            col = (room_col + 1) * (self.room_size + 1) - 1
            for jj in range(nrows):
                if (jj % (self.room_size + 1)) != (self.room_size // 2):
                    walls.append((jj, col))

        for room_row in range(self.room_nrows - 1):
            row = (room_row + 1) * (self.room_size + 1) - 1
            for jj in range(ncols):
                walls.append((row, jj))

        # process each room
        start_coord = None
        terminal_state = None
        self.traps = []
        count = 0
        for room_r in range(self.room_nrows): 
            if room_r % 2 == 0:
                cols_iterator = range(self.room_ncols)
            else:
                cols_iterator = reversed(range(self.room_ncols))
            for room_c in cols_iterator:
                # existing rooms
                if count < self.nrooms:
                    # remove top wall
                    if ((room_c == self.room_ncols - 1) and (room_r % 2 == 0)) or (
                        (room_c == 0) and (room_r % 2 == 1)
                    ):
                        if room_r != self.room_nrows - 1:
                            wall_to_remove = self._convert_room_coord_to_global(
                                room_r, room_c, self.room_size, self.room_size // 2
                            )
                            if wall_to_remove in walls:
                                walls.remove(wall_to_remove)
                # rooms to remove
                else:
                    for ii in range(-1, self.room_size + 1):
                        for jj in range(-1, self.room_size + 1):
                            wall_to_include = self._convert_room_coord_to_global(
                                room_r, room_c, ii, jj
                            )
                            if (
                                wall_to_include[0] >= 0
                                and wall_to_include[0] < nrows
                                and wall_to_include[1] >= 0
                                and wall_to_include[1] < ncols
                                and (wall_to_include not in walls)
                            ):
                                walls.append(wall_to_include)
                    pass

                # start coord
                if count == nrooms // 2:
                    start_coord = self._convert_room_coord_to_global(
                        room_r, room_c, self.room_size // 2-1, self.room_size // 2-1
                    )
                # terminal state
                if count == nrooms - 1:
                    terminal_state = self._convert_room_coord_to_global(
                        room_r, room_c, self.room_size // 2+1, self.room_size // 2+1
                    )
                # trap
                if include_traps:
                    self.traps.append(
                        self._convert_room_coord_to_global(
                            room_r,
                            room_c,
                            self.room_size // 2 ,
                            self.room_size // 2 -1,
                        )
                    )
                    self.traps.append(
                        self._convert_room_coord_to_global(
                            room_r,
                            room_c,
                            self.room_size // 2 -1 ,
                            self.room_size // 2 ,
                        )
                    )
                    self.traps.append(
                        self._convert_room_coord_to_global(
                            room_r,
                            room_c,
                            self.room_size // 2,
                            self.room_size -1 ,
                        )
                    )
                count += 1

        terminal_states = (terminal_state,) + tuple(self.traps)

        if self.reward_free:
            reward_at = {}
        else:
            reward_at = {
                terminal_state: 1.0,
                start_coord: 0.01,
                (self.room_size // 2, self.room_size // 2): 0.1,
                (1,self.room_size -1):0.15
            }

        # Check remove_walls
        if remove_walls:
            walls = ()

        # Init base class
        GridWorld.__init__(
            self,
            nrows=nrows,
            ncols=ncols,
            start_coord=start_coord,
            terminal_states=terminal_states,
            success_probability=success_probability,
            reward_at=reward_at,
            walls=walls,
            default_reward=0.0,
        )

        # Check initial distribution
        if initial_state_distribution == "uniform":
            distr = np.ones(self.observation_space.n) / self.observation_space.n
            self.set_initial_state_distribution(distr)

        # spaces
        if self.array_observation:
            self.discrete_observation_space = self.observation_space
            self.observation_space = spaces.Box(0.0, 1.0, shape=(2,))

    def _convert_room_coord_to_global(
        self, room_row, room_col, room_coord_row, room_coord_col
    ):
        col_offset = (self.room_size + 1) * room_col
        row_offset = (self.room_size + 1) * room_row

        row = room_coord_row + row_offset
        col = room_coord_col + col_offset
        return (row, col)

    def _convert_index_to_float_coord(self, state_index):
        yy, xx = self.index2coord[state_index]

        # centering
        xx = xx + 0.5
        yy = yy + 0.5
        # map to [0, 1]
        xx = xx / self.ncols
        yy = yy / self.nrows
        return np.array([xx, yy])

    def reset(self, seed=None, options=None):
        self.state, info = GridWorld.reset(self, seed=seed, options=options)
        state_to_return = self.state
        if self.array_observation:
            state_to_return = self._convert_index_to_float_coord(self.state)
        return state_to_return, info

    def step(self, action):
        assert self.action_space.contains(action), "Invalid action!"

        # save state for rendering
        if self.is_render_enabled():
            self.append_state_for_rendering(self.state)

        # take step
        next_state, reward, terminated, truncated, info = self.sample(
            self.state, action
        )
        self.state = next_state

        state_to_return = self.state
        if self.array_observation:
            state_to_return = self._convert_index_to_float_coord(self.state)

        return state_to_return, reward, terminated, truncated, info

    def get_background(self):
        """
        Returne a scene (list of shapes) representing the background
        """
        bg = Scene()

        # traps
        for y, x in self.traps:
            shape = GeometricPrimitive("POLYGON")
            shape.set_color((0.5, 0.0, 0.0))
            shape.add_vertex((x, y))
            shape.add_vertex((x + 1, y))
            shape.add_vertex((x + 1, y + 1))
            shape.add_vertex((x, y + 1))
            bg.add_shape(shape)

        # walls
        for wall in self.walls:
            y, x = wall
            shape = GeometricPrimitive("POLYGON")
            shape.set_color((0.25, 0.25, 0.25))
            shape.add_vertex((x, y))
            shape.add_vertex((x + 1, y))
            shape.add_vertex((x + 1, y + 1))
            shape.add_vertex((x, y + 1))
            bg.add_shape(shape)

        # rewards
        for y, x in self.reward_at:
            flag = GeometricPrimitive("POLYGON")
            rwd = self.reward_at[(y, x)]
            if rwd == 1.0:
                flag.set_color((0.0, 0.5, 0.0))
            elif rwd == 0.1:
                flag.set_color((0.0, 0.0, 0.5))
            else:
                flag.set_color((0.5, 0.0, 0.0))

            x += 0.5
            y += 0.25
            flag.add_vertex((x, y))
            flag.add_vertex((x + 0.25, y + 0.5))
            flag.add_vertex((x - 0.25, y + 0.5))
            bg.add_shape(flag)

        return bg

In [6]:
# define RF-agent
class RFAgent(AgentWithSimplePolicy):
    name="RFAgent"
    def __init__(
            self,
            env,
            gamma=1,
            horizon=100,
            delta=0.1,
            varepsilon=0.1,
            cb=0.01,
            **kwargs
    ):
        # init base class
        AgentWithSimplePolicy.__init__(self,env=env,**kwargs)

        # basic parameters
        self.gamma=gamma
        self.horizon=horizon
        self.varepsilon=varepsilon
        self.delta=delta
        self.cb=cb
        self.initial_state,_=self.env.reset()
        self.S=self.env.observation_space.n
        self.A=self.env.action_space.n

        #task information
        self.R_hat=np.zeros((self.S,self.A)) # reward info
        self.bonus=np.ones((self.S,self.A))*self.horizon #should multiply H
        self.P_hat=np.ones((self.S,self.A,self.S))*1.0/self.S #estimated transition kernel
        self.Nsas=np.zeros((self.S,self.A,self.S)) #visitation count of (s,a,s')
        self.Nsa=np.zeros((self.S,self.A)) #visitation count of (s,a)

        # uncertainty function
        self.Whsa=np.ones((self.horizon,self.S,self.A))

    def eval(self,**kwargs):
        """
        Returns a value corresponding to the evaluation of the agent on the
        evaluation environment.

        For instance, it can be a Monte-Carlo evaluation of the policy learned in fit().
        """
        return super().eval() #use the eval() from AgentWithSimplePolicy
    
    #calculate bonus
    def beta(self,n):
        beta = np.log(6*self.S*self.A*self.horizon/self.delta) + self.S*np.log(8*np.exp(1)*(n+1))
        return beta
    
    def update(self,s,a,next_state,reward):
        self.Nsas[s,a,next_state]+=1
        self.Nsa[s,a]+=1

        n_sa=self.Nsa[s,a]
        n_sas=self.Nsas[s,a,:]
        self.P_hat[s,a,:]=n_sas/n_sa
        self.R_hat[s,a]=reward
        self.bonus[s,a]=self.beta(n_sa)/n_sa

    def backW(self,s,a,h):
        out=0
        for i in range(self.S):
            out+=self.P_hat[s,a,i]*np.max(self.Whsa[h+1,i,:])
        return out

    def update_value(self):
        for h in range(self.horizon-1,-1,-1):
            if h==self.horizon-1:
                for s in range(self.S):
                    for a in range(self.A):
                        self.Whsa[h,s,a]=min(1,0.0005*self.cb*4*self.horizon*self.bonus[s,a])
            else:
                for s in range(self.S):
                    for a in range(self.A):
                        self.Whsa[h,s,a]=min(1,0.0005*self.cb*4*self.horizon*self.bonus[s,a]+self.backW(s,a,h))

    def policy(self,observation,step):
        return self.Whsa[step,observation,:].argmax()
    
    def fit(self,budget,**kwargs):
        T=budget
        for t in range(T):
            if ((t+1)%1000)==0:
                print("Episode",t+1)
            self.update_value()
            observation,info=self.env.reset()
            done=False
            step=0
            reward=0
            current_state=0
            while step<self.horizon:
                # terminal state is an absorbing state
                if done:
                    action=self.policy(current_state,step)
                    next_step=current_state
                    self.update(current_state,action,next_step,reward)
                else:
                    action=self.policy(observation,step)
                    next_step,reward,terminated,truncated,info=self.env.step(action)
                    #update visitation count and policy
                    self.update(observation,action,next_step,reward)
                    current_state=observation
                    observation=next_step
                    done=terminated or truncated
                step+=1
        
        np.save("source_counts.npy",self.Nsa)
        np.save("source_transition.npy",self.P_hat)
        return self.R_hat,self.P_hat

In [None]:
# generate source data
source_env=NRoom(
    nrooms=1,
    remove_walls=False,
    room_size=4,
    initial_state_distribution="center", #see if we can change to others
    include_traps=False,
)
T=100000
agent=RFAgent(source_env,gamma=1,horizon=20,cb=0.002)
agent.fit(budget=T)
# save source_count and source_transition after that

In [None]:
# get V_opt by value iteration
def value_iteration(S,A,H,R,P):
    # initialize value function
    V=np.zeros((H,S))
    # value iteration
    for h in range(H-1,-1,-1):
        if h==H-1:
            for s in range(S):
                V[h,s]=np.max(R[s,:])
        else:
            for s in range(S):
                if (sum([1-P[s,a,s] for a in range(A)])<=0.01):
                    V[h,s]=np.max(R[s,:])
                else:
                    best_next_value=0
                    for a in range(A):
                        current_next_value=R[s,a]+np.dot(P[s,a,:],V[h+1,:])
                        if current_next_value>best_next_value:
                            best_next_value=current_next_value
                    V[h,s]=best_next_value
    # return V_1, dim=S
    return V[0,:]

In [None]:
# generate v_opt for different success probability
opt_success=[]
for i in range(1,13):
    success=0.95-i*0.05
    # generate estimate of r and p
    env=NRoom(
        nrooms=1,
        remove_walls=False,
        room_size=4,
        initial_state_distribution="center", #see if we can change to others
        include_traps=True,
        success_probability=success
    )
    rfAgent=RFAgent(env=env,horizon=20,gamma=1,cb=0.002)
    R,P=rfAgent.fit(budget=10000)
    # calculate optimal value
    opt=value_iteration(env=env,R=R,P=P)
    opt_v=opt[rfAgent.initial_state]
    opt_success.append([success,opt_v])
opt_success=np.array(opt_success)
np.save("../data/vopt.npy",opt_success)