# Step-by-Step Solution for MAS Assignments using ir-sim

This notebook provides implementations for the three assignments. We use ir-sim for simulations. Assume ir-sim is installed (`pip install ir-sim`). For RL, we wrap ir-sim in Gymnasium and use Stable-Baselines3 (`pip install stable-baselines3 gymnasium`).

Focus: Precise code, metrics, evaluations. Run cells sequentially.

In [1]:
# Imports (common for all)
import irsim
import numpy as np
import matplotlib.pyplot as plt
from gymnasium import Env, spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
import yaml


# For metrics
def compute_circle_metric(positions, center, radius):
    distances = np.linalg.norm(np.array(positions) - center, axis=1)
    return np.mean(np.abs(distances - radius))

def collision_rate(env):
    return env.world.check_all_collisions()  # ir-sim has collision check

## Assignment 1: Basic - RL for Circle Following

Goal: Train single agent to follow circle around (5,5) r=3. Then scale to 5-10 agents with shared policy.

Steps:
1. Define Gym env for circle.
2. Train PPO.
3. Simulate MAS, compute metrics: mean deviation from circle, no collisions.

In [2]:
import irsim

env = irsim.make("circle.yaml")
env.load_behavior("custom_behavior_methods")  # load the custom behavior module

STEPS = 1000
for _ in range(STEPS):
    env.step()
    env.render(0.01)  # small pause for visualization

    if env.done():
        break

# Optional: compute error from desired circle
state = env.get_robot_state()[0]
pos = state[:2]
error = abs(np.linalg.norm(pos - [5,5]) - 2.0)
print("Distance from circle:", error)

env.end()

[32m2025-09-22 22:33[0m | [1mINFO    [0m | [1mSimulation environment 'circle' has been initialized and started.[0m


NameError: name 'np' is not defined

In [1]:
import irsim

env = irsim.make('circle.yaml') # initialize the environment with the configuration file

for i in range(300): # run the simulation for 300 steps

    env.step()  # update the environment
    env.render() # render the environment

    if env.done(): break # check if the simulation is done
        
env.end() # close the environment

[32m2025-09-22 22:06[0m | [1mINFO    [0m | [1mSimulation environment 'circle' has been initialized and started.[0m
[32m2025-09-22 22:07[0m | [1mINFO    [0m | [1mFigure will be closed within 3.00 seconds.[0m
[32m2025-09-22 22:07[0m | [1mINFO    [0m | [1mThe simulated environment has ended. Total simulation time: 0.1 seconds.[0m


In [5]:
# Step 2: Train PPO
env = CircleEnv()
check_env(env)
model = PPO('MlpPolicy', env, verbose=1)
model.learn(total_timesteps=10000)
env.close()
model.save('circle_ppo')

[32m2025-09-22 21:16[0m | [1mINFO    [0m | [1mSimulation environment 'circle' has been initialized and started.[0m


AttributeError: 'EnvBase' object has no attribute 'get_state'

In [None]:
# Step 3: MAS Simulation (5 robots)
def mas_circle(n_robots=5):
    # Modify YAML for multi
    multi_yaml = yaml_config.replace('robot:', f'robots:\n' + '  - ' * n_robots)
    # Positions staggered
    states = [[i*0.5+1,1,0] for i in range(n_robots)]
    # ... (expand YAML with states)
    
    env = irsim.make('multi_circle.yaml')  # Assume updated YAML
    model = PPO.load('circle_ppo')
    positions = []
    for t in range(1000):
        for i in range(n_robots):
            obs = env.get_state(i)[:4]
            action, _ = model.predict(obs)
            env.step(i, action)
        positions.append([env.get_state(i)[:2] for i in range(n_robots)])
        env.render(0.05)
        if env.done(): break
    env.end()
    positions = np.array(positions)
    metric = compute_circle_metric(positions.mean(axis=1), [5,5], 3)
    coll = collision_rate(env)  # Track during sim
    print(f'Mean deviation: {metric}, Collisions: {coll}')

mas_circle(5)

## Assignment 1: Advanced - Subsumption Architecture

Layers: 1. Obstacle avoidance (RVO). 2. Circle following (custom controller).

Steps:
1. Implement layered control.
2. Evaluate single, then scale to 5,10,20. Metrics: agent-level (deviation, speed), group (coverage, collisions).

In [None]:
# Step 1: Subsumption Controller
class SubsumptionController:
    def __init__(self, center, radius):
        self.center = np.array(center)
        self.radius = radius
    
    def layer1_avoid(self, percepts):
        # Use RVO or simple: if obstacle <1m, turn
        if any(d < 1 for d in percepts):
            return np.array([0, np.pi/4])  # Suppress higher, activate avoid
        return None
    
    def layer2_circle(self, pos):
        # Tangential velocity for circle
        to_center = self.center - pos
        tangent = np.array([-to_center[1], to_center[0]]) / np.linalg.norm(to_center)
        vel = 0.5 * tangent[:2]  # Linear vel tangential
        ang = 0  # Straight
        return np.append(vel, ang)
    
    def decide(self, pos, percepts):
        avoid = self.layer1_avoid(percepts)
        if avoid is not None:
            return avoid
        return self.layer2_circle(pos)

# Sim single
env = irsim.make('circle_obs.yaml')  # With obstacles
controller = SubsumptionController([5,5], 3)
for i in range(500):
    pos = env.get_state(0)[:2]
    percepts = env.get_lidar_scan(0)
    action = controller.decide(pos, percepts)
    env.step(action)
    env.render(0.05)
env.end()

In [None]:
# Step 2: Scaling Evaluation
def evaluate_subsumption(n_robots):
    # Similar to MAS, but use controller per agent
    # Track per agent deviation, avg speed, group collisions
    # ... implement loop, return dict of metrics
    pass  # Placeholder - expand as above

for n in [1,5,10,20]:
    metrics = evaluate_subsumption(n)
    print(f'N={n}: {metrics}')

## Assignment 2: Basic - Nature-Inspired Leader-Follower (Boids)

5 agents, fixed leader. Followers use separation, alignment, cohesion via FOV (no comm).

Steps:
1. Implement boids rules.
2. Sim, metrics: formation distance variance, distance to leader.

In [None]:
# Step 1: Boids Controller
class BoidsFollower:
    def __init__(self, leader_pos, fov_range=2.0):
        self.leader = leader_pos
        self.range = fov_range
        self.separation_w = 1.5
        self.alignment_w = 1.0
        self.cohesion_w = 1.0
    
    def perceive_neighbors(self, env, agent_id):
        # Use FOV to get nearby agents
        neighbors = []
        for i in range(env.robot_num):
            if i != agent_id and np.linalg.norm(env.get_state(i)[:2] - env.get_state(agent_id)[:2]) < self.range:
                neighbors.append(env.get_state(i))
        return np.array(neighbors)
    
    def compute_velocity(self, pos, vel, neighbors):
        if len(neighbors) == 0:
            # Follow leader
            dir_to_leader = self.leader - pos
            return 0.5 * dir_to_leader / np.linalg.norm(dir_to_leader)
        
        # Separation
        sep = np.mean([pos - n[:2] for n in neighbors], axis=0)
        
        # Alignment
        ali = np.mean([n[2:4] for n in neighbors], axis=0)  # Assume vel in state
        
        # Cohesion
        coh = np.mean([n[:2] for n in neighbors], axis=0) - pos
        
        new_vel = (self.separation_w * sep + self.alignment_w * ali + self.cohesion_w * coh)
        return 0.5 * new_vel / np.linalg.norm(new_vel)

# Sim
env = irsim.make('flock.yaml')  # 5 robots, leader at [0,0]
leader_pos = [0,0]
followers = [BoidsFollower(leader_pos) for _ in range(4)]  # Agent 0 leader
for t in range(1000):
    for i in range(1,5):
        pos = env.get_state(i)[:2]
        vel = env.get_state(i)[2:4]
        neighbors = followers[i-1].perceive_neighbors(env, i)
        action = followers[i-1].compute_velocity(pos, vel, neighbors)
        env.step(action)
    env.render(0.05)
    if env.done(): break
env.end()

# Metrics: variance of inter-distances
# Implement tracking

## Assignment 2: Advanced - Leader Election + Coordination

Bully election, then assign positions in line, use virtual comm (ir-sim message).

Steps:
1. Implement bully election.
2. Post-election, coordinate formation.
3. Scale, metrics: election time, formation stability.
4. Reflection.

In [None]:
# Step 1: Bully Election (simplified, IDs 1-10, highest wins)
class BullyElection:
    def __init__(self, agent_id):
        self.id = agent_id
        self.state = 'alive'  # alive, candidate, defeated
        self.messages = []  # Simulate comm
    
    def election_phase(self, all_agents):
        # Broadcast election if alive
        if self.state == 'alive':
            self.state = 'candidate'
            for agent in all_agents:
                if agent.id > self.id and agent.state != 'defeated':
                    agent.messages.append(('election', self.id))
        
        # Handle messages
        for msg_type, sender_id in self.messages:
            if msg_type == 'election' and sender_id > self.id:
                self.state = 'defeated'
            elif msg_type == 'victory':
                self.state = 'follower'
        self.messages = []
        
        if all(a.state != 'candidate' for a in all_agents if a.id != self.id):
            if self.state == 'candidate':
                for a in all_agents:
                    a.messages.append(('victory', self.id))
                self.state = 'leader'

# Sim election
agents = [BullyElection(i) for i in range(1,6)]
for round in range(10):  # Max rounds
    for agent in agents:
        agent.election_phase(agents)
    if any(a.state == 'leader' for a in agents): break
leader_id = next(a.id for a in agents if a.state == 'leader')
print(f'Leader: {leader_id}')

# Step 2: Formation (line behind leader)
# Assign positions based on ID order, use boids to maintain
# ... extend boids with target positions

# Step 3: Scaling - repeat for n=10,20, measure rounds for election

# Reflection: Bully pros: simple, decentralized; cons: high msg overhead for large n. Nature-inspired: low comm, emergent; cons: no guarantee on leader quality. Superior: Bully for critical systems needing quick consensus; boids for robust swarms.

## Assignment 3: B - MARL for Level-Based Foraging

Use ir-sim grid map with levels (1-3 items). Agents collect, upgrade. Use simple QMIX-like (torch) or independent PPO.

Steps:
1. Setup foraging env.
2. Implement MARL (independent learners).
3. Scale 5-20 agents, metrics: total collected, time to max level, collisions.
4. Reflection: Scalability issues with non-coop.

In [None]:
# Step 1: Foraging Env (grid with items)
class ForagingEnv(Env):
    # Similar to CircleEnv, but multi-agent, state includes item levels
    # Items on map, agents carry level, collect if match/lower
    pass  # Define: obs per agent (pos, inventory, local map), action move/collect

# Step 2: MARL Training (Independent PPO)
n_agents = 5
models = [PPO('MlpPolicy', ForagingEnv()) for _ in range(n_agents)]
for model in models:
    model.learn(10000)

# Step 3: Sim & Scale
def eval_foraging(n):
    # Load models, sim, track collected items sum
    pass

for n in [5,10,20]:
    score = eval_foraging(n)
    print(f'N={n}: Total collected {score}')

# Reflection: Independent MARL scales poorly due to non-stationarity; centralized critic (QMIX) better for coop. Performance drops with density.