In [1]:
import numpy as np
import gym

class EpsilonGreedyQPolicy():
    
    def __init__(self, epsilon = 0.1):
        self.epsilon = epsilon

    def selectAction(self, qValues):
        nb_actions = qValues.shape[0]

        if np.random.uniform() < self.epsilon:
            action = np.random.randint(0, nb_actions)
        else:
            action = np.argmax(qValues)
        
        return action

class SARSALambaAgent():

    # def __init__(self, env, nb_actions, nb_states, gamma = 0.5, lambdaValue = 0, policy = EpsilonGreedyQPolicy(), terminalStates = None):
    def __init__(self, env, learningRate = 0.5, gamma = 1.0, lambdaValue = 0.0, policy = EpsilonGreedyQPolicy(), terminalStates = None):
        self.env = env
        # self.nb_actions = nb_actions
        self.nb_actions = None
        # self.nb_states = nb_states
        self.nb_states = None
        self.learningRate = learningRate
        self.gamma = gamma
        self.lambdaValue = lambdaValue
        self.policy = policy
        self.testPolicy = EpsilonGreedyQPolicy(0.0)
        # self.epsilon = self.policy.epsilon
        self.terminalStates = terminalStates
        self.currentState = None
        # self.currentState = self.env.reset()
        self.currentAction = None
        self.qValues = None
        self.eligibilityTrace = None
        self.qValuesHistory = None


        if isinstance(self.env.action_space, gym.spaces.discrete.Discrete):
            self.nb_actions = self.env.action_space.n

        if isinstance(self.env.observation_space, gym.spaces.discrete.Discrete):
            self.nb_states = self.env.observation_space.n

        # self.qValues = np.random.rand(self.nb_states, self.nb_actions)
        # self.eligibilityTrace = np.zeros_like(self.qValues)

        # if self.terminalStates is not None:
            # for state in self.terminalStates:
                # self.qValues[state] = np.zeros(self.nb_actions)

    def reset(self):
        self.qValues = np.random.rand(self.nb_states, self.nb_actions)
        self.eligibilityTrace = None

        if self.terminalStates is not None:
            for state in self.terminalStates:
                self.qValues[state] = np.zeros(self.nb_actions)
                # print(self.qValues[state])

        # self.eligibilityTrace = np.zeros_like(self.qValues)
        self.qValuesHistory = np.copy(self.qValues)[np.newaxis]

        # self.currentState = self.env.reset()
        self.currentState = None
        self.currentAction = None

    def resetEpisode(self):
        self.currentState = self.env.reset()
        self.currentAction = self.policy.selectAction(self.qValues[self.currentState])
        self.eligibilityTrace = np.zeros_like(self.qValues)
        # print(self.eligibilityTrace.shape)

    def estimateQValues(self, nb_episodes):
        self.reset()

        for episode in np.arange(nb_episodes):
            self.resetEpisode()
            # print(self.currentState)
            done = False
            reward = 0.0
            while done is not True:
                nextState, reward, done, info = self.env.step(self.currentAction)
                nextAction = self.policy.selectAction(self.qValues[nextState])
                error = reward + self.gamma * self.qValues[nextState][nextAction] - self.qValues[self.currentState][self.currentAction]
                self.eligibilityTrace[self.currentState][self.currentAction] += 1
                self.qValues += self.learningRate * error * self.eligibilityTrace
                self.eligibilityTrace[:] = self.gamma * self.lambdaValue * self.eligibilityTrace[self.currentState][self.currentAction]
                self.currentState = nextState
                self.currentAction = nextAction
            self.qValuesHistory = np.concatenate([self.qValuesHistory, np.copy(self.qValues)[np.newaxis]], axis = 0)

    def evaluateQValues(self):
        self.resetEpisode()
        done = False
        reward = 0.0
        tReward = 0.0
        while done is not True:
            nextState, reward, done, info = self.env.step(self.currentAction)
            nextAction = self.testPolicy.selectAction(self.qValues[nextState])
            self.currentState = nextState
            self.currentAction = nextAction
            tReward += reward
        print(tReward)

    def getQValuesHistory(self):
        if self.qValuesHistory is not None:
            return np.copy(self.qValuesHistory)
        else:
            return None

In [2]:
ENV_NAME = 'CliffWalking-v0'
env = gym.make(ENV_NAME)
nb_actions = env.action_space.n
nb_states = env.observation_space.n

sarsaLambdaAgent = SARSALambaAgent(env, 0.5, 1.0, 0.9, EpsilonGreedyQPolicy(0.1), np.array([47]))
# sarsaLambdaAgent.reset()
# print(sarsaLambdaAgent.getQValuesHistory().shape)
sarsaLambdaAgent.estimateQValues(500)
sarsaLambdaAgent.evaluateQValues()
hist = sarsaLambdaAgent.getQValuesHistory()
print(hist.shape)
# print(hist[0][36])
# print(hist[10][36])
# print(hist[499])



-17.0
(501, 48, 4)


In [3]:
# print(type(env.observation_space))
# temp = np.array([2]) 
# print(temp.shape)
# for i in temp:
    # print(i)

# mO = np.ones((48, 4))
# mO[:] = 3
# print(mO.shape)
# mT = np.ones((48, 4))
# print(mO)
# print(mO.shape)
# mF = mT + mO
# print(mF)
# print(mF.shape)
# print(np.concatenate([mO[np.newaxis], mT[np.newaxis]], axis = 0).shape)
# print(np.concatenate([mO[np.newaxis], mT[np.newaxis]], axis = 0))
# print(np.copy(mT)[np.newaxis].shape)
# print(np.copy(mT)[np.newaxis])
# print(np.append(mO, np.append(mO, mT, axis = 2), axis = 2).shape)