In [1]:
%load_ext lab_black

In [2]:
from gym import Env
import gym
import pygame
from gym.spaces import Discrete, Box, Dict
import numpy as np
import random

In [3]:
class WarehouseAgent:
    def __init__(self):
        self.GRID_DIM = [7, 6]

        self.agent_position = [1, 2]

        self.box_location = [4, 3]
        self.goal_location = [3, 1]
        self._action_to_direction = {
            0: np.array([-1, 0]),
            1: np.array([1, 0]),
            2: np.array([0, -1]),
            3: np.array([0, 1]),
        }
        self._ACTIONLOOKUP = {
            0: "move up",
            1: "move down",
            2: "move left",
            3: "move right",
            4: "push",
        }
        self.GRID_DIM = np.asarray(self.GRID_DIM)
        self.GRID = np.zeros(
            self.GRID_DIM
        )  # The Boundaries are the walls, so playing space is only [:-2,:-2]
        self.GRID[:, [0, -1]] = 1
        self.GRID[[0, -1], :] = 1
        self.GRID[[1, 2, 5], 3:5] = 1
        self.walls = 1
        self.action_space = Discrete(len(self._ACTIONLOOKUP.keys()))
        self.state_space = Discrete(self.GRID_DIM[0] * self.GRID_DIM[1])
        self.observation_space = Dict(
            {
                "agent": Box(
                    np.array([0, 0]),
                    np.array([self.GRID_DIM[0] - 1, self.GRID_DIM[1] - 1]),
                    shape=(2,),
                    dtype=int,
                ),
                "box": Box(
                    np.array([0, 0]),
                    np.array([self.GRID_DIM[0] - 1, self.GRID_DIM[1] - 1]),
                    shape=(2,),
                    dtype=int,
                ),
                "target": Box(
                    np.array([0, 0]),
                    np.array([self.GRID_DIM[0] - 1, self.GRID_DIM[1] - 1]),
                    shape=(2,),
                    dtype=int,
                ),
            }
        )
        self._agent_location = np.array(self.agent_position)
        self._box_location = np.array(self.box_location)
        self._target_location = np.array(self.goal_location)

    #         print(self.GRID)

    def step(self, action):
        self._prev_agent_location = None
        self._prev_box_location = None
        moved_box = False

        if action < 4:
            moved_player = self._move(action)
        else:
            moved_player, moved_box = self._push(action)

        done, reward = self.is_over()
        observation = self._get_obs()
        info = self._get_info()

        return observation, reward, done, info

    def render(self):
        rend = self.GRID.copy().astype(dtype="U1")
        rend[self._agent_location[0], self._agent_location[1]] = "A"
        rend[self._box_location[0], self._box_location[1]] = "B"
        rend[self._target_location[0], self._target_location[1]] = "T"
        if np.array_equal(self._target_location, self._box_location):
            rend[self._target_location[0], self._target_location[1]] = "D"
        return print(rend)

    def reset(self, seed=None, return_info=False, options=None):
        self._agent_location = np.array(self.agent_position)
        self._box_location = np.array(self.box_location)
        self._target_location = np.array(self.goal_location)

        observation = self._get_obs()
        info = self._get_info()
        return (observation, info) if return_info else observation

    def _get_obs(self):
        return {
            "agent": self._agent_location,
            "box": self._box_location,
            "target": self._target_location,
        }

    def _get_info(self):
        return {
            "distance": np.linalg.norm(
                self._box_location - self._target_location, ord=1
            )
        }

    def _state_in_seq(self):
        m, n = self._agent_location
        seq = m * self.GRID.shape[1] + n
        return seq

    def _push(self, action):
        loc = self._box_location - self._agent_location
        #         print(f'loc{loc}, box :{self._box_location}, agent:{self._agent_location}')
        push_dir = None
        for idx, val in enumerate(self._action_to_direction.values()):
            if np.array_equal(loc, val):
                valid = True
                push_dir = idx
                break
            else:
                valid = False

        if valid:
            self._prev_agent_location = self._agent_location
            self._prev_box_location = self._box_location
            self._box_location = (
                self._box_location + self._action_to_direction[push_dir]
            )
            if self.GRID[self._box_location[0], self._box_location[1]] == 1:
                self._box_location = self._prev_box_location
                return False, False
            else:
                self._agent_location = (
                    self._agent_location + self._action_to_direction[push_dir]
                )
                return True, True

        return False, False

    def _move(self, action):
        self._prev_agent_location = self._agent_location
        self._prev_box_location = self._box_location
        self._agent_location = self._agent_location + self._action_to_direction[action]
        #             print(self.GRID[self._agent_location],self._agent_location,self.GRID)
        if self.GRID[self._agent_location[0], self._agent_location[1]] == 1:
            self._agent_location = self._prev_agent_location
            return False
        elif np.array_equal(self._agent_location, self._box_location):
            self._agent_location = self._prev_agent_location
            return False
        return True

    def is_over(self):
        if np.array_equal(
            self._box_location, self._target_location
        ):  # checking if the box is at the target already
            done = True
            reward = 0
        elif (
            sum(
                a := np.array(
                    [
                        True
                        if self.GRID[
                            (self._box_location + val)[0], (self._box_location + val)[1]
                        ]
                        == 1
                        else False
                        for val in self._action_to_direction.values()
                    ]
                )
            )
            >= 1
        ):
            # basically checking if there are atleast 1 wall adjacent to box
            if sum(a) > 1:
                done = True
                reward = -1
            elif sum(a) == 1:
                if ~(self._box_location - self._target_location).all():
                    done = False
                    reward = -1
                    return done, reward
                else:
                    #                 print(a)
                    direc = np.where(a == True)
                    #                 print(direc)
                    direc = direc[0][0]
                    left = self._box_location + self._action_to_direction[direc]
                    right = left.copy()
                    if direc in [0, 1]:
                        count = 0
                        while (self.GRID[left[0], left[1]] != 0) and (
                            self.GRID[right[0], right[1]] != 0
                        ):

                            left = np.clip(
                                left + self._action_to_direction[2],
                                [0, 0],
                                [self.GRID_DIM[0] - 1, self.GRID_DIM[1] - 1],
                            )
                            right = np.clip(
                                right + self._action_to_direction[3],
                                [0, 0],
                                [self.GRID_DIM[0] - 1, self.GRID_DIM[1] - 1],
                            )
                            count += 1
                            if count >= self.GRID_DIM[1]:
                                done = True
                                reward = -1
                                return done, reward
                                break
                    #                         right = right + self._action_to_direction[3]

                    else:
                        count = 0
                        while (self.GRID[left[0], left[1]] != 0) and (
                            self.GRID[right[0], right[1]] != 0
                        ):
                            left = np.clip(
                                left + self._action_to_direction[1],
                                [0, 0],
                                [self.GRID_DIM[0] - 1, self.GRID_DIM[1] - 1],
                            )
                            right = np.clip(
                                right + self._action_to_direction[0],
                                [0, 0],
                                [self.GRID_DIM[0] - 1, self.GRID_DIM[1] - 1],
                            )
                            count += 1
                            if count >= self.GRID_DIM[0]:
                                done = True
                                reward = -1
                                return done, reward
                                break

                    done = False
                    reward = -1
                    return done, reward
        #         np.where([True if self.GRID[(self._box_location + val)[0], (self._box_location + val)[1] ] == 1 else False for val in self._action_to_direction.values() ] == True)[0][0]: # gotta check if the box is not adjacent to 2 walls but still is terminating state like the boundary walls
        else:
            done = False
            reward = -1
        return done, reward

In [4]:
env = WarehouseAgent()
env.render()

[['1' '1' '1' '1' '1' '1']
 ['1' '0' 'A' '1' '1' '1']
 ['1' '0' '0' '1' '1' '1']
 ['1' 'T' '0' '0' '0' '1']
 ['1' '0' '0' 'B' '0' '1']
 ['1' '0' '0' '1' '1' '1']
 ['1' '1' '1' '1' '1' '1']]


In [5]:
act = [1, 1, 3, 3, 1, 4, 0, 2, 4]
for ac in act:
    print(env.step(ac))
    print(env.render())

({'agent': array([2, 2]), 'box': array([4, 3]), 'target': array([3, 1])}, -1, False, {'distance': 3.0})
[['1' '1' '1' '1' '1' '1']
 ['1' '0' '0' '1' '1' '1']
 ['1' '0' 'A' '1' '1' '1']
 ['1' 'T' '0' '0' '0' '1']
 ['1' '0' '0' 'B' '0' '1']
 ['1' '0' '0' '1' '1' '1']
 ['1' '1' '1' '1' '1' '1']]
None
({'agent': array([3, 2]), 'box': array([4, 3]), 'target': array([3, 1])}, -1, False, {'distance': 3.0})
[['1' '1' '1' '1' '1' '1']
 ['1' '0' '0' '1' '1' '1']
 ['1' '0' '0' '1' '1' '1']
 ['1' 'T' 'A' '0' '0' '1']
 ['1' '0' '0' 'B' '0' '1']
 ['1' '0' '0' '1' '1' '1']
 ['1' '1' '1' '1' '1' '1']]
None
({'agent': array([3, 3]), 'box': array([4, 3]), 'target': array([3, 1])}, -1, False, {'distance': 3.0})
[['1' '1' '1' '1' '1' '1']
 ['1' '0' '0' '1' '1' '1']
 ['1' '0' '0' '1' '1' '1']
 ['1' 'T' '0' 'A' '0' '1']
 ['1' '0' '0' 'B' '0' '1']
 ['1' '0' '0' '1' '1' '1']
 ['1' '1' '1' '1' '1' '1']]
None
({'agent': array([3, 4]), 'box': array([4, 3]), 'target': array([3, 1])}, -1, False, {'distance': 3.0})

In [6]:
~(np.array([4, 1]) - np.array([1, 3])).all()

False

In [7]:
import gym
import numpy as np
import operator
from IPython.display import clear_output
from time import sleep
import random
import itertools
import tqdm
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns


tqdm.monitor_interval = 0

  if LooseVersion(mpl.__version__) >= "3.0":
  other = LooseVersion(other)


In [8]:
def create_random_policy(env):
    policy = {}
    for key in range(0, env.state_space.n):
        current_end = 0
        p = {}
        for action in range(0, env.action_space.n):
            p[action] = 1 / env.action_space.n
        policy[key] = p
    return policy

In [9]:
def create_state_action_dictionary(env, policy):
    Q = {}
    for key in policy.keys():
        Q[key] = {a: 0.0 for a in range(0, env.action_space.n)}
    return Q
cre
Q

In [10]:
def run_game(env, policy, display=True, returns=True):
    env.reset()
    episode = []
    returns = []
    finished = False
    while not finished:
        s = env._state_in_seq()
        #         print(s)  # env._get_obs()["agent"][0] * env._get_obs()["agent"][1]
        #         if display:
        #             clear_output(True)
        #             print(env.render())
        #             sleep(1)

        timestep = []
        timestep.append(s)
        n = random.uniform(0, sum(policy[s].values()))
        top_range = 0
        for prob in policy[s].items():
            top_range += prob[1]
            if n < top_range:
                action = prob[0]
                break
        #         print(action)
        state, reward, finished, info = env.step(action)
        timestep.append(action)
        timestep.append(reward)

        episode.append(timestep)
        returns.append(reward)

    #         print(state, reward)

    if display:
        clear_output(True)
        print(env.render())
        sleep(1)
    if returns:
        return episode, returns
    return episode, None

In [11]:
def test_policy(policy, env, r=10):
    wins = 0
    r = 100
    for i in range(r):
        w, _ = run_game(env, policy, display=True)
        w = w[-1][-1]
        if w == 0:
            wins += 1
    return wins / r

In [12]:
def monte_carlo_e_soft(env, episodes=100, policy=None, epsilon=0.01, plot_graph=True):
    if not policy:
        policy = create_random_policy(
            env
        )  # Create an empty dictionary to store state action values
    Q = create_state_action_dictionary(
        env, policy
    )  # Empty dictionary for storing rewards for each state-action pair
    returns = {}  # 3.
    rx = []
    for _ in range(episodes):  # Looping through episodes
        G = 0  # Store cumulative reward in G (initialized at 0)
        episode, rew = run_game(
            env=env, policy=policy, display=False
        )  # Store state, action and value respectively
        rx.append(np.sum(rew))
        # for loop through reversed indices of episode array.
        # The logic behind it being reversed is that the eventual reward would be at the end.
        # So we have to go back from the last timestep to the first one propagating result from the future.

        for i in reversed(range(0, len(episode))):
            s_t, a_t, r_t = episode[i]
            state_action = (s_t, a_t)
            G += r_t  # Increment total reward by reward on current timestep

            if not state_action in [(x[0], x[1]) for x in episode[0:i]]:  #
                if returns.get(state_action):
                    returns[state_action].append(G)
                else:
                    returns[state_action] = [G]

                Q[s_t][a_t] = sum(returns[state_action]) / len(
                    returns[state_action]
                )  # Average reward across episodes

                Q_list = list(
                    map(lambda x: x[1], Q[s_t].items())
                )  # Finding the action with maximum value
                indices = [i for i, x in enumerate(Q_list) if x == max(Q_list)]
                max_Q = random.choice(indices)

                A_star = max_Q  # 14.

                for a in policy[s_t].items():
                    #                     print(policy[s_t])  # Update action probability for s_t in policy
                    if a[0] == A_star:

                        policy[s_t][a[0]] = (
                            1
                            - epsilon
                            + (
                                epsilon
                                / np.abs(np.sum(np.array(list(policy[s_t].values()))))
                            )
                        )
                    else:
                        policy[s_t][a[0]] = epsilon / np.abs(
                            np.sum(np.array(list(policy[s_t].values())))
                        )  # abs(sum(policy[s_t].values()))
    print("N")
    if plot_graph:
        print("Y")
        cumulative_average = np.cumsum(np.array(rx)) / (np.arange(episodes) + 1)
        plt.plot(
            cumulative_average, label=r"Gradient Bandit ($\alpha$ = " + f"{epsilon})"
        )
        plt.title(f"Average rewards over {episodes} timesteps", fontsize=14)
        plt.xlabel("Timesteps", fontsize=14)
        plt.ylabel("Average Rewards", fontsize=14)

    return policy

In [None]:
env = WarehouseAgent()
env.reset()
policy = monte_carlo_e_soft(env, episodes=100)

In [None]:
policy

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns


def plot_policy(policy):
    plot = np.zeros(len(policy.keys()), dtype="U2")
    sign = {0: "^", 1: "v", 2: "<", 3: ">", 4: "P"}
    for keys, values in zip(policy.keys(), policy.values()):
        lst = []
        #         print(values)
        for val in values.values():
            lst.append(val)
        #         print(lst)
        direc = np.argmax(np.asarray(lst))
        plot[keys] = sign[direc]
    plot = plot.reshape([7, 6])
    return plot


env.render()
plot_policy(policy)

In [None]:
# test_policy(policy, env)

In [None]:
z = [0] * 11
if (a := len(z)) > 10:
    print(a)

In [None]:
if (n := len(z)) > 10:
    print(f"List is too long ({n} elements, expected <= 10)")

In [None]:
m, n = np.array([0, 1])

In [None]:
m, n

In [None]:
env._get_obs()

In [None]:
# env._state_in_seq()

In [None]:
# run_game(policy=policy, env=env, display=0)

In [None]:
np.zeros(5)

In [None]:
# if sum(a := [1, 1, 0, 0]) > 1:
#     print(True)
_ = np.array([True, True, False])
np.where(_ == True)[0][0]

In [None]:
np.array(list({0: 0.2, 1: 0.2, 2: 0.2, 3: 0.2, 4: 0.2}.values()))