In [1]:
import heapq
import itertools
import copy
import random
from typing import Dict, List, Tuple

In [None]:
def dijkstra(graph: Dict[str, Dict[str, float]], start: str) -> Dict[str, float]:
    dist = {start: 0.0}
    pq = [(0.0, start)]
    while pq:
        d,u = heapq.heappop(pq)
        if d>dist.get(u, float("inf")): continue
        for v,w in graph[u].items():
            nd = d + w
            if nd < dist.get(v, float("inf")):
                dist[v] = nd
                heapq.heappush(pq, (nd, v))
    return dist

In [15]:
class RiderAssignmentEnv:
    def __init__(self, graph, riders, orders, capacity=2):
        """
        graph: adjacency dict graph[u][v] = distance (symmetric)
        riders: dict rider_id -> start_node
        orders: dict order_id -> (pickup_node, drop_node)
        capacity: max concurrent orders per rider
        """
        self.graph = graph
        self.riders = riders
        self.orders = orders
        self.capacity = capacity
        self.order_ids = list(orders.keys())
        self.rider_ids = list(riders.keys())

        # precompute all-pairs shortest distances (Dijkstra from every node)
        self._nodes = list(graph.keys())
        self._all_pairs = {n: dijkstra(graph, n) for n in self._nodes}

    def initial_state(self):
        state = {r: [] for r in self.rider_ids}
        for o in self.order_ids:
            r = random.choice(self.rider_ids)
            state[r].append(o)
        return state

    def _best_route_cost_for_assigned_orders(self, rider_id: str, assigned_orders: List[str]) -> float:
        """
        Compute minimal feasible route cost for this rider to service assigned_orders (list of order ids),
        respecting pickup-before-drop for each order and capacity.
        Returns float('inf') if no feasible ordering exists.
        """
        m = len(assigned_orders)
        if m == 0:
            return 0.0

        # Build event list: ('p', order), ('d', order)
        events = []
        for o in assigned_orders:
            events.append(('p', o))
            events.append(('d', o))

        start_node = self.riders[rider_id]

        best = float('inf')
        # generate all permutations of events (2m)! / (symmetries) - ok for small m
        for perm in itertools.permutations(events, 2*m):
            # quick pruning: every order must appear exactly twice and pickups before drops
            seen_picks = set()
            onboard = 0
            ok = True
            # we also track that events are well-formed: no duplicate same event positions (permutations of tuples handle uniqueness)
            # verify prefix constraints
            for ev in perm:
                kind, oid = ev
                if kind == 'p':
                    if oid in seen_picks:
                        ok = False; break
                    seen_picks.add(oid)
                    onboard += 1
                    if onboard > self.capacity:
                        ok = False; break
                else:  # drop
                    if oid not in seen_picks:
                        ok = False; break
                    # ensure we haven't dropped earlier than pickup (we track via seen_picks but need extra check if dropped twice)
                    # count occurrences: drop before pickup already guarded
                    onboard -= 1
                # continue
            if not ok:
                continue
            # also ensure every order's pickup is before its drop in this perm:
            valid = True
            pos = {}
            for idx, ev in enumerate(perm):
                if ev not in pos:
                    pos[ev] = idx
            for o in assigned_orders:
                if pos.get(('p', o), float('inf')) >= pos.get(('d', o), float('inf')):
                    valid = False
                    break
            if not valid:
                continue

            # compute route cost along nodes implied by this event permutation
            nodes = [start_node]
            for kind, oid in perm:
                node = self.orders[oid][0] if kind == 'p' else self.orders[oid][1]
                nodes.append(node)
            # sum shortest-path distances between successive nodes
            total = 0.0
            feasible = True
            for u, v in zip(nodes, nodes[1:]):
                dist_uv = self._all_pairs.get(u, {}).get(v, float('inf'))
                if dist_uv == float('inf'):
                    feasible = False
                    break
                total += dist_uv
                # small pruning
                if total >= best: 
                    feasible = False
                    break
            if feasible and total < best:
                best = total
        return best

    def route_cost(self, rider_id: str, assigned_orders: List[str]) -> float:
        return self._best_route_cost_for_assigned_orders(rider_id, assigned_orders)

    def cost(self, state: Dict[str, List[str]]) -> float:
        total = 0.0
        for r, assigned in state.items():
            total += self.route_cost(r, assigned)
            if total == float('inf'):
                return float('inf')
        return total

    # neighbours / random_neighbour implementations (unchanged)
    def neighbours(self, state):
        neighbours = []
        # move one order between riders
        for r1 in self.rider_ids:
            for r2 in self.rider_ids:
                if r1 == r2: continue
                for o in list(state[r1]):
                    ns = {rid: lst[:] for rid, lst in state.items()}
                    ns[r1].remove(o)
                    ns[r2].append(o)
                    neighbours.append(ns)
        # swap sequence inside a rider
        for r in self.rider_ids:
            lst = state[r]
            n = len(lst)
            if n >= 2:
                for i in range(n):
                    for j in range(i+1, n):
                        ns = {rid: lst2[:] for rid, lst2 in state.items()}
                        ns[r][i], ns[r][j] = ns[r][j], ns[r][i]
                        neighbours.append(ns)
        return neighbours

    def random_neighbour(self, state):
        nbs = self.neighbours(state)
        return random.choice(nbs) if nbs else state

    @staticmethod
    def random_problem(num_nodes=6, num_edges=8, num_riders=2, num_orders=3,
                       capacity=2, weight_range=(1,5), seed=None):
        """
        Generate a random toy problem.
        """
        if seed is not None:
            random.seed(seed)

        # create graph
        nodes = [chr(65+i) for i in range(num_nodes)]  # 'A','B',...
        graph = {n:{} for n in nodes}
        edges = set()
        while len(edges) < num_edges:
            u, v = random.sample(nodes, 2)
            if (u,v) not in edges and (v,u) not in edges:
                w = random.randint(*weight_range)
                graph[u][v] = w
                graph[v][u] = w
                edges.add((u,v))

        # riders
        riders = {f"R{i+1}": random.choice(nodes) for i in range(num_riders)}

        # orders
        orders = {}
        for j in range(num_orders):
            pickup, drop = random.sample(nodes, 2)
            orders[f"O{j+1}"] = (pickup, drop)

        return RiderAssignmentEnv(graph, riders, orders, capacity)