# Recogida de basuras por RL

Código principal para generar la simulación, añadir la red neuronal y el método de entrenamiento.

In [150]:
import math
import os
import sys
import time
import random

import numpy as np
import pandas as pd
import gymnasium as gym
from gymnasium import spaces
import torch
import networkx as nx 
import osmnx as ox

Carga del grafo

In [151]:
# Obtenemos el grafo con los nodos de los contenedores de basura ya incluidos

G = ox.load_graphml("contenedores_resto_FCC_valencia_filtrado_grafo.graphml", node_dtypes={"osmid": str})
nodos, aristas = ox.graph_to_gdfs(G)

# Limitamos el rango de recogida de basuras al barrio de Benimàmet, València

nodos_zona = nodos[(-0.411672 > nodos["x"]) & (nodos["x"] > -0.431842) & (39.506424 > nodos["y"]) & (nodos["y"] > 39.495904)]
nodos_zona.to_csv("nodos_zona.csv", index=True, encoding="utf-8")

nodos_id = set(nodos_zona.index)
aristas_zona = aristas[aristas.index.get_level_values("u").isin(nodos_id) & aristas.index.get_level_values("v").isin(nodos_id)].copy()
aristas_zona.to_csv("aristas_zona.csv", index=True, encoding="utf-8")



Preparación tuplas de nodos_zona y aristas_zona del entorno

In [212]:
# Se trabaja con diccionarios

def generacion_dicc_nodos(nodos_zona):
    nodos_dicc = {}
    for indice, fila in nodos_zona.iterrows():
        nodos_dicc[indice] = {
            "indice" : indice,
            "contenedor" : 1 if fila["contenedor"] == "True" else 0,  # 1 sí,  0 no
            "capacidad_contenedor" : 2 if fila["contenedor"] == "True" else 0, # 1 unidad de llenado sí, 0 unidades de llenado no
            "llenado" :  0.5 if fila["contenedor"] == "True" else 0,  # 0.5 (50%) (valor inicial aleatorio) sí,  0 (0%) no
            "posicion_camion" : 0,                                    # 1 sí,  0 no
            "llenado_camion" : 0                                      # Dato compartido en todos los nodos, normalizado a 1   
        }
    
    return nodos_dicc

def generacion_dicc_aristas(aristas_zona):
    aristas_dicc = {}
    for indice, fila in aristas_zona.iterrows():
        aristas_dicc[indice] = {
            "indice" : fila["osmid"], 
            "desde" : indice[0],
            "hasta" : indice[1],
            "distancia" : fila["length"],  # m
            "tiempo_recorrido" : fila["travel_time"], # s
            "velocidad_media" : fila["speed_kph"], # Km/h (velocidad indicada por graphml)
            "velocidad_max" : 30, # Km/h (Consideración inicial vias interubanas) 
            "via" : fila["highway"]
        }

    return aristas_dicc


nodos_dicc = generacion_dicc_nodos(nodos_zona)
aristas_dicc = generacion_dicc_aristas(aristas_zona)


Modificación indices de los nodos para trabajar despues con las GNN (se almacena la relación para mostrar resultados)

In [213]:
def modificacion_indices(nodos_dicc, aristas_dicc):
    
    nodos_dicc_str = {str(k): v for k, v in nodos_dicc.items()}

    mapeo = {osmid: idx for idx, osmid in enumerate(nodos_dicc_str.keys())}
    mapeo_inverso = {idx: osmid for osmid, idx in mapeo.items()}

    nodos_dicc_ind = {}
    for osmid, data in nodos_dicc_str.items():
        new_idx = mapeo[osmid]
        nodo_copy = data.copy()
        nodo_copy["indice"] = new_idx
        nodos_dicc_ind[new_idx] = nodo_copy

    aristas_dicc_ind = {}
    for osmid, data in aristas_dicc.items():
        u_idx = mapeo[str(data["desde"])]
        v_idx = mapeo[str(data["hasta"])]
        arista_copy = data.copy()
        arista_copy["desde"] = u_idx
        arista_copy["hasta"] = v_idx
        aristas_dicc_ind[osmid] = arista_copy

    return nodos_dicc_ind, aristas_dicc_ind, mapeo, mapeo_inverso

nodos_indice, aristas_indice, mapeo, mapeo_inverso = modificacion_indices(nodos_dicc, aristas_dicc)
print(mapeo_inverso[113])

344427875


Creación del entorno Gymnasium

In [340]:
class RecogidaBasurasEnv(gym.Env):

    def __init__(self, nodos_indice, aristas_indice, capacidad_camion = 120.0, steps_maximo = 2500, mascara = True): # añadida máscara para indicar si el agente solo elije las acciones permitidas o pueda elegir todas las acciones posibles (incluso las prohibidas)
        super().__init__()
        self.nodos_indice = nodos_indice
        self.aristas_indice = aristas_indice
        self.capacidad_camion = capacidad_camion
        self.carga_camion = 0
        self.steps_maximo = steps_maximo
        self.steps = 0
        self.tiempo_total = 0 #s
        self.mascara = mascara

        self.nodo_inicial = 103 #Entrada pueblo.
        self.nodo_actual = self.nodo_inicial
        self.nodo_anterior = None

        self.adjacencia = self._nodos_adjacentes()  

        # Espacio de acciones 
        self.action_space = spaces.Dict({
            "tipo" : spaces.Discrete(2), # 1 recoger basura, 0 moverse
            "destino" : spaces.Discrete(len(nodos_indice)) 
        })

        # Espacio de observaciones
        self.observation_space = spaces.Dict({
            "posicion_camion" : spaces.Discrete(len(nodos_indice)),
            "llenado_camion" : spaces.Box(0.0, self.capacidad_camion, shape=()),
            "contenedor" : spaces.Discrete(2),
            "llenado_contenedor" : spaces.Box(0.0, 1.0, shape=()) # Nivel lleando contenedores normalizado
        })



    # Creación dle diccionario de nodos accesibles a partir de uno 
    def _nodos_adjacentes(self):  
        adj = {nid: [] for nid in self.nodos_indice.keys()}
        for _, data in self.aristas_indice.items():
            u = data["desde"]
            v = data["hasta"]
            adj[u].append(v)
        return adj
    


    def reset(self, seed = 123, options = None):  # Seed 123 para desarrollo, None para entrenamineto agente, varias seeds fijas para fase final
        super().reset(seed = seed)
        self.nodo_actual = self.nodo_inicial
        self.carga_camion = 0.0
        self.steps = 0
        self.tiempo_total = 0

        # Reinicio de los nodos: rellenado de contenedores e incialización posicion inicial camión 
        for indice, nodo in self.nodos_indice.items():
            nodo["llenado"] = 0.5 if nodo["contenedor"] == 1 else 0 #Futuro, np.random(0.0, 0.90)
            nodo["llenado_camion"] = 0.0
            nodo["posicion_camion"] = 1 if indice == self.nodo_inicial else 0

        # Inicializacion nuevas condiciones inciales tráfico

        obs = self._obtener_observacion()
        info = {"mascara": self._mascara_acciones()} if self.mascara else {}
        return obs, info
    


    def _obtener_observacion(self):
        nodo = self.nodos_indice[self.nodo_actual]
        return {
            "posicion_camion": self.nodo_actual,
            "llenado_camion": float(min(1.0, self.carga_camion / self.capacidad_camion)),
            "contenedor": int(nodo["contenedor"]),
            "llenado_contenedor": float(nodo["llenado"])
        }



    def step(self, action):
        recompensa = 0
        info = {}
        self.steps += 1

        tipo = action["tipo"]
        destino = action["destino"]

        if tipo == 1:
            recompensa += self._recogida_basura()
        elif tipo == 0:
            if destino in self.adjacencia[self.nodo_actual]:
                # Cambio de nodo del camion
                self.nodo_anterior = self.nodo_actual
                self.nodos_indice[self.nodo_actual]["posicion_camion"] = 0
                self.nodo_actual = destino
                self.nodos_indice[self.nodo_actual]["posicion_camion"] = 1

                recompensa += self._recorrido_camion()
            else:
                recompensa += -1
        else: 
            recompensa += -1

        terminado = False
        truncado = False

        # Condiciones finalización
        # Terminado
        if self.carga_camion >= self.capacidad_camion:
            terminado = True
        if self.nodo_actual == self.nodo_inicial and self.steps > 1:
            terminado = True
        
        # Truncado
        if self.steps > self.steps_maximo:
            truncado = True

        # Recompensa final
        if terminado or truncado:
            recompensa += self._recompensa_final()

        obs = self._obtener_observacion()
        info = {"mascara": self._mascara_acciones()} if self.mascara else {}
        return obs, recompensa, terminado, truncado, info
    


    def _recogida_basura(self):
        recompensa = 0
        nodo = self.nodos_indice[self.nodo_actual]
        if nodo["contenedor"] == 1 and nodo["llenado"] > 0:
            basura_disponible = nodo["llenado"] * nodo["capacidad_contenedor"]
            self.carga_camion += basura_disponible

            carga_camion_norm = min(1.0, self.carga_camion / self.capacidad_camion)

            nodo["llenado"] = 0 

            for n in self.nodos_indice.values():
                n["llenado_camion"] = carga_camion_norm
            
            # Tiempo recogida 
            self.tiempo_total += 30 #sec, tiempo aprox recogida (cambiarlo a variable)

            # Recompensas 
            recompensa = (basura_disponible / nodo["capacidad_contenedor"]) * 1  # 1 factor arbitrario (recompensa inicial y sencilla) (si es menor al 50/70%, añadir mini penalización)
            return recompensa
        
        elif nodo["contenedor"] == 1 and nodo["llenado"] == 0:
            recompensa = 0
            return recompensa

        else:
            recompensa = -1
            return recompensa


    def _recorrido_camion(self):
        recompensa = 0

        # Recompensas por tiempo recorrido y distancia recorrida

        return recompensa



    def _recompensa_final(self):
        recompensa = 0

        # Añadir recompensas y penalizaciones

        return recompensa
    



    def render(self):
        print(f"Nodo actual: {self.nodo_actual} | Carga camión: {self.carga_camion:.2f} kg | Paso: {self.steps}")




    def _get_accessible_nodes(self):
            return self.adjacencia[self.nodo_actual]
    


    def _mascara_acciones(self):
        mascara_tipo = np.array([True, True], dtype=bool)  
        mascara_destino = np.zeros(len(self.nodos_indice), dtype=bool)

        adjacentes = self._nodos_adjacentes()[self.nodo_actual]

    # Excluir volver al nodo anterior, salvo callejón sin salida
        if self.nodo_anterior is not None:
            vecinos_validos = [v for v in adjacentes if v != self.nodo_anterior]
            if len(vecinos_validos) == 0:
                # caso callejón sin salida → permitimos volver
                vecinos_validos = [self.nodo_anterior]
        else:
            vecinos_validos = adjacentes

        mascara_destino[vecinos_validos] = True

        nodo = self.nodos_indice[self.nodo_actual]
        mascara_recoger = nodo["contenedor"] == 1 and nodo["llenado"] > 0

        return {
            "mascara_tipo": mascara_tipo,
            "mascara_destino": mascara_destino,
            "mascara_recoger": mascara_recoger
        }

In [338]:
def agente_probabilistico(env, max_steps=20):
    obs, info = env.reset()
    terminated, truncated = False, False
    contenedores_visitados = 0

    for step in range(max_steps):
        print(f"\n--- Step {step+1} ---")
        print(f"Observación: {obs}")
        print(f"Info: {info}")

        nodo_tiene_contenedor = obs["contenedor"] == 1
        lleno_contenedor = obs["llenado_contenedor"] > 0

        if nodo_tiene_contenedor and lleno_contenedor:
            # 100% probabilidad de recoger, 0% de moverse
            contenedores_visitados += 1
            if random.random() <= 1.0:
                action = {"tipo": 1, "destino": 0}
            else:
                posibles = np.where(info["mascara"]["mascara_destino"])[0]
                destino = int(random.choice(posibles)) if len(posibles) > 0 else 0
                action = {"tipo": 0, "destino": destino}
        else:
            # Nodo sin contenedor o contenedor vacío → siempre moverse
            posibles = np.where(info["mascara"]["mascara_destino"])[0]
            destino = int(random.choice(posibles)) if len(posibles) > 0 else 0
            action = {"tipo": 0, "destino": destino}

        print(f"Acción elegida: {action}")
        obs, reward, terminated, truncated, info = env.step(action)
        print(f"Recompensa: {reward}")
        env.render()

        if terminated or truncated:
            print("Episodio terminado.")
            print(f"Contenedores_visitados = {contenedores_visitados}")
            break

In [349]:
env = RecogidaBasurasEnv(nodos_indice, aristas_indice, mascara=True)

agente_probabilistico(env, max_steps=500)


--- Step 1 ---
Observación: {'posicion_camion': 103, 'llenado_camion': 0.0, 'contenedor': 0, 'llenado_contenedor': 0.0}
Info: {'mascara': {'mascara_tipo': array([ True,  True]), 'mascara_destino': array([False, False, False, False, False, False, False, False, False,
       False, False, False, False, False, False, False, False, False,
       False, False, False, False, False, False, False, False, False,
       False, False, False, False, False, False, False, False, False,
       False, False, False, False, False, False, False, False, False,
       False, False, False, False, False, False, False, False, False,
       False, False, False, False, False, False, False, False, False,
       False, False, False, False, False, False, False, False, False,
       False, False, False, False, False, False, False, False, False,
       False, False, False, False, False, False, False, False, False,
       False, False, False, False, False, False, False, False, False,
       False, False, False, Fals

In [180]:
print(mapeo_inverso[103])
print(mapeo_inverso[115])
print(mapeo_inverso[114])
print(mapeo_inverso[1])
print(mapeo_inverso[0])
print(mapeo_inverso[116])

344427748
434698715
434698704
115579019
115578653
434698720
