In [None]:
#This code demonstrates how to train and test swarm robots using MAPPO in a simple grid-based environment. The robots learn to navigate to the target position through reinforcement learning and optimize their policies to maximize the cumulative rewards

In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

class PolicyNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, output_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

class SwarmRobot:
    def __init__(self, input_size, output_size):
        self.policy_network = PolicyNetwork(input_size, output_size)
        self.optimizer = optim.Adam(self.policy_network.parameters(), lr=0.001)
        self.states = []
        self.actions = []
        self.rewards = []

    def select_action(self, state):
        state = torch.FloatTensor(state)
        action_probs = F.softmax(self.policy_network(state), dim=-1)
        action_dist = torch.distributions.Categorical(action_probs)
        action = action_dist.sample()
        return action.item()

    def store_experience(self, state, action, reward):
        self.states.append(state)
        self.actions.append(action)
        self.rewards.append(reward)

    def update_policy(self, gamma=0.99):
        returns = []
        advantages = []
        R = 0

        # Calculate returns and advantages
        for reward in reversed(self.rewards):
            R = reward + gamma * R
            returns.insert(0, R)

        for i in range(len(self.rewards)):
            returns[i] -= torch.mean(torch.tensor(returns))
            advantages.append(returns[i])

        # Convert lists to tensors
        states = torch.FloatTensor(self.states)
        actions = torch.LongTensor(self.actions)
        advantages = torch.FloatTensor(advantages)

        # Compute action probabilities and selected action log probabilities
        action_probs = F.softmax(self.policy_network(states), dim=-1)
        selected_action_probs = action_probs.gather(1, actions.unsqueeze(1)).squeeze()
        selected_action_log_probs = torch.log(selected_action_probs)

        # Compute policy loss
        policy_loss = -(selected_action_log_probs * advantages).mean()

        # Update policy network
        self.optimizer.zero_grad()
        policy_loss.backward()
        self.optimizer.step()

        # Clear experience buffers
        self.states = []
        self.actions = []
        self.rewards = []

def train_swarm_robots(swarm_robots, num_episodes):
    for episode in range(num_episodes):
        print(f"Episode {episode + 1}/{num_episodes}")
        # Initialize environment
        env = Environment()
        state = env.reset()

        for step in range(max_steps_per_episode):
            for robot in swarm_robots:
                action = robot.select_action(state)
                next_state, reward, done = env.step(action)
                robot.store_experience(state, action, reward)
                state = next_state

                if done:
                    break

        for robot in swarm_robots:
            robot.update_policy()

    print("Training completed!")

# Example Environment
class Environment:
    def __init__(self):
        self.state = None
        self.done = False

    def reset(self):
        self.state = [0, 0]  # Initial state
        self.done = False
        return self.state

    def step(self, action):
        x, y = self.state
        if action == 0:  # Move up
            y = max(y - 1, 0)
        elif action == 1:  # Move down
            y = min(y + 1, 5)  # Assuming a 6x6 grid
        elif action == 2:  # Move left
            x = max(x - 1, 0)
        elif action == 3:  # Move right
            x = min(x + 1, 5)  # Assuming a 6x6 grid

        self.state = [x, y]

        if self.state == [5, 5]:  # Target reached
            reward = 1.0
            self.done = True
        else:
            reward = 0.0

        return self.state, reward, self.done

# Configuration
num_robots = 5
max_steps_per_episode = 50
num_episodes = 100

# Create swarm robots
swarm_robots = [SwarmRobot(2, 4) for _ in range(num_robots)]

# Train swarm robots
train_swarm_robots(swarm_robots, num_episodes)


Episode 1/100
Episode 2/100
Episode 3/100
Episode 4/100
Episode 5/100
Episode 6/100
Episode 7/100
Episode 8/100
Episode 9/100
Episode 10/100
Episode 11/100
Episode 12/100
Episode 13/100
Episode 14/100
Episode 15/100
Episode 16/100
Episode 17/100
Episode 18/100
Episode 19/100
Episode 20/100
Episode 21/100
Episode 22/100
Episode 23/100
Episode 24/100
Episode 25/100
Episode 26/100
Episode 27/100
Episode 28/100
Episode 29/100
Episode 30/100
Episode 31/100
Episode 32/100
Episode 33/100
Episode 34/100
Episode 35/100
Episode 36/100
Episode 37/100
Episode 38/100
Episode 39/100
Episode 40/100
Episode 41/100
Episode 42/100
Episode 43/100
Episode 44/100
Episode 45/100
Episode 46/100
Episode 47/100
Episode 48/100
Episode 49/100
Episode 50/100
Episode 51/100
Episode 52/100
Episode 53/100
Episode 54/100
Episode 55/100
Episode 56/100
Episode 57/100
Episode 58/100
Episode 59/100
Episode 60/100
Episode 61/100
Episode 62/100
Episode 63/100
Episode 64/100
Episode 65/100
Episode 66/100
Episode 67/100
Epis

In [8]:
# ... (code for training swarm robots)

# Test the trained swarm robots
def test_swarm_robots(swarm_robots, env):
    state = env.reset()
    done = False

    while not done:
        actions = []
        for robot in swarm_robots:
            action = robot.select_action(state)
            actions.append(action)

        next_state, reward, done = env.step(actions[0])  # Assuming single-agent environment
        state = next_state

        # Print current state and reward
        print("Current state:", state)
        print("Reward:", reward)

        # Optionally, you can visualize the environment or robot's movement

    print("Testing completed!")

# Create the environment
env = Environment()

# Test the trained swarm robots
test_swarm_robots(swarm_robots, env)


Current state: [0, 1]
Reward: 0.0
Current state: [0, 2]
Reward: 0.0
Current state: [1, 2]
Reward: 0.0
Current state: [2, 2]
Reward: 0.0
Current state: [3, 2]
Reward: 0.0
Current state: [4, 2]
Reward: 0.0
Current state: [4, 3]
Reward: 0.0
Current state: [4, 4]
Reward: 0.0
Current state: [5, 4]
Reward: 0.0
Current state: [5, 5]
Reward: 1.0
Testing completed!


The code starts by importing the necessary libraries for implementing MAPPO in swarm robots.

The PolicyNetwork class defines a neural network model that will be used to learn the policy for swarm robots. It consists of two fully connected layers.

The SwarmRobot class represents an individual robot in the swarm. It contains the policy network, an optimizer for updating the network, and buffers to store the robot's states, actions, and rewards during training. It also includes methods for selecting actions, storing experiences, and updating the policy based on the collected experiences.

The train_swarm_robots function trains the swarm robots using MAPPO. It takes the swarm robots and the number of episodes as inputs. For each episode, it initializes the environment, collects experiences by interacting with the environment, and updates the policies of the swarm robots based on the collected experiences.

The Environment class represents the environment in which the swarm robots operate. It includes methods for resetting the environment and performing a step by taking an action and returning the next state, reward, and termination status. In this example, the environment is a 6x6 grid where the robots need to navigate to a target position at [5, 5].

The code defines the configuration parameters, such as the number of swarm robots, the maximum number of steps per episode, and the number of episodes for training.

Next, it creates the swarm robots using the SwarmRobot class.

The train_swarm_robots function is called to train the swarm robots. During training, it prints the current episode.

The test_swarm_robots function is defined to test the trained swarm robots. It takes the trained swarm robots and the environment as inputs. It performs a sample episode where the swarm robots select actions based on the learned policy and prints the current state and reward at each step.

The code creates an instance of the environment.

Finally, it calls the test_swarm_robots function to test the trained swarm robots in the environment.
