In [2]:
import numpy as np
import matplotlib.pyplot as plt
np.set_printoptions(precision=2)

In [298]:
class MountainCar():
    def __init__(self):
        pass
    
    def init(self):
        self.pos = np.random.rand(1)[0]*0.2 + -0.6  # initialize position to [-0.6, -0.4]
        self.v = 0
        return np.array([self.pos, self.v]), -1, False
        
    def _bound_pos(self, pos):
        return min(max(-1.2, pos), 0.5)
    
    def _bound_v(self, v):
        return min(max(-0.07, v), 0.07)
        
    def step(self, action):
        # input action (0,1,2}
        # map to {-1,0,1}
        action -= 1
        
        # update velocity
        self.v = self.v + 0.001*action - 0.0025*np.cos(3*self.pos)
        self.v = self._bound_v(self.v)
        
        # update position
        self.pos = self._bound_pos(self.pos + self.v)
        
        # terminal check
        if self.pos == 0.5:
            reward = 1
            termination = True
        else:
            reward = 0
            termination = False
            
        return np.array([self.pos, self.v]), reward, termination

In [299]:
# Tile coding for R^d space
class TileCoding():
    def __init__(self, bounds, n, m):
        self.n = n
        self.m = m
        self.width = np.array([(b[1]-b[0])/m for b in bounds])
        self.origin = np.array([[bounds[i][0] + self.width[i]/n*j for i in range(len(bounds))] for j in range(n)])
        
    
    def __call__(self, state):
        ls = []
        for i in range(self.n):
            coord = (state - self.origin[i]) // self.width  # coordinate in tiles (R^d)
            coord = int(sum([j**k for j,k in zip(coord, range(len(coord)))]))  # flattened to (R^1)
            out = np.zeros(self.m**2)
            out[coord] = 1
            
            ls.append(out)
        return np.hstack(ls)

# test
t = TileCoding([[-1.2, 0.5],[-0.07,0.07]], 8, 4)
t(np.array([0,0]))

array([0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0.])

In [300]:
class SARSA():
    def __init__(self, func, gamma, epsilon):
        self.func = func
        self.gamma = gamma
        self.epsilon = epsilon
    
    def episode(self, env):
        state, reward, termination = env.init()
        action = self.act(state)
        t = 0
        while True:
            # observe next state and take next action
            state_, reward, termination = env.step(action)
            if not termination:
                action_ = self.act(state_)

                # learn
                target = reward + self.gamma * self.func(state_)[action_]
                self.func.learn(target, state, action)

                # iterate
                state = state_
                action = action_
                t += 1
                
            else:
                self.func.learn(reward, state, action)
                break
                
            # timeout
            if t >= 999999:
                break
                
        return t
                
    def act(self, state):
        values = self.func(state)
        if np.random.rand() < self.epsilon:
            return np.random.choice([0,1,2])
        return np.argmax(values)
        
                
class Linear():
    def __init__(self, weights, alpha, encoding=None):
        self.weights = weights
        self.alpha = alpha
        self.encoding = encoding
    
    def __call__(self, state, encode=True):
        if self.encoding is not None and encode:
            state = self.encoding(state)
        return state @ self.weights
    
    def learn(self, target, state, action):
        if self.encoding is not None:
            state = self.encoding(state)
        values = self(state, encode=False)
        error = target - values[action]
        errors = np.zeros(self.weights.shape[1])
        errors[action] = error
        self.weights += self.alpha * np.outer(state, errors)  # (outer product of error & state, R^(a*d))

In [303]:
env = MountainCar()
agent = SARSA(func=Linear(weights=np.random.randn(512,3), alpha=0.1, encoding=TileCoding([[-1.2, 0.5],[-0.07,0.07]], 8, 8)), gamma=0.99, epsilon=0.4)

for _ in range(20):
    print(agent.episode(env))
agent.epsilon=0.2
for _ in range(30):
    print(agent.episode(env))
    
# time taken for each episode:

16677
154620
133037
16669
2140
171175
107147
59835
4988
4437
1750
4755
9831
12885
4769
220122
39378
95700
333960
540958
23270
13180
108718
8482
836
3557
9778
10226
3518
5623
16151
9862
17554
9497
15170
98965
669165
10887
175375
1487
518270
1955
11234
202130
4651
3111
1796
6523
8810
9474
