<h3>2-dimensional grid world for the PD-World as described</h3>

Agents: agent 1 Black agent, agent 2  In red and 3 in blue  Dropoff: in green   Pick up: in yellow 

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from matplotlib.colors import ListedColormap
# Define the grid size
grid_size = 5
custom_colors = ['white', 'black', 'red', 'blue', 'yellow','green']
custom_cmap = ListedColormap(custom_colors)

# Create a function to plot the grid
def plot_grid(initial_state):
    # Create a grid with all zeros
    grid = np.zeros((grid_size, grid_size))

    # Update the grid based on the initial state
    for state in initial_state:
        row, col, cell_type = state
        if cell_type == 'Black':
            grid[row-1][col-1] = 1  # Black agent represented by 1
        elif cell_type == 'Red':
            grid[row-1][col-1] = 2  # Red agent represented by 2
        elif cell_type == 'Blue':
            grid[row-1][col-1] = 3  # Blue agent represented by 3
        elif cell_type == 'P':
            grid[row-1][col-1] = 4  # Pickup cell represented by 4
            
        elif cell_type == 'D':
            grid[row-1][col-1] = 5  # Dropoff cell represented by 5

    # Set up the plot
    plt.figure(figsize=(6, 6))
    sns.heatmap(grid, annot=True, cmap=custom_cmap, cbar=False, fmt='')

    # Add labels to the axes
    plt.xlabel('Columns')
    plt.ylabel('Rows')

    # Show the plot
    plt.title('Grid World Representation')
    plt.show()

# Define the initial state
initial_state = [
    (1, 2, 'Black'),  # Black agent at row 1, column 3
    (3, 3, 'Red'),    # Red agent at row 3, column 3
    (5, 3, 'Blue'),   # Blue agent at row 3, column 5
    (1, 5, 'P'),      # Pickup cell at row 5, column 3
    (2, 4, 'P'),      # Pickup cell at row 2, column 4
    (5, 2, 'P'),      # Pickup cell at row 1, column 5
    (1, 1, 'D'),      # Dropoff cell at row 1, column 1
    (3, 1, 'D'),      # Dropoff cell at row 3, column 1
    (4, 5, 'D')       # Dropoff cell at row 4, column 5
]

# Plot the grid
plot_grid(initial_state)


In [81]:
pip install tqdm

Collecting tqdm
  Obtaining dependency information for tqdm from https://files.pythonhosted.org/packages/2a/14/e75e52d521442e2fcc9f1df3c5e456aead034203d4797867980de558ab34/tqdm-4.66.2-py3-none-any.whl.metadata
  Downloading tqdm-4.66.2-py3-none-any.whl.metadata (57 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.6/57.6 kB[0m [31m355.4 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading tqdm-4.66.2-py3-none-any.whl (78 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.3/78.3 kB[0m [31m484.5 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: tqdm
Successfully installed tqdm-4.66.2

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated pack

In [93]:
import numpy as np
import random
from tqdm import tqdm

class Agent:
    def __init__(self, color, initial_state):
        self.color = color
        self.state = list(initial_state)  # Convert tuple to list for mutability
        self.blocks_carried = 0  # Number of blocks carried by the agent
        self.q_table = {}  # Q-table for the agent
        self.last_action = None  # Track last action taken by the agent

    def take_action(self, epsilon):
        positions = tuple(self.state[:6])  # Convert to tuple
        block_status = tuple(self.state[6:9])  # Convert to tuple
        block_counts = tuple(self.state[9:])  # Convert to tuple

        # Define the set of possible actions
        possible_actions = ['North', 'South', 'East', 'West', 'Pickup', 'Dropoff']

        # Choose an action based on epsilon-greedy policy
        if random.uniform(0, 1) < epsilon:  # Explore (random action)
            action = random.choice(possible_actions)
        else:  # Exploit (choose action with highest Q-value)
            action_values = [self.q_table.get((tuple(self.state), a), 0) for a in possible_actions]
            max_value = max(action_values)
            best_actions = [a for a, v in zip(possible_actions, action_values) if v == max_value]
            action = random.choice(best_actions)

        # Apply the chosen action and get next state and reward
        next_state, reward = self.apply_action(action, list(block_status), list(block_counts))

        # Update Q-value based on Q-learning formula
        old_q_value = self.q_table.get((tuple(self.state), self.last_action), 0)
        max_next_q_value = max(self.q_table.get((next_state, a), 0) for a in self.get_valid_actions(next_state))
        new_q_value = old_q_value + LEARNING_RATE * (reward + DISCOUNT_FACTOR * max_next_q_value - old_q_value)
        self.q_table[(tuple(self.state), self.last_action)] = new_q_value

        # Update agent's state and last action
        self.state = list(next_state)  # Convert back to list
        self.last_action = action

        return next_state, reward

    def apply_action(self, action, block_status, block_counts):
        positions = list(self.state[:6])  # Convert to list

        if action == 'North' and positions[1] > 1:
            positions[1] -= 1  # Move north
        elif action == 'South' and positions[1] < grid_size:
            positions[1] += 1  # Move south
        elif action == 'East' and positions[3] < grid_size:
            positions[3] += 1  # Move east
        elif action == 'West' and positions[3] > 1:
            positions[3] -= 1  # Move west
        elif action == 'Pickup':
            # Check if the agent is in a pickup cell and not already carrying a block
            for pickup_loc in pickup_locations:
                if positions[:2] == list(pickup_loc[:2]) and block_status[pickup_loc[2] - 1] == 1 and self.blocks_carried == 0:
                    block_counts[pickup_loc[2] - 1] -= 1  # Reduce the block count at the pickup cell
                    self.blocks_carried += 1  # Increase the agent's block count
                    return tuple(positions + block_status + block_counts), REWARD_PICKUP
        elif action == 'Dropoff':
            # Check if the agent is in a dropoff cell and carrying a block
            for dropoff_loc in dropoff_locations:
                if positions[:2] == list(dropoff_loc[:2]) and block_status[dropoff_loc[2] - 1] == 0 and self.blocks_carried > 0:
                    block_counts[dropoff_loc[2] - 1] += 1  # Increase the block count at the dropoff cell
                    self.blocks_carried -= 1  # Decrease the agent's block count
                    return tuple(positions + block_status + block_counts), REWARD_DROPOFF

        # For all other actions or invalid actions, return the current state and a move reward
        return tuple(positions + block_status + block_counts), REWARD_MOVE

    def get_valid_actions(self, state):
        # Implement logic to get valid actions based on the current state
        valid_actions = ['North', 'South', 'East', 'West']  # By default, the agent can move in all directions

        # Check if the agent is in a pickup cell and not already carrying a block
        for pickup_loc in pickup_locations:
            if state[:2] == list(pickup_loc[:2]) and self.blocks_carried == 0:
                valid_actions.append('Pickup')  # Add pickup action to valid actions

        # Check if the agent is in a dropoff cell and carrying a block
        for dropoff_loc in dropoff_locations:
            if state[:2] == list(dropoff_loc[:2]) and self.blocks_carried > 0:
                valid_actions.append('Dropoff')  # Add dropoff action to valid actions

        return valid_actions  # Return the list of valid actions

# Define constants
grid_size = 5
REWARD_PICKUP = 13
REWARD_DROPOFF = 13
REWARD_MOVE = -1
NUM_EPISODES = 4
EPSILON_DECAY = 0.99
LEARNING_RATE = 0.1
DISCOUNT_FACTOR = 0.9


initial_environment = np.array([
    [5, 0, 1, 0, 4],
    [0, 0, 0, 4, 0],
    [5, 0, 2, 0, 0],
    [0, 0, 0, 0, 5],
    [0, 4, 3, 0, 0]
])

print(initial_environment)
states = [(i, j, i_, j_, i__, j_) for i in range(1, 6) for j in range(1, 6)
          for i_ in range(1, 6) for j_ in range(1, 6) for i__ in range(1, 6)]

actions = ['North', 'South', 'East', 'West', 'Pickup', 'Dropoff']

# Define agents
red_agent = Agent('Red', (3, 3, 3, 5, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5))
blue_agent = Agent('Blue', (5, 3, 3, 3, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5))
black_agent = Agent('Black', (1, 3, 3, 3, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5))

# Define pickup and dropoff locations
pickup_locations = [
    (1, 5, 0),  
    (2, 4, 1),      # Pickup cell at row 2, column 4
    (5, 2, 2)       # Pickup cell at row 5, column 2
]

dropoff_locations = [
    (1, 1, 0),      # Dropoff cell at row 1, column 1
    (3, 1, 1),      # Dropoff cell at row 3, column 1
    (4, 5, 2)       # Dropoff cell at row 4, column 5
]

# Function to check terminal state
def is_terminal_state(environment):
    # Iterate through each drop-off location
    for dropoff_loc in dropoff_locations:
        row, col, _ = dropoff_loc  # Extract row and column
        if environment[row - 1, col - 1] != 5:  # Check if the cell doesn't contain 5 blocks
            return False  # Not a terminal state
    return True  # All drop-off cells contain 5 blocks, so it's a terminal state


def update_q_values(agent, next_state, reward, learning_rate, discount_factor):
    old_q_value = agent.q_table.get((tuple(agent.state), agent.last_action), 0)
    max_next_q_value = max(agent.q_table.get((next_state, a), 0) for a in agent.get_valid_actions(next_state))
    new_q_value = old_q_value + learning_rate * (reward + discount_factor * max_next_q_value - old_q_value)
    agent.q_table[(tuple(agent.state), agent.last_action)] = new_q_value




[[5 0 1 0 4]
 [0 0 0 4 0]
 [5 0 2 0 0]
 [0 0 0 0 5]
 [0 4 3 0 0]]


<h3>dd</h3>

In [97]:
def train_agents(num_episodes, epsilon_decay, learning_rate, discount_factor, epsilon):
    # Initialize Q-table for each agent
    red_agent.q_table = {}
    blue_agent.q_table = {}
    black_agent.q_table = {}

    rewards = []

    for episode in tqdm(range(num_episodes), desc='Training Progress'):
        # Reset environment and agents to initial state
        environment = np.copy(initial_environment)
        red_agent.state = (3, 3, 3, 5, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5)
        blue_agent.state = (5, 3, 3, 3, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5)
        black_agent.state = (1, 3, 3, 3, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5)

        total_reward = 0

        while True:
            # Agents take actions
            red_next_state, red_reward = red_agent.take_action(epsilon)
            blue_next_state, blue_reward = blue_agent.take_action(epsilon)
            black_next_state, black_reward = black_agent.take_action(epsilon)

            # Update Q-values for each agent
            update_q_values(red_agent, red_next_state, red_reward, learning_rate, discount_factor)
            update_q_values(blue_agent, blue_next_state, blue_reward, learning_rate, discount_factor)
            update_q_values(black_agent, black_next_state, black_reward, learning_rate, discount_factor)

            total_reward += red_reward + blue_reward + black_reward

            # Check for terminal state
            if is_terminal_state(environment):
                break

        # Decay epsilon after each episode
        epsilon *= epsilon_decay

        rewards.append(total_reward)

    # Return rewards and trained agents
    return rewards, [red_agent, blue_agent, black_agent]

# Train agents
rewards, trained_agents = train_agents(NUM_EPISODES, EPSILON_DECAY, LEARNING_RATE, DISCOUNT_FACTOR, 1.0)

# Print rewards per episode
print("Rewards per Episode:", rewards)



Training Progress: 100%|████████████████████████| 4/4 [00:00<00:00, 4065.23it/s][A[A

Rewards per Episode: [-3, -3, -3, -3]





In [98]:
def visualize_q_table(agent):
    q_table_df = pd.DataFrame.from_dict(agent.q_table, orient='index', columns=['Q-Value'])
    return q_table_df
red_q_table = visualize_q_table(trained_agents[0])
print("Red Agent Q-Table:")
print(red_q_table)

blue_q_table = visualize_q_table(trained_agents[1])
print("\nBlue Agent Q-Table:")
print(blue_q_table)

black_q_table = visualize_q_table(trained_agents[2])
print("\nBlack Agent Q-Table:")
print(black_q_table)

Red Agent Q-Table:
                                                    Q-Value
((3, 3, 3, 5, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5),...   -0.100
((3, 3, 3, 4, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5),...   -0.100
((3, 3, 3, 5, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5),...   -0.100
((3, 3, 3, 5, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5),...   -0.190
((3, 3, 3, 5, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5),...   -0.271

Blue Agent Q-Table:
                                                    Q-Value
((5, 3, 3, 3, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5),...    -0.10
((5, 4, 3, 3, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5),...    -0.10
((5, 3, 3, 3, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5),...    -0.10
((5, 3, 3, 3, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5),...    -0.19
((5, 3, 3, 3, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5),...    -0.19
((5, 3, 3, 4, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5),...    -0.10

Black Agent Q-Table:
                                                    Q-Value
((1, 3, 3, 3, 1, 3, 0, 0, 0, 0, 0, 0, 5, 5, 5),...    -0.19
((1, 3, 3, 2, 1, 3, 0, 0, 0, 0, 0, 0, 