Currently Covers - 
 - MDPs
 - Policy Iteration
 - Value Iteration
 - Q-Learning

### Markov Decision Process

In [0]:
import abc

class MDP:
    __metaclass__ = abc.ABCMeta
    def __init__(self, rewards, initialState, actionList):
        self.initialState = initialState
        self.rewards = rewards # the stateSpace can be got by rewars.keys()
        self.actionList = actionList
        pass
    
    @abc.abstractmethod
    def getTransitionModel(self, state, action):
        """gives a transition model for a learned mdp model. Enter the state and 
        action. The output will be result state and the robability with which it 
        will reach the state"""
        return
    
    def getRewards(self, state):
        """Given a state it will putput the reward for that state"""
        return self.rewards[state]
    
    @abc.abstractmethod
    def getPossibleActions(self, state):
        """Outputs the allowed actions in a particular state."""
        return

### Grid World Design

In [0]:
import operator

class gridMDP(MDP):
    def __init__(self, grid, initialState, terminals, gamma):
        grid.reverse()
        self.grid = grid
        self.initialState = initialState
        self.gamma = gamma
        self.terminals = terminals
        self.noOfRows= len(grid)
        self.noOfCols = len(grid[0])
        self.allStates = []
        self.rewards = {}
        for x in range(self.noOfCols):
            for y in range(self.noOfRows):
                if grid[y][x] is not None:
                    self.allStates.append((x, y))
                self.rewards[(x,y)] = grid[y][x]
        self.actionList = [(1,0),(0,1),(-1,0),(0,-1)]
        MDP.__init__(self, self.rewards, initialState, self.actionList)
        #MDP, self).__init__(self.rewards, initialState, self.actionList)
    
    def getTransitionModel(self, state, action):
        if state in self.terminals:
            return [(state, 1.0)]
        if action is None:
            return [(state, 0.0)]
        else:
            return [(self.doAction(state, action), 0.8),
                    (self.doAction(state, self.rightOf(action)), 0.1),
                    (self.doAction(state, self.leftOf(action)), 0.1)]

    def rightOf(self, action):
        if action == (0,1):
            return (1,0)
        elif action==(1,0):
            return (0,-1)
        elif action==(0,-1):
            return (-1,0)
        elif action==(-1,0):
            return (0,1)
        else:
            return action
    
    def leftOf(self, action):
        if action == (0,1):
            return (-1,0)
        elif action==(1,0):
            return (0,1)
        elif action==(0,-1):
            return (1,0)
        elif action==(-1,0):
            return (0,-1)
        else:
            return action
    
    def getPossibleActions(self, state):
        return self.actionList
    
    def doAction(self, state, action):
            "Return the state that results from going in this direction."
            resultState = tuple(map(operator.add, state, action))
            if resultState in self.allStates:
                return resultState
            else:
                return state 
            
    def printGrid(self):
        for i in reversed(self.grid):
            print("\t".join(str(j) for j in i))
    
    def printUtilities(self, utilities):
        utilityMat = [['a' for t in range(self.noOfCols)] for q in range(self.noOfRows)]
        for i in range(self.noOfCols):
            for j in range(self.noOfRows):
                if (i,j) in utilities:
                    utilityMat[j][i] = utilities[(i,j)]
                else:
                    utilityMat[j][i] = 'None'
            utilityMat.reverse()
        for i in range(len(utilityMat)):
            print(utilityMat[i])
            print('\n')
    
    def printPolicy(self, policy):
        for p in policy:
            if policy[p] == (0,1):
                policy[p] = '^'
            elif policy[p] == (1,0):
                policy[p] = '>'
            elif policy[p] == (0,-1):
                policy[p] = 'v'
            elif policy[p] == (-1,0):
                policy[p] = '<'
            else:
                policy[p] = '.'
            policyMat = [['a' for t in range(self.noOfCols)] for q in range(self.noOfRows)]
            for i in range(self.noOfCols):
                for j in range(self.noOfRows):
                    if (i,j) in policy:
                        policyMat[j][i] = policy[(i,j)]
                    else:
                        policyMat[j][i] = '.'
            policyMat.reverse()
        for i in range(len(policyMat)):
            print(policyMat[i])
            print('\n')

### Policy Iteration

In [0]:
def policyIteration(mdp):
    U = {}
    policy = {}
    for s in mdp.allStates:
        if s in mdp.terminals:
            U[s] = mdp.rewards[s]
            policy[s] = (0,0)
        else:
            U[s] = 0
            policy[s] = (0,1)  #default policy is to go up in the grid
    while True:
        U = policyEvaluation(policy, U, mdp)
        unchanged = True
        for s in mdp.allStates:
            if s not in mdp.terminals:
                bestExpected = 0
                preferredAction = (0,0)
                for a in mdp.actionList:
                    expectedUtility = 0
                    for (s1,p) in mdp.getTransitionModel(s,a):
                        expectedUtility = expectedUtility + (p*U[s1])
                        if expectedUtility > bestExpected:
                            bestExpected = expectedUtility
                            preferredAction = a
                if preferredAction != policy[s]:
                    policy[s] = preferredAction    #We improve the policy for state s to action a
                    unchanged = False
        if unchanged:
            return U, policy
    return

def policyEvaluation(policy, U, mdp):
    rewards, gamma = mdp.rewards, mdp.gamma
    for s in mdp.allStates:
        if s not in mdp.terminals:
            U[s] = rewards[s]+(gamma * sum([p * U[s1]
                                              for (s1, p)
                                              in mdp.getTransitionModel(s, policy[s])]))
    return U

def main():
    grid43 = gridMDP([[-0.04, -0.04, -0.04, +1],
                      [-0.04, None, -0.04, -1],
                      [-0.04, -0.04, -0.04, -0.04]],
                     (0,0),
                     [(3, 2), (3, 1)],
                     0.99)
    grid33 = gridMDP([[-3, -1, 10],
                      [-1, -1, -1],
                      [-1, -1, -1]],
                     (0,0),
                     [(2,2)],
                     0.99)
    gridToRun = grid33
    finalUtilities, optimalPolicy = policyIteration(gridToRun)
    gridToRun.printUtilities(finalUtilities)
    gridToRun.printGrid()
    gridToRun.printPolicy(optimalPolicy)

if __name__ == '__main__':
    main()

### Modified Policy Iteration

In [0]:
def modifiedPolicyIteration(mdp):
    U = {}
    policy = {}
    for s in mdp.allStates:
        if s in mdp.terminals:
            U[s] = mdp.rewards[s]
            policy[s] = (0,0)
        else:
            U[s] = 0
            policy[s] = (0,1)  #default policy is to go up in the grid
    while True:
        U = policyEvaluation(policy, U, mdp)
        unchanged = True
        for s in mdp.allStates:
            if s not in mdp.terminals:
                bestExpected = 0
                preferredAction = (0,0)
                for a in mdp.actionList:
                    expectedUtility = 0
                    for (s1,p) in mdp.getTransitionModel(s,a):
                        expectedUtility = expectedUtility + (p*U[s1])
                        if expectedUtility > bestExpected:
                            bestExpected = expectedUtility
                            preferredAction = a
                if preferredAction != policy[s]:
                    policy[s] = preferredAction    #We improve the policy for state s to action a
                    unchanged = False
        if unchanged:
            return U, policy
    return

def policyEvaluation(policy, U, mdp):
    rewards, gamma = mdp.rewards, mdp.gamma
    k = 20
    for i in range(k):
        for s in mdp.allStates:
            if s not in mdp.terminals:
                U[s] = rewards[s]+(gamma * sum([p * U[s1]
                                                  for (s1, p)
                                                  in mdp.getTransitionModel(s, policy[s])]))
    return U

def main():
    grid43 = gridMDP([[-0.04, -0.04, -0.04, +1],
                      [-0.04, None, -0.04, -1],
                      [-0.04, -0.04, -0.04, -0.04]],
                     (0,0),
                     [(3, 2), (3, 1)],
                     0.99)
    grid33 = gridMDP([[-3, -1, 10],
                      [-1, -1, -1],
                      [-1, -1, -1]],
                     (0,0),
                     [(2,2)],
                     0.99)
    gridToRun = grid43
    finalUtilities, optimalPolicy = modifiedPolicyIteration(gridToRun)
    gridToRun.printUtilities(finalUtilities)
    gridToRun.printGrid()
    gridToRun.printPolicy(optimalPolicy)

if __name__ == '__main__':
    main()

### Value Iteration

In [0]:
def valueIteration(mdp, epsilon):
    U1 = {}
    policy = {}
    rewards, gamma = mdp.rewards, mdp.gamma
    for s in mdp.allStates:
        if s in mdp.terminals:
            U1[s] = mdp.getRewards(s)
            policy[s] = (0,0)
        else:
            U1[s] = 0
            policy[s] = (0,0)
    while True:
        U = U1.copy()
        delta = 0
        for s in mdp.allStates:
            if s not in mdp.terminals:
                preferredAction = (0,0)
                bestUtility = 0
                for a in mdp.actionList:
                    utility = 0
                    for (s1,p) in mdp.getTransitionModel(s,a):
                        utility = utility + (p*U[s1])
                        if utility > bestUtility:
                            bestUtility = utility
                            preferredAction = a
                U1[s] = rewards[s] + gamma * bestUtility
                policy[s] = preferredAction
                delta = max(delta, abs(U1[s] - U[s]))
        if delta < epsilon * (1 - gamma) / gamma:
            return U, policy

def main():
    grid43 = gridMDP([[-0.04, -0.04, -0.04, +1],
                      [-0.04, None, -0.04, -1],
                      [-0.04, -0.04, -0.04, -0.04]],
                     (0,0),
                     [(3, 2), (3, 1)],
                     0.99)
    grid33 = gridMDP([[-3, -1, 10],
                      [-1, -1, -1],
                      [-1, -1, -1]],
                     (0,0),
                     [(2,2)],
                     0.99)
    gridToRun = grid43
    finalUtilities, optimalPolicy = valueIteration(gridToRun, 0.1)
    gridToRun.printUtilities(finalUtilities)
    gridToRun.printGrid()
    gridToRun.printPolicy(optimalPolicy)

if __name__ == '__main__':
    main()

### Q-Learning Agent

In [0]:
import random

'''
Interpreting the different actions is as follows- 
(0,1)  --> Move Up
(0,-1) --> Move Down
(1,0)  --> Move Right
(-1,0) --> Move Left
Interpreting the states is straight forward and similar to what is done in the book AIMA.

'''
class QLearningAgent(gridMDP):
    def __init__(self, grid, initialState, terminals, gamma, alpha, N_e, epsilon, rewardPlus):
        gridMDP.__init__(self, grid, initialState, terminals, gamma)
        self.alpha = alpha
        self.N_e = N_e
        self.Q = {}
        self.N = {}
        self.epsilon = epsilon
        self.rewardPlus = rewardPlus
        for i in self.allStates:
            for j in self.getPossibleActions(i):
                self.Q[(i,j)] = 0
                self.N[(i,j)] = 0
    
    #Overriding the transition Modle method, and returning junk values, becasue transition 
    #model should not be available to the learning agent.
    def getTransitionModel(self, state, action):
        return (state, 0)
    
    #Overriding the doAction method
    def doAction(self, state, action):
        stochasticProb = 0.8
        rightAction = self.rightOf(action)
        leftAction = self.leftOf(action)
        randomNumber = random.random()
        resultState = state
        if randomNumber <= stochasticProb:
            resultState = tuple(map(operator.add, state, action))
        elif randomNumber > stochasticProb and randomNumber < (stochasticProb + 
                                                               ((1-stochasticProb)/2)):
            resultState = tuple(map(operator.add, state, rightAction))
        else:
            resultState = tuple(map(operator.add, state, leftAction))
        
        if resultState not in self.allStates:
            resultState = state
        return resultState
    
    def getActionToPerform(self, curState):
        dictActions = {}
        for a in self.getPossibleActions(curState):
            if self.N[(curState,a)] < self.N_e:
                dictActions[self.rewardPlus] = a
            else:
                dictActions[self.Q[(curState,a)]] = a
        return dictActions[max(dictActions)]
    
    def getAlternateActionToPerform(self, state):
        dictActions = {}
        for a in self.getPossibleActions(state):
            dictActions[self.Q[(state,a)]] = a
        
        randomNumber = random.random()
        if randomNumber < self.epsilon:
            action = random.choice(self.getPossibleActions(state))
            if self.N[(state,action)] > self.N_e:
                return dictActions[max(dictActions)]
            else:
                return action
        else:
            return dictActions[max(dictActions)]
    
    #* self.N[(curState,action)] 
    def getUpdateValue(self, resultState, curState, action):
        value = self.alpha * (self.getRewards(curState) +
                              (self.gamma * self.getMaxQValue(resultState)) -
                              self.Q[(curState, action)])
        return value
    
    def makeUpdate(self, resultState, curState, actionTaken):
        updateValue = self.getUpdateValue(resultState, curState, actionTaken)
        self.Q[(curState,actionTaken)] += updateValue
        return
    
    def getMaxQValue(self, state):
        listOfQValues = []
        for i in self.getPossibleActions(state):
            listOfQValues.append(self.Q[(state, i)])
        return max(listOfQValues)

In [0]:
import matplotlib.pyplot as pyplot
%matplotlib inline
def main():
    qGrid43 = QLearningAgent([[-0.04, -0.04, -0.04, +1],
                              [-0.04, None, -0.04, -1],
                              [-0.04, -0.04, -0.04, -0.04]],
                            (0,0),
                            [(3, 2), (3, 1)],
                            0.9,#gamma
                            0.60, #alpha
                            10,  #N_e
                            0.6, #epsilon, greater the value of epsilon greater the random actions
                            2) #rewardPlus
    
    #Another Trial Grid
    qGrid33 = QLearningAgent([[-3, -1, 5],
                              [-1, -1, -1],
                              [-1, -1, -1]],
                            (0,0),
                            [(2,2)],
                            0.9,#gamma
                            0.10, #alpha
                            30,  #N_e
                            0.9, #epsilon, greater the value of epsilon greater the random actions
                            2) #rewardPlus
    
    qGridToRun = qGrid43
    
    iterations = 500
    x = []
    y1 = []
    y2 = []
    y3 = []
    for i in range(iterations):
        curState = qGridToRun.initialState
        while curState not in qGridToRun.terminals:
            a = qGridToRun.getActionToPerform(curState)
            #a = qGridToRun.getAlternateActionToPerform(curState)
            resultState = qGridToRun.doAction(curState, a)
            qGridToRun.N[(curState,a)] += 1
            qGridToRun.makeUpdate(resultState, curState, a) 
            curState = resultState
        qGridToRun.Q[(curState,a)] = qGridToRun.getRewards(curState)
        x.append(i)
        y1.append(qGridToRun.Q[(2,2),(1,0)])
        y2.append(qGridToRun.Q[(0,0),(0,1)])
        y3.append(qGridToRun.Q[(0,2),(1,0)])
    
    plotRMS(x,y1,'r')
    plotRMS(x,y2,'b')
    plotRMS(x,y3,'g')
    pyplot.show()
        
    for s in qGridToRun.allStates:
        if s not in qGridToRun.terminals:
            print(s, qGridToRun.getActionToPerform(s))
    
def plotRMS(x,y,c):
    avg = sum(y)/len(y)
    rmsV = []
    for i in y:
        rmsV.append(((i-avg) ** 2) ** 0.5)
    pyplot.plot(x,rmsV,c)

if __name__ == '__main__':
    main()   

In [0]:
import mxnet