In [None]:
%pip install tensorboard
%pip install stable-baselines3
%pip install gymnasium
%pip install shimmy
%pip install torch
%pip install stable-baselines3 gym
%pip install matplotlib
%pip install kiwisolver

: 

In [None]:
import pandas as pd
import numpy as np
import random
import networkx as nx
from math import sqrt
import matplotlib.pyplot as plt

: 

In [None]:
def generador_data(fac, dem, deman_max):
    ancho = 2500
    cor_dem = [(random.randint(1, ancho), random.randint(1, ancho)) for _ in range(dem)]
    dem_p = [np.random.poisson(deman_max) for _ in range(dem)]  # Tamaño de puntos escalado para visualización

    cor_fac = [(random.randint(1, ancho), random.randint(1, ancho)) for _ in range(fac)]

    # Crear el grafo
    G = nx.Graph()

    # Añadir nodos de demanda y facilidad al grafo
    for i, coord in enumerate(cor_dem):
        G.add_node(f"D{i}", pos=coord, demand=dem_p[i], color='red', size=dem_p[i])

    for j, coord in enumerate(cor_fac):
        G.add_node(f"F{j}", pos=coord, color='blue', size=100)  # Tamaño fijo para facilidades

    # Conectar nodos si cumplen una condición específica (e.g., todos conectados)
    cover=[]
    for i, c_dem in enumerate(cor_dem):
        for j, c_fac in enumerate(cor_fac):
            # Conectar si la distancia es menor a un umbral, ajustar según necesidad
            if sqrt((c_dem[0] - c_fac[0]) ** 2 + (c_dem[1] - c_fac[1]) ** 2) < 500:
                cover.append(1)
                G.add_edge(f"D{i}", f"F{j}")
            else:
                cover.append(0)

    cover=np.array(cover).reshape(fac,dem)
    # Establecer el tamaño de la figura (ancho x alto en pulgadas)
    plt.figure(figsize=(15, 15))  # Ajuste del tamaño de la figura a 15x15 pulgadas
    pos = nx.get_node_attributes(G, 'pos')
    colors = [G.nodes[node]['color'] for node in G.nodes]
    sizes = [G.nodes[node]['size'] for node in G.nodes]

    nx.draw(G, pos, node_color=colors, node_size=sizes, with_labels=False, edge_color='gray', alpha=0.6)
    plt.title('Red de Puntos de Demanda y Facilidades')
    plt.show()
    return dem_p,cover


: 

In [None]:
fac=20
demp=100
deman_max=80
demanda,m_cover=generador_data(fac, demp, deman_max)
print(sum(demanda))


i,j=m_cover.shape
dic={}
dem_fac={}
cover={}
for i in range(i):
    acum=[0,[]]
    dem_acum=0
    lista=[]
    for j in range(j):
        if m_cover[i][j]!=0:
            dem_acum+=int(m_cover[i][j])*demanda[j]
            acum[0]=dem_acum
            acum[1].append(demanda[j])
            lista.append(j)
    dic[i]=acum
    dem_fac[i]=dem_acum
    cover[i]=lista
dem_fac
dic
cover_dic=cover

: 

In [None]:
import gym
from gym import spaces
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim

class HumanitarianFacilityLocationEnv(gym.Env):
    """Entorno para modelar la ubicación de facilidades en situaciones humanitarias."""
    def __init__(self, num_demand_points, num_facilities, num_days, coverage_dict, people_per_demand_point, 
                 max_installations_per_facility=5, max_capacity_per_facility=500, max_available_tents=100):
        super(HumanitarianFacilityLocationEnv, self).__init__()
        self.num_demand_points = num_demand_points
        self.num_facilities = num_facilities
        self.num_days = num_days
        self.current_day = 0
        self.coverage_dict = coverage_dict
        self.people_per_demand_point = np.array(people_per_demand_point, dtype=np.int32)
        self.demand_status = np.ones(self.num_demand_points, dtype=int) 
        self.max_installations_per_facility = max_installations_per_facility
        self.max_capacity_per_facility = max_capacity_per_facility
        self.max_available_tents = max_available_tents
        self.installations_per_facility = np.zeros(num_facilities, dtype=int)
        self.available_installations = self.max_available_tents
        self.action_space = spaces.MultiDiscrete([max_installations_per_facility + 1] * num_facilities)
        self.observation_space = spaces.Box(low=0, high=1, shape=(self.num_demand_points,), dtype=np.uint8)

    def reset(self):
        self.current_day = 0
        self.installations_per_facility.fill(0)
        self.available_installations = self.max_available_tents  # Reiniciar disponibilidad
        self.demand_status.fill(1)
        self.state = self.update_coverage()
        return self.state

    def update_coverage(self):
        capacity_used_per_demand_point = np.zeros(self.num_demand_points)
        for facility_index in range(self.num_facilities):
            if facility_index in self.coverage_dict:
                covered_demand_points = self.coverage_dict[facility_index]
                capacity_used_per_demand_point[covered_demand_points] += self.installations_per_facility[facility_index] * self.max_capacity_per_facility
        self.state = np.where(capacity_used_per_demand_point >= self.people_per_demand_point, 0, 1)
        return self.state


    def step(self, action):
        # Asegurar que no se instalen más instalaciones de las disponibles **en el día actual**
        action = np.minimum(action, self.available_installations)
        # Asegurar que no se supere el límite por instalación
        action = np.minimum(action, self.max_installations_per_facility - self.installations_per_facility)

        self.installations_per_facility += action
        self.available_installations -= np.sum(action)

        reward = self.evaluate_reward()
        self.state = self.update_coverage()

        self.current_day += 1
        # El episodio termina si se llega al último día o si se cubre toda la demanda
        done = self.current_day >= self.num_days or np.all(self.state == 0)

        # Reiniciar la disponibilidad de instalaciones al principio de cada día
        if not done:
            self.available_installations = self.max_available_tents

        info = {}
        
        print()
        print(f"(DIA {self.current_day}):")
        print("*"*100)
        print(f"Estado actual: {self.state}")
        print(f"Demanda: {self.people_per_demand_point}")
        print(f"Demanda actual: {sum(self.people_per_demand_point*self.state)}")
        print(f"Recompensa: {self.evaluate_reward()}")
        # Depurar la demanda por punto de demanda
        return self.state, reward, done, info

    def evaluate_reward(self):
        uncovered_people = np.sum(self.state * self.people_per_demand_point)*1000
        #total_capacity = np.sum(self.installations_per_facility * self.max_capacity_per_facility)
        #capacity_reward = (total_capacity - uncovered_people) / total_capacity if total_capacity > 0 else 0 
        #unused_capacity_penalty = (total_capacity - uncovered_people) / total_capacity if total_capacity > 0 else 0 
        num_tents_penalty = np.sum(self.installations_per_facility) * 100
        return -uncovered_people - num_tents_penalty

class DQNNetwork(nn.Module):
    def __init__(self, state_dim, num_facilities, max_installations_per_facility):
        super(DQNNetwork, self).__init__()
        self.layer1 = nn.Linear(state_dim, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, num_facilities * (max_installations_per_facility + 1))
        self.num_facilities = num_facilities
        self.max_installations_per_facility = max_installations_per_facility

    def forward(self, x):
        x = torch.relu(self.layer1(x))
        x = torch.relu(self.layer2(x))
        x = self.layer3(x)
        x = x.view(-1, self.num_facilities, self.max_installations_per_facility + 1)
        return x


class DQNAgent:
    def __init__(self, state_dim, num_facilities, max_installations_per_facility, coverage_dict, epsilon=0.1):
        self.model = DQNNetwork(state_dim, num_facilities, max_installations_per_facility)
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.criterion = nn.MSELoss()
        self.epsilon = epsilon
        self.num_facilities = num_facilities
        self.max_installations_per_facility = max_installations_per_facility
        self.coverage_dict = coverage_dict

    def choose_action(self, state):
        if random.random() < self.epsilon:
            action = np.random.randint(0, self.max_installations_per_facility + 1, size=self.num_facilities)
            return action
        else:
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            with torch.no_grad():
                q_values = self.model(state_tensor)
                # Implementación para seleccionar la mejor acción:
                action = []
                for facility_index in range(self.num_facilities):
                    if facility_index in self.coverage_dict and self.coverage_dict[facility_index]:
                        # Encuentra la acción (cantidad de carpas) que maximiza el valor Q para la facilidad actual
                        best_action = torch.argmax(q_values[0, facility_index]).item()
                        action.append(best_action)
                    else:
                        # Si la facilidad no está en coverage_dict, no se instalan carpas
                        action.append(0)

                return np.array(action) 

    def update_model(self, state, action, reward, next_state, done):
        state = torch.FloatTensor(state).unsqueeze(0)
        next_state = torch.FloatTensor(next_state).unsqueeze(0)

        reward = torch.FloatTensor([reward])
        done = torch.FloatTensor([done])

        # Obtener los valores Q para todas las acciones en el estado actual
        q_values = self.model(state)

        # Convertir action a un tensor de PyTorch
        action_tensor = torch.LongTensor(action)

        # Convertir np.arange a un tensor de PyTorch
        facility_indices = torch.arange(self.num_facilities).long()

        # Seleccionar los valores Q de las acciones tomadas
        current_q = q_values[0, facility_indices, action_tensor]  

        # Obtener el Q máximo del siguiente estado para cada facilidad
        max_next_q = self.model(next_state).max(2)[0].detach()

        # Calcular el Q objetivo para cada facilidad
        expected_q = reward + 0.99 * max_next_q * (1 - done)

        # Calcular la pérdida
        loss = self.criterion(current_q, expected_q)

        # Actualizar los pesos del modelo
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

# Main execution
# Define las variables de entrada

num_demand_points = demp
num_facilities = fac
num_days = 5
coverage_dict = cover_dic
people_per_demand_point = demanda

env = HumanitarianFacilityLocationEnv(num_demand_points=num_demand_points,
                                      num_facilities=num_facilities,
                                      num_days=num_days,
                                      coverage_dict=coverage_dict,
                                      people_per_demand_point=people_per_demand_point)

state_dim = env.observation_space.shape[0]
max_installations_per_facility = env.max_installations_per_facility
agent = DQNAgent(state_dim=state_dim, 
                 num_facilities=num_facilities, 
                 max_installations_per_facility=max_installations_per_facility,
                 coverage_dict=coverage_dict)  # Pasar coverage_dict al agente

num_episodes = 10000
demand_history_by_day = [] 

for episode in range(num_episodes):
    state = env.reset()
    done = False
    total_reward = 0
    demand_history_by_day.append([]) 
    print()
    print("-"*100)
    print("EPISODIO",episode)
    print("-"*100)
    
    while not done:
        action = agent.choose_action(state)
        next_state, reward, done, info = env.step(action)
        agent.update_model(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

        # Imprimir información de asignación de instalaciones a puntos de demanda
        print("Instalaciones por facilidad:", env.installations_per_facility)
        for facility_index, num_installations in enumerate(env.installations_per_facility):
            if num_installations > 0:
                covered_demand_points = env.coverage_dict.get(facility_index, [])
                print(f" - Instalación {facility_index}: {num_installations} carpas, atendiendo puntos de demanda: {covered_demand_points}")
        # Actualizar la demanda faltante al final de cada día
        if env.current_day > 0 and env.current_day % 1 == 0:
            uncovered_people = np.sum(env.state * env.people_per_demand_point) 
            demand_history_by_day[-1].append(uncovered_people)

    print(f"Episode {episode + 1}: Total Reward = {total_reward}, Uncovered People = {uncovered_people}")

: 

In [None]:
cover

: 