In [9]:
import sys
from contextlib import closing

import numpy as np
from io import StringIO

from gym import utils, Env, spaces
from gym.utils import seeding
from gym.envs.toy_text import discrete

In [2]:
LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3

MAPS = {
#     "4x4": [
#         "SFFF",
#         "FHFH",
#         "FFFH",
#         "HFFG"
#     ],
    "8x8": [
        "SWWWWWWD",
        "WWWOOOWW",
        "WWWOWWWW",
        "WWWWWWOW",
        "WOOWWWOD",
        "OWWWOWWW",
        "WWWWWWWW",
        "DWWWOOWW"
    ],
}


In [8]:
def categorical_sample(prob_n, np_random):
    """
    Sample from categorical distribution
    Each row specifies class probabilities
    """
    prob_n = np.asarray(prob_n)
    csprob_n = np.cumsum(prob_n)
    return (csprob_n > np_random.rand()).argmax()

In [7]:
class SailingEnv(discrete.DiscreteEnv):
    """
    Winter is here. You and your friends were tossing around a frisbee at the
    park when you made a wild throw that left the frisbee out in the middle of
    the lake. The water is mostly frozen, but there are a few holes where the
    ice has melted. If you step into one of those holes, you'll fall into the
    freezing water. At this time, there's an international frisbee shortage, so
    it's absolutely imperative that you navigate across the lake and retrieve
    the disc. However, the ice is slippery, so you won't always move in the
    direction you intend.
    The surface is described using a grid like the following
        SFFF
        FHFH
        FFFH
        HFFG
    S : starting point, safe
    F : frozen surface, safe
    H : hole, fall to your doom
    G : goal, where the frisbee is located
    The episode ends when you reach the goal or fall in a hole.
    You receive a reward of 1 if you reach the goal, and zero otherwise.
    """

    metadata = {'render.modes': ['human', 'ansi']}

#     def __init__(self, desc=None, map_name="8x8", is_slippery=True):
    def __init__(self, config):
#         if desc is None and map_name is None:
#             desc = generate_random_map()
#         elif desc is None:
#             desc = MAPS[map_name]
        
        desc = MAPS[config["map_name"]]
        
        self.current_step = 0
        self.total_steps = config["total_steps"] 
        self.destinations = get_destination(desc)
        self.destinations_dict = {D: False for D in self.destinations}
        self.num_reached_destinations = 0
        self.total_destinations = len(self.destinations)
        self.is_all_destinations_reached = False
        
        if config["is_random_env"] == False:
            self.random_seed = config["random_seed"]
            random.seed(self.random_seed)
            
        self.desc = desc = np.asarray(desc, dtype='c')
        self.nrow, self.ncol = nrow, ncol = desc.shape
        self.reward_range = (0, 1)
        
        nA = 4
        nS = nrow * ncol

        isd = np.array(desc == b'S').astype('float64').ravel()
        isd /= isd.sum()

        P = {s: {a: [] for a in range(nA)} for s in range(nS)}
        
        self.reached_destinations = []
        
        def to_s(row, col):
            return row*ncol + col

        def inc(row, col, a):
            if a == LEFT:
                col = max(col - 1, 0)
            elif a == DOWN:
                row = min(row + 1, nrow - 1)
            elif a == RIGHT:
                col = min(col + 1, ncol - 1)
            elif a == UP:
                row = max(row - 1, 0)
            return (row, col)
        
        def get_destination(MAP):
            destination = []
            row = len(MAP)
            col = len(MAP[row-1])

            for i in range(row):
                for j in range(col):

                    newletter = MAP[i][j]
    #                 print(i, j, i*col + j, newletter)
                    if newletter == "D":

                        destination.append(i*col + j)
            return destination
        
        def seed(self, seed=None):
            self.np_random, seed = seeding.np_random(seed)
            return [seed]
        
        def reset(self):
            self.s = categorical_sample(self.isd, self.np_random)
            self.lastaction = None
            return int(self.s)
        
        def step(self, a):
            transitions = self.P[self.s][a]
            i = categorical_sample([t[0] for t in transitions], self.np_random)
            p, s, r, d = transitions[i]
            self.s = s
            self.lastaction = a
            self.current_step = self.current_step + 1
            if self.current_step == self.total_steps:
                d =  True
            if is_all_destination_reached:
                d = True
            return (int(s), r, d, {"prob": p})
        
        def update_reached_destinations(newstate):
            if newstate in self.destinations_dict:
                if self.destinations_dict[newstate] == False:
                    self.destinations_dict[newstate] = True
                    self.num_reached_destinations +=1
                    return True
                else:
                    return False
            
        def update_probability_matrix(row, col, action):
            newrow, newcol = inc(row, col, action)
            
            newstate = to_s(newrow, newcol)
            newletter = desc[newrow, newcol]
            is_updated_destinations = update_reached_destinations(newstate)
            
                
            done = bytes(newletter) in b'O'
            
            if is_updated_destinations == True:
                if self.num_reached_destinations == self.total_destinations:
                    done = True
                    
#             reward = float(newletter == b'G')
            reward = self.num_reached_destinations
            return newstate, reward, done
        
        for row in range(nrow):
            for col in range(ncol):
                s = to_s(row, col)
                for a in range(4):
                    li = P[s][a]
                    letter = desc[row, col]
                    if letter in b'GH':
                        li.append((1.0, s, 0, True))
                    else:
                        li.append((
                            1., *update_probability_matrix(row, col, a)
                        ))
                        
#                         if is_slippery:
#                             for b in [(a - 1) % 4, a, (a + 1) % 4]:
#                                 li.append((
#                                     1. / 3.,
#                                     *update_probability_matrix(row, col, b)
#                                 ))
#                         else:
#                             li.append((
#                                 1., *update_probability_matrix(row, col, a)
#                             ))

        super(SailingEnv, self).__init__(nS, nA, P, isd)

    def render(self, mode='human'):
        outfile = StringIO() if mode == 'ansi' else sys.stdout

        row, col = self.s // self.ncol, self.s % self.ncol
        desc = self.desc.tolist()
        desc = [[c.decode('utf-8') for c in line] for line in desc]
        desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
        if self.lastaction is not None:
            outfile.write("  ({})\n".format(
                ["Left", "Down", "Right", "Up"][self.lastaction]))
        else:
            outfile.write("\n")
        outfile.write("\n".join(''.join(line) for line in desc)+"\n")

        if mode != 'human':
            with closing(outfile):
                return outfile.getvalue()

In [None]:
environment_config = dict(
    total_steps = 50,
    random_seed = 10,
    is_random_env = False,
    map_name = "8x8",   
)
