<a href="https://colab.research.google.com/github/daniel-alex101/DronRL-POMDP/blob/main/GTrXL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Transformer GTrXL en resolución de observabilidad parcial en Aprendizaje por Refuerzo


Transformer GTrXL basado en la implementación de GitHub de Alan:
https://github.com/alantess/gtrxl-torch

In [None]:
import os
import torch as T
import torch.optim as optim
import torch.nn.functional as F
import torch.nn as nn
from typing import Optional
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch import Tensor
import math
import sys
from stable_baselines3.common.distributions import DiagGaussianDistribution
from functools import partial


# Positional encoding del transformer
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=1024):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout) # Capa de dropout
        pe = T.zeros(max_len, d_model)
        position = T.arange(0, max_len, dtype=T.float).unsqueeze(1) # Generar índices para las posiciones
        div_term = T.exp(
            T.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # Divisor de la fórmula del PE
        pe[:, 0::2] = T.sin(position * div_term) # seno para los pares
        pe[:, 1::2] = T.cos(position * div_term) # coseno para los impares
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :] # Sumar PE en el forward
        return self.dropout(x)

# Codificador del transformer
class TEL(TransformerEncoderLayer):
    def __init__(self,
                 d_model,
                 nhead,
                 n_layers=1,
                 dim_feedforward=256,
                 activation="relu",
                 dropout=0,
                 layer_norm_eps=1e-5,
                 batch_first=False):
        super().__init__(d_model, nhead, dim_feedforward, dropout, activation,
                         layer_norm_eps, batch_first)
        # 2 GRUs: una al comienzo y otra al final
        self.gru_1 = nn.GRU(d_model,
                            d_model,
                            num_layers=n_layers,
                            batch_first=True)
        self.gru_2 = nn.GRU(input_size=d_model,
                            hidden_size=d_model,
                            num_layers=n_layers,
                            batch_first=True)

    def forward(self,
                src: Tensor,
                src_mask: Optional[Tensor] = None,
                src_key_padding_mask: Optional[Tensor] = None,
                is_causal: Optional[bool] = None ) -> Tensor:
        h = (src).sum(dim=1).unsqueeze(dim=0) # Calcular estado inicial h de las GRU
        src = self.norm1(src) # Aplicar primer layer norm

        # Atención
        out = self.self_attn(src,
                             src,
                             src,
                             attn_mask=src_mask,
                             key_padding_mask=src_key_padding_mask)[0]

        out, h = self.gru_1(out, h) # Aplicar primera GRU con su estado h
        out = self.norm2(out) # Segunda leyer norm del transformer

        # Aplicando capas lineales del transformer sobre la normalización
        out = self.activation(self.linear1(out))
        out = self.activation(self.linear2(out))
        out, h = self.gru_2(out, h) # Obtener salida final con la segunda GRU
        return out



# GTrXL final
class GTrXL(nn.Module):
    def __init__(self,
                 d_model,
                 nheads,
                 transformer_layers,
                 hidden_dims=256,
                 n_layers=1,
                 layer_norm_eps=1e-5,
                 batch_first=False,
                 chkpt_dir="models",
                 activation='relu',
                 network_name='network.pt'):
        super(GTrXL, self).__init__()

        # Positional encoding
        self.embed = PositionalEncoding(d_model)
        encoded = TEL(d_model,
                      nheads,
                      n_layers,
                      dim_feedforward=hidden_dims,
                      activation=activation,
                      layer_norm_eps=layer_norm_eps,
                      batch_first=batch_first)
        self.transfomer = TransformerEncoder(encoded, transformer_layers)
        self.file = os.path.join(chkpt_dir, network_name)

    def forward(self, x):
        x = self.embed(x)
        x = self.transfomer(x)
        return x

    def save(self):
        T.save(self.state_dict(), self.file)

    def load(self):
        self.load_state_dict(T.load(self.file))

## Entorno de simulación de redes móviles con drones (esto no es parte de la ayudantía sino del proyecto de trabajo de título)


In [None]:
import gymnasium as gym
from gymnasium.spaces import Box, Dict, Discrete
import numpy as np
from scipy import constants
from math import pi
from stable_baselines3.common.callbacks import BaseCallback
import matplotlib.pyplot as plt
from stable_baselines3.common.env_checker import check_env
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
import pandas as pd
import json
import pickle
from stable_baselines3 import PPO
import sys


#REWARDS
class Person:
  def __init__(self, position):
    self.position = position
  # Mueve a la persona en una direccion asegurandose que no se salga del grid
  def take_step(self, n, m, direction):
    return np.clip(self.position + direction, [0, 0], [n - 1, m - 1])

def RewardMetrics(distances):
  #constantes
  fc=28e9
  lambda_s=constants.speed_of_light/fc
  Pt=10e-3
  Pt_dB=10*np.log10(Pt) #-20
  #variacion de ruido
  N0_dbm=-170
  N0=10**((N0_dbm-30)/10)
  B=1e6 #Hz
  sigma2_u=N0*B
  sigma2_u_dB=10*np.log10(sigma2_u)
  gamma=1
  d0=1
  metrics=[]
  k_dB=10*np.log10(((lambda_s/4*pi*d0))**2)#duda
  for distance in distances.values():
      line= distance
      PL_dB=k_dB-10*gamma*np.log10((line / d0)**2)
      Pl=10**(PL_dB/10)
      R=B*np.log2(1+(Pt*Pl/sigma2_u))
      Cost=R*1e-6
      metrics.append(np.mean(Cost))
  return metrics

def get_distances(agent_position, height, people):
  agent_position_3d = np.append(agent_position, height)
  distances = {}
  for i, person in enumerate(people):
    people_position_3d = np.append(person.position, 0)
    distance = np.linalg.norm(agent_position_3d - people_position_3d)
    distances[i] = distance
  return distances

def Total_signal_grid(agent_position, height, people):
  distances_grid = get_distances(agent_position, height, people)
  metrics_grid = RewardMetrics(distances_grid)
  return sum(metrics_grid)

def StdDev_signal_grid(agent_position, height, people):
  distances_grid = get_distances(agent_position, height, people)
  metrics_grid = RewardMetrics(distances_grid)
  return np.std(metrics_grid)
# Recompensa en base a la señal dependiendo de la distancia
def Reward1(distances):
  fc=28e9
  lambda_s=constants.speed_of_light/fc
  Pt=10e-3
  Pt_dB=10*np.log10(Pt) #-20
  #variacion de ruido
  N0_dbm=-170
  N0=10**((N0_dbm-30)/10)
  B=1e6 #Hz
  sigma2_u=N0*B
  sigma2_u_dB=10*np.log10(sigma2_u)
  gamma=1
  d0=1
  metrics=[]
  k_dB=10*np.log10(((lambda_s/4*pi*d0))**2)#duda
  for distance in distances.values():
      line= distance
      PL_dB=k_dB-10*gamma*np.log10((line / d0)**2)
      Pl=10**(PL_dB/10)
      R=B*np.log2(1+(Pt*Pl/sigma2_u))
      Cost=R*1e-6
      metrics.append(Cost)
  return np.mean(metrics)

#Recompensa con umbral
def Reward2(threshold, distances):
  costs = RewardMetrics(distances)

  # Cantidad de personas
  n_people = len(distances)
  n_people_above_treshold = 0
  for cost in costs:
    if cost > threshold:
      n_people_above_treshold += 1
  perc_people_above_treshold = n_people_above_treshold / n_people

  # Con porcentaje de personas
  if perc_people_above_treshold == 1:
    return 5, np.mean(costs), perc_people_above_treshold
  elif perc_people_above_treshold > 0.8:
    return 3, np.mean(costs), perc_people_above_treshold
  elif perc_people_above_treshold > 0.5:
    return 1, np.mean(costs), perc_people_above_treshold
  else:
    return 0, np.mean(costs), perc_people_above_treshold

# Recompensa con diferencia de señal
def Reward3(old_distances, new_distances):
  old_metrics = RewardMetrics(old_distances).copy()
  old_mean = np.mean(old_metrics)
  new_metrics = RewardMetrics(new_distances).copy()
  new_mean = np.mean(new_metrics)
  delta = new_mean - old_mean
  return delta, new_mean

def Calc_threshold(n,m,height):
  distances_threshold = {}
  agent_position_min = np.array([ 0, 0, height])
  person_position_min = np.array([ 0, 0, 0])
  agent_position_max = np.array([n-1, m-1, height])

  distance_min = np.linalg.norm(agent_position_min - person_position_min)
  distances_threshold[0] = distance_min
  distance_max = np.linalg.norm(agent_position_max - person_position_min)
  distances_threshold[1] = distance_max

  metrics_threshold = RewardMetrics(distances_threshold).copy()
  mean_threshold= np.mean(metrics_threshold)
  return mean_threshold

def get_distances(agent_position,height,people):
  agent_position_3d = np.append(agent_position, height)
  distances = {}
  for i, person in enumerate(people):
    people_position_3d = np.append(person.position, 0)
    distance = np.linalg.norm(agent_position_3d - people_position_3d)
    distances[i] = distance
  return distances

# Cambiar a cantidad de señal
def IdealCaseReward1(n, m,action_to_direction,ideal_agent_pos,height,people):
  maxSignalReward = 0
  best_action = 0
  for i in range(9):
    direction = action_to_direction[i]
    new_pos = ideal_agent_pos + direction
    new_pos = np.clip(new_pos, [0, 0], [n - 1, m - 1])
    reward_signal = Reward1(get_distances(new_pos,height,people))

    if reward_signal > maxSignalReward:
      maxSignalReward = reward_signal
      best_action = i

  direction = action_to_direction[best_action]
  ideal_pos = ideal_agent_pos + direction
  ideal_pos = np.clip(ideal_pos, [0, 0], [n - 1, m - 1])
  return ideal_pos, maxSignalReward


def IdealCaseReward2(n, m, threshold, action_to_direction,ideal_agent_pos,height,people):
  maxNumberPeople = 0
  maxMeanSignal = 0
  best_action = 0
  max_reward = 0
  for i in range(9):
    direction = action_to_direction[i]
    new_pos = ideal_agent_pos + direction
    new_pos = np.clip(new_pos, [0, 0], [n - 1, m - 1])
    reward,mean_signal,people_above_th = Reward2(threshold, get_distances(new_pos,height,people))

    # Incluir el promedio de señal para decidir entre cantidad de personas por sobre el umbral
    if people_above_th >= maxNumberPeople:
      if people_above_th == maxNumberPeople:
        if mean_signal > maxMeanSignal:
          maxMeanSignal = mean_signal
          maxNumberPeople = people_above_th
          best_action = i
          max_reward = reward
      else:
        maxMeanSignal = mean_signal
        maxNumberPeople = people_above_th
        best_action = i
        max_reward = reward
  direction = action_to_direction[best_action]
  ideal_pos = ideal_agent_pos + direction
  ideal_pos = np.clip(ideal_pos, [0, 0], [n - 1, m - 1])
  return ideal_pos, max_reward, maxMeanSignal

def IdealCaseReward3(n, m, action_to_direction,ideal_agent_pos,height,people, old_distances):
  maxSignalReward = 0
  best_action = 0
  new_mean_signal = 0
  for i in range(9):
    direction = action_to_direction[i]
    new_pos = ideal_agent_pos + direction
    new_pos = np.clip(new_pos, [0, 0], [n - 1, m - 1])
    reward, mean_signal = Reward3(old_distances, get_distances(new_pos,height,people))

    if reward > maxSignalReward:
      maxSignalReward = reward
      best_action = i
      new_mean_signal = mean_signal

  direction = action_to_direction[best_action]
  ideal_pos = ideal_agent_pos + direction
  ideal_pos = np.clip(ideal_pos, [0, 0], [n - 1, m - 1])
  return ideal_pos, maxSignalReward, new_mean_signal


def getActionProbs(seedo):
  primo1 = 2
  primo2 = 3
  primo3 = 5
  primo4 = 7
  primo5 = 11
  primo6 = 13
  primo7 = 17
  primo8 = 19
  primo9 = 23
  primo10 = 29
  prob = np.random.permutation(np.array([ 0.025, 0.025, 0.025, 0.025, 0.025, 0.025, 0.025, 0.025, 0.8]))

  if seedo % primo1 == 0:
    #print("Acción: Stay in place")
    prob = np.array([0.8, 0.025, 0.025, 0.025, 0.025, 0.025, 0.025, 0.025, 0.025]) # Acción: Stay in place
  elif seedo % primo2 == 0:
    #print("Acción: ↖")
    prob = np.array([ 0.025, 0.8, 0.025, 0.025, 0.025, 0.025, 0.025, 0.025, 0.025]) # Acción: ↖
  elif seedo % primo3 == 0:
    #print("Acción: ↑")
    prob = np.array([ 0.025, 0.025, 0.8, 0.025, 0.025, 0.025, 0.025, 0.025, 0.025]) # Acción: ↑
  elif seedo % primo4 == 0:
    #print("Acción: ↗")
    prob = np.array([ 0.025, 0.025, 0.025, 0.8, 0.025, 0.025, 0.025, 0.025, 0.025]) # Acción: ↗
  elif seedo % primo5 == 0:
    #print("Acción: →")
    prob = np.array([ 0.025, 0.025, 0.025, 0.025, 0.8, 0.025, 0.025, 0.025, 0.025]) # Acción: →
  elif seedo % primo6 == 0:
    #print("Acción: ↘")
    prob = np.array([ 0.025, 0.025, 0.025, 0.025, 0.025, 0.8, 0.025, 0.025, 0.025]) # Acción: ↘
  elif seedo % primo7 == 0:
    #print("Acción: ↓")
    prob = np.array([ 0.025, 0.025, 0.025, 0.025, 0.025, 0.025, 0.8, 0.025, 0.025]) # Acción: ↓
  elif seedo % primo8 == 0:
    #print("Acción: ↙")
    prob = np.array([ 0.025, 0.025, 0.025, 0.025, 0.025, 0.025, 0.025, 0.8, 0.025]) # Acción: ↙
  elif seedo % primo9 == 0:
    #print("Acción: ←")
    prob = np.array([ 0.025, 0.025, 0.025, 0.025, 0.025, 0.025, 0.025, 0.025, 0.8]) # Acción: ←
  return prob

#------------------------------ -------------------CUSTOM ENV -------------------------------------------------------------
class CustomEnv3(gym.Env):
  def __init__(self, n, m, max_steps, height, people_quantity, seed=None):
    super(CustomEnv3, self).__init__()
    self.n = n
    self.m = m
    self.max_steps = max_steps
    self.height = height
    self.people_quantity = people_quantity
    self.seed = seed
    self.threshold = Calc_threshold(self.n,self.m,self.height)

    self.observation_space = Dict(
    {
      "grid": Box(low=0, high=self.people_quantity, shape=(n*m*3,), dtype=np.int32),  # Grid
      "x_agent": Discrete(n),  # Agent's x position
      "y_agent": Discrete(m),  # Agent's y position
    }, seed)

    # Define action space (9 possible actions)
    self.action_space = Discrete(9)

    # Mapping of actions to directions
    self._action_to_direction = {
        0: np.array([0, 0]),    # Stay in place
        1: np.array([-1, -1]),  # ↖
        2: np.array([0, -1]),   # ↑
        3: np.array([1, -1]),   # ↗
        4: np.array([1, 0]),    # →
        5: np.array([1, 1]),    # ↘
        6: np.array([0, 1]),    # ↓
        7: np.array([-1, 1]),   # ↙
        8: np.array([-1, 0]),   # ←
    }
    self.reset()

  def reset(self, seed=None):
    self.static_agent_pos = np.array([self.n // 2, self.m // 2])
    self.rewards_static_agent = []
    self.ideal_agent_pos = np.array([self.n // 2, self.m // 2])
    self.rewards_ideal_agent = []

    self.current_step = 0

    # Probabilidades de personas
    self.probabilities = getActionProbs(self.seed)  ###CAMBIO

    # Se devuelve al agente al centro del grid
    self.agent_position = np.array([self.n // 2, self.m // 2])

    # Se actualiza el historial de estados de la matriz de personas
    self.grid = np.zeros((self.n, self.m), dtype=np.int32)
    self.grid_1step_prev = np.zeros((self.n, self.m), dtype=np.int32)
    self.grid_2step_prev = np.zeros((self.n, self.m), dtype=np.int32)

    # Se colocan aleatoriamente las personas en la matriz
    people = []
    local_rng = np.random.default_rng(self.seed)
    for _ in range(self.people_quantity):
        if self.seed % 29 == 0:
            x, y = np.random.randint(self.n), np.random.randint(self.m)
        else:
            x = local_rng.integers(self.n)
            y = local_rng.integers(self.m)
        person = Person([x, y])
        people.append(person)
        self.grid[x, y] += 1

    self.people = people.copy()
    self.state = self.observation_space.sample()
    info = self._get_info()
    obs = self._get_obs()
    return obs, info


  def _get_info(self):
    info = {
        'current_step': self.current_step,
        'agent_position': self.agent_position,
        'ideal_agent_position': self.ideal_agent_pos,
        'static_agent_position': self.static_agent_pos,
        'people_positions': self.people,
        'Total_signal': Total_signal_grid(self.agent_position, self.height, self.people),
        'Total_signal_static': Total_signal_grid(self.static_agent_pos, self.height, self.people),
        'Total_signal_ideal': Total_signal_grid(self.ideal_agent_pos, self.height, self.people),
        'Std_dev_signal': StdDev_signal_grid(self.agent_position, self.height, self.people),
        'Std_dev_signal_static': StdDev_signal_grid(self.static_agent_pos, self.height, self.people),
        'Std_dev_signal_ideal': StdDev_signal_grid(self.ideal_agent_pos, self.height, self.people)
    }
    return info

  def agent_move(self, action):
    direction = self._action_to_direction[action]
    new_position = self.agent_position + direction
    return np.clip(new_position, [0, 0], [self.n - 1, self.m - 1])

  def step(self, action):
    # Se mueve el agente y se actualiza su posición
    old_distances = get_distances(self.agent_position, self.height, self.people)
    old_distances_static = get_distances(self.static_agent_pos, self.height, self.people)
    old_distances_ideal = get_distances(self.ideal_agent_pos,self.height,self.people)
    self.agent_position = self.agent_move(action).copy()

    # ------------------------------------------------------------------------
    # Se actualiza la posición de las personas
    self.grid_2step_prev = np.copy(self.grid_1step_prev)
    self.grid_1step_prev = np.copy(self.grid)
    posible_actions = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8])

    # Todas las personas realizan un paso considerando las probabilidades
    people_new_positions = []
    for person in self.people:
      action_direction = np.random.choice(posible_actions, p=self.probabilities)
      direction = self._action_to_direction[action_direction]
      person_new_position = person.take_step(self.n, self.m, direction).copy()
      if action_direction != 0:
          self.grid[person.position[0], person.position[1]] -= 1
          self.grid[person_new_position[0], person_new_position[1]] += 1
      person_updated = Person(person_new_position)
      people_new_positions.append(person_updated)

    self.people = people_new_positions.copy()
    new_distances = get_distances(self.agent_position, self.height, self.people)

    #Reward 3
    reward, new_mean = Reward3(old_distances, new_distances)

    # Recompensas de otros drones
    new_distances_static = get_distances(self.static_agent_pos, self.height, self.people)
    reward_static_agent, static_new_mean = Reward3(old_distances_static, new_distances_static)
    self.ideal_agent_pos, reward_ideal_agent, ideal_new_mean = IdealCaseReward3(self.n, self.m, self._action_to_direction, self.ideal_agent_pos, self.height, self.people, old_distances_ideal)

    # Check if the episode is done
    info = self._get_info()
    observation = self._get_obs()
    self.current_step += 1
    done = self.current_step >= self.max_steps

    truncated = False
    return observation, reward, done, truncated, info



class RewardCallback(BaseCallback):
    def __init__(self):
        super(RewardCallback, self).__init__()
        self.episode_rewards = []
        self.episode_infos = []

    def _on_step(self) -> bool:
        if len(self.locals['infos']) > 0:
            for info in self.locals['infos']:
                if 'episode' in info.keys():
                    self.episode_rewards.append(info['episode']['r'])
                    self.episode_infos.append(info)
        return True



    def _on_training_end(self):

        np.savetxt("./models/rewards3/transformer1_reward"+"_"+task_id+".txt", self.episode_rewards, delimiter=",")




## Modificación de observabilidad parcial 1: pasando solo el estado actual y no los dos anteriores

In [None]:

class PartialObservationEnv(CustomEnv3):
    def __init__(self, n, m, max_steps, height, people_quantity, seed=29):

        super().__init__(n, m, max_steps, height, people_quantity, seed)


        self.observation_space = gym.spaces.Dict({

            "grid": Box(low=0, high=self.people_quantity, shape=(n,m), dtype=np.int32),
            "x_agent": gym.spaces.Discrete(n),
            "y_agent": gym.spaces.Discrete(m)
        })

    def _get_obs(self):

        x, y = self.agent_position

        return {
            "grid": self.grid, # Solo la observación actual sin concatenar las dos anteriores
            "x_agent": x,
            "y_agent": y
        }


## Modificación de observabilidad parcial 2: pasando eliminando aleatoriamente una persona de la visión local

In [None]:

class PartialObservationEnv(CustomEnv3):
    def __init__(self, n, m, max_steps, height, people_quantity, seed=29):

        super().__init__(n, m, max_steps, height, people_quantity, seed)


        self.observation_space = gym.spaces.Dict({

            "partial_grid": Box(low=0, high=self.people_quantity, shape=(n*m*3,), dtype=np.int32),
            "x_agent": gym.spaces.Discrete(n),
            "y_agent": gym.spaces.Discrete(m)
        })

    def _get_obs(self):

        x, y = self.agent_position


        # Grid parcial de donde se quitan las personas
        partial_grid = np.zeros((self.n,self.m))

        person_mask = self.grid == 1

        random_values = np.random.randint(0,4, size=person_mask.shape) # Seleccionando posiciones aleatorias a quitar

        partial_grid[person_mask] = np.where(random_values[person_mask] == 0, 0, self.grid[person_mask])

        partial_grid = np.concatenate((self.grid_2step_prev.flatten(), self.grid_1step_prev.flatten(), partial_grid.flatten()))

        return {
            "partial_grid": partial_grid, # pasando grid parcial como estado
            "x_agent": x,
            "y_agent": y
        }


## Integración de entorno de Gym con transformer en Stable-Baselines 3


In [None]:

import gymnasium as gym
import numpy as np
import torch as T
import torch.nn as nn
from stable_baselines3 import PPO
from stable_baselines3.common.policies import ActorCriticPolicy
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor, MlpExtractor

Feature Extractor 1: El grid se pasa por una CNN antes de pasarse por el transformer


In [None]:

class GTrXLFeatureExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space, d_model, nheads, transformer_layers, conv_channels=32, kernel_size=3, stride=1, padding=1):


        super().__init__(observation_space, features_dim=d_model)

        # Obtener el tamaño del grid y las posiciones del agente
        grid_shape = observation_space["grid"].shape
        agent_pos_size = observation_space["x_agent"].n + observation_space["y_agent"].n  # Tamaño de las posiciones del agente

        # Definir las capas de la CNN
        self.conv1 = nn.Conv2d(1, conv_channels, kernel_size=kernel_size, stride=stride, padding=padding)
        self.conv2 = nn.Conv2d(conv_channels, conv_channels * 2, kernel_size=kernel_size, stride=stride, padding=padding)

        # Capa de pooling para reducir el tamaño del grid
        self.pool = nn.AdaptiveAvgPool2d((4, 4))

        # Capa de embedding para las posiciones del agente
        self.agent_pos_embedding = nn.Linear(agent_pos_size, d_model)

        # Capa de embedding para la entrada concatenada (CNN + posiciones del agente)
        self.input_embedding = nn.Linear(conv_channels * 2 * 4 * 4 + d_model, d_model)

        # Definiendo el modelo GTrXL
        self.gtrxl = GTrXL(
            d_model=d_model,
            nheads=nheads,
            transformer_layers=transformer_layers,
            hidden_dims=d_model,
            n_layers=1
        )

    def forward(self, observations):
        # Obtener el tamaño del batch
        batch_size = observations["grid"].shape[0]


        # Asegurarse de que el grid tenga la forma (batch_size, 1, height, width)
        grid = observations["grid"].view(batch_size, 1, *observations["grid"].shape[1:])

        # Procesar el grid con la CNN
        x = self.conv1(grid)
        x = T.relu(x)
        x = self.conv2(x)
        x = T.relu(x)
        x = self.pool(x)  # Aplicar el pooling

        # Aplanar el tensor después de la CNN
        x = x.view(batch_size, -1)

        # Obtener las posiciones del agente y procesarlas con una capa lineal
        x_agent = T.tensor(observations["x_agent"], dtype=T.float32).view(batch_size, -1)
        y_agent = T.tensor(observations["y_agent"], dtype=T.float32).view(batch_size, -1)

        # Concatenar las posiciones del agente y el grid procesado
        agent_pos = T.cat([x_agent, y_agent], dim=1)
        agent_pos = self.agent_pos_embedding(agent_pos)

        # Concatenar las características del grid y las posiciones del agente
        x = T.cat([x, agent_pos], dim=1)

        # Embedding de las entradas concatenadas
        x = self.input_embedding(x)

        # Añadir dimensión de secuencia para el Transformer
        x = x.unsqueeze(1)  # (batch_size, 1, d_model)

        # Pasar por el modelo GTrXL
        features = self.gtrxl(x)

        # Tomar la última salida de la dimensión de secuencia
        features = features[:, -1, :]  # Usar el último dato de la secuencia

        return features




Feature Extractor 2: Se pasa directamente la observación al transformer

In [None]:
class GTrXLFeatureExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space, d_model=256, nheads=4, transformer_layers=3):
        super().__init__(observation_space, features_dim=d_model)


        # Calcular tamaño de la grilla
        local_grid_size = int(np.prod(observation_space["partial_grid"].shape))
        agent_pos_size = observation_space["x_agent"].n + observation_space["y_agent"].n
        n_input = local_grid_size + agent_pos_size

        self.input_embedding = nn.Linear(n_input, d_model)
        self.gtrxl = GTrXL(
            d_model=d_model,
            nheads=nheads,
            transformer_layers=transformer_layers,
            hidden_dims=d_model,
            n_layers=1
        )

    def forward(self, observations):


            batch_size = observations["partial_grid"].shape[0]
            local_grid_flat = observations["partial_grid"].clone().detach().float().view(batch_size, -1)

            # Obtener posiciones del agente
            x_agent = observations["x_agent"].clone().detach().float().view(batch_size, -1)
            y_agent = observations["y_agent"].clone().detach().float().view(batch_size, -1)
            # Concatenar los elementos del estado (posiciones y grilla)
            x = T.cat([
                local_grid_flat,
                x_agent,
                y_agent
            ], dim=1)

            x = self.input_embedding(x)
            x = x.unsqueeze(1)  #
            features = self.gtrxl(x)
            features = features[:, -1, :]  # Tomar la última observación
            return features


Creación de la estrategia de RL que combina el algoritmo PPO y el feature extractor

In [None]:
class GTrXLPolicy(ActorCriticPolicy):
    def __init__(
        self,
        observation_space,
        action_space,
        lr_schedule,
        *args,
        **kwargs,
    ):
        super().__init__(
            observation_space,
            action_space,
            lr_schedule,
            *args,
            **kwargs,
        # Parámetros del transformer: cabezas de atención, dimensionalidad y capas
        self.features_extractor_kwargs = kwargs.get("features_extractor_kwargs", {}) s
        self.d_model = self.features_extractor_kwargs.get("d_model")
        self.nheads = self.features_extractor_kwargs.get("nheads")
        self.transformer_layers = self.features_extractor_kwargs.get("transformer_layers")

    # Construcción de las redes de PPO
    def _build(self, lr_schedule) -> None:

        self.features_extractor = self.make_features_extractor()


        # Crear el MLP extractor de features
        self.mlp_extractor = MlpExtractor(
            self.features_dim,  # input dim
            net_arch=self.net_arch,
            activation_fn=self.activation_fn,
            device=self.device
        )

        # Construir las redes de estrategia y valor
        latent_dim_pi = self.mlp_extractor.latent_dim_pi
        latent_dim_vf = self.mlp_extractor.latent_dim_vf

        # Capa de estrategias
        if isinstance(self.action_dist, DiagGaussianDistribution):
            self.action_net = nn.Linear(latent_dim_pi, self.action_dist.proba_distribution_net_params)
        else:  # Categórico
            self.action_net = nn.Linear(latent_dim_pi, self.action_space.n)

        # Capa de valores
        self.value_net = nn.Linear(latent_dim_vf, 1)

        # Inicializar pesos
        module_gains = {
            self.features_extractor: np.sqrt(2),
            self.mlp_extractor: np.sqrt(2),
            self.action_net: 0.01,
            self.value_net: 1,
        }

        for module, gain in module_gains.items():
            module.apply(partial(self.init_weights, gain=gain))

        # Optimizador
        self.optimizer = self.optimizer_class(self.parameters(), lr=lr_schedule(1), **self.optimizer_kwargs)

    # Crear feature extractor del transformer
    def make_features_extractor(self) -> BaseFeaturesExtractor:

        return self.features_extractor_class(self.observation_space, **self.features_extractor_kwargs)


Función que evalua un modelo entrenado calculando la señal media que alcanza

In [None]:
def mean_signal( model, env, max_steps = 20, episodes=10):
  mean_total_signal_rl = []


  for episode in range(episodes):
    print(f"Episode {episode}")
    obs,_ = env.reset()
    done = False
    episode_return = 0
    total_signal_rl = []
    for i in range(max_steps):
      # Predice la acción de manera determinística
      action, _ = model.predict(obs, deterministic=True)
      action = int(action)  # Convertir la acción a un entero
      obs, reward, done, _, info = env.step(action)
      episode_return += reward

      total_signal_rl.append(info['Total_signal'])

    mean_total_signal_rl.append(np.mean(total_signal_rl))
    return np.mean(mean_total_signal_rl)


Busqueda de hiperparámetros con Optuna

In [None]:
def objective_PPOGTrXL(trial):
    # Crear entorno con observación parcial
    env = PartialObservationEnv(
        n=20,
        m=20,
        max_steps=50,
        height=1,
        people_quantity=10,
        seed=29
    )

    # Sugerencias de hiperparámetros
    learning_rate = trial.suggest_loguniform('learning_rate', 1e-6, 0.01)
    n_steps = trial.suggest_int('n_steps', 2048, 8192)
    batch_size = trial.suggest_categorical('batch_size', [64, 128, 256, 512])
    gamma = trial.suggest_uniform('gamma', 0.1, 0.9)
    ent_coef = trial.suggest_loguniform('ent_coef', 1e-8, 0.1)
    vf_coeff = trial.suggest_uniform('vf_coeff', 0.1, 0.9)
    clip_range = trial.suggest_uniform('clip_range', 0.1, 0.9)
    gae_lambda = trial.suggest_uniform('gae_lambda', 0.8, 1.0)
    n_epochs = trial.suggest_int('n_epochs', 3, 10)

    num_layers_pi = trial.suggest_int('num_layers1', 1, 3)
    layer_sizes_pi = [trial.suggest_categorical(f'layer_{i}_size1', [32, 64, 128]) for i in range(num_layers_pi)]

    num_layers_vf = trial.suggest_int('num_layers2', 1, 3)
    layer_sizes_vf = [trial.suggest_categorical(f'layer_{i}_size2', [32, 64, 128]) for i in range(num_layers_vf)]

    activation_fn = trial.suggest_categorical('activation_fn', ['tanh', 'relu', 'silu'])
    activation_fn_mapping = {
        'tanh': nn.Tanh,
        'relu': nn.ReLU,
        'silu': nn.SiLU
    }
    activation_fn_class = activation_fn_mapping[activation_fn]


    nheads = trial.suggest_int('nheads', 2, 8)

    head_dim = trial.suggest_int('d_model', 64, 256, step=32)
    d_model = head_dim * nheads
    transformer_layers = trial.suggest_int('transformer_layers', 1, 4)

    # Resto del código igual...
    policy_kwargs = dict(
        features_extractor_class=GTrXLFeatureExtractor,
        features_extractor_kwargs=dict(
            d_model=d_model,
            nheads=nheads,
            transformer_layers=transformer_layers
        ),
        net_arch=[dict(pi=layer_sizes_pi, vf=layer_sizes_vf)],
        activation_fn=activation_fn_class
    )

    model = PPO(
        policy=GTrXLPolicy,
        env=env,
        learning_rate=learning_rate,
        n_steps=n_steps,
        batch_size=batch_size,
        gamma=gamma,
        ent_coef=ent_coef,
        vf_coef=vf_coeff,
        clip_range=clip_range,
        gae_lambda=gae_lambda,
        n_epochs=n_epochs,
        policy_kwargs=policy_kwargs,
        verbose=0
    )

    model.learn(total_timesteps=100000)
    mean_total_signal_rl = mean_signal(model, env)
    return mean_total_signal_rl


import optuna



storage = optuna.storages.RDBStorage(url="sqlite:///ppoTr.db")


studyPPO = optuna.create_study(storage=storage, study_name="ppoTr", direction='maximize',load_if_exists=True)
studyPPO.optimize(objective_PPOGTrXL, n_trials=200, n_jobs=1)

best_trial = studyPPO.best_trial

# Imprime los mejores resultados en la consola
print(f"Best trial number: {best_trial.number}")
print(f"Best trial value (reward): {best_trial.value}")
print("Best trial parameters:")
for key, value in best_trial.params.items():
    print(f"    {key}: {value}")

# Guarda los mejores resultados en un archivo .txt
with open('optuna_resultsPPOTr.txt', 'w') as f:
    f.write(f"Best trial number: {best_trial.number}\n")
    f.write(f"Best trial value (reward): {best_trial.value}\n")
    f.write("Best trial parameters:\n")
    for key, value in best_trial.params.items():
        f.write(f"    {key}: {value}\n")




with open('studyPPOTr.pkl', 'wb') as f:
    pickle.dump(studyPPO, f)

Entrenamiento final de los modelos con los resultados de Optuna


In [None]:



import torch.nn as nn

env = PartialObservationEnv(
    n=20,
    m=20,
    max_steps=50,
    height=1,
    people_quantity=10,
    seed=29
)



# Hiperprámetros óptimos para el primer modelo
params = {'learning_rate': 0.0006747153326104986, 'n_steps': 8130, 'batch_size': 64, 'gamma': 0.6954486367633275, 'ent_coef': 2.5572784410000594e-07, 'vf_coeff': 0.7185033334025468, 'clip_range': 0.7523046194604548, 'gae_lambda': 0.8661621416239416, 'n_epochs': 4, 'num_layers1': 2, 'layer_0_size1': 64, 'layer_1_size1': 32, 'num_layers2': 2, 'layer_0_size2': 32, 'layer_1_size2': 32, 'activation_fn': 'relu', 'nheads': 3, 'd_model': 192, 'transformer_layers': 1}

# Argumentos del transformer
policy_kwargs = dict(
    features_extractor_class=GTrXLFeatureExtractor,
    features_extractor_kwargs=dict(
        d_model=params['d_model'],
        nheads=params['nheads'],
        transformer_layers=params['transformer_layers']
    ),
    net_arch=[dict(pi=[32,128,128], vf=[32,32])],
    activation_fn= nn.ReLU
)

# Crear modelo con los hiperparámetros
model = PPO(
    policy=GTrXLPolicy,
    env=env,
    learning_rate=params['learning_rate'],
    n_steps=params['n_steps'],
    batch_size=params['batch_size'],
    gamma=params['gamma'],
    ent_coef=params['ent_coef'],
    vf_coef=params['vf_coeff'],
    clip_range=params['clip_range'],
    gae_lambda=params['gae_lambda'],
    n_epochs=params['n_epochs'],
    policy_kwargs=policy_kwargs,
    verbose=0
)

# Entrenar
reward_callback = RewardCallback()
model.learn(total_timesteps=500000, callback = reward_callback)



model.save("./models/env3/transformer1_model.zip")
print(reward_callback.episode_rewards)



 Entrenamiento del modelo 2

In [None]:
# Hiperparámetros óptimos del modelo 2
params = {'learning_rate': 0.0055442406400710445, 'n_steps': 8036, 'batch_size': 512, 'gamma': 0.5634271409978006, 'ent_coef': 3.1092876269438148e-06, 'vf_coeff': 0.28952605301403755, 'clip_range': 0.4846436742967982, 'gae_lambda': 0.994042405682114, 'n_epochs': 3, 'num_layers1': 3, 'layer_0_size1': 32, 'layer_1_size1': 128, 'layer_2_size1': 128, 'num_layers2': 2, 'layer_0_size2': 64, 'layer_1_size2': 32, 'activation_fn': 'tanh', 'nheads': 5, 'd_model': 160, 'transformer_layers': 2}



policy_kwargs = dict(
    features_extractor_class=GTrXLFeatureExtractor,
    features_extractor_kwargs=dict(
        d_model=params['d_model'],
        nheads=params['nheads'],
        transformer_layers=params['transformer_layers']
    ),
    net_arch=[dict(pi=[32,128,128], vf=[64,32])],
    activation_fn= nn.Tanh
)

model2 = PPO(
    policy=GTrXLPolicy,
    env=env,
    learning_rate=params['learning_rate'],
    n_steps=params['n_steps'],
    batch_size=params['batch_size'],
    gamma=params['gamma'],
    ent_coef=params['ent_coef'],
    vf_coef=params['vf_coeff'],
    clip_range=params['clip_range'],
    gae_lambda=params['gae_lambda'],
    n_epochs=params['n_epochs'],
    policy_kwargs=policy_kwargs,
    verbose=0
)

reward_callback = RewardCallback()
model.learn(total_timesteps=500000, callback = reward_callback)

