Let actions be encoded as:

Left --> 0

Up --> 1

Right --> 2

Down --> 3

# Import

In [1]:
import numpy as np

# Environment

In [100]:
"""
Definition of the Warehouse Agent environment
"""

class WarehouseAgent():
    def __init__(self):
        """
        Initializing the environment
        """
        self.GRID_DIM = [6,7] # columns, rows
        self.agent_position = [1,2]
        self.box_location = [4,3]
        self.goal_location = [3,1]

        # a --> agent
        # w --> wall
        # e --> empty
        # b --> box
        # g --> goal
        self.game = [['w', 'w', 'w', 'w', 'e', 'e'],
                     ['w', ' ', 'A', 'w', 'e', 'e'],
                     ['w', ' ', ' ', 'w', 'w', 'w'],
                     ['w', 'G', ' ', ' ', ' ', 'w'],
                     ['w', ' ', ' ', 'B', ' ', 'w'],
                     ['w', ' ', ' ', 'w', 'w', 'w'],
                     ['w', 'w', 'w', 'w', 'e', 'e']
                    ]
     
    def reset(self):
        """Function to reset the environment at the end of each episode to its initial state configuration
        Returns:
            state: the state of the environment reset to its initial conditions
        """
        # self.box_location = [2,2]
        # print(self.box_location)
        self.__init__()
        return self
    

    def step(self, action):
        """Function to control and evaluate the agents' action
        Args:
            action: pass on the action which the agent needs to take at that time step
        Returns:
            new_state: the new state agent reaches after taking the action
            reward: the reward obtained on taking the action
            done: boolean value to determine if episode terminating condition is reached
        """

        agent_x, agent_y = self.agent_position
        box_x, box_y = self.box_location

        f = 1 # flag to indicate invalid move (no change in state)

        # LEFT
        if action == 0:

            if agent_y == 1: # 1st column
                f = 0

            else:
                if box_y == agent_y - 1:
                    if box_x == agent_x:
                        if box_y == 1:
                            f = 0
                        else:
                            
                            self.box_location[1] -= 1
                            self.agent_position[1] -= 1

                            # updates
                            self.game[agent_x][agent_y] = ' '
                            self.game[agent_x][agent_y-1] = 'A'
                            self.game[box_x][box_y-1] = 'B'
                            # print('moved left1')
                    else:

                        self.game[agent_x][agent_y] = ' '
                        self.game[agent_x][agent_y-1] = 'A'
                        self.agent_position[1] -= 1
                        # print('moved left2')
                else:
                
                    self.game[agent_x][agent_y] = ' '
                    self.game[agent_x][agent_y-1] = 'A'
                    self.agent_position[1] -= 1
                    # print('moved left3')


        # UP
        elif action == 1:
            if agent_x == 1:
                f = 0
            elif agent_x == 3 and agent_y in [3,4]:
                f = 0
            else:
                if box_x == agent_x - 1:
                    if box_y == agent_y:
                        if self.game[box_x-1][box_y] == 'w':
                            f = 0
                            # print('cant go up')
                        else:
                            self.box_location[0] -= 1
                            self.agent_position[0] -= 1
                            
                            # updates
                            self.game[agent_x][agent_y] =' '
                            self.game[agent_x - 1][agent_y] = 'A'
                            self.game[box_x-1][box_y] = 'B'
                            # print('moved up')
                    else:
                        self.agent_position[0] -= 1
                        
                        # updates
                        self.game[agent_x][agent_y] =' '
                        self.game[agent_x - 1][agent_y] = 'A'
                        # print('only agent up1')
                else:
                    self.agent_position[0] -= 1
                    
                    # updates
                    self.game[agent_x][agent_y] =' '
                    self.game[agent_x - 1][agent_y] = 'A'
                    # print('only agent up')

        
        # RIGHT
        elif action == 2:
            if agent_x in [3,4] and agent_y == 4:
                f = 0
            elif agent_x in [1,2,5] and agent_y == 2:
                f = 0
            else:
                if agent_y == box_y - 1:
                    if box_x == agent_x:
                        if self.game[box_x][box_y+1] == 'w':
                            f = 0
                        else:
                            self.box_location[1] += 1
                            self.agent_position[1] += 1

                            # updates
                            self.game[agent_x][agent_y] = ' '
                            self.game[agent_x][agent_y+1] = 'A'
                            self.game[box_x][box_y+1] = 'B'

                    else:
                        self.agent_position[1] += 1

                        # updates
                        self.game[agent_x][agent_y] = ' '
                        self.game[agent_x][agent_y+1] = 'A'
                else:
                    self.agent_position[1] += 1

                    # updates
                    self.game[agent_x][agent_y] = ' '
                    self.game[agent_x][agent_y+1] = 'A'
        # DOWN
        elif action == 3:
            if agent_x == 5:
                f = 0
            elif agent_x == 4 and agent_y in [3,4]:
                f = 0
            else:
                if box_x == (agent_x + 1):
                    if box_y == agent_y:
                        if self.game[box_x+1][box_y] == 'w':
                            # print(1)                                          COMMENTED
                            f = 0
                        else:
                            self.box_location[0] += 1
                            self.agent_position[0] += 1
                            
                            # updates
                            self.game[agent_x][agent_y] =' '
                            self.game[box_x+1][box_y] = 'B'
                            self.game[agent_x+1][agent_y] = 'A'
                            # print('moved-down1')
                    else:
                        # updates
                        self.game[agent_x][agent_y] =' '
                        self.game[agent_x+1][agent_y] = 'A'
                        # print('moved-down2')
                        
                        self.agent_position[0] += 1

                
                else:
                    # updates
                    self.game[agent_x][agent_y] =' '
                    self.game[agent_x+1][agent_y] = 'A'
                    # print('moved-down3')
                        
                    self.agent_position[0] += 1
        # if f == 0:                                                            COMMENTED
            # print("Same state")                                               COMMENTED
        done = False
        if self.box_location == [3,1]:
            reward = 0
            done = True
            # print("Well done")                                                COMMENTED

        else:
            reward = -1
        # print(self.box_location)
        if self.box_location in [[1,1], [1,2], [5,1], [5,2], [3,4], [4,4]]:
            done = True
            # print('Mission Failed')                                           COMMENTED

        return self.agent_position, reward, done


        # failed attempt
        # if action == 0:
        #     if self.agent_position[1] != 1: # 2nd column can't go left.
        #         a, b = self.agent_position

        #         if self.box_location == [a,b-1]: # If box is on the cell left to the agent.
        #             if b != 2: # agent is 2 away from wall
        #                 self.agent_position[1] -= 1
        #                 self.box_location[1] -= 1
        #         else:
        #             self.agent_position[1] -= 1

        # elif action == 1:
        #     if self.agent_position[0] != 1: # 2nd row can't go up.
        #         a,b = self.agent_position

        #         if self.box_location == [a-1,b]: # If box is on the cell above the agent.
        #             if a != 2: # agent is 2 away from wall
        #                 self.agent_position[0] -= 1
        #                 self.box_location[0] -= 1
                
        #         elif self.agent_position not in ([3,3], [3,4]):
        #             self.agent_position[0] -= 1
        
        
        # elif action == 2:
        #     if ((self.agent_position[0] in (1,2,5)) and self.agent_position[1] == 2) or ((self.agent_position[0] in (3,4)) and self.agent_position[1] == 4): # removing all base cases
        #         self.agent_position = self.agent_position
        #     else:
        #         if self.agent_position[1] == 1 and self.box_location[1] == 2 and self.agent_position[0] in :

        # pass
    


    def render(self):
        """Function to get the simulation of the warehouse agent system 
        """
        
        for i in range(7):
            for j in range(6):
                print(self.game[i][j], end = ',')
            print('\n')


agent = WarehouseAgent()

The above is the same code from assignment 2, except for a few print statements being commented, as they are not necessary (Have been mentioned as new comments to the side)

# Assignment 3

# 1

## Global variable initialisation

In [101]:
# Global variables

alpha = 0
EPS = 0
GAMMA = 0
xcoord = []
ycoord = []
actionSpace = []
stateSpace = []
returns = {}
pairsVisited = {}
numEpisodes = 0

In [113]:
def initialise():
    global EPS, GAMMA, xcoord, ycoord, actionSpace, stateSpace, returns, pairsVisited, Q, C, numEpisodes
    EPS = 0.05
    GAMMA = 0.9
    alpha = 0.85
        
    xcoord = [i for i in range(7)]

    ycoord = [i for i in range(6)]

    actionSpace = [0, 1, 2, 3]

    stateSpace = []
    returns = {}
    pairsVisited = {}

    Q = {}
    C = {}
    for i in xcoord:
        for j in ycoord:
            for action in actionSpace:
                Q[((i, j), action)] = 0
                C[((i, j), action)] = 0
            stateSpace.append((i, j))

   
    
    numEpisodes = 100
    

## Off policy Monte Carlo

In [114]:
initialise()

agent.reset()

targetPolicy = {}
for state in stateSpace:
    values = np.array([Q[(state, a)] for a in actionSpace])
    best = np.random.choice(np.where(values==values.max())[0])
    targetPolicy[state] = actionSpace[best]

for i in range(numEpisodes):
    memory = []
    behaviourPolicy = {}
    for state in stateSpace:
        rand = np.random.random()
        if rand < 1 - EPS:
            behaviourPolicy[state] = [targetPolicy[state]]
        else:
            behaviourPolicy[state] = actionSpace
            
    observation = (1,2)
    done = False
    count_steps = 0
    max_steps = 100
    while not done:
        if count_steps == max_steps:            # run max for 100 steps else terminate episode
            break
        count_steps += 1
        # print(behaviourPolicy)
        # print(observation)
        action = np.random.choice(behaviourPolicy[observation])
        # print(action)
        observation_, reward, done = agent.step(action)
        memory.append((observation[0], observation[1], action, reward))
        observation = tuple(observation_)
        # print(observation_)
    memory.append((observation[0], observation[1], action, reward))
    
    G = 0
    W = 1
    last = True
    for x, y, action, reward in reversed(memory):
        sa = ((x,y), action)
        if last:
            last = False
        else:
            C[sa] += W
            Q[sa] += (W / C[sa])*(G - Q[sa])
            values = np.array([Q[(state, a)] for a in actionSpace])
            best = np.random.choice(np.where(values == values.max())[0])
            targetPolicy[state] = actionSpace[best]
            if action != targetPolicy[state]:
                break
            if len(behaviourPolicy[state]) == 1:
                prob = 1 - EPS
            else:
                prob = EPS / len(behaviourPolicy[state])
            W *= 1/prob
        G = GAMMA*G + reward
        
    
    if EPS > 0.0001:
        EPS -= 0.0001
    else:
        EPS = 0
        
        
print("Best selected policy :", targetPolicy)

Best selected policy : {(0, 0): 0, (0, 1): 3, (0, 2): 0, (0, 3): 1, (0, 4): 0, (0, 5): 1, (1, 0): 1, (1, 1): 1, (1, 2): 1, (1, 3): 2, (1, 4): 0, (1, 5): 1, (2, 0): 3, (2, 1): 3, (2, 2): 3, (2, 3): 2, (2, 4): 3, (2, 5): 1, (3, 0): 3, (3, 1): 0, (3, 2): 1, (3, 3): 0, (3, 4): 0, (3, 5): 2, (4, 0): 2, (4, 1): 3, (4, 2): 1, (4, 3): 3, (4, 4): 2, (4, 5): 0, (5, 0): 2, (5, 1): 2, (5, 2): 0, (5, 3): 3, (5, 4): 0, (5, 5): 2, (6, 0): 0, (6, 1): 3, (6, 2): 2, (6, 3): 0, (6, 4): 0, (6, 5): 2}


## On policy Monte Carlo

In [115]:
initialise()

agent.reset()

returns = {}
pairsVisited = {}


for i in xcoord:
    for j in ycoord:
        for action in actionSpace:
            returns[((i, j), action)] = 0
            pairsVisited[((i, j), action)] = 0

policy = {}
for state in stateSpace:
    policy[state] = np.random.choice(actionSpace)
    
for i in range(numEpisodes):
    statesActionsReturns = []
    memory = []
    observation = (1,2)
    done = False
    count_steps = 0
    max_steps = 100
    while not done:
        if count_steps == max_steps:      
            break
        count_steps += 1
        action = policy[tuple(observation)]
        observation_, reward, done= agent.step(action)
        memory.append((observation[0], observation[1], action, reward))
        observation = observation_
    memory.append((observation[0], observation[1], action, reward))
    
    G = 0
    last = True
    for x, y, action, reward in reversed(memory):
        if last:
            last = False
        else:
            statesActionsReturns.append((x, y, action, G))
        G = GAMMA*G + reward
        
        
    statesActionsReturns.reverse()
    statesActionsVisited = []
        
        
    for x, y, action, G in statesActionsReturns:
        sa = ((x, y), action)
        if sa not in statesActionsVisited:
            pairsVisited[sa] += 1
            
            returns[(sa)] += (1 / pairsVisited[(sa)])*(G-returns[(sa)])
            Q[sa] = returns[sa]
            rand = np.random.random()
            if rand < 1 - EPS:
                state = (x, y)
                values = np.array([Q[(state, a)] for a in actionSpace])
                best = np.random.choice(np.where(values == values.max())[0])
                policy[state] = actionSpace[best]
            else:
                policy[state] = np.random.choice(actionSpace)
            statesActionsVisited.append(sa)
   
    if EPS > 0.0001:
        EPS -= 0.0001
    else:
        EPS = 0

print("Best selected policy :", policy)

Best selected policy : {(0, 0): 1, (0, 1): 3, (0, 2): 3, (0, 3): 1, (0, 4): 3, (0, 5): 1, (1, 0): 0, (1, 1): 3, (1, 2): 2, (1, 3): 2, (1, 4): 0, (1, 5): 0, (2, 0): 0, (2, 1): 0, (2, 2): 1, (2, 3): 1, (2, 4): 2, (2, 5): 0, (3, 0): 0, (3, 1): 1, (3, 2): 1, (3, 3): 0, (3, 4): 2, (3, 5): 2, (4, 0): 2, (4, 1): 1, (4, 2): 2, (4, 3): 2, (4, 4): 1, (4, 5): 2, (5, 0): 3, (5, 1): 0, (5, 2): 3, (5, 3): 1, (5, 4): 1, (5, 5): 1, (6, 0): 2, (6, 1): 0, (6, 2): 3, (6, 3): 0, (6, 4): 2, (6, 5): 2}


## SARSA

In [116]:
def select(state):
    rand = np.random.random()
    if rand < 1 - EPS:
        # state = (x, y)
        # print(state)
        values = np.array([Q[(state, a)] for a in actionSpace])
        best = np.random.choice(np.where(values == values.max())[0])
        policy[state] = actionSpace[best]
    else:
        policy[state] = np.random.choice(actionSpace)
    return policy[state]

In [117]:
initialise()

agent.reset()

policy = {}
for state in stateSpace:
    policy[state] = np.random.choice(actionSpace)

    
for i in range(numEpisodes):
    statesActionsReturns = []
    memory = []
    observation = (1,2)
    done = False
    count_steps = 0
    max_steps = 100
    while not done:
        if count_steps == max_steps:
            break
        count_steps += 1
        action = select(observation)
        observation_, reward, done = agent.step(action)
        memory.append((observation[0], observation[1], action, reward))
        observation = tuple(observation_)
    memory.append((observation[0], observation[1], action, reward))
    
    last = True
    for x, y, action, reward in reversed(memory):
        if last:
            last = False
        else:
            statesActionsReturns.append((x, y, action, reward))
        
        
    statesActionsReturns.reverse()
    statesActionsVisited = []
        
    
    x0, y0, action0, reward0 = statesActionsReturns[0]    
    sa = ((x0, y0), action0)
    
    for x, y, action, reward in statesActionsReturns:
        sa_ = ((x, y), action)
        if sa_ not in statesActionsVisited:
            Q[sa] += alpha * (reward + (GAMMA * Q[sa_]) - Q[sa])
            sa = sa_
            statesActionsVisited.append(sa)
            
    if EPS > 0.0001:
        EPS -= 0.0001
    else:
        EPS = 0
        
print("Best selected policy :", policy)

Best selected policy : {(0, 0): 0, (0, 1): 0, (0, 2): 0, (0, 3): 3, (0, 4): 2, (0, 5): 1, (1, 0): 0, (1, 1): 2, (1, 2): 1, (1, 3): 1, (1, 4): 1, (1, 5): 1, (2, 0): 2, (2, 1): 1, (2, 2): 3, (2, 3): 3, (2, 4): 1, (2, 5): 1, (3, 0): 1, (3, 1): 2, (3, 2): 3, (3, 3): 0, (3, 4): 3, (3, 5): 1, (4, 0): 0, (4, 1): 3, (4, 2): 2, (4, 3): 2, (4, 4): 3, (4, 5): 2, (5, 0): 2, (5, 1): 2, (5, 2): 1, (5, 3): 1, (5, 4): 2, (5, 5): 2, (6, 0): 3, (6, 1): 1, (6, 2): 3, (6, 3): 1, (6, 4): 1, (6, 5): 0}


## Q Learning

In [118]:
initialise()

agent.reset()

policy = {}
for state in stateSpace:
    policy[state] = np.random.choice(actionSpace)

for i in range(numEpisodes):
    statesActionsReturns = []
    memory = []
    observation = (1,2)
    done = False
    count_steps = 0
    max_steps = 100
    while not done:
        if count_steps == max_steps:
            break
        count_steps += 1
        action = policy[observation]
        observation_, reward, done = agent.step(action)
        memory.append((observation[0], observation[1], action, reward))
        observation = tuple(observation_)
    memory.append((observation[0], observation[1], action, reward))
    
    last = True
    for x, y, action, reward in reversed(memory):
        if last:
            last = False
        else:
            statesActionsReturns.append((x, y, action, reward))
        
        
    statesActionsReturns.reverse()
    statesActionsVisited = []
        
    
    x0, y0, action0, reward0 = statesActionsReturns[0]    
    sa = ((x0, y0), action0)
    
    for x, y, action, reward in statesActionsReturns:
        values = np.array([Q[((x,y), a)] for a in actionSpace])
        a_ = np.where(values == values.max())[0][0]
        sa_ = ((x,y), a_)
        if sa_ not in statesActionsVisited:
            Q[sa] += alpha * (reward + (GAMMA * Q[sa_]) - Q[sa])
            sa = sa_
            rand = np.random.random()
            if rand < 1 - EPS:
                state = (x, y)
                values = np.array([Q[(state, a)] for a in actionSpace])
                best = np.random.choice(np.where(values == values.max())[0])
                policy[state] = actionSpace[best]
            else:
                policy[state] = np.random.choice(actionSpace)
            statesActionsVisited.append(sa)
            
    if EPS > 0.0001:
        EPS -= 0.0001
    else:
        EPS = 0

print("Best selected policy :", policy)

Best selected policy : {(0, 0): 2, (0, 1): 1, (0, 2): 3, (0, 3): 0, (0, 4): 3, (0, 5): 3, (1, 0): 1, (1, 1): 0, (1, 2): 3, (1, 3): 0, (1, 4): 3, (1, 5): 1, (2, 0): 2, (2, 1): 2, (2, 2): 1, (2, 3): 0, (2, 4): 2, (2, 5): 3, (3, 0): 1, (3, 1): 1, (3, 2): 1, (3, 3): 2, (3, 4): 1, (3, 5): 0, (4, 0): 0, (4, 1): 1, (4, 2): 2, (4, 3): 1, (4, 4): 1, (4, 5): 1, (5, 0): 3, (5, 1): 1, (5, 2): 1, (5, 3): 2, (5, 4): 2, (5, 5): 2, (6, 0): 1, (6, 1): 0, (6, 2): 2, (6, 3): 2, (6, 4): 2, (6, 5): 2}


In [119]:
class LinearSoftmaxAgent(object):
    """
        Act with softmax policy. 
        Features are encoded as
        phi(s, a) is a one-hot-encoded vector of states.
    """
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.states = []
        self.actions = []
        self.probs = []
        self.rewards = []
        self.theta = np.random.random(state_size * action_size)
        self.alpha = .01
        self.gamma = .99
        self.pi = []

    def store(self, state, action, prob, reward):
        self.states.append(state)
        self.actions.append(action)
        self.probs.append(prob)
        self.rewards.append(reward)

    def _phi(self, s, a):
        encoded = np.zeros([self.action_size, self.state_size])
        encoded[a] = s
        return encoded.flatten()

    def _softmax(self, s, a):
        return np.exp(self.theta.dot(self._phi(s, a)) / 100)

    def pi(self, s):
        weights = np.empty(self.action_size)
        for a in range(self.action_size):
            weights[a] = self._softmax(s, a)
        return weights / np.sum(weights)

    def act(self, state):
        probs = self.pi(state)
        a = np.random.choices(range(0, self.action_size), weights=probs)
        a = a[0]
        pi = probs[a]
        return (a, pi)

    def _gradient(self, s, a):
        expected = 0
        probs = self.pi(s)
        for b in range(0, self.action_size):
            expected += probs[b] * self._phi(s, b)
        return self._phi(s, a) - expected

    def _R(self, t):
        total = 0
        for tau in range(t, len(self.rewards)):
            total += self.gamma**(tau - t) * self.rewards[tau]
        return total

    def train(self):
        self.rewards -= np.mean(self.rewards)
        self.rewards /= np.std(self.rewards)
        for t in range(len(self.states)):
            s = self.states[t]
            a = self.actions[t]
            r = self._R(t)
            grad = self._gradient(s, a)
            self.theta = self.theta + self.alpha * r * grad

        self.states = []
        self.actions = []
        self.probs = []
        self.rewards = []



In [120]:
SAVE_FREQUENCY = 10
agent.reset()
state = (1,2)
score = 0
episode = 0
prev_frame = None
g = LinearSoftmaxAgent(4, 4)
MAX_EPISODES = 10000

In [None]:
while episode < MAX_EPISODES:  # episode loop
    # agent.render()
    print(state)
    print(g.act(state))
    print(1)
    action, prob = g.act(state)
    state, reward, done = agent.step(action)  # take a random action
    if done:
        reward = -10
    score += reward
    g.store(state, action, prob, reward)

    if done:
        episode += 1
        g.train()
        score = 0
        agent.reset()
        state = (1,2)

In my experiments I noticed that the agent doesn't step down at (2,2) [the cell just above the goal] when implementing Q-Learning and On Policy Monte Carlo. Whereas in off policy Monte Carlo and SARSA, the correct action is taken.

Thus, both are better for this problems according to my experiments.

