# Multi station path

While its possible to define a shortest path between 2 stations the shortest path between multiple stations will be dependent on which rail lines you start on and leave the routed stations.

In [None]:
from zone1tube.data.stations import ExtStations
from zone1tube.algorithms.graph import generate_graph
from scipy.sparse.csgraph import shortest_path


with open("../zone1.json", "r") as f:
    stations = ExtStations.validate_json(f.read())


station_lookup = {s.id: s.name for s in stations}
line_lookup = {d.line_ref: d.line_name for s in stations for d in s.destinations}


graph = generate_graph(stations, 7)
dists, predecessors = shortest_path(
    graph.csr_m, directed=True, return_predecessors=True
)


def reconstruct_path(i: int, j: int, path=None):
    if path is None:
        path = []
    previous = predecessors[i, j]
    if previous == i:
        return (path + [j, i])[::-1]
    return reconstruct_path(i, previous, path=path + [j])

In [None]:
names = ["Liverpool Street", "Green Park", "Bayswater"]


def name_to_id(name: str) -> int:
    return next(s.id for s in stations if s.name == name)


def platforms(id: str) -> list[int]:
    return [graph.node_inverse_lookup[n] for n in graph.platform_lookup[id]]


stops = [(id, platforms(id)) for s in names if (id := name_to_id(s))]
route = list(zip(stops[:-1], stops[1:], strict=True))
route

In [None]:
import numpy as np

from zone1tube.algorithms.graph import Node


shortest_steps = [
    x
    for x in (
        (graph.node_lookup[i],)
        + min(
            *((graph.node_lookup[j], dists[i, j]) for j in to_stop[1]),
            key=lambda x: x[1],
        )
        for from_stop, to_stop in route
        for i in from_stop[1]
    )
    if x[2] < np.inf
]
shortest_steps

In [None]:
from collections import defaultdict
from scipy.sparse.csgraph import csgraph_from_dense
from zone1tube.algorithms.graph import Graph


def build_intermediate_graph(input: list[tuple[Node, Node, float]], change_time=7):
    node_lookup: dict[int, Node] = dict(
        enumerate({a for a, _, _ in input} | {b for _, b, _ in input})
    )
    node_inverse_lookup: dict[Node, int] = {v: k for k, v in node_lookup.items()}
    node_count = len(node_lookup)
    adj_matrix = np.matrix(np.ones((node_count, node_count), dtype=np.int8) * np.inf)
    for a, b, w in input:
        i = node_inverse_lookup[a]
        j = node_inverse_lookup[b]
        adj_matrix[i, j] = w
    platform_lookup: dict[int, set[Node]] = defaultdict(set)
    for node in node_lookup.values():
        platform_lookup[node.station_id].add(node)
    for lines in platform_lookup.values():
        for a in lines:
            for b in lines:
                if a != b:
                    i = node_inverse_lookup[a]
                    j = node_inverse_lookup[b]
                    adj_matrix[i, j] = change_time
    csr_m = csgraph_from_dense(adj_matrix, null_value=np.inf)
    return Graph(
        platform_lookup=platform_lookup,
        csr_m=csr_m,
        node_lookup=node_lookup,
        node_inverse_lookup=node_inverse_lookup,
    )

In [None]:
intermediate_graph = build_intermediate_graph(shortest_steps)
intermediate_dists, intermediate_predecessors = shortest_path(
    intermediate_graph.csr_m, directed=True, return_predecessors=True
)


def node_to_str(n):
    station_id, line_ref = n
    return f"{station_lookup[station_id]} -> {line_lookup[line_ref]}"


def reconstruct_intermediate_path(i: int, j: int, path=None):
    if path is None:
        path = []
    previous = intermediate_predecessors[i, j]
    if previous == i:
        return (path + [j, i])[::-1]
    return reconstruct_intermediate_path(i, previous, path=path + [j])


from_intermediates = [
    inode
    for s in stops[0][1]
    if (node := graph.node_lookup[s])
    and node
    and (inode := intermediate_graph.node_inverse_lookup.get(node))
    and inode
]
to_intermediates = [
    inode
    for s in stops[-1][1]
    if (node := graph.node_lookup[s])
    and node
    and (inode := intermediate_graph.node_inverse_lookup.get(node))
    and inode
]

i, j, time = min(
    (
        (i, j, intermediate_dists[i, j])
        for i in from_intermediates
        for j in to_intermediates
    ),
    key=lambda x: x[2],
)
shortest_intermediate_stops = [
    intermediate_graph.node_lookup[x] for x in reconstruct_intermediate_path(i, j)
]
print("\n-> ".join(node_to_str(x) for x in shortest_intermediate_stops))

In [None]:
for a, b in zip(
    shortest_intermediate_stops[:-1], shortest_intermediate_stops[1:], strict=True
):
    i = graph.node_inverse_lookup[a]
    j = graph.node_inverse_lookup[b]
    print(
        "\n-> ".join(node_to_str(graph.node_lookup[s]) for s in reconstruct_path(i, j))
    )
print("total time:", time, "mins")