## Imports

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import gymnasium as gym
from gymnasium import Env, spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env

## Data Example

In [None]:
antennas = [
    {'id': 'A1', 'x': 0, 'y': 0, 'bandwidth': 100},
    {'id': 'A2', 'x': 100, 'y': 0, 'bandwidth': 80},
    {'id': 'A3', 'x': 50, 'y': 86, 'bandwidth': 120}
]
users = [
    {'id': 'U1', 'x': 10, 'y': 30},
    {'id': 'U2', 'x': 60, 'y': 20},
    {'id': 'U3', 'x': 100, 'y': 60},
    {'id': 'U4', 'x': 20, 'y': 90}
]

In [None]:
df_antennas = pd.DataFrame(antennas)
df_antennas

In [None]:
df_users = pd.DataFrame(users)
df_users

In [None]:
plt.figure(figsize=(9, 7))
plt.plot(df_antennas["x"], df_antennas["y"], 'ro', label='Antennas', markersize=10)
plt.plot(df_users["x"], df_users["y"], 'bs', label='Users', markersize=8)

for i, row in df_antennas.iterrows():
    plt.text(row["x"] + 2, row["y"] + 2, row["id"], color='red', fontsize=10)
for i, row in df_users.iterrows():
    plt.text(row["x"] + 2, row["y"] + 2, row["id"], color='blue', fontsize=10)
    
plt.grid(True)
plt.legend()
plt.xlabel('X Coordinate')
plt.ylabel('Y Coordinate')
plt.title('Antennas and Users Positions')
plt.axis('equal')
plt.show()

In [None]:
distance_matrix = np.zeros((len(users), len(antennas)))

for i, user in enumerate(users):
    for j, antenna in enumerate(antennas):
        distance_matrix[i][j] = np.linalg.norm([user['x'] - antenna['x'], user['y'] - antenna['y']])

user_ids = [user['id'] for user in users]
antenna_ids = [antenna['id'] for antenna in antennas]
df_distance = pd.DataFrame(distance_matrix, index=user_ids, columns=antenna_ids)
df_distance

## Reinforcement Learning

OpenAI's `gym` is a Python library used to build training environments for RL agents. It doesn't train the agent itself, but rather creates the "world" in which the agent operates. What we define is:

- What the agent can observe.
- What actions the agent can take.
- What reward the agent receives for those actions.
- When the episode ends.

A `gym` environment always has 5 functions:
- `__init__()`: Initializes the environment (users, antennas, etc.).
- `reset()`: Starts a new episode, returns the initial state.
- `step(action)`: Applies an action, returns: (new state, reward, done, info).
- `render()`: (Optional) Visually shows what's happening.
- `close()`: (Optional) Frees resources at the end.

### Simple Case: Assignment of users to a unique antenna

In [None]:
class SimpleAntennaEnv(gym.Env):
    def __init__(self):
        super(SimpleAntennaEnv, self).__init__()

        self.antennas = [
            {'id': 'A0', 'x': 10, 'y': 90, 'bandwidth': 2},
            {'id': 'A1', 'x': 90, 'y': 90, 'bandwidth': 2}
        ]

        self.num_antennas = len(self.antennas)
        self.num_users = 3

        # The observation space is what the agent sees: in this case the distances of each user to each antenna and the available bandwidth of each antenna
        # spaces.Box represents a continuous space (the observation is a vector of real numbers)
        # Example of observation: [distance_to_A0, distance_to_A1, remaining_capacity_A0, remaining_capacity_A1]
        self.observation_space = spaces.Box(low=0, high=np.inf, shape=(self.num_antennas*2,), dtype=np.float32)
        
        # The action space is what the agent can do: in this case, select one of the antennas to connect to
        # spaces.Discrete means the agent must pick an integer action from a finite set of actions (0 to num_antennas-1)
        self.action_space = spaces.Discrete(self.num_antennas)

        self.reset()

    def reset(self, seed=None, options=None):
        self.current_user = 0
        self.bandwidth_used = np.zeros(self.num_antennas)

        # Random users per episode to guarantee variability
        self.users = []
        for i in range(self.num_users):
            self.users.append({
                'id': f'U{i}',
                'x': np.random.uniform(0, 100),
                'y': np.random.uniform(0, 100)
            })

        self.distances = self.compute_distances()
        self.connections = []

        return self._get_observation(), {}

    def compute_distances(self):
        dist_matrix = np.zeros((self.num_users, self.num_antennas))
        for i, user in enumerate(self.users):
            for j, antenna in enumerate(self.antennas):
                dist_matrix[i][j] = np.linalg.norm(
                    [user['x'] - antenna['x'], user['y'] - antenna['y']]
                )
        return dist_matrix

    def _get_observation(self):
        """
        Method that returns the state (observation) for the current user, which will be seen
        by the agent during training/inference.
        """
        distances = self.distances[self.current_user]
        capacities = np.array([
            self.antennas[i]['bandwidth'] - self.bandwidth_used[i]
            for i in range(self.num_antennas)
        ])
        return np.concatenate([distances, capacities]).astype(np.float32)

    def step(self, action):
        user_idx = self.current_user
        antenna_idx = action
        distance = self.distances[user_idx][antenna_idx]

        if distance <= 50:
            if self.bandwidth_used[antenna_idx] < self.antennas[antenna_idx]['bandwidth']:
                reward = 1
                self.bandwidth_used[antenna_idx] += 1
            else:
                reward = 0  
        else:
            reward = -1 

        self.connections.append((user_idx, antenna_idx))
        self.current_user += 1
        terminated = self.current_user >= self.num_users
        truncated = False

        if terminated:
            obs = np.zeros(self.observation_space.shape, dtype=np.float32)
        else:
            obs = self._get_observation()

        return obs, reward, terminated, truncated, {}

    def render(self, mode='human'):
        plt.figure(figsize=(8, 6))

        for antenna in self.antennas:
            plt.plot(antenna['x'], antenna['y'], 'ro', markersize=10)
            plt.text(antenna['x'] + 1, antenna['y'] + 1, antenna['id'], color='red')

        for user in self.users:
            plt.plot(user['x'], user['y'], 'bs', markersize=8)
            plt.text(user['x'] + 1, user['y'] + 1, user['id'], color='blue')

        for user_idx, antenna_idx in self.connections:
            user = self.users[user_idx]
            antenna = self.antennas[antenna_idx]
            plt.plot(
                [user['x'], antenna['x']],
                [user['y'], antenna['y']],
                'g--'
            )

        plt.xlabel('X')
        plt.ylabel('Y')
        plt.title('Antennas, Users and Connections')
        plt.grid(True)
        plt.axis('equal')
        plt.show()

In [None]:
# Check that the environment is properly implemented
check_env(SimpleAntennaEnv())

# Create the environment
env = SimpleAntennaEnv()

# Create the PPO model
model = PPO("MlpPolicy", env, verbose=1)

# Train the model
model.learn(total_timesteps=10000)

In [None]:
obs, _ = env.reset()
done = False
total_reward = 0

while not done:
    action, _ = model.predict(obs)
    obs, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    total_reward += reward

env.render()
plt.show()

print(f"Agent total reward: {total_reward}")

### Complex Case: Assignment of users to several antennas

`self.observation_space`: It is what the agent sees, which in this case is the distances of each user to each antenna and the available bandwith of each antenna. With this the agent decides what action to take. In our case:
```
self.observation_space = spaces.Box(
    low=0.0,
    high=np.inf,
    shape=(self.num_antennas * 2,),
    dtype=np.float32
)
```

**spaces.Box** represents a continuous space (the observation is a vector of real numbers). We are telling the agent that it will receive a 2xnum_antennas vector:
```
[
 dist_a0, dist_a1, dist_a2, dist_a3, dist_a4,
 cap_a0,  cap_a1,  cap_a2,  cap_a3,  cap_a4
]
```


`self.action_space`: It is what the agent can do, which in this case is select one of the antennas to connect to.
```
self.action_space = spaces.Box(
            low=0.0, high=1.0,
            shape=(self.num_antennas,),
            dtype=np.float32
        )
```

**spaces.Discrete** the agent must pick an integer action from a finite set of actions (0 to num_antennas-1). 

In [None]:
class MultiAntennaEnv(gym.Env):
    def __init__(self):
        """
        Initialize the environment with:
        - 5 fixed antennas, each with 1000 Mbps total capacity
        - A fixed number of users (default: 3)
        - Observation and action space definitions
        """
        super(MultiAntennaEnv, self).__init__()

        # Fixed antennas, each with 1000 Mbps total capacity
        self.antennas = [
            {'id': 'A0', 'x': 10,  'y': 90, 'bandwidth': 1000.0},
            {'id': 'A1', 'x': 90,  'y': 90, 'bandwidth': 1000.0},
            {'id': 'A2', 'x': 50,  'y': 50, 'bandwidth': 1000.0},
            {'id': 'A3', 'x': 20,  'y': 20, 'bandwidth': 1000.0},
            {'id': 'A4', 'x': 80,  'y': 20, 'bandwidth': 1000.0},
        ]

        self.num_antennas = len(self.antennas)
        self.num_users = 3
        self.max_distance = 50.0
        self.user_demand = 50.0  # Mbps required per user

        # Observation: distances to antennas + remaining capacity per antenna
        self.observation_space = spaces.Box(
            low=0.0, high=np.inf,
            shape=(self.num_antennas * 2,),
            dtype=np.float32
        )

        # Action: fraction of the user's demand assigned to each antenna
        self.action_space = spaces.Box(
            low=0.0, high=1.0,
            shape=(self.num_antennas,),
            dtype=np.float32
        )

        self.reset()

    def reset(self, seed=None, options=None):
        """
        Reset the environment at the beginning of an episode:
        - Set bandwidth usage to zero
        - Generate new random user positions
        - Precompute distances
        - Return the initial observation
        """
        self.current_user = 0
        self.bandwidth_used = np.zeros(self.num_antennas)

        # Generate random users within a 100x100 area
        self.users = []
        for i in range(self.num_users):
            self.users.append({
                'id': f'U{i}',
                'x': np.random.uniform(0, 100),
                'y': np.random.uniform(0, 100)
            })

        self.distances = self.compute_distances()
        self.connections = []
        return self._get_observation(), {}

    def compute_distances(self):
        """
        Compute and return a matrix of Euclidean distances between
        each user and each antenna (shape: num_users × num_antennas).
        """
        dist_matrix = np.zeros((self.num_users, self.num_antennas))
        for i, user in enumerate(self.users):
            for j, antenna in enumerate(self.antennas):
                dist_matrix[i][j] = np.linalg.norm(
                    [user['x'] - antenna['x'], user['y'] - antenna['y']]
                )
        return dist_matrix

    def _get_observation(self):
        """
        Return the current observation:
        - Distances from current user to all antennas
        - Remaining bandwidth of each antenna
        """
        distances = self.distances[self.current_user]
        capacities = np.array([
            self.antennas[i]['bandwidth'] - self.bandwidth_used[i]
            for i in range(self.num_antennas)
        ])
        return np.concatenate([distances, capacities]).astype(np.float32)

    def step(self, action):
        """
        Take a step in the environment:
        - The agent proposes a fractional assignment to each antenna
        - We compute how much bandwidth is actually assigned (respecting distance and capacity limits)
        - Reward is based on whether the user's demand is fully, partially, or not met
        - Move to the next user
        """
        action = np.clip(action, 0, 1)  # Ensure action is within bounds

        # Normalize if sum of fractions exceeds 1.0
        total_fraction = np.sum(action)
        if total_fraction > 1.0:
            action = action / total_fraction
           
        total_assigned = 0.0  # keeps track of how many total Mbps were assigned for the user
        valid_assignments = np.zeros(self.num_antennas)  # keeps track of how many Mbps were actually assigned per antenna

        for j in range(self.num_antennas):
            # Distance between current user and antenna j
            dist = self.distances[self.current_user][j]

            # Number of Mbps the agent wants to assign to antenna j
            assign_mbps = action[j] * self.user_demand

            # Check if the antenna is whithin range and has enough bandwidth
            if dist <= self.max_distance:
                if self.bandwidth_used[j] + assign_mbps <= self.antennas[j]['bandwidth']:
                    valid_assignments[j] = assign_mbps
                    self.bandwidth_used[j] += assign_mbps
                    total_assigned += assign_mbps

        # Store which antennas were assigned to the current  user and with how many Mbps
        self.connections.append((self.current_user, valid_assignments.copy()))

        if total_assigned >= self.user_demand:
            reward = 1.0
        elif total_assigned > 0:
            reward = 0.5
        else:
            reward = -1.0

        self.current_user += 1
        terminated = self.current_user >= self.num_users
        truncated = False

        if terminated:
            obs = np.zeros(self.observation_space.shape, dtype=np.float32)
        else:
            obs = self._get_observation()

        return obs, reward, terminated, truncated, {}

    def render(self, mode='human'):
        """
        Display a 2D plot showing:
        - Red circles: antennas with usage/remaining bandwidth
        - Blue squares: users with total Mbps received
        - Green dashed lines: links representing partial connections
        """
        fig, ax = plt.subplots(figsize=(9, 7))

        for i, antenna in enumerate(self.antennas):
            used = self.bandwidth_used[i]
            total = antenna['bandwidth']
            remaining = total - used

            ax.plot(antenna['x'], antenna['y'], 'ro', markersize=10)
            ax.text(
                antenna['x'] + 1,
                antenna['y'] + 1,
                f"{antenna['id']}\nused: {used:.0f} / left: {remaining:.0f} Mbps",
                color='red',
                fontsize=9
            )

        for user_idx, user in enumerate(self.users):
            ax.plot(user['x'], user['y'], 'bs', markersize=8)

            # Sum of assigned Mbps from all antennas to this user
            if user_idx < len(self.connections):
                assignment = self.connections[user_idx][1]
                received = np.sum(assignment)
                label = f"{user['id']}\nreceived: {received:.1f} Mbps"
            else:
                label = f"{user['id']}"

            ax.text(user['x'] + 1, user['y'] + 1, label, color='blue', fontsize=9)

        for user_idx, assignment in self.connections:
            user = self.users[user_idx]
            for j in range(self.num_antennas):
                if assignment[j] > 0:
                    antenna = self.antennas[j]
                    ax.plot([user['x'], antenna['x']], [user['y'], antenna['y']], 'g--')

        ax.set_xlabel('X coordinate')
        ax.set_ylabel('Y coordinate')
        ax.set_title('User connections and antenna usage (in Mbps)')
        ax.grid(True)
        ax.set_aspect('equal')
        plt.show()

In [None]:
# Instantiate the environment
env = MultiAntennaEnv()
check_env(env)

# Create the PPO model
# PPO = Proximal Policy Optimization algorithm, a popular reinforcement learning algorithm that learns using clipped policy gradients
# A policy is the neural network that the agent uses to decide which action to take given an observation
# A policy gradient is a way to improve the policy: "If the action gave a high reward, make it more likely next time"

# Problem! -> Older algorithms' update could be too agressive: 
#   - If an action got a very high reward, the model might overreact
#   - It could jump to a new policy that’s too different from the previous one
#   - That breaks learning

# PPO's Solution! -> Clipped policy gradients: Only allow the policy to change a small amount per udpate

# Then, we need to choose a policy architecture, which in this case is a simple MLP
# PPO uses the MLP to learn the policy, and it clips the gradients during the learing step
model = PPO("MlpPolicy", env, verbose=1)

# Train
model.learn(total_timesteps=20000)

In [None]:
obs, _ = env.reset()
done = False
total_reward = 0

while not done:
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    total_reward += reward

env.render()
print(f"\nTotal reward: {total_reward}")