In [None]:
# ========================= import ===================================

import numpy as np 
import matplotlib.pyplot as plt
from tqdm import tqdm, trange
import seaborn as sns
import random

# ======================== CFG ======================================

class CFG:

    episode = 500
    MC_alpha = [0.01, 0.02, 0.03, 0.04]
    TD_alpha = [0.05, 0.1, 0.15]
    target = [1/6, 2/6, 3/6, 4/6, 5/6]
    run = 500
    garma = 1.0

# ======================== init ======================================

class game():
    def __init__(self, alpha, TypeAlgo):
        self.value_function = np.full((7), 0.5)
        self.policy = np.full((7,2), 0.5)
        self.Left = -1
        self.Right = 1
        self.Stop = 0
        self.actions = [self.Left, self.Right]
        self.FirstEnd = 0
        self.LastEnd = 6
        self.alpha = alpha
        self.TypeAlgo = TypeAlgo

    def reset(self):
        pass

    def step(self):
        if (self.TypeAlgo == 'MC'):
            return self.step_MC()
        if (self.TypeAlgo == 'TD'):
            return self.step_TD()

    def step_TD(self):
        his = []
        cur = 3
        reward = 0

        while(cur != self.FirstEnd and cur != self.LastEnd):
            action = np.random.choice(self.actions, p=self.policy[cur, :])
            state_reward = 0
            his.append((cur, action, state_reward))
            cur += action
            reward += state_reward
        if cur == self.LastEnd:
            state_reward = 1
        else:
            state_reward = 0
        his.append((cur, self.Stop, state_reward))

        return his, reward

    def step_MC(self):
        his = []
        cur = 3
        reward = 0

        while(cur != self.FirstEnd and cur != self.LastEnd):
            action = np.random.choice(self.actions, p=self.policy[cur, :])
            if (cur + action == self.LastEnd):
                state_reward = 1
            else:
                state_reward = 0
            his.append((cur, action, state_reward))
            cur += action
            reward += state_reward

        return his, reward

    def update(self, state, action, state_reward, reward):
        if (self.TypeAlgo == 'MC'):
            return self.update_MC(state, reward)
        if (self.TypeAlgo == 'TD'):
            return self.update_TD(state, state + action, state_reward)

    def update_TD(self, state, Nstate, state_reward):
        up = self.alpha * (state_reward + CFG.garma * self.value_function[Nstate] * int(state != Nstate) - self.value_function[state])
        self.value_function[state] += up
        return up
    
    def update_MC(self, state, reward):
        up = self.alpha * (reward - self.value_function[state])
        self.value_function[state] += up
        return up

    def get_value_function(self):
        return self.value_function

# ======================= function ===================================

def MC_evaluate(episode, alpha):
    total_error = np.zeros((episode), dtype = float)
    for _ in trange(CFG.run, desc = f'MC - {alpha}'):
        env = game(alpha = alpha, TypeAlgo = 'MC')
        error = []
        for ep in range(episode):
            env.reset()
            his, reward = env.step()
            for i in range(len(his)):
                state, action, state_reward = his[i]
                env.update(state, action, state_reward, reward)
            error.append(np.sqrt(np.sum(np.power(env.get_value_function()[1:-1] - CFG.target, 2))/5.0))
        error = np.asarray(error)
        total_error += error
    return total_error/CFG.run

def TD_evaluate(episode, alpha, TDplot = False):
    total_error = np.zeros((episode), dtype = float)
    for r in trange(CFG.run, desc = f'TD - {alpha}'):
        env = game(alpha = alpha, TypeAlgo = 'TD')
        error = []
        for ep in range(episode):
            env.reset()
            his, reward = env.step()
            update = np.zeros((7), dtype = float)
            for i in range(len(his)):
                state, action, state_reward = his[i]
                env.update(state, action, state_reward, reward)

            env.value_function += update
            error.append(np.sqrt(np.sum(np.power(env.get_value_function()[1:-1] - CFG.target, 2))/5.0))
            if (r == 1 and TDplot):
                if (ep == 0 or ep == 1 or ep == 10 or ep == 100 or ep == 200 or ep == 299):
                    plt.plot(env.get_value_function()[1:-1], label = f'{ep}')
        error = np.asarray(error)
        total_error += error
        if (r == 1 and TDplot):
            plt.plot(CFG.target,label = 'true values')
            plt.xticks([0,1,2,3,4],['A','B','C','D','E'])
            plt.legend()
            plt.savefig('figure_6_6/6_6.png')
            plt.close()
            return
    
    return total_error/CFG.run


# ======================= main ========================================

for alpha in CFG.MC_alpha:
    error = MC_evaluate(CFG.episode, alpha)
    plt.plot(error,'--', label = f'MC - {alpha}')

for alpha in CFG.TD_alpha:
    error = TD_evaluate(CFG.episode, alpha)
    plt.plot(error, label = f'TD - {alpha}')
plt.legend()
plt.savefig('figure_6_7/6_7.png')
plt.close()

TD_evaluate(CFG.episode, 0.1, True)

In [6]:
#######################################################################
# Copyright (C)                                                       #
# 2016-2018 Shangtong Zhang(zhangshangtong.cpp@gmail.com)             #
# 2016 Kenta Shimada(hyperkentakun@gmail.com)                         #
# Permission given to modify the code as long as you keep this        #
# declaration at the top                                              #
#######################################################################

import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from tqdm import tqdm

# all states
N_STATES = 19

# discount
GAMMA = 1

# all states but terminal states
STATES = np.arange(1, N_STATES + 1)

# start from the middle state
START_STATE = 10

# two terminal states
# an action leading to the left terminal state has reward -1
# an action leading to the right terminal state has reward 1
END_STATES = [0, N_STATES + 1]

# true state value from bellman equation
TRUE_VALUE = np.arange(-20, 22, 2) / 20.0
TRUE_VALUE[0] = TRUE_VALUE[-1] = 0

# n-steps TD method
# @value: values for each state, will be updated
# @n: # of steps
# @alpha: # step size
def temporal_difference(value, n, alpha):
    # initial starting state
    state = START_STATE

    # arrays to store states and rewards for an episode
    # space isn't a major consideration, so I didn't use the mod trick
    states = [state]
    rewards = [0]

    # track the time
    time = 0

    # the length of this episode
    T = float('inf')
    while True:
        # go to next time step
        time += 1

        if time < T:
            # choose an action randomly
            if np.random.binomial(1, 0.5) == 1:
                next_state = state + 1
            else:
                next_state = state - 1

            if next_state == 0:
                reward = -1
            elif next_state == 20:
                reward = 1
            else:
                reward = 0

            # store new state and new reward
            states.append(next_state)
            rewards.append(reward)

            if next_state in END_STATES:
                T = time

        # get the time of the state to update
        update_time = time - n
        if update_time >= 0:
            returns = 0.0
            # calculate corresponding rewards
            for t in range(update_time + 1, min(T, update_time + n) + 1):
                returns += pow(GAMMA, t - update_time - 1) * rewards[t]
            # add state value to the return
            if update_time + n <= T:
                returns += pow(GAMMA, n) * value[states[(update_time + n)]]
            state_to_update = states[update_time]
            # update the state value
            if not state_to_update in END_STATES:
                value[state_to_update] += alpha * (returns - value[state_to_update])
        if update_time == T - 1:
            break
        state = next_state

# Figure 7.2, it will take quite a while
def figure7_2():
    # all possible steps
    steps = np.power(2, np.arange(0, 10))

    # all possible alphas
    alphas = np.arange(0, 1.1, 0.1)

    # each run has 10 episodes
    episodes = 10

    # perform 100 independent runs
    runs = 100

    # track the errors for each (step, alpha) combination
    errors = np.zeros((len(steps), len(alphas)))
    for run in tqdm(range(0, runs)):
        for step_ind, step in enumerate(steps):
            for alpha_ind, alpha in enumerate(alphas):
                # print('run:', run, 'step:', step, 'alpha:', alpha)
                value = np.zeros(N_STATES + 2)
                for ep in range(0, episodes):
                    temporal_difference(value, step, alpha)
                    # calculate the RMS error
                    errors[step_ind, alpha_ind] += np.sqrt(np.sum(np.power(value - TRUE_VALUE, 2)) / N_STATES)
    # take average
    errors /= episodes * runs

    print(errors)

if __name__ == '__main__':
    figure7_2()

  2%|▏         | 2/100 [00:15<12:38,  7.74s/it]


KeyboardInterrupt: 