%%

In [None]:
from gym import Env
from gym.spaces import Discrete, Box, Dict
import numpy as np
from collections import defaultdict
import matplotlib as plt
import plottin
import itertools

%%

In [None]:
class WarehouseAgent(Env):
    def __init__(self):
        self.GRID_DIM = [7,6]
        self.agent_position = [1,2]
        self.box_location = [4,3]
        self.goal_location = [3,1]
        self._action_to_direction = {
            0: np.array([-1, 0]),
            1: np.array([1, 0]),
            2: np.array([0, -1]),
            3: np.array([0, 1]),
        }
        self._ACTIONLOOKUP = {
            0: 'move up',
            1: 'move down',
            2: 'move left',
            3: 'move right',
            4: 'push'
            }
        self.GRID_DIM = np.asarray(self.GRID_DIM)
        self.GRID = np.zeros(self.GRID_DIM ) # The Boundaries are the walls, so playing space is only [:-2,:-2] 
        self.GRID[:,[0,-1]] = 1
        self.GRID[[0,-1],:] = 1
        self.GRID[[1,2,5],3:5] = 1
        self.walls = 1
        self.action_space = Discrete(len(self._ACTIONLOOKUP.keys()))
        self.state_space = Discrete(self.GRID_DIM[0]*self.GRID_DIM[1])
        self.observation_space = Dict(
            {
                "agent": Box(np.array([0,0]), np.array([self.GRID_DIM[0]-1,self.GRID_DIM[1] - 1]), shape=(2,), dtype=int),
                'box' : Box( np.array([0,0]), np.array([self.GRID_DIM[0]-1,self.GRID_DIM[1] - 1]), shape=(2,), dtype=int),
                "target": Box( np.array([0,0]), np.array([self.GRID_DIM[0]-1,self.GRID_DIM[1] - 1]), shape=(2,), dtype=int),
            })
        self._agent_location = np.array(self.agent_position)
        self._box_location = np.array(self.box_location)
        self._target_location = np.array(self.goal_location) 
            
    def step(self, action):
        self._prev_agent_location = None
        self._prev_box_location = None
        moved_box = False
        if action<4:
            moved_player = self._move(action)
        else:
            moved_player, moved_box = self._push(action)
            
        done, reward = self.is_over()            
        observation = self._get_obs()
        info = self._get_info()
        
        return observation, reward, done, info      
        
    def render(self):
        rend = self.GRID.copy().astype(dtype='U1')
        rend[self._agent_location[0],self._agent_location[1]] = 'A'
        rend[self._box_location[0],self._box_location[1]] = 'B'
        rend[self._target_location[0],self._target_location[1]] = 'T'
        return rend
        
    def reset(self,seed = None, return_info = False, options = None):
        self._agent_location = np.array(self.agent_position)
        self._box_location = np.array(self.box_location)
        self._target_location = np.array(self.goal_location)
        
        observation = self._get_obs()
        info = self._get_info()
        return (observation, info) if return_info else observation
        
    def _state_in_seq(self):
        m, n = self._agent_location
        seq = m * self.GRID.shape[1] + n
        return seq
    def _get_obs(self):
        return {"agent":self._agent_location,"box": self._box_location,"target":self._target_location}
    def _get_info(self):
        return {'distance': np.linalg.norm(self._box_location - self._target_location,ord = 1)}
    def _push(self,action):
        loc = self._box_location - self._agent_location
#         print(f'loc{loc}, box :{self._box_location}, agent:{self._agent_location}')
        push_dir = None
        for idx,val in enumerate(self._action_to_direction.values()):
            if np.array_equal(loc,val):
                valid = True
                push_dir = idx
                break
            else :
                valid = False
            
        if valid:
            self._prev_agent_location = self._agent_location
            self._prev_box_location = self._box_location
            self._box_location = self._box_location + self._action_to_direction[push_dir]
            if self.GRID[self._box_location[0],self._box_location[1]] == 1:
                self._box_location = self._prev_box_location
                return False, False
            else:
                self._agent_location = self._agent_location + self._action_to_direction[push_dir]
                return True, True
        
        return False, False
            
    def _move(self,action):
            self._prev_agent_location = self._agent_location
            self._prev_box_location = self._box_location
            self._agent_location = self._agent_location + self._action_to_direction[action]
#             print(self.GRID[self._agent_location],self._agent_location,self.GRID)
            if self.GRID[self._agent_location[0],self._agent_location[1]] == 1:
                self._agent_location = self._prev_agent_location
                return False
            elif np.array_equal(self._agent_location, self._box_location):
                self._agent_location = self._prev_agent_location
                return False
            return True
            
    def is_over(self):
        if np.array_equal(self._box_location, self._target_location):
            done = True
            reward = 0
        elif sum([True if self.GRID[(self._box_location + val)[0],(self._box_location + val)[1]] == 1 else False for val in self._action_to_direction.values()])>1 :
            done = True
            reward = -1
        else: 
            done = False
            reward = -1
        return done , reward

%%

In [None]:
env = WarehouseAgent()
env._get_obs()

%%

In [None]:
env._state_in_seq()

%% [markdown]<br>
SARSA

%%

In [None]:
env = WarehouseAgent()
n_states, n_actions = env.state_space.n, env.action_space.n
Q = np.zeros((n_states, n_actions))
def ep_greedy(env,Q,epsilon=0.9):
    seq = env._state_in_seq()
    if np.random.random()<epsilon:
#         print(Q[seq,:])
        x=(Q[seq,:]!=0).all()
#         print(x,'here')
        if x :           
            action = np.argmax(Q[seq,:])
#             print('h')
        else:
            action = np.where(Q[seq,:]==0)[0]
#             print(action)
            action=action[0]
#             print('why')
    else:
        action = np.random.randint(env.action_space.n)
#     print(action)
    return action
def Sarsa(env,alpha, gamma, epsilon, episodes, max_steps):
    timestep_reward = []
    for ep in range(episodes):
        env.reset()
        done = False
        total_reward = 0        
        curr_state = env._state_in_seq()
        curr_a = ep_greedy(env,Q)
        t = 0
        while not done :
            obs, reward, done, info = env.step(curr_a)
            next_state = env._state_in_seq()
            total_reward+= reward
            next_act = ep_greedy(env,Q)
    #         s = curr_state 
    #         s_ = next_state 
    #         a_ = next_act
            t+=1
            Q[curr_state, curr_a] += alpha * ( reward + (gamma * Q[next_state, next_act] ) - Q[curr_state, curr_a] )
            curr_state = next_state
            curr_a = next_act
    #         print(reward)
#             env.render()
        print(t)
        timestep_reward.append(total_reward)
    return timestep_reward

%%

In [None]:
epsilon = 1e-2
epsisodes=100
max_steps = 10
alpha = 0.5
gamma = 0.95

In [None]:
Sarsa(epsilon=epsilon, alpha=alpha, gamma=gamma, max_steps=max_steps, env=env,episodes=epsisodes)

%% [markdown]<br>
Q-Learning

%%

In [None]:
def createEpsilonGreedyPolicy(Q, epsilon, num_actions):
	"""
	Creates an epsilon-greedy policy based
	on a given Q-function and epsilon.
	
	Returns a function that takes the state
	as an input and returns the probabilities
	for each action in the form of a numpy array
	of length of the action space(set of possible actions).
	"""
	def policyFunction(state):

In [None]:
		Action_probabilities = np.ones(num_actions,
				dtype = float) * epsilon / num_actions
				
		best_action = np.argmax(Q[state])
		Action_probabilities[best_action] += (1.0 - epsilon)
		return Action_probabilities

In [None]:
	return policyFunction

%%

In [None]:
def qLearning(env, num_episodes=100, discount_factor = 1.0,
							alpha = 0.6, epsilon = 0.1):
	"""
	Q-Learning algorithm: Off-policy TD control.
	Finds the optimal greedy policy while improving
	following an epsilon-greedy policy"""
	
	# Action value function
	# A nested dictionary that maps
	# state -> (action -> action-value).
	Q = defaultdict(lambda: np.zeros(env.action_space.n))

In [None]:
	# Keeps track of useful statistics
	stats = plottin.EpisodeStats(
		episode_lengths = np.zeros(num_episodes),
		episode_rewards = np.zeros(num_episodes))	
	
	# Create an epsilon greedy policy function
	# appropriately for environment action space
	policy = createEpsilonGreedyPolicy(Q, epsilon, env.action_space.n)
	
	# For every episode
	for ith_episode in range(num_episodes):
		
		# Reset the environment and pick the first action
		reset_state = env.reset()
		state = env._state_in_seq()
		
		for t in itertools.count():
			
			# get probabilities of all actions from current state
			action_probabilities = policy(state)

In [None]:
			# choose action according to
			# the probability distribution
			action = np.random.choice(np.arange(
					len(action_probabilities)),
					p = action_probabilities)

In [None]:
			# take action and get reward, transit to next state
			obs, reward, done, info = env.step(action)

In [None]:
			next_state = env._state_in_seq()

In [None]:
			# Update statistics
			stats.episode_rewards[ith_episode] += reward
			stats.episode_lengths[ith_episode] = t
			
			# TD Update
			best_next_action = np.argmax(Q[next_state])	
			td_target = reward + discount_factor * Q[next_state][best_next_action]
			td_delta = td_target - Q[state][action]
			Q[state][action] += alpha * td_delta

In [None]:
			# done is True if episode terminated
			if done:
				break
				
			state = next_state
	
	return Q, stats

%%

In [None]:
env = WarehouseAgent()
qLearning(env=env)