In [1]:
import numpy as np

### GridWorld Class

The `GridWorld` class represents a grid environment where an agent can navigate to perform tasks. It includes methods for initializing the environment, getting starting locations, determining the next location based on an action, and checking if the state is terminal.

- `__init__(self, n=5)`: Initializes the grid world with size `n` and defines possible actions.
- `get_starting_locations(self)`: Returns random starting locations for the agent and package.
- `get_next_location(self, agent_row, agent_col, action_index)`: Computes the next location of the agent based on the given action.
- `is_terminal_state(self, agent_row, agent_col, carrying)`: Checks if the current state is terminal (i.e., if the agent is at the base and carrying the package).


In [2]:
class GridWorld:
    def __init__(self, n=5):
        self.n = n
        self.B = (n - 1, n - 1) 
        self.actions = ['up', 'right', 'down', 'left']
        self.action_space = len(self.actions)

    def get_starting_locations(self):
        agent_row = np.random.randint(self.n)
        agent_col = np.random.randint(self.n)
        package_row = np.random.randint(self.n)
        package_col = np.random.randint(self.n)
        while (agent_row, agent_col) == self.B or (package_row, package_col) == self.B:
            agent_row = np.random.randint(self.n)
            package_row = np.random.randint(self.n)
            package_col = np.random.randint(self.n)
        return agent_row, agent_col, package_row, package_col

    def get_next_location(self, agent_row, agent_col, action_index):
        new_row, new_col = agent_row, agent_col
        action = self.actions[action_index]
        if action == 'up' and agent_row > 0:
            new_row -= 1
        elif action == 'right' and agent_col < self.n - 1:
            new_col += 1
        elif action == 'down' and agent_row < self.n - 1:
            new_row += 1
        elif action == 'left' and agent_col > 0:
            new_col -= 1
        return new_row, new_col

    def is_terminal_state(self, agent_row, agent_col, carrying):
        return (agent_row, agent_col) == self.B and carrying

### QLearningAgent Class

The `QLearningAgent` class implements a Q-learning agent to interact with the `GridWorld` environment. It manages Q-values and updates them based on actions taken, rewards received, and future states.

- `__init__(self, grid_world, learning_rate=0.1, discount_factor=0.95, epsilon=0.9)`: Initializes the Q-learning agent with the given parameters.
- `get_next_action(self, agent_row, agent_col, package_row, package_col, carrying)`: Chooses the next action based on the epsilon-greedy policy.
- `update_q_values(self, old_state, action_index, reward, new_state)`: Updates the Q-values using the Q-learning algorithm.
- `Reward structure is set as`: For Delivery : 50, For a wrong move : -1, and for pickup : 20.


In [3]:
class QLearningAgent:
    def __init__(self, grid_world, learning_rate=0.1, discount_factor=0.95, epsilon=0.9):
        self.grid_world = grid_world
        self.q_values = np.random.rand(grid_world.n, grid_world.n, grid_world.n, grid_world.n, 2, grid_world.action_space)
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = epsilon
        self.rewards = {
            'delivery': 80,
            'move': -1,
            'pickup': 20
        }

    def get_next_action(self, agent_row, agent_col, package_row, package_col, carrying):
        if np.random.random() < self.epsilon:
            return np.argmax(self.q_values[agent_row, agent_col, package_row, package_col, carrying])
        else:
            return np.random.randint(self.grid_world.action_space)

    def update_q_values(self, old_state, action_index, reward, new_state):
        old_q_value = self.q_values[old_state][action_index]
        temporal_difference = reward + (self.discount_factor * np.max(self.q_values[new_state])) - old_q_value
        self.q_values[old_state][action_index] = old_q_value + (self.learning_rate * temporal_difference)

### Training Class

The `Training` class handles the training process for the Q-learning agent. It involves running multiple episodes where the agent learns from interactions with the `GridWorld` environment.

- `__init__(self, agent, grid_world, num_episodes=100000, max_steps_per_episode=200)`: Initializes the training with the agent, grid world, number of episodes, and maximum steps per episode.
- `train(self)`: Runs the training loop for the specified number of episodes, updating Q-values based on the agent's actions and rewards.


In [4]:
class Training:
    def __init__(self, agent, grid_world, num_episodes=100000, max_steps_per_episode=200):
        self.agent = agent
        self.grid_world = grid_world
        self.num_episodes = num_episodes
        self.max_steps_per_episode = max_steps_per_episode

    def train(self):
        for episode in range(self.num_episodes):
            agent_row, agent_col, package_row, package_col = self.grid_world.get_starting_locations()
            carrying = 0

            for step in range(self.max_steps_per_episode):
                action_index = self.agent.get_next_action(agent_row, agent_col, package_row, package_col, carrying)
                new_agent_row, new_agent_col = self.grid_world.get_next_location(agent_row, agent_col, action_index)

                if (new_agent_row, new_agent_col) == (package_row, package_col) and not carrying:
                    reward = self.agent.rewards['pickup']
                    carrying = 1
                elif (new_agent_row, new_agent_col) == self.grid_world.B and carrying:
                    reward = self.agent.rewards['delivery']
                else:
                    reward = self.agent.rewards['move']

                old_state = (agent_row, agent_col, package_row, package_col, carrying)
                new_state = (new_agent_row, new_agent_col, package_row, package_col, carrying)
                self.agent.update_q_values(old_state, action_index, reward, new_state)

                agent_row, agent_col = new_agent_row, new_agent_col

                if self.grid_world.is_terminal_state(agent_row, agent_col, carrying):
                    break
        print('Training complete!')

### Testing Class

The `Testing` class evaluates the performance of the trained Q-learning agent by testing it in the `GridWorld` environment and checking its success rate.

- `__init__(self, agent, grid_world, max_steps_per_episode=200)`: Initializes the testing with the agent, grid world, and maximum steps per episode.
- `test_agent(self, num_tests=10)`: Tests the agent over a specified number of trials and prints the success rate along with the paths taken during the tests.


In [5]:
class Testing:
    def __init__(self, agent, grid_world, max_steps_per_episode=200):
        self.agent = agent
        self.grid_world = grid_world
        self.max_steps_per_episode = max_steps_per_episode

    def test_agent(self, num_tests=10):
        success_count = 0
        for _ in range(num_tests):
            agent_row, agent_col, package_row, package_col = self.grid_world.get_starting_locations()
            carrying = 0
            path = [(agent_row, agent_col)]
            for step in range(self.max_steps_per_episode):
                action_index = self.agent.get_next_action(agent_row, agent_col, package_row, package_col, carrying)
                agent_row, agent_col = self.grid_world.get_next_location(agent_row, agent_col, action_index)
                path.append((agent_row, agent_col))
                if (agent_row, agent_col) == (package_row, package_col) and not carrying:
                    carrying = 1
                if self.grid_world.is_terminal_state(agent_row, agent_col, carrying):
                    success_count += 1
                    print('Success')
                    break
            print(f'Path taken by agent for package location: {(package_row, package_col)} - ')
            print(path)
        print(f'Success rate: {success_count}/{num_tests}')

In [6]:
grid_world = GridWorld()
agent = QLearningAgent(grid_world)
trainer = Training(agent, grid_world)
trainer.train()

tester = Testing(agent, grid_world)
tester.test_agent(num_tests=10)

Training complete!
Success
Path taken by agent for package location: (2, 4) - 
[(4, 0), (4, 1), (3, 1), (3, 2), (3, 3), (3, 4), (2, 4), (3, 4), (4, 4)]
Success
Path taken by agent for package location: (1, 4) - 
[(3, 1), (3, 2), (3, 3), (2, 3), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (0, 4), (1, 4), (2, 4), (1, 4), (2, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (1, 3), (1, 4), (2, 4), (1, 4), (2, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1, 4), (2, 4), (1,