In [1]:
import sys
from contextlib import closing

import numpy as np
from six import StringIO, b

from gym import utils
from gym.envs.toy_text import discrete

LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3

MAPS = {
    "theAlley": [
        "S...H...H...G"
    ],
    "walkInThePark": [
        "S.......",
        ".....H..",
        "........",
        "......H.",
        "........",
        "...H...G"
    ],
    "1Dtest": [

    ],
    "4x4": [
        "S...",
        ".H.H",
        "...H",
        "H..G"
    ],
    "8x8": [
        "S.......",
        "........",
        "...H....",
        ".....H..",
        "...H....",
        ".HH...H.",
        ".H..H.H.",
        "...H...G"
    ],
}

POTHOLE_PROB = 0.2
BROKEN_LEG_PENALTY = -10
SLEEP_DEPRIVATION_PENALTY = -0.0
REWARD = 10

def generate_random_map(size=8, p=0.8):
    """Generates a random valid map (one that has a path from start to goal)
    :param size: size of each side of the grid
    :param p: probability that a tile is frozen
    """
    valid = False

    # DFS to check that it's a valid path.
    def is_valid(res):
        frontier, discovered = [], set()
        frontier.append((0,0))
        while frontier:
            r, c = frontier.pop()
            if not (r,c) in discovered:
                discovered.add((r,c))
                directions = [(1, 0), (0, 1), (-1, 0), (0, -1)]
                for x, y in directions:
                    r_new = r + x
                    c_new = c + y
                    if r_new < 0 or r_new >= size or c_new < 0 or c_new >= size:
                        continue
                    if res[r_new][c_new] == 'G':
                        return True
                    if (res[r_new][c_new] not in '#H'):
                        frontier.append((r_new, c_new))
        return False

    while not valid:
        p = min(1, p)
        res = np.random.choice(['.', 'H'], (size, size), p=[p, 1-p])
        res[0][0] = 'S'
        res[-1][-1] = 'G'
        valid = is_valid(res)
    return ["".join(x) for x in res]


class DrunkenWalkEnv(discrete.DiscreteEnv):
    """
    A simple grid environment, completely based on the code of 'FrozenLake', credits to 
    the original authors.

    You're finding your way home (G) after a great party which was happening at (S).
    Unfortunately, due to recreational intoxication you find yourself only moving into 
    the intended direction 80% of the time, and perpendicular to that the other 20%.

    To make matters worse, the local community has been cutting the budgets for pavement
    maintenance, which means that the way to home is full of potholes, which are very likely
    to make you trip. If you fall, you are obviously magically transported back to the party, 
    without getting some of that hard-earned sleep.

        S...
        .H.H
        ...H
        H..G
    H : pothole, you have a POTHOLE_PROB chance of tripping
    G : goal, time for bed

    The episode ends when you reach the goal or trip.
    You receive a reward of +10 if you reach the goal, 
    but get a SLEEP_DEPRIVATION_PENALTY and otherwise.  剥夺睡眠

    """

    metadata = {'render.modes': ['human', 'ansi']}  # the meaning of ansi

    def __init__(self, desc=None, map_name="4x4",is_slippery=True):
        """ This generates a map and sets all transition probabilities.

            (by passing constructed nS, nA, P, isd to DiscreteEnv)
        """
        if desc is None and map_name is None:
            desc = generate_random_map()
        elif desc is None:
            desc = MAPS[map_name]

        self.desc = desc = np.asarray(desc,dtype='c')
        self.nrow, self.ncol = nrow, ncol = desc.shape  # start 6*8
        self.reward_range = (0, 1)

        nA = 4
        nS = nrow * ncol
        # compare desc with S, find the big S position
        isd = np.array(desc == b'S').astype('float64').ravel() # ravel: transfer n-D to 1-D 
#         print(isd)
# [1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
#  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
# ---episode 0---
        isd /= isd.sum()

        # We need to pass 'P' to DiscreteEnv:
        # P dictionary dict of dicts of lists, where
        # P[s][a] == [(probability, nextstate, reward, done), ...]
        P = {s : {a : [] for a in range(nA)} for s in range(nS)}  # how can P represent so many params

        def to_s(row, col):
            return row*ncol + col

        #def inc(row, col, a):
        def intended_destination(row, col, a):
            if a == LEFT:
                col = max(col-1,0)
            elif a == DOWN:
                row = min(row+1,nrow-1)
            elif a == RIGHT:
                col = min(col+1,ncol-1)
            elif a == UP:
                row = max(row-1,0)
            return (row, col)

        def construct_transition_for_intended(row, col, a, prob, li):
            """ this constructs a transition to the "intended_destination(row, col, a)"
                and adds it to the transition list (which could be for a different action b).

            """
            newrow, newcol = intended_destination(row, col, a)  # the next step position
            newstate = to_s(newrow, newcol)   # how to understand the new state is a integer varaible
            newletter = desc[newrow, newcol]  # record the new letter
            done = bytes(newletter) in b'G'   # when home is done
            rew = REWARD if newletter == b'G' else SLEEP_DEPRIVATION_PENALTY
            li.append( (prob, newstate, rew, done) )


        for row in range(nrow):
            for col in range(ncol):
                # specify transitions for s=(row, col)
                s = to_s(row, col)
                letter = desc[row, col]
                for a in range(4):
                    # specify transitions for action a
                    li = P[s][a] # s is states, a is action
                    if letter in b'G':
                        #when we are at goal, we reset with prob. 1
                        li.append((1.0, s, 42, True))   # prob newstate reward done
                        #really, this should not be happening, since we get done
                        #when transitioning TO the g
                        # with prob. 0.8 we move as intended:
                        construct_transition_for_intended(row, col, a, 0.8, li)
                        # but with prob. 0.1 we move sideways to intended:
                        for b in [(a-1)%4, (a+1)%4]: # 其他的  0.1
                            construct_transition_for_intended(row, col, b, 0.1, li)

        super(DrunkenWalkEnv, self).__init__(nS, nA, P, isd)

    def action_to_string(self, action_index):
        s ="{}".format(["Left","Down","Right","Up"][action_index])
        return s

    def render(self, mode='human'):
        outfile = StringIO() if mode == 'ansi' else sys.stdout

        row, col = self.s // self.ncol, self.s % self.ncol
        desc = self.desc.tolist()
        desc = [[c.decode('utf-8') for c in line] for line in desc]
        desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
        if self.lastaction is not None:
            outfile.write(" (last action was '{action}')\n".format( action=self.action_to_string(self.lastaction) ) )
        else:
            outfile.write("\n")
        outfile.write("\n".join(''.join(line) for line in desc)+"\n")

        if mode != 'human':
            with closing(outfile):
                return outfile.getvalue()

In [6]:
import numpy as np
import gym
from gym import wrappers


def run_episode(env, policy, gamma = 1.0, render = False):
    """ Evaluates policy by using it to run an episode and finding its
    total reward.
    args:
    env: gym environment.
    policy: the policy to be used.
    gamma: discount factor.
    render: boolean to turn rendering on/off.
    returns:
    total reward: real value of the total reward recieved by agent under policy.
    """
    obs = env.reset()
    total_reward = 0
    step_idx = 0
    while True:
        if render:
            env.render()
        obs, reward, done , _ = env.step(int(policy[obs]))
        total_reward += (gamma ** step_idx * reward)
        step_idx += 1
        if done:
            break
    return total_reward


def evaluate_policy(env, policy, gamma = 1.0,  n = 100):
    """ Evaluates a policy by running it n times.
    returns:
    average total reward
    """
    for i in range(n) :
        scores = run_episode(env, policy, gamma = gamma, render = False)
        print(scores)
    return 

def extract_policy(v, gamma = 1.0):
    """ Extract the policy given a value-function """
    policy = np.zeros(env.nS)
    for s in range(env.nS):
        q_sa = np.zeros(env.action_space.n)
        for a in range(env.action_space.n):
            for next_sr in env.P[s][a]:
                # next_sr is a tuple of (probability, next state, reward, done)
                p, s_, r, _ = next_sr
                q_sa[a] += (p * (r + gamma * v[s_]))
        policy[s] = np.argmax(q_sa)
    return policy


def value_iteration(env, gamma = 1.0):
    """ Value-iteration algorithm """
    v = np.zeros(env.nS)  # initialize value-function
    max_iterations = 100000
    eps = 1e-20
    for i in range(max_iterations):
        prev_v = np.copy(v)
        for s in range(env.nS):
            q_sa = [sum([p*(r + prev_v[s_]) for p, s_, r, _ in env.P[s][a]]) for a in range(env.nA)] 
            v[s] = max(q_sa)
        if (np.sum(np.fabs(prev_v - v)) <= eps):
            print ('Value-iteration converged at iteration# %d.' %(i+1))
            break
    return v


if __name__ == '__main__':
    env = DrunkenWalkEnv(map_name="theAlley")
    gamma = 0.9
    optimal_v = value_iteration(env, gamma);
    policy = extract_policy(optimal_v, gamma)
    policy_score = evaluate_policy(env, policy, gamma, n=100)
#     print('Policy average score = ', policy_score)



ValueError: attempt to get argmax of an empty sequence