In [2]:
import numpy as np
import random
import copy

Actions:
* 0 - Left
* 1 - Down
* 2 - Right
* 3 - Up

Agent Position:  1.1

In [3]:
# Defining GridWorld Environment Class

class myGridWorld:

    size=5                        # 5x5 grid
    RewardGrid=np.zeros([5,5])    # grid representing rewards
    RewardGrid[0][4]=1            # sets reward in the top-right cell to 1
    PositionGrid=np.zeros([5,5])  # grid representing current position of agent
    PositionGrid[4][0]=1.1        # sets agent's initial position in the bottom-left cell
    action_space=4                # no. of possible actions
    noisyMoveChance=0.3           # probability of noisy move
    currI=4                       # row index
    currJ=0                       # col index
    DoneStatus=False              # whether the episode is terminated
    EnableNoise=True              # enable or disable noise
    observation_spaces=size*size  # total no. of observations

    # initialize the environment with default values
    def __init__(self,size=5,noisyMoveChance=0.3,EnableNoise=True):
        self.basicReset()
        self.EnableNoise=EnableNoise
        if(0<size):
            self.size=int(size)
            self.RewardGrid=np.zeros([size,size])
            self.RewardGrid[0][size-1]=1
            self.PositionGrid=np.zeros([size,size])
            self.PositionGrid[size-1][0]=1.1
            self.observation_spaces=self.size*self.size
            self.currI=size-1
            self.currJ=0
            self.observation_spaces=self.size*self.size
        if(0<noisyMoveChance and noisyMoveChance<1):     # probability value
            self.noisyMoveChance=noisyMoveChance

    # resets the environment to its initial state
    def basicReset(self):
          self.size=5
          self.RewardGrid=np.zeros([5,5])
          self.RewardGrid[0][4]=1
          self.PositionGrid=np.zeros([5,5])
          self.PositionGrid[4][0]=1.1
          self.action_space=4
          self.noisyMoveChance=0.3
          self.currI=4
          self.currJ=0
          self.DoneStatus=False
          self.EnableNoise=True
          self.observation_spaces=self.size*self.size

    # reset environment with parameters
    def reset(self,size=5,noisyMoveChance=0.3,EnableNoise=True):
        self.__init__(size,noisyMoveChance,EnableNoise)
        return self.currI*self.size+self.currJ         #current state of agent

    # print the reward grid
    def printRewardGrid(self):
        for i in range(len(self.RewardGrid)):
            for j in range(len(self.RewardGrid[0])):
                print(self.RewardGrid[i][j],end=' ')
            print()

    # print the position grid
    def printPositionGrid(self):
        for i in range(len(self.PositionGrid)):
            for j in range(len(self.PositionGrid[0])):
                print(self.PositionGrid[i][j],end=' ')

    # print the current state of the position grid
    def render(self):
        self.printPositionGrid()

    # return position grid
    def getPositionGrid(self):
        return self.PositionGrid

    # return no. of actions
    def getAvailableMoves(self):
        return self.action_space

    # return size of grid
    def getSize(self):
        return self.size

    # takes an action and updates the agent's position
    def move(self,action):
        randNum=random.random()
        if(self.EnableNoise and randNum<=self.noisyMoveChance):
            self.makeNoisyMove(action)
        else:
            self.makeProperMove(action)
        return self.currI,self.currJ,self.currI*self.size+self.currJ,self.RewardGrid[self.currI][self.currJ],self.DoneStatus

    # noisy move with random action
    def makeNoisyMove(self,action):
        randNum=random.randint(0,3)
        self.makeProperMove(randNum)

    # proper move based on given action
    def makeProperMove(self,action):
        if(action==0):  # Left
            if(0<self.currJ):
                self.PositionGrid[self.currI][self.currJ]=0
                self.currJ-=1
                self.PositionGrid[self.currI][self.currJ]=1.1

        elif(action==1):  # Down
            if(self.currI<self.size-1):
                self.PositionGrid[self.currI][self.currJ]=0
                self.currI+=1
                self.PositionGrid[self.currI][self.currJ]=1.1

        elif(action==2):  # Right
            if(self.currJ<self.size-1):
                self.PositionGrid[self.currI][self.currJ]=0
                self.currJ+=1
                self.PositionGrid[self.currI][self.currJ]=1.1

        elif(action==3):  # Up
            if(0<self.currI):
                self.PositionGrid[self.currI][self.currJ]=0
                self.currI-=1
                self.PositionGrid[self.currI][self.currJ]=1.1

        if(self.currI==0 and self.currJ==self.size-1):   # termination condition reached
            self.DoneStatus=True

    # call move method on action and return output of it
    def step(self,action):
        return self.move(action)

In [20]:
# Define Q-Learning model training class

class myGridWorldTrainer:

    env=[]                # an instance of environment
    Q=[]                  # q value matrix
    matrix=[]             # matrix with actions corresponding to the highest q values for each state
    Trajectories=[]       # list of trajectories obtained during training
    DirectionalMatrix=[]  # matrix with arrows based on the highest q values

    # train a q learning model
    def trainModel(self,model):
        env=self.env
        alpha = 0.6    # learning rate
        gamma = 0.9    # discount factor
        Q = np.zeros([env.observation_spaces, env.action_space])

        for episode in range(1,10001):
            done = False
            TotalReward = 0
            state = env.reset()    # reset env

            while done != True:
                if(episode<500):    # epsilon-greedy strategy
                    action = random.randint(0,3)
                else:
                    action=np.argmax(Q[state])
                    i,j,state2, reward, done = env.step(action)     # takes an action
                    Q[state,action] += alpha * (reward + gamma* np.max(Q[state2]) - Q[state,action])    #update q value
                    TotalReward += reward
                    state = state2

        self.Q=Q    # learned q values matrix
        return Q

    # get optimal directions from learned q values
    def getDirections(self,Q):
        matrix=[]

        for i in range(0,25):
            matrix.append(np.argmax(Q[i]))      # appends the index of the action with maximum Q-value
        matrix=np.reshape(matrix,(5,5))

        DirectionalMatrix=[]
        for i in range(5):
            row=[]
            for j in range(5):
                if(matrix[i][j]==0):
                    row.append('\u2190')    #left symbol
                elif(matrix[i][j]==1):
                    row.append('\u2193')    #down symbol
                elif(matrix[i][j]==2):
                    row.append('\u2192')    #right symbol
                elif(matrix[i][j]==3):
                    row.append('\u2191')    #up symbol
            DirectionalMatrix.append(row)

        self.DirectionalMatrix=DirectionalMatrix
        self.matrix=matrix
        return matrix

    # generate trajectories based on optimal actions
    def getTrajectories(self,matrix,numTrajectories):
        Trajectories=[]

        for iters in range(numTrajectories):
            path=[]       # list for a single trajectory
            done=False
            state = self.env.reset()
            TotalReward = 0
            path.append(state)
            i=int(state/self.env.size)    # row index
            j=state%self.env.size         # col index

            # trajectory loop
            while done != True:
                action=matrix[i][j]       # retrieve action
                i,j,state2, reward, done = self.env.step(action)    # take action
                TotalReward += reward
                state = state2          # update state
                path.append(state)

            Trajectories.append(path)

        self.Trajectories=Trajectories
        return Trajectories

    # all training functions
    def allInOne(self,model,numTrajectories):
        self.env=model
        Q=self.trainModel(model)
        matrix=self.getDirections(Q)
        return self.getTrajectories(matrix,numTrajectories)