In [None]:
import gym
import numpy as np
import numpy as np
import scipy.interpolate as interp
#import matplotlib.pyplot as plt
from IPython import display
import matplotlib
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
#import PyQt5
import random
import operator
%matplotlib inline

#%matplotlib qt

## TD(lambda) Value Function Iteration

In [None]:
# Prediction
class BlackJack:
    def __init__(self, MAX_EPISODES=500000):
        self.env = gym.make('Blackjack-v0')
        self.MAX_EPISODES = MAX_EPISODES
        self.ALPHA = 0.5
        self.GAMMA = 0.95
        self.LAMBDA = 0.5
        
        # State values
        self.v = dict({})
        for p_hand in range(self.env.observation_space[0].n):
            for d_card in range(self.env.observation_space[1].n):
                for useable_ace in range(self.env.observation_space[2].n):
                    string = str(p_hand) + '_' + str(d_card) + '_' + str(useable_ace)
                    self.v[string] = 0 # Initiate to 0

        # Eligibility traces
        self.e = dict({})
        for p_hand in range(self.env.observation_space[0].n):
            for d_card in range(self.env.observation_space[1].n):
                for useable_ace in range(self.env.observation_space[2].n):
                    string = str(p_hand) + '_' + str(d_card) + '_' + str(useable_ace)
                    self.e[string] = 0 # Initiate to 0
            
    def action(self, player_hand):
        """
        NB: What about the usable ace?
        """
        probs = [0.8, 0.2] if player_hand > 18 else [0.2, 0.8]
        action = np.random.choice(np.arange(2), p=probs)
        return action

    def run_episode(self):
        new_s = self.env.reset()
        done = False
        while not done:
            s = new_s
            s_key = str(s[0]) + '_' + str(s[1]) + '_' + str(1*s[2])
            
            # Choose action and take a step
            a = self.action(s[0])
            new_s, r, done, _ = self.env.step(a)
            new_s_key = str(new_s[0]) + '_' + str(new_s[1]) + '_' + str(1*new_s[2])
            
            # Update eligibilities
            self.e.update((x, y * self.LAMBDA * self.GAMMA) for x, y in self.e.items())
            self.e[s_key] += 1
            
            # Update value function according to 
            # the td-error and update their eligibilities.
            td_error = r + self.GAMMA * self.v[new_s_key] - self.v[s_key]
            self.v[s_key] += self.ALPHA * td_error * self.e[s_key]
                 
    def value_iteration(self):
        for episode in range(self.MAX_EPISODES):
            if episode % 10000 == 0:
                print(f"Value Iteration {episode}")
            self.run_episode()

In [None]:
bj = BlackJack()
bj.value_iteration()

In [None]:
X = []
Y = []
Z = []
for key, value in bj.v.items():
    if (int('2_10_1'[-1])) & (int(key.split('_')[0]) > 11) & (int(key.split('_')[0]) < 22) & (int(key.split('_')[1]) < 12):  # Usable ace
        X.append(int(key.split('_')[0]))
        Y.append(int(key.split('_')[1]))
        Z.append(value)        
        
plotx, ploty, = np.meshgrid(np.linspace(np.min(X),np.max(X),10),\
                           np.linspace(np.min(Y),np.max(Y),10))
plotz = interp.griddata((X,Y),Z,(plotx,ploty),method='linear')

fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.plot_surface(plotx,ploty,plotz,cstride=1,rstride=1,cmap='viridis')  # or 'hot'

In [None]:
env = gym.make('FrozenLake8x8-v0')
new_s = env.reset()
env.step(1)
#env.action_space.contains(3)
#env.observation_space

## Backward View SARSA(lambda)

In [None]:
# Prediction
class BlackJackSARSA:
    def __init__(self, MAX_EPISODES=500000):
        self.env = gym.make('Blackjack-v0')
        self.MAX_EPISODES = MAX_EPISODES
        self.ALPHA = 0.5
        self.GAMMA = 0.95
        self.LAMBDA = 0.5
        self.EPS = 0.9
        
        # State-action values
        self.q = dict({})
        for p_hand in range(self.env.observation_space[0].n):
            for d_card in range(self.env.observation_space[1].n):
                for usable_ace in range(self.env.observation_space[2].n):
                    for action in range(self.env.action_space.n):
                        key = (p_hand, d_card, usable_ace, action)
                        self.q[key] = 0 # Initiate to 0
            
    def action(self, player_hand):
        """
        NB: What about the usable ace?
        """
        probs = [0.8, 0.2] if player_hand > 18 else [0.2, 0.8]
        action = np.random.choice(np.arange(2), p=probs)
        return action

    def greedy_action(self, state):
        q_sub = {key: val for key, val in self.q.items() if key[0:len(state)] == state}
        return max(q_sub.items(), key=operator.itemgetter(1))[0][-1]

    def epsilon_greedy(self, state):
        if np.random.random() >= self.EPS:
            return self.greedy_action(state)

        return self.env.action_space.sample()    
    
    def run_episode(self):
        # Eligibility traces
        e = dict({})
        for p_hand in range(self.env.observation_space[0].n):
            for d_card in range(self.env.observation_space[1].n):
                for usable_ace in range(self.env.observation_space[2].n):
                    for action in range(self.env.action_space.n):
                        key = (p_hand, d_card, usable_ace, action)
                        e[key] = 0 # Initiate to 0
        
        new_s = self.env.reset()
        new_a = random.sample([0, 1], k=1)[0]
        done = False
        while not done:
            s = new_s
            a = new_a
            #s_a_key = str(s[0]) + '_' + str(s[1]) + '_' + str(1*s[2]) + '_' + str(a)
            s_a_key = s + (a,)
            
            # Take action a
            new_s, r, done, _ = self.env.step(a)
            
            # Choose new action from S'
            new_a = self.epsilon_greedy(new_s)
            new_s_a_key = new_s + (new_a,)
            #new_s_a_key = str(new_s[0]) + '_' + str(new_s[1]) + '_' + str(1*new_s[2]) + '_' + str(new_a)

            # Calculate TD error
            delta = r + self.GAMMA * self.q[new_s_a_key] - self.q[s_a_key]
            
            # Update eligibilities
            e.update((x, y * self.LAMBDA * self.GAMMA) for x, y in e.items())
            e[s_a_key] += 1
            
            # Update q values
            self.q.update((x, y + self.ALPHA * delta * e[x]) for x, y in self.q.items())

    def policy_iteration(self, n_episodes=1000):
        for episode in range(n_episodes):
            if episode % 1000 == 0:
                print(f"Policy Iteration {episode}")
            self.run_episode() 
    

In [None]:
bjsarsa = BlackJackSARSA()
bjsarsa.policy_iteration(100_000)

In [None]:
bjsarsa.q

In [None]:
# Imports specifically so we can render outputs in Jupyter.
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display


def display_frames_as_gif(frames):
    """
    Displays a list of frames as a gif, with controls
    """
    #plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi = 72)
    patch = plt.imshow(frames[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frames[i])

    anim = animation.FuncAnimation(plt.gcf(), animate, frames = len(frames), interval=50)
    display(display_animation(anim, default_mode='loop'))

FrozenLake
https://gym.openai.com/envs/FrozenLake8x8-v0/

In [2]:
# Prediction
class FZSARSA:
    def __init__(self, MAX_EPISODES=500000):
        self.env = gym.make('FrozenLake8x8-v0')
        self.MAX_EPISODES = MAX_EPISODES
        self.ALPHA = 0.5
        self.GAMMA = 0.95
        self.LAMBDA = 0.5
        self.EPS = 0.9
        
        # State-action values
        self.q = dict({})
        for s in range(self.env.observation_space.n):
            for action in range(self.env.action_space.n):
                key = (s, action)
                self.q[key] = 0 # Initiate to 0
            
    def greedy_action(self, state):
        q_sub = {key: val for key, val in self.q.items() if key[0] == state}
        return max(q_sub.items(), key=operator.itemgetter(1))[0][-1]

    def epsilon_greedy(self, state):
        if np.random.random() >= self.EPS:
            return self.greedy_action(state)

        return self.env.action_space.sample()    
    
    def run_episode(self, d=False):
        print('HEY')
        
        # Eligibility traces
        e = dict({})
        for s in range(self.env.observation_space.n):
            for action in range(self.env.action_space.n):
                key = (s, action)
                e[key] = 0 # Initiate to 0
        
        new_s = self.env.reset()
        new_a = random.sample(range(4), k=1)[0]
        done = False
        
        if d:
            img = plt.imshow(self.env.render(mode='rgb_array')) # only call this once
            
        while not done:    
            s = new_s
            a = new_a
            s_a_key = (s,) + (a,)
            
            if d:            
                img.set_data(self.env.render(mode='rgb_array')) # just update the data
            
            # Take action a
            new_s, r, done, _ = self.env.step(a)
            
            # Choose new action from S'
            if d:
                new_a = self.greedy_action(new_s)                
            else:
                new_a = self.epsilon_greedy(new_s)
            new_s_a_key = (new_s,) + (new_a,)

            # Calculate TD error
            delta = r + self.GAMMA * self.q[new_s_a_key] - self.q[s_a_key]
            
            # Update eligibilities
            e.update((x, y * self.LAMBDA * self.GAMMA) for x, y in e.items())
            e[s_a_key] += 1
            
            # Update q values
            self.q.update((x, y + self.ALPHA * delta * e[x]) for x, y in self.q.items())    
        
    def policy_iteration(self, n_episodes=1000):
        for episode in range(n_episodes):
            if episode % 100 == 0:
                print(f"Policy Iteration {episode}")
            self.run_episode() 
            self.env.close()        
            

In [3]:
fzsarsa = FZSARSA()
fzsarsa.policy_iteration(1_000)

Policy Iteration 0
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
Policy Iteration 100
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
Policy Iteration 200
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY
HEY

In [None]:
fzsarsa.run_episode(d=True)

In [None]:
env = gym.make('CartPole-v0')

# Run a demo of the environment
observation = env.reset()
cum_reward = 0
frames = []
for t in range(5000):
    # Render into buffer. 
    frames.append(env.render(mode = 'rgb_array'))
    action = env.action_space.sample()
    observation, reward, done, info = env.step(action)
    if done:
        break
env.close()
display_frames_as_gif(frames)