In [5]:
import numpy as np
import gym
import time

from contextlib import contextmanager
from gym.spaces import Discrete


class SnakeEnv(gym.Env):
    SIZE=100
  
    def __init__(self, ladder_num, dices):
        self.ladder_num = ladder_num
        self.dices = dices
        self.ladders = dict()
        self.observation_space=Discrete(self.SIZE+1)
        self.action_space=Discrete(len(dices))

        ladders = dict(np.random.randint(1, self.SIZE, size=(self.ladder_num, 2)))
        
        for k,v in ladders.items():
            self.ladders[v] = k
            self.ladders[k] = v
            # print 'ladders info:'
            # print self.ladders
            # print 'dice ranges:'
            # print self.dices
        self.pos = 1

    def reset(self):
        self.pos = 1
        return self.pos

    def step(self, a):
        step = np.random.randint(1, self.dices[a] + 1)
        self.pos += step
        if self.pos == 100:
            return 100, 100, 1, {}
        elif self.pos > 100:
            self.pos = 200 - self.pos

        if self.pos in self.ladders:
            self.pos = self.ladders[self.pos]
        return self.pos, -1, 0, {}

    def reward(self, s):
        if s == 100:
            return 100
        else:
            return -1

    def render(self):
        pass


class TableAgent(object):
    def __init__(self, env):
        self.s_len = env.observation_space.n
        self.a_len = env.action_space.n

        self.r = [env.reward(s) for s in range(0, self.s_len)]
        self.pi = np.array([0 for s in range(0, self.s_len)])
        self.p = np.zeros([self.a_len, self.s_len, self.s_len], dtype=np.float) # p(s'|s,a)

        ladder_move = np.vectorize(lambda x: env.ladders[x] if x in env.ladders else x)

        for i, dice in enumerate(env.dices):
            prob = 1.0 / dice
            for src in range(1, 100):
                step = np.arange(dice)
                step += src
                step = np.piecewise(step, [step > 100, step <= 100],
                    [lambda x: 200 - x, lambda x: x])
                step = ladder_move(step)
                for dst in step:
                    self.p[i, src, dst] += prob
        self.p[:, 100, 100]=1
        self.value_pi = np.zeros((self.s_len))
        self.value_q = np.zeros((self.s_len, self.a_len))
        self.gamma = 0.8

    def play(self, state):
        return self.pi[state]


class ModelFreeAgent(object):
    def __init__(self, env):
        self.s_len = env.observation_space.n
        self.a_len = env.action_space.n

        self.pi = np.array([0 for s in range(0, self.s_len)])
        self.value_q = np.zeros((self.s_len, self.a_len))
        self.value_n = np.zeros((self.s_len, self.a_len))
        self.gamma = 0.8

    def play(self, state, epsilon = 0):
        if np.random.rand() < epsilon:
            return np.random.randint(self.a_len)
        else:
            return self.pi[state]


def eval_game(env, policy):
    state = env.reset()
    return_val = 0
    while True:
        if isinstance(policy, TableAgent) or isinstance(policy, ModelFreeAgent):
            act = policy.play(state)
        elif isinstance(policy, list):
            act = policy[state]
        else:
            raise Error('Illegal policy')
        state, reward, terminate, _ = env.step(act)
        # print state
        return_val += reward
        if terminate:
          break
    return return_val


@contextmanager
def timer(name):
    start = time.time()
    yield
    end = time.time()
    print('{} COST:{}'.format(name, end - start))

In [18]:
class MonteCarlo(object):
    def __init__(self, epsilon=0.0):
        self.epsilon = epsilon

    def monte_carlo_eval(self, agent, env):
        state = env.reset()
        episode = []
        while True:
            ac = agent.play(state, self.epsilon)
            next_state, reward, terminate, _ = env.step(ac)
            episode.append((state, ac, reward))
            state = next_state
            if terminate:
                break

        values = []
        return_val = 0
        for (state, action, reward) in reversed(episode):
            return_val = return_val * agent.gamma + reward
            values.append((state, action, return_val))
        # every visit
        for (state, action, value) in reversed(values):
            agent.value_n[state][action] += 1
            agent.value_q[state][action] += (value -  \
                agent.value_q[state][action]) /  \
                agent.value_n[state][action]
        # first visit
#         for (state, action, value) in reversed(values):
#             agent.value_q[state][action] = value
        
    def policy_improve(self, agent):
        new_policy = np.zeros_like(agent.pi)
        for i in range(1, agent.s_len):
            new_policy[i] = np.argmax(agent.value_q[i,:])
        if np.all(np.equal(new_policy, agent.pi)):
            return False
        else:
            agent.pi = new_policy
            return True

    # monte carlo
    def monte_carlo_opt(self, agent, env):
        for i in range(10):
            for j in range(100):
                self.monte_carlo_eval(agent, env)
            self.policy_improve(agent)

            
np.random.seed(0)
env = SnakeEnv(10, [3,6])
agent = ModelFreeAgent(env)
mc = MonteCarlo(0.3)
with timer('Timer Monte Carlo Iter'):
    mc.monte_carlo_opt(agent, env)
print('return_pi={}'.format(eval_game(env,agent)))
print(agent.pi)

Timer Monte Carlo Iter COST:0.10898399353027344
return_pi=92
[0 1 1 1 1 1 1 1 1 1 0 0 0 0 1 1 1 1 1 0 0 1 1 1 1 1 1 1 1 0 1 0 1 1 0 1 0
 0 0 1 0 1 1 1 0 0 0 0 1 1 1 1 0 0 0 1 0 1 1 0 0 0 1 1 1 0 0 1 0 0 0 0 1 0
 0 1 1 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0]


In [22]:
class SARSA(object):
    def __init__(self, epsilon=0.0):
        self.epsilon = epsilon

    def sarsa_eval(self, agent, env):
        # sarsa
        state = env.reset()
        prev_state = -1
        prev_act = -1
        while True:
            act = agent.play(state, self.epsilon)
            next_state, reward, terminate, _ = env.step(act)
            if prev_act != -1:
                # update the q value of previous state and previous action
                return_val = reward + agent.gamma * (0 if terminate else agent.value_q[state][act])
                agent.value_n[prev_state][prev_act] += 1
                agent.value_q[prev_state][prev_act] += (return_val - \
                    agent.value_q[prev_state][prev_act]) / \
                    agent.value_n[prev_state][prev_act]

            prev_act = act
            prev_state = state
            state = next_state

            if terminate:
                break

    def policy_improve(self, agent):
        new_policy = np.zeros_like(agent.pi)
        for i in range(1, agent.s_len):
            new_policy[i] = np.argmax(agent.value_q[i,:])
        if np.all(np.equal(new_policy, agent.pi)):
            return False
        else:
            agent.pi = new_policy
            return True

    # monte carlo
    def sarsa(self, agent, env):
        for i in range(10):
            for j in range(2000):
                self.sarsa_eval(agent, env)
            self.policy_improve(agent)

            
np.random.seed(0)
agent3 = ModelFreeAgent(env)
mc = SARSA(0.3)
with timer('Timer SARSA Iter'):
    mc.sarsa(agent3, env)
print('return_pi={}'.format(eval_game(env,agent3)))
print(agent3.pi)

Timer SARSA Iter COST:2.1603031158447266
return_pi=95
[0 1 1 1 1 1 1 1 1 1 0 0 0 0 1 1 1 1 1 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 0 0 0 1 1 1 1 0 0 0 0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 0 1 1 0 0 0 0 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0]


In [32]:
class QLearning(object):
    def __init__(self, epsilon=0.0):
        self.epsilon = epsilon

    def policy_improve(self, agent):
        new_policy = np.zeros_like(agent.pi)
        for i in range(1, agent.s_len):
            new_policy[i] = np.argmax(agent.value_q[i,:])
        if np.all(np.equal(new_policy, agent.pi)):
            return False
        else:
            agent.pi = new_policy
            return True

    # q learning
    def q_learning(self, agent, env):
        for i in range(10):
            for j in range(3000):
                self.q_learn_eval(agent, env)
            self.policy_improve(agent)

    def q_learn_eval(self, agent, env):
        state = env.reset()
        prev_state = -1
        prev_act = -1
        while True:
            act = agent.play(state, self.epsilon)
            next_state, reward, terminate, _ = env.step(act)
            if prev_act != -1:
                return_val = reward + agent.gamma * (0 if terminate else np.max(agent.value_q[state,:]))
                agent.value_n[prev_state][prev_act] += 1
                agent.value_q[prev_state][prev_act] += (return_val - \
                    agent.value_q[prev_state][prev_act]) / \
                    agent.value_n[prev_state][prev_act]

            prev_act = act  
            prev_state = state
            state = next_state

            if terminate:
                break


np.random.seed(0)
agent3 = ModelFreeAgent(env)
mc = QLearning(0.3)
with timer('Timer Q Learning Iter'):
    mc.q_learning(agent3, env)
print('return_pi={}'.format(eval_game(env, agent3)))
print(agent3.pi)


Timer Q Learning Iter COST:4.325722932815552
return_pi=92
[0 1 1 1 1 1 1 1 1 1 0 0 0 0 1 1 1 1 1 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0
 0 0 0 0 1 1 1 0 0 0 0 1 1 1 1 1 1 1 1 0 0 1 1 1 1 1 1 1 0 1 1 0 0 0 0 1 1
 1 1 1 1 0 1 1 1 1 0 0 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0]
