In [None]:
## An implementation of the Mountain Car problem 
## via a SARSA algorithm

In [None]:
import tensorflow as tf
import numpy as np
import random
import gym

In [None]:
## Load the environment

env = gym.make('MountainCar-v0')

In [None]:
## A simple implementation of a uniform square tile coding

class TileCoding:
    
    def __init__(self, feat_ranges, nbins, offsets):
        
        self.tilings = self.buildTilings(feat_ranges, nbins, offsets)
    
    
    def maketiling(self, feat_range, nbins, offset):
        
        ##Build a 1D tiling with uniformely spaced tiles
        tiling = np.linspace(feat_range[0], feat_range[1], nbins + 1)[1:-1] + offset
        
        return tiling

    def makeTilings(self, feat_ranges, nbins, offset):
        
        ##Make a multidimensional tiling by assembling 1D tilings for each dimension
        tilings = []
        
        for i in range(len(feat_ranges)):
            feat_range = feat_ranges[i]
            tiling = self.maketiling(feat_range, nbins, offset[i])
            tilings.append(tiling)

        return tilings


    def buildTilings(self, feat_ranges, nbins, offsets):
        
        ##Make collection of offset tilengs
        tilings = []

        for offset in offsets:

            tiling = self.makeTilings(feat_ranges, nbins, offset)
            tilings.append(tiling)

        return tilings

    def encode(self, feature, tiling):
        ##Check that the number of features coincide with the number of 1D tilings
        assert len(feature) == len(tiling)
        
        indices = []
        
        ##Get indices of the active tiles in the tiling
        for i in range(len(feature)):
            feat_i = feature[i]
            tiles_i = tiling[i]
            idx = np.digitize(feat_i, tiles_i)
            indices.append(idx)
        
        ##Build a vector of tiles activity
        dims = [len(tiles) + 1 for tiles in tiling]
        tot = int(np.prod(dims))
        coding = np.zeros(dims)
        coding[tuple(indices)] = 1

        return coding.reshape(tot, 1)

    def tile(self, feature):
        
        ##Concatenate the various encoded vectors
        codings = []

        for tiling in self.tilings:
            code = self.encode(feature, tiling)
            codings.append(code)

        return np.concatenate(codings)

In [None]:
## Define an agent with linear Q-function, SARSA update algorithm and experience replay 

BATCH_NUM = 32
OFFSETS = [[0.0, 0.0], [0.1, 0.01], [0.3, 0.03], [-0.1, -0.01], [-1.0, -0.05], [1.0, 0.05], [-1.2, -0.06]]

class Agent:
    def __init__(self, env):
           
        self.gamma = 1.0 ##No discount
        self.epsilon = 0.0 ## Exploration is granted by negative reward feedback
        self.num_actions = env.action_space.n
        
        ranges = self.getFeatRange(env)
        ##Offsets for the tile coding
        
        
        self.lr = 0.3/len(offsets)
        num_bin = 8 ## Use 8 x 8 tilings
        self.tilecode = TileCoding(ranges, num_bin, OFFSETS) ##Initialize the Tile Coding class
        self.states = [] ## Collect states for experience replay
        dummy = self.feat([0, 0]) ## Dummy state for debug and weights shape
        
        self.W = tf.Variable(np.zeros(shape = (1, dummy.shape[0] * env.action_space.n + 1)), dtype =tf.float32) ##Weights vector
        self.optimizer = tf.keras.optimizers.Adam(self.lr) ## Choice of optimizer
        
    
    #Get the ranges of the state compontents
    
    def getFeatRange(self, env):
        
        range_low = env.observation_space.low
        range_high = env.observation_space.high
        
        feat_ranges = list(zip(range_low, range_high))
        
        return feat_ranges
    
    ##Collect the relevant feedback from the environment
    
    def collect(self, state, action, reward, next_state, next_action, done):
        
        self.states.append([state, action, reward, next_state, next_action, done])
     
    ##Prepare the dataset for experience replay
    
    def prepareDataset(self, states):
        
        X = []
        y = []
        
        for vals in states:
            
            state, a, reward, next_state, next_action, done = vals
            
            feat = self.feat(state)
            x = np.zeros((feat.shape[0] * self.num_actions, 1))
            x[a * feat.shape[0]: (a + 1) * feat.shape[0]] = feat
            s = x.shape[0]
            x = np.append(x, [1]).reshape(s + 1, 1)
            
            X.append(x)
            
            if not done:  
                target = reward + self.gamma * self.q(next_state, next_action)
            else:
                target = reward
            
            y.append(target)
            
        return np.squeeze(np.array(X)).T, np.expand_dims(np.array(y), axis = 1).T
            
    
    ##Define a MSE loss
    
    def loss(self, X, y):
        
        y_pred = tf.matmul(self.W, X)
        return tf.reduce_mean(tf.square(y - y_pred))
    
        
    
    ##Define the Q-function 
       
    def q(self, state, a):
        
        feat = self.feat(state)
        x = np.zeros((feat.shape[0] * self.num_actions, 1))
        x[a * feat.shape[0]: (a + 1) * feat.shape[0]] = feat
        
        s = x.shape[0]
        x = np.append(x, [1]).reshape(s + 1, 1)
        
        return tf.matmul(self.W, tf.convert_to_tensor(x, dtype = tf.float32))
    
    #Choose an action in a epsilong-greedy way
    
    def chooseAction(self, state):
        
        if (np.random.random() > self.epsilon):
            vals = [self.q(state, a) for a in range(self.num_actions)]
            return np.argmax(vals)
        else:
            return int(np.random.random() * self.num_actions)
        
    ##SARSA algorithm
    
    def updateReplay(self):
        
        if len(self.states) > BATCH_NUM:
            states = random.choices(self.states, k = BATCH_NUM)
        else:
            states = self.states
        
        X, y = self.prepareDataset(states)
        
        ##Define a cost function for the optimizer
        cost = lambda: self.loss(tf.convert_to_tensor(X, dtype = tf.float32), tf.convert_to_tensor(y, dtype = tf.float32))
        
        ##Perform a step of batch gradient descent
        self.optimizer.minimize(cost, self.W)
     
    ##Get the coding
    def feat(self, state):
        coding = self.tilecode.tile(state)
        return coding
    
    ##Flush the state memory of the last 200 states
    def flush(self):
        
        self.states = self.states[200:]
        

In [None]:
##Create agent
agent = Agent(env)

In [None]:
UPDATE_STEP = 4
NUM_EPISODES = 2000
NUM_STEPS = 1000

rewards = []

for episode in range(NUM_EPISODES):

    state = env.reset()
    action = agent.chooseAction(state)
    total_reward = 0

    for step in range(1, NUM_STEPS + 1):
        env.render()

        next_state, reward, done, info = env.step(action)
        next_action = agent.chooseAction(next_state)
        agent.collect(state, action, reward, next_state, next_action, done)
        total_reward += reward
        
        ##Update the weights every UPDATE_STEP steps
        if step%UPDATE_STEP == 0:
            agent.updateReplay()
        
        if done:
            ##Return some infos about the performance and append reward for later analysis
            rewards.append(total_reward)
            print("Episode ", episode,": ", total_reward)
            break

        state = next_state
        action = next_action
    
    ##Flush the memory
    if episode%20 == 0:
        agent.flush()
               
env.close()