In [1]:
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Tuple, Set, Optional
import math
import random
import heapq

Node = int

# =========================
# Datos del problema
# =========================

@dataclass
class ProblemData:
    # Grafo (no dirigido) con tiempos de viaje
    adj: Dict[Node, List[Tuple[Node, float]]]

    depot: Node
    repair_nodes: Set[Node]                 # nodos de reparación (carreteras dañadas transformadas)
    demand_nodes: Set[Node]                 # nodos de demanda
    repair_time: Dict[Node, float]          # s_i (duraciones de reparación)
    demand_service_time: Dict[Node, float]  # s'_i (duraciones de suministro/servicio)

@dataclass
class ACOParams:
    alpha: float = 1.0
    beta: float = 2.0
    rho: float = 0.2
    Q: float = 1.0
    num_ants: int = 40
    num_iters: int = 150
    seed: int = 7

@dataclass
class Solution:
    repair_route: List[Node]
    relief_route: List[Node]
    d_repair: Dict[Node, float]         # d_i (tiempos de reparación completada)
    relief_finish_time: float           # objetivo: tiempo del último servicio (incluye suministro)

# =========================
# Utilidades de grafo
# =========================

def add_undirected_edge(adj: Dict[Node, List[Tuple[Node, float]]], u: Node, v: Node, w: float):
    adj.setdefault(u, []).append((v, w))
    adj.setdefault(v, []).append((u, w))

def dijkstra_static(adj: Dict[Node, List[Tuple[Node, float]]], source: Node) -> Dict[Node, float]:
    """Dijkstra estándar: distancias mínimas sin ventanas."""
    dist: Dict[Node, float] = {source: 0.0}
    pq = [(0.0, source)]
    while pq:
        d, u = heapq.heappop(pq)
        if d != dist.get(u, math.inf):
            continue
        for v, w in adj.get(u, []):
            nd = d + w
            if nd < dist.get(v, math.inf):
                dist[v] = nd
                heapq.heappush(pq, (nd, v))
    return dist

def earliest_arrival_dijkstra(
    adj: Dict[Node, List[Tuple[Node, float]]],
    source: Node,
    start_time: float,
    repair_done_time: Dict[Node, float],
    repair_nodes: Set[Node],
) -> Dict[Node, float]:
    """
    Dijkstra de 'tiempo más temprano de llegada' con esperas.
    Si se llega a un nodo de reparación v antes de repair_done_time[v], se espera hasta repair_done_time[v].
    Propiedad FIFO -> Dijkstra funciona.
    """
    dist: Dict[Node, float] = {source: start_time}
    pq = [(start_time, source)]

    def apply_wait(node: Node, t: float) -> float:
        if node in repair_nodes:
            return max(t, repair_done_time.get(node, 0.0))
        return t

    dist[source] = apply_wait(source, dist[source])

    while pq:
        t_u, u = heapq.heappop(pq)
        if t_u != dist.get(u, math.inf):
            continue
        for v, w in adj.get(u, []):
            t_v = t_u + w
            t_v = apply_wait(v, t_v)
            if t_v < dist.get(v, math.inf):
                dist[v] = t_v
                heapq.heappush(pq, (t_v, v))
    return dist

def roulette_choice(candidates: List[Node], weights: List[float]) -> Node:
    s = sum(weights)
    if s <= 0:
        return random.choice(candidates)
    r = random.random() * s
    acc = 0.0
    for c, w in zip(candidates, weights):
        acc += w
        if acc >= r:
            return c
    return candidates[-1]

# =========================
# ACO (adaptado al paper)
# =========================

class IOSRCRVD_ACO:
    def __init__(self, data: ProblemData, params: ACOParams):
        self.data = data
        self.p = params
        random.seed(self.p.seed)

        # Feromonas sobre transiciones "siguiente nodo a visitar"
        self.tau_rep: Dict[Tuple[Node, Node], float] = {}
        self.tau_dem: Dict[Tuple[Node, Node], float] = {}
        self._init_pheromones()

        self.static_dist_cache: Dict[Node, Dict[Node, float]] = {}

    def _init_pheromones(self):
        init = 1.0
        rep = list(self.data.repair_nodes) + [self.data.depot]
        dem = list(self.data.demand_nodes) + [self.data.depot]
        for i in rep:
            for j in rep:
                if i != j:
                    self.tau_rep[(i, j)] = init
        for i in dem:
            for j in dem:
                if i != j:
                    self.tau_dem[(i, j)] = init

    def _heuristic(self, cost: float) -> float:
        return 1.0 / max(cost, 1e-6)

    def _static_dist(self, src: Node) -> Dict[Node, float]:
        if src not in self.static_dist_cache:
            self.static_dist_cache[src] = dijkstra_static(self.data.adj, src)
        return self.static_dist_cache[src]

    # ---- 1) Ruta del equipo de reparación + tiempos d_i ----
    def build_repair_route_and_times(self) -> Tuple[List[Node], Dict[Node, float]]:
        route = [self.data.depot]
        remaining = set(self.data.repair_nodes)

        current = self.data.depot
        t = 0.0
        d_repair: Dict[Node, float] = {}

        while remaining:
            dist_cur = self._static_dist(current)
            candidates = [v for v in remaining if v in dist_cur and math.isfinite(dist_cur[v])]
            if not candidates:
                return route, {v: math.inf for v in self.data.repair_nodes}

            weights = []
            for v in candidates:
                tau = (self.tau_rep.get((current, v), 1.0) ** self.p.alpha)
                eta = (self._heuristic(dist_cur[v]) ** self.p.beta)
                weights.append(tau * eta)

            nxt = roulette_choice(candidates, weights)

            # viajar + reparar
            t += dist_cur[nxt]
            t += self.data.repair_time.get(nxt, 0.0)  # s_i
            d_repair[nxt] = t                         # d_i (momento de reparación completada)
            route.append(nxt)

            remaining.remove(nxt)
            current = nxt

        return route, d_repair

    # ---- 2) Ruta del vehículo de ayuda con ventanas + objetivo ----
    def build_relief_route_and_finish_time(self, d_repair: Dict[Node, float]) -> Tuple[List[Node], float]:
        route = [self.data.depot]
        remaining = set(self.data.demand_nodes)

        current = self.data.depot
        t = 0.0
        finish_time_last = 0.0

        while remaining:
            dist_time = earliest_arrival_dijkstra(self.data.adj, current, t, d_repair, self.data.repair_nodes)

            candidates = [v for v in remaining if v in dist_time and math.isfinite(dist_time[v])]
            if not candidates:
                return route, math.inf

            weights = []
            for v in candidates:
                arrival = dist_time[v]
                travel_effective = max(arrival - t, 0.0)
                tau = (self.tau_dem.get((current, v), 1.0) ** self.p.alpha)
                eta = (self._heuristic(travel_effective) ** self.p.beta)
                weights.append(tau * eta)

            nxt = roulette_choice(candidates, weights)

            arrival_time = dist_time[nxt]
            service = self.data.demand_service_time.get(nxt, 0.0)  # s'_i
            t = arrival_time + service
            finish_time_last = t

            route.append(nxt)
            remaining.remove(nxt)
            current = nxt

        return route, finish_time_last

    # ---- Feromonas ----
    def evaporate(self):
        for k in list(self.tau_rep.keys()):
            self.tau_rep[k] *= (1.0 - self.p.rho)
        for k in list(self.tau_dem.keys()):
            self.tau_dem[k] *= (1.0 - self.p.rho)

    def reinforce_global_best(self, sol: Solution):
        if not math.isfinite(sol.relief_finish_time) or sol.relief_finish_time <= 0:
            return
        delta = self.p.Q / sol.relief_finish_time
        for i in range(len(sol.repair_route) - 1):
            a, b = sol.repair_route[i], sol.repair_route[i + 1]
            self.tau_rep[(a, b)] = self.tau_rep.get((a, b), 0.0) + delta
        for i in range(len(sol.relief_route) - 1):
            a, b = sol.relief_route[i], sol.relief_route[i + 1]
            self.tau_dem[(a, b)] = self.tau_dem.get((a, b), 0.0) + delta

    def solve(self) -> Solution:
        best = Solution([], [], {}, math.inf)

        for _ in range(self.p.num_iters):
            iter_best = Solution([], [], {}, math.inf)

            for _k in range(self.p.num_ants):
                rep_route, d_rep = self.build_repair_route_and_times()
                rel_route, finish = self.build_relief_route_and_finish_time(d_rep)

                sol = Solution(rep_route, rel_route, d_rep, finish)
                if sol.relief_finish_time < iter_best.relief_finish_time:
                    iter_best = sol

            self.evaporate()
            self.reinforce_global_best(iter_best)

            if iter_best.relief_finish_time < best.relief_finish_time:
                best = iter_best

        return best

# =========================
# EJEMPLO PEQUEÑO (informe)
# =========================

def build_example() -> ProblemData:
    adj: Dict[Node, List[Tuple[Node, float]]] = {}

    # Tiempos de viaje:
    # 0-1:4, 1-6:2, 6-2:2, 2-3:3, 2-7:2, 7-4:2, 4-5:3
    add_undirected_edge(adj, 0, 1, 4)
    add_undirected_edge(adj, 1, 6, 2)
    add_undirected_edge(adj, 6, 2, 2)
    add_undirected_edge(adj, 2, 3, 3)
    add_undirected_edge(adj, 2, 7, 2)
    add_undirected_edge(adj, 7, 4, 2)
    add_undirected_edge(adj, 4, 5, 3)

    repair_nodes = {6, 7}
    demand_nodes = {3, 5}

    # Duraciones de reparación s_i
    repair_time = {6: 6.0, 7: 5.0}

    # Duraciones de suministro s'_i (tu nota: 3 -> 4 y 5 -> 3)
    demand_service_time = {3: 4.0, 5: 3.0}

    return ProblemData(
        adj=adj,
        depot=0,
        repair_nodes=repair_nodes,
        demand_nodes=demand_nodes,
        repair_time=repair_time,
        demand_service_time=demand_service_time,
    )

# =========================
# MAIN (con impresión corregida)
# =========================

if __name__ == "__main__":
    data = build_example()
    params = ACOParams(alpha=1.0, beta=2.0, rho=0.2, Q=1.0, num_ants=50, num_iters=200, seed=7)
    aco = IOSRCRVD_ACO(data, params)
    best = aco.solve()

    print("=== Mejor solución encontrada (ACO) ===")
    print("Ruta equipo de reparación:", best.repair_route)

    # Duraciones s_i
    print("Duraciones de reparación s_i:", {k: data.repair_time[k] for k in sorted(data.repair_nodes)})

    # Tiempos d_i
    print("Tiempos de reparación completada d_i:", {k: round(best.d_repair[k], 2) for k in sorted(best.d_repair)})

    print("Ruta vehículo de ayuda:", best.relief_route)

    # Duraciones s'_i
    print("Duraciones de suministro s'_i:", {k: data.demand_service_time[k] for k in sorted(data.demand_nodes)})

    # Traza del vehículo: llegada / servicio / fin
    print("\n--- Traza vehículo de ayuda (llegada/servicio/fin) ---")
    t = 0.0
    current = data.depot
    for nxt in best.relief_route[1:]:
        dist_time = earliest_arrival_dijkstra(data.adj, current, t, best.d_repair, data.repair_nodes)
        arrival = dist_time[nxt]
        service = data.demand_service_time.get(nxt, 0.0)
        finish = arrival + service
        print(f"{current} -> {nxt}: sale {t:.2f}, llega {arrival:.2f}, servicio {service:.2f}, termina {finish:.2f}")
        t = finish
        current = nxt

    print("\nTiempo final del último servicio (objetivo):", round(best.relief_finish_time, 2))


=== Mejor solución encontrada (ACO) ===
Ruta equipo de reparación: [0, 6, 7]
Duraciones de reparación s_i: {6: 6.0, 7: 5.0}
Tiempos de reparación completada d_i: {6: 12.0, 7: 21.0}
Ruta vehículo de ayuda: [0, 3, 5]
Duraciones de suministro s'_i: {3: 4.0, 5: 3.0}

--- Traza vehículo de ayuda (llegada/servicio/fin) ---
0 -> 3: sale 0.00, llega 17.00, servicio 4.00, termina 21.00
3 -> 5: sale 21.00, llega 31.00, servicio 3.00, termina 34.00

Tiempo final del último servicio (objetivo): 34.0


In [2]:
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Tuple, Set
import math
import random
import heapq

Node = int

# ======================================================
# Datos del problema
# ======================================================

@dataclass
class ProblemData:
    adj: Dict[Node, List[Tuple[Node, float]]]
    depot: Node
    repair_nodes: Set[Node]
    demand_nodes: Set[Node]
    repair_time: Dict[Node, float]
    demand_service_time: Dict[Node, float]

@dataclass
class ACOParams:
    alpha: float = 1.0
    beta: float = 2.0
    rho: float = 0.2
    Q: float = 1.0
    num_ants: int = 60
    num_iters: int = 300
    seed: int = 7

@dataclass
class Solution:
    repair_route: List[Node]
    relief_route: List[Node]
    repair_done_time: Dict[Node, float]
    objective: float  # tiempo del último servicio

# ======================================================
# Utilidades de grafo
# ======================================================

def add_undirected_edge(adj, u, v, w):
    adj.setdefault(u, []).append((v, w))
    adj.setdefault(v, []).append((u, w))

def dijkstra_static(adj, source):
    dist = {source: 0.0}
    pq = [(0.0, source)]
    while pq:
        d, u = heapq.heappop(pq)
        if d != dist.get(u, math.inf):
            continue
        for v, w in adj.get(u, []):
            nd = d + w
            if nd < dist.get(v, math.inf):
                dist[v] = nd
                heapq.heappush(pq, (nd, v))
    return dist

def earliest_arrival_dijkstra(adj, source, start_time, repair_done, repair_nodes):
    dist = {source: start_time}
    pq = [(start_time, source)]

    def wait_if_needed(node, t):
        if node in repair_nodes:
            return max(t, repair_done.get(node, 0.0))
        return t

    dist[source] = wait_if_needed(source, dist[source])

    while pq:
        t_u, u = heapq.heappop(pq)
        if t_u != dist.get(u, math.inf):
            continue
        for v, w in adj.get(u, []):
            t_v = wait_if_needed(v, t_u + w)
            if t_v < dist.get(v, math.inf):
                dist[v] = t_v
                heapq.heappush(pq, (t_v, v))
    return dist

def roulette(cands, weights):
    s = sum(weights)
    if s <= 0:
        return random.choice(cands)
    r = random.random() * s
    acc = 0.0
    for c, w in zip(cands, weights):
        acc += w
        if acc >= r:
            return c
    return cands[-1]

# ======================================================
# ACO
# ======================================================

class IOSRCRVD_ACO:
    def __init__(self, data: ProblemData, p: ACOParams):
        self.data = data
        self.p = p
        random.seed(p.seed)

        self.tau_rep = {}
        self.tau_dem = {}
        self.static_dist = {}

        self._init_pheromones()

    def _init_pheromones(self):
        init = 1.0
        for i in self.data.repair_nodes | {self.data.depot}:
            for j in self.data.repair_nodes | {self.data.depot}:
                if i != j:
                    self.tau_rep[(i, j)] = init
        for i in self.data.demand_nodes | {self.data.depot}:
            for j in self.data.demand_nodes | {self.data.depot}:
                if i != j:
                    self.tau_dem[(i, j)] = init

    def _heur(self, cost):
        return 1.0 / max(cost, 1e-6)

    def _static_dist(self, src):
        if src not in self.static_dist:
            self.static_dist[src] = dijkstra_static(self.data.adj, src)
        return self.static_dist[src]

    # ---------------- Repair crew ----------------

    def build_repair(self):
        route = [self.data.depot]
        remaining = set(self.data.repair_nodes)
        t = 0.0
        current = self.data.depot
        done = {}

        while remaining:
            dist = self._static_dist(current)
            cands = [v for v in remaining if v in dist]
            weights = []
            for v in cands:
                weights.append(
                    (self.tau_rep[(current, v)] ** self.p.alpha) *
                    (self._heur(dist[v]) ** self.p.beta)
                )
            nxt = roulette(cands, weights)
            t += dist[nxt]
            t += self.data.repair_time[nxt]
            done[nxt] = t
            route.append(nxt)
            remaining.remove(nxt)
            current = nxt

        return route, done

    # ---------------- Relief vehicle ----------------

    def build_relief(self, repair_done):
        route = [self.data.depot]
        remaining = set(self.data.demand_nodes)
        t = 0.0
        current = self.data.depot
        last_finish = 0.0

        while remaining:
            dist = earliest_arrival_dijkstra(
                self.data.adj, current, t, repair_done, self.data.repair_nodes
            )
            cands = [v for v in remaining if v in dist]
            weights = []
            for v in cands:
                travel = max(dist[v] - t, 0.0)
                weights.append(
                    (self.tau_dem[(current, v)] ** self.p.alpha) *
                    (self._heur(travel) ** self.p.beta)
                )
            nxt = roulette(cands, weights)
            t = dist[nxt] + self.data.demand_service_time[nxt]
            last_finish = t
            route.append(nxt)
            remaining.remove(nxt)
            current = nxt

        return route, last_finish

    # ---------------- Pheromones ----------------

    def evaporate(self):
        for k in self.tau_rep:
            self.tau_rep[k] *= (1 - self.p.rho)
        for k in self.tau_dem:
            self.tau_dem[k] *= (1 - self.p.rho)

    def reinforce(self, sol: Solution):
        delta = self.p.Q / sol.objective
        for i in range(len(sol.repair_route) - 1):
            a, b = sol.repair_route[i], sol.repair_route[i + 1]
            self.tau_rep[(a, b)] += delta
        for i in range(len(sol.relief_route) - 1):
            a, b = sol.relief_route[i], sol.relief_route[i + 1]
            self.tau_dem[(a, b)] += delta

    # ---------------- Solve ----------------

    def solve(self):
        best = Solution([], [], {}, math.inf)

        for _ in range(self.p.num_iters):
            iter_best = Solution([], [], {}, math.inf)
            for _k in range(self.p.num_ants):
                rep_route, rep_done = self.build_repair()
                rel_route, obj = self.build_relief(rep_done)
                sol = Solution(rep_route, rel_route, rep_done, obj)
                if obj < iter_best.objective:
                    iter_best = sol
            self.evaporate()
            self.reinforce(iter_best)
            if iter_best.objective < best.objective:
                best = iter_best

        return best

# ======================================================
# Ejemplo del paper (Figura 3 / Tabla 1)
# ======================================================

def build_example_paper():
    adj = {}

    add_undirected_edge(adj, 0, 3, 5)
    add_undirected_edge(adj, 0, 8, 10)
    add_undirected_edge(adj, 0, 2, 6)
    add_undirected_edge(adj, 3, 4, 5)
    add_undirected_edge(adj, 4, 9, 8)
    add_undirected_edge(adj, 9, 8, 1)
    add_undirected_edge(adj, 8, 7, 5)
    add_undirected_edge(adj, 7, 6, 6)
    add_undirected_edge(adj, 6, 10, 3)
    add_undirected_edge(adj, 6, 5, 2)
    add_undirected_edge(adj, 5, 1, 2)
    add_undirected_edge(adj, 1, 2, 5)
    add_undirected_edge(adj, 2, 6, 7)

    return ProblemData(
        adj=adj,
        depot=0,
        repair_nodes={2, 3, 5, 7, 9, 10},
        demand_nodes={1, 4, 6},
        repair_time={2: 6, 3: 5, 5: 3, 7: 14, 9: 7, 10: 16},
        demand_service_time={1: 2, 4: 2, 6: 2},
    )

# ======================================================
# Main
# ======================================================

if __name__ == "__main__":
    data = build_example_paper()
    params = ACOParams()
    aco = IOSRCRVD_ACO(data, params)
    best = aco.solve()

    print("=== MEJOR SOLUCIÓN (ACO) ===")
    print("Ruta equipo de reparación:", best.repair_route)
    print("Tiempos de reparación:", {k: round(v, 1) for k, v in best.repair_done_time.items()})
    print("Ruta vehículo de ayuda:", best.relief_route)
    print("Tiempo final del último servicio:", round(best.objective, 2))


=== MEJOR SOLUCIÓN (ACO) ===
Ruta equipo de reparación: [0, 3, 2, 5, 7, 9, 10]
Tiempos de reparación: {3: 10.0, 2: 27.0, 5: 37.0, 7: 59.0, 9: 72.0, 10: 103.0}
Ruta vehículo de ayuda: [0, 4, 1, 6]
Tiempo final del último servicio: 46.0
