diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index b8f65c694..fa8fb93f0 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -31,7 +31,7 @@ jobs: runs-on: ${{ matrix.os}} strategy: matrix: - python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] + python-version: ['3.9', '3.10', '3.11', '3.12'] os: [windows-latest, ubuntu-latest] max-parallel: 20 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0e3fbbc53..d8b2a859c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,9 +1,14 @@ repos: -- repo: https://github.com/ambv/black - rev: 22.3.0 - hooks: +- repo: https://github.com/astral-sh/ruff-pre-commit + # Ruff version. + rev: v0.3.0 + hooks: + # Run the linter. + - id: ruff + args: [ --fix ] + # Run the formatter. + - id: ruff-format +- repo: https://github.com/ambv/black + rev: 24.1.1 + hooks: - id: black -- repo: https://github.com/pycqa/flake8 - rev: 4.0.1 - hooks: - - id: flake8 \ No newline at end of file diff --git a/aequilibrae/matrix/__init__.py b/aequilibrae/matrix/__init__.py index 39ae5bde4..18521fe8a 100644 --- a/aequilibrae/matrix/__init__.py +++ b/aequilibrae/matrix/__init__.py @@ -1,2 +1,3 @@ from .aequilibrae_matrix import AequilibraeMatrix, matrix_export_types from .aequilibrae_data import AequilibraeData, data_export_types +from .sparse_matrix import Sparse, COO diff --git a/aequilibrae/matrix/sparse_matrix.pxd b/aequilibrae/matrix/sparse_matrix.pxd new file mode 100644 index 000000000..53c5b611b --- /dev/null +++ b/aequilibrae/matrix/sparse_matrix.pxd @@ -0,0 +1,13 @@ +from libcpp.vector cimport vector + +cdef class Sparse: + pass + +cdef class COO(Sparse): + cdef: + vector[size_t] *row + vector[size_t] *col + vector[double] *data + readonly object shape + + cdef void append(COO self, size_t i, size_t j, double v) noexcept nogil diff --git a/aequilibrae/matrix/sparse_matrix.pyx b/aequilibrae/matrix/sparse_matrix.pyx new file mode 100644 index 000000000..270850223 --- /dev/null +++ b/aequilibrae/matrix/sparse_matrix.pyx @@ -0,0 +1,122 @@ +from libcpp.vector cimport vector +from libcpp cimport nullptr +from cython.operator cimport dereference as d + +import scipy.sparse +import numpy as np +import openmatrix as omx + +cdef class Sparse: + """ + A class to implement sparse matrix operations such as reading, writing, and indexing + """ + + def __cinit__(self): + """C level init. For C memory allocation and initialisation. Called exactly once per object.""" + pass + + def __init__(self): + """Python level init, may be called multiple times, for things that can't be done in __cinit__.""" + pass + + def __dealloc__(self): + """ + C level deallocation. For freeing memory allocated by this object. *Must* have GIL, `self` may be in a + partially deallocated state already. + """ + pass + + def to_disk(self, path, name: str): + f = omx.open_file(path, "a") + try: + f[name] = self.to_scipy().tocsr().toarray() + finally: + f.close() + + @classmethod + def from_disk(cls, path, names=None, aeq=False): + """ + Read a OMX file and return a dictionary of matrix names to a scipy.sparse matrix, or + aequilibrae.matrix.sparse matrix. + """ + f = omx.open_file(path, "r") + res = {} + try: + for matrix in (f.list_matrices() if names is None else names): + if aeq: + res[matrix] = cls.from_matrix(f[matrix]) + else: + res[matrix] = scipy.sparse.csr_matrix(f[matrix]) + return res + finally: + f.close() + + +cdef class COO(Sparse): + """ + A class to implement sparse matrix operations such as reading, writing, and indexing + """ + + def __cinit__(self): + """C level init. For C memory allocation and initialisation. Called exactly once per object.""" + + self.row = new vector[size_t]() + self.col = new vector[size_t]() + self.data = new vector[double]() + + def __init__(self, shape=None): + """Python level init, may be called multiple times, for things that can't be done in __cinit__.""" + + self.shape = shape + + def __dealloc__(self): + """ + C level deallocation. For freeing memory allocated by this object. *Must* have GIL, `self` may be in a + partially deallocated state already. + """ + + del self.row + self.row = nullptr + + del self.col + self.col = nullptr + + del self.data + self.data = nullptr + + def to_scipy(self, shape=None, dtype=np.float64): + """ + Create scipy.sparse.coo_matrix from this COO matrix. + """ + row = &d(self.row)[0] + col = &d(self.col)[0] + data = &d(self.data)[0] + + if shape is None: + shape = self.shape + + return scipy.sparse.coo_matrix((data, (row, col)), dtype=dtype, shape=shape) + + @classmethod + def from_matrix(cls, m): + """ + Create COO matrix from an dense or scipy-like matrix. + """ + if not isinstance(m, scipy.sparse.coo_matrix): + m = scipy.sparse.coo_matrix(m) + + self = cls() + + cdef size_t[:] row = m.row.astype(np.uint64), col = m.row.astype(np.uint64) + cdef double[:] data = m.data + + self.row.insert(self.row.end(), &row[0], &row[-1] + 1) + self.col.insert(self.col.end(), &col[0], &col[-1] + 1) + self.data.insert(self.data.end(), &data[0], &data[-1] + 1) + + return self + + cdef void append(COO self, size_t i, size_t j, double v) noexcept nogil: + self.row.push_back(i) + self.col.push_back(j) + self.data.push_back(v) diff --git a/aequilibrae/paths/__init__.py b/aequilibrae/paths/__init__.py index 61af0cc6e..42b21a0f6 100644 --- a/aequilibrae/paths/__init__.py +++ b/aequilibrae/paths/__init__.py @@ -8,6 +8,7 @@ from aequilibrae.paths.traffic_assignment import TrafficAssignment, TransitAssignment from aequilibrae.paths.vdf import VDF from aequilibrae.paths.graph import Graph, TransitGraph +from aequilibrae.paths.route_choice import RouteChoice from aequilibrae import global_logger diff --git a/aequilibrae/paths/graph.py b/aequilibrae/paths/graph.py index 2a75572c1..495c7ff72 100644 --- a/aequilibrae/paths/graph.py +++ b/aequilibrae/paths/graph.py @@ -4,14 +4,35 @@ from datetime import datetime from os.path import join from typing import List, Tuple, Optional +import dataclasses import numpy as np import pandas as pd -from aequilibrae.paths.graph_building import build_compressed_graph +from aequilibrae.paths.graph_building import build_compressed_graph, create_compressed_link_network_mapping from aequilibrae.context import get_logger +@dataclasses.dataclass +class NetworkGraphIndices: + network_ab_idx: np.array + network_ba_idx: np.array + graph_ab_idx: np.array + graph_ba_idx: np.array + + +def _get_graph_to_network_mapping(lids, direcs): + num_uncompressed_links = int(np.unique(lids).shape[0]) + indexing = np.zeros(int(lids.max()) + 1, np.uint64) + indexing[np.unique(lids)[:]] = np.arange(num_uncompressed_links) + + graph_ab_idx = direcs > 0 + graph_ba_idx = direcs < 0 + network_ab_idx = indexing[lids[graph_ab_idx]] + network_ba_idx = indexing[lids[graph_ba_idx]] + return NetworkGraphIndices(network_ab_idx, network_ba_idx, graph_ab_idx, graph_ba_idx) + + class GraphBase(ABC): # noqa: B024 """ Graph class. @@ -95,6 +116,9 @@ def __init__(self, logger=None): self.dead_end_links = np.array([]) + self.compressed_link_network_mapping_idx = None + self.compressed_link_network_mapping_data = None + # Randomly generate a unique Graph ID randomly self._id = uuid.uuid4().hex @@ -167,6 +191,11 @@ def prepare_graph(self, centroids: Optional[np.ndarray]) -> None: self.__build_compressed_graph() self.compact_num_links = self.compact_graph.shape[0] + # The cache property should be recalculated when the graph has been re-prepared + self.compressed_link_network_mapping_idx = None + self.compressed_link_network_mapping_data = None + self.network_compressed_node_mapping = None + def __build_compressed_graph(self): build_compressed_graph(self) @@ -505,6 +534,27 @@ def save_compressed_correspondence(self, path, mode_name, mode_id): node_path = join(path, f"nodes_to_indices_c{mode_name}_{mode_id}.feather") pd.DataFrame(self.nodes_to_indices, columns=["node_index"]).to_feather(node_path) + def create_compressed_link_network_mapping(self): + """ + Create two arrays providing a mapping of compressed id to link id. + + Uses sparse compression. Index ``idx`` by the by compressed id and compressed id + 1, the + network IDs are then in the range ``idx[id]:idx[id + 1]``. + + .. code-block:: python + + >>> idx, data = graph.compressed_link_network_mapping + >>> data[idx[id]:idx[id + 1]] # ==> Slice of network ID's corresponding to the compressed ID + + Links not in the compressed graph are not contained within the ``data`` array. + + :Returns: + **idx** (:obj:`np.array`): index array for ``data`` + **data** (:obj:`np.array`): array of link ids + """ + + return create_compressed_link_network_mapping(self) + class Graph(GraphBase): def __init__(self, *args, **kwargs): diff --git a/aequilibrae/paths/graph_building.pyx b/aequilibrae/paths/graph_building.pyx index 4bd13c85e..724643de8 100644 --- a/aequilibrae/paths/graph_building.pyx +++ b/aequilibrae/paths/graph_building.pyx @@ -325,6 +325,10 @@ def build_compressed_graph(graph): "link_id": np.arange(slink), } ) + + # Link compression can introduce new simple cycles into the graph + comp_lnk = comp_lnk[comp_lnk.a_node != comp_lnk.b_node] + max_link_id = link_id_max * 10 comp_lnk.link_id += max_link_id @@ -377,3 +381,76 @@ def build_compressed_graph(graph): # If will refer all the links that have no correlation to an element beyond the last link # This element will always be zero during assignment graph.graph.__compressed_id__ = graph.graph.__compressed_id__.fillna(graph.compact_graph.id.max() + 1).astype(np.int64) + + +@cython.embedsignature(True) +@cython.boundscheck(False) +@cython.initializedcheck(False) +def create_compressed_link_network_mapping(graph): + # Cache the result, this isn't a huge computation but isn't worth doing twice + if ( + graph.compressed_link_network_mapping_idx is not None + and graph.compressed_link_network_mapping_data is not None + and graph.network_compressed_node_mapping is not None + ): + return ( + graph.compressed_link_network_mapping_idx, + graph.compressed_link_network_mapping_data, + graph.network_compressed_node_mapping, + ) + + cdef: + long long i, j, a_node, x, b_node, tmp, compressed_id + long long[:] b + long long[:] values + np.uint32_t[:] idx + np.uint32_t[:] data + np.int32_t[:] node_mapping + + # This method requires that graph.graph is sorted on the a_node IDs, since that's done already we don't + # bother redoing sorting it. + + # Some links are completely removed from the network, they are assigned ID `graph.compact_graph.id.max() + 1`, + # we skip them. + filtered = graph.graph[graph.graph.__compressed_id__ != graph.compact_graph.id.max() + 1] + gb = filtered.groupby(by="__compressed_id__", sort=True) + idx = np.zeros(graph.compact_num_links + 1, dtype=np.uint32) + data = np.zeros(len(filtered), dtype=np.uint32) + + node_mapping = np.full(graph.num_nodes, -1, dtype=np.int32) + + i = 0 + for compressed_id, df in gb: + idx[compressed_id] = i + values = df.link_id.values + a = df.a_node.values + b = df.b_node.values + + # In order to ensure that the link IDs come out in the correct order we must walk the links + # we do this assuming the `a` array is sorted. + j = 0 + # Find the missing a_node, this is the starting of the chain. We cannot rely on the node ordering to do a simple lookup + + a_node = x = a[np.isin(a, b, invert=True, assume_unique=True)][0] + while True: + tmp = a.searchsorted(x) + if tmp < len(a) and a[tmp] == x: + x = b[tmp] + data[i + j] = values[tmp] + else: + break + j += 1 + + b_node = x + node_mapping[a_node] = graph.compact_graph["a_node"].iat[compressed_id] + node_mapping[b_node] = graph.compact_graph["b_node"].iat[compressed_id] + + i += len(values) + + idx[-1] = i + + graph.compressed_link_network_mapping_idx = idx + graph.compressed_link_network_mapping_data = data + graph.network_compressed_node_mapping = node_mapping + + return idx, data, node_mapping diff --git a/aequilibrae/paths/results/assignment_results.py b/aequilibrae/paths/results/assignment_results.py index 220b7033a..1c6da335a 100644 --- a/aequilibrae/paths/results/assignment_results.py +++ b/aequilibrae/paths/results/assignment_results.py @@ -1,10 +1,9 @@ -import dataclasses import multiprocessing as mp from abc import ABC, abstractmethod import numpy as np from aequilibrae.matrix import AequilibraeMatrix, AequilibraeData -from aequilibrae.paths.graph import Graph, TransitGraph, GraphBase +from aequilibrae.paths.graph import Graph, TransitGraph, GraphBase, _get_graph_to_network_mapping from aequilibrae.parameters import Parameters from aequilibrae import global_logger from pathlib import Path @@ -22,14 +21,6 @@ """ -@dataclasses.dataclass -class NetworkGraphIndices: - network_ab_idx: np.array - network_ba_idx: np.array - graph_ab_idx: np.array - graph_ba_idx: np.array - - class AssignmentResultsBase(ABC): """Assignment results base class for traffic and transit assignments.""" @@ -249,15 +240,7 @@ def total_flows(self) -> None: sum_axis1(self.total_link_loads, self.link_loads, self.cores) def get_graph_to_network_mapping(self): - num_uncompressed_links = int(np.unique(self.lids).shape[0]) - indexing = np.zeros(int(self.lids.max()) + 1, np.uint64) - indexing[np.unique(self.lids)[:]] = np.arange(num_uncompressed_links) - - graph_ab_idx = self.direcs > 0 - graph_ba_idx = self.direcs < 0 - network_ab_idx = indexing[self.lids[graph_ab_idx]] - network_ba_idx = indexing[self.lids[graph_ba_idx]] - return NetworkGraphIndices(network_ab_idx, network_ba_idx, graph_ab_idx, graph_ba_idx) + return _get_graph_to_network_mapping(self.lids, self.direcs) def get_load_results(self) -> AequilibraeData: """ diff --git a/aequilibrae/paths/route_choice.py b/aequilibrae/paths/route_choice.py new file mode 100644 index 000000000..c5b1e553f --- /dev/null +++ b/aequilibrae/paths/route_choice.py @@ -0,0 +1,543 @@ +import itertools +import warnings +import logging +import pathlib +import socket +import sqlite3 +from datetime import datetime +from typing import List, Optional, Tuple, Union, Dict +from uuid import uuid4 +from functools import cached_property + +import numpy as np +import pandas as pd +import pyarrow as pa +from aequilibrae.context import get_active_project +from aequilibrae.matrix import AequilibraeMatrix +from aequilibrae.paths.graph import Graph, _get_graph_to_network_mapping +from aequilibrae.paths.route_choice_set import RouteChoiceSet + + +class RouteChoice: + all_algorithms = ["bfsle", "lp", "link-penalisation", "link-penalization"] + + default_paramaters = { + "generic": {"seed": 0, "max_routes": 0, "max_depth": 0, "max_misses": 100, "penalty": 1.01, "cutoff_prob": 1.0}, + "link-penalisation": {}, + "bfsle": {"beta": 1.0, "theta": 1.0, "penalty": 1.0}, + } + + def __init__(self, graph: Graph, matrix: Optional[AequilibraeMatrix] = None, project=None): + self.parameters = self.default_paramaters.copy() + self.procedure_id = None + self.procedure_date = None + + proj = project or get_active_project(must_exist=False) + self.project = proj + + self.logger = proj.logger if proj else logging.getLogger("aequilibrae") + + self.cores: int = 0 + self.graph = graph + self.matrix = matrix + + self.schema = RouteChoiceSet.schema + self.psl_schema = RouteChoiceSet.psl_schema + + self.compact_link_loads: Optional[Dict[str, np.array]] = None + self.link_loads: Optional[Dict[str, np.array]] = None + + self.sl_compact_link_loads: Optional[Dict[str, np.array]] = None + self.sl_link_loads: Optional[Dict[str, np.array]] = None + + self.results: Optional[pa.Table] = None + self.where: Optional[pathlib.Path] = None + self.save_path_files: bool = False + + self.nodes: Optional[Union[List[int], List[Tuple[int, int]]]] = None + + self._config = {} + self._selected_links = {} + + @cached_property + def __rc(self) -> RouteChoiceSet: + return RouteChoiceSet(self.graph) + + def set_choice_set_generation(self, /, algorithm: str, **kwargs) -> None: + """Chooses the assignment algorithm and set parameters. + Options for algorithm are, 'bfsle' for breadth first search with link removal, or 'link-penalisation'/'link-penalization'. + + BFSLE implementation based on "Route choice sets for very high-resolution data" by Nadine Rieser-Schüssler, + Michael Balmer & Kay W. Axhausen (2013). + https://doi.org/10.1080/18128602.2012.671383 + + 'lp' is also accepted as an alternative to 'link-penalisation' + + Setting the parameters for the route choice: + + `beta`, `theta`, and `seed` are BFSLE specific parameters. + + Setting `max_depth` or `max_misses`, while not required, is strongly recommended to prevent runaway algorithms. + `max_misses` is the maximum amount of duplicate routes found per OD pair. If it is exceeded then the route set + if returned with fewer than `max_routes`. It has a default value of `100`. + + - When using BFSLE `max_depth` corresponds to the maximum height of the graph of graphs. It's value is + largely dependent on the size of the paths within the network. For very small networks a value of 10 + is a recommended starting point. For large networks a good starting value is 5. Increase the value + until the number of desired routes is being consistently returned. If it is exceeded then the route set + if returned with fewer than `max_routes`. + + - When using LP, `max_depth` corresponds to the maximum number of iterations performed. While not enforced, + it should be higher than `max_routes`. It's value is dependent on the magnitude of the cost field, + specifically it's related to the log base `penalty` of the ratio of costs between two alternative routes. + If it is exceeded then the route set if returned with fewer than `max_routes`. + + Additionally BFSLE has the option to incorporate link penalisation. Every link in all routes found at a depth + are penalised with the `penalty` factor for the next depth. So at a depth of 0 no links are penalised nor + removed. At depth 1, all links found at depth 0 are penalised, then the links marked for removal are removed. + All links in the routes found at depth 1 are then penalised for the next depth. The penalisation compounds. + Pass set `penalty=1.0` to disable. + + When performing an assignment, `cutoff_prob` can be provided to exclude routes from the path-sized logit model. + The `cutoff_prob` is used to compute an inverse binary logit and obtain a max difference in utilities. If a + paths total cost is greater than the minimum cost path in the route set plus the max difference, the route is + excluded from the PSL calculations. The route is still returned, but with a probability of 0.0. + + The `cutoff_prob` should be in the range [0, 1]. It is then rescaled internally to [0.5, 1] as probabilities + below 0.5 produce negative differences in utilities. A higher `cutoff_prob` includes more routes. A value of + `0.0` will only include the minimum cost route. A value of `1.0` includes all routes. + + :Arguments: + **algorithm** (:obj:`str`): Algorithm to be used + **kwargs** (:obj:`dict`): Dictionary with all parameters for the algorithm + """ + algo_dict = {i: i for i in self.all_algorithms} + algo_dict["lp"] = "link-penalisation" + algo_dict["link-penalization"] = "link-penalisation" + algo = algo_dict.get(algorithm.lower()) + + if algo is None: + raise AttributeError(f"Assignment algorithm not available. Choose from: {','.join(self.all_algorithms)}") + + defaults = self.default_paramaters["generic"] | self.default_paramaters[algo] + for key in kwargs.keys(): + if key not in defaults: + raise ValueError(f"Invalid parameter `{key}` provided for algorithm `{algo}`") + + self.algorithm = algo + self._config["Algorithm"] = algo + + self.parameters = defaults | kwargs + + def set_cores(self, cores: int) -> None: + """Allows one to set the number of cores to be used + + Inherited from :obj:`AssignmentResultsBase` + + :Arguments: + **cores** (:obj:`int`): Number of CPU cores to use + """ + self.cores = cores + + def set_save_path_files(self, save_it: bool) -> None: + """turn path saving on or off. + + :Arguments: + **save_it** (:obj:`bool`): Boolean to indicate whether paths should be saved + """ + self.save_path_files = save_it + raise NotImplementedError() + + def set_save_routes(self, where: Optional[str] = None) -> None: + """ + Set save path for route choice results. Provide ``None`` to disable. + + **warning** enabling route saving will disable in memory results. Viewing the results will read the results + from disk first. + + :Arguments: + **save_it** (:obj:`bool`): Boolean to indicate whether routes should be saved + """ + if where is not None: + where = pathlib.Path(where) + if not where.exists(): + raise ValueError(f"Path does not exist `{where}`") + self.where = where + + def prepare(self, nodes: Union[List[int], List[Tuple[int, int]]]) -> None: + """ + Prepare OD pairs for batch computation. + + :Arguments: + **nodes** (:obj:`Union[list[int], list[tuple[int, int]]]`): List of node IDs to operate on. If a 1D list is + provided, OD pairs are taken to be all pair permutations of the list. If a list of pairs is provided + OD pairs are taken as is. All node IDs must be present in the compressed graph. To make a node ID + always appear in the compressed graph add it as a centroid. Duplicates will be dropped on execution. + """ + if len(nodes) == 0: + raise ValueError("`nodes` list-like empty.") + + if all( + isinstance(pair, tuple) + and len(pair) == 2 + and isinstance(pair[0], (int, np.integer)) + and isinstance(pair[1], (int, np.integer)) + for pair in nodes + ): + self.nodes = nodes + + elif len(nodes) > 1 and all(isinstance(x, (int, np.unsignedinteger)) for x in nodes): + self.nodes = list(itertools.permutations(nodes, r=2)) + else: + raise ValueError(f"{type(nodes)} or {type(nodes[0])} for not valid types for the `prepare` method") + + def execute_single(self, origin: int, destination: int, perform_assignment: bool = False) -> List[Tuple[int]]: + """ + Generate route choice sets between `origin` and `destination`, potentially performing an assignment. + + Does not require preparation. + + Node IDs must be present in the compressed graph. To make a node ID always appear in the compressed + graph add it as a centroid. + + :Arguments: + **origin** (:obj:`int`): Origin node ID. + **destination** (:obj:`int`): Destination node ID. + **perform_assignment** (:obj:`bool`): Whether or not to perform an assignment. Default `False`. + + :Returns: + ***route set** (:obj:`List[Tuple[int]]`): A list of routes as tuples of link IDs. + """ + self.procedure_id = uuid4().hex + self.procedure_date = str(datetime.today()) + + self.results = None + return self.__rc.run( + origin, + destination, + bfsle=self.algorithm == "bfsle", + path_size_logit=perform_assignment, + cores=self.cores, + where=str(self.where) if self.where is not None else None, + **self.parameters, + ) + + def execute(self, perform_assignment: bool = False) -> None: + """ + Generate route choice sets between the previously supplied nodes, potentially performing an assignment. + + Node IDs must be present in the compressed graph. To make a node ID always appear in the compressed + graph add it as a centroid. + + To access results see `RouteChoice.get_results()`. + + :Arguments: + **perform_assignment** (:obj:`bool`): Whether or not to perform an assignment. Default `False`. + """ + if self.nodes is None: + raise ValueError( + "to perform batch route choice generation you must first prepare with the selected nodes. See `RouteChoice.prepare()`" + ) + + self.procedure_date = str(datetime.today()) + + self.results = None + self.__rc.batched( + self.nodes, + bfsle=self.algorithm == "bfsle", + path_size_logit=perform_assignment, + cores=self.cores, + where=str(self.where) if self.where is not None else None, + **self.parameters, + ) + + def info(self) -> dict: + """Returns information for the transit assignment procedure + + Dictionary contains keys 'Algorithm', 'Matrix totals', 'Computer name', 'Procedure ID', 'Parameters', and + 'Select links'. + + The classes key is also a dictionary with all the user classes per transit class and their respective + matrix totals + + :Returns: + **info** (:obj:`dict`): Dictionary with summary information + """ + + if self.matrix is None: + matrix_totals = {} + elif len(self.matrix.view_names) == 1: + matrix_totals = {self.matrix.view_names[0]: np.sum(self.matrix.matrix_view[:, :])} + else: + matrix_totals = { + nm: np.sum(self.matrix.matrix_view[:, :, i]) for i, nm in enumerate(self.matrix.view_names) + } + + info = { + "Algorithm": self.algorithm, + "Matrix totals": matrix_totals, + "Computer name": socket.gethostname(), + "Procedure ID": self.procedure_id, + "Parameters": self.parameters, + "Select links": self._selected_links, + } + return info + + def log_specification(self): + self.logger.info("Route Choice specification") + self.logger.info(self._config) + + def get_results(self) -> Union[pa.Table, pa.dataset.Dataset]: + """Returns the results of the route choice procedure + + Returns a table of OD pairs to lists of link IDs for each OD pair provided (as columns). + Represents paths from ``origin`` to ``destination``. + + If `save_routes` was specified then a Pyarrow dataset is returned. The caller is responsible for reading this dataset. + + :Returns: + **results** (:obj:`pa.Table`): Table with the results of the route choice procedure + """ + if self.results is None: + try: + self.results = self.__rc.get_results() + except RuntimeError as err: + if self.where is None: + raise ValueError("Route choice results not computed and read/save path not specified") from err + self.results = pa.dataset.dataset( + self.where, format="parquet", partitioning=pa.dataset.HivePartitioning(self.schema) + ) + + return self.results + + def get_load_results(self) -> Union[Tuple[pd.DataFrame, pd.DataFrame], pd.DataFrame]: + """ + Translates the link loading results from the graph format into the network format. + + :Returns: + **dataset** (:obj:`Union[Tuple[pd.DataFrame, pd.DataFrame], pd.DataFrame]`): + A tuple of uncompressed and compressed link loading results as DataFrames. + Columns are the matrix name concatenated direction. + """ + + if self.matrix is None: + raise ValueError( + "AequilibraE matrix was not initially provided. To perform link loading set the `RouteChoice.matrix` attribute." + ) + + tmp = self.__rc.link_loading(self.matrix, self.save_path_files) + self.link_loads = {k: v[0] for k, v in tmp.items()} + self.compact_link_loads = {k: v[1] for k, v in tmp.items()} + + # Create a data store with a row for each uncompressed link + m = _get_graph_to_network_mapping(self.graph.graph.link_id.values, self.graph.graph.direction.values) + lids = np.unique(self.graph.graph.link_id.values) + uncompressed_df = self.__link_loads_to_df(m, lids, self.link_loads) + + m_compact = _get_graph_to_network_mapping( + self.graph.compact_graph.link_id.values, self.graph.compact_graph.direction.values + ) + compact_lids = np.unique(self.graph.compact_graph.link_id.values) + compressed_df = self.__link_loads_to_df(m_compact, compact_lids, self.compact_link_loads) + + return uncompressed_df, compressed_df + + def __link_loads_to_df(self, mapping, lids, link_loads): + df = pd.DataFrame( + {"link_id": lids} | {k + dir: np.zeros(lids.shape) for k in link_loads.keys() for dir in ["_ab", "_ba"]} + ) + for k, v in link_loads.items(): + # Directional Flows + df[k + "_ab"].values[mapping.network_ab_idx] = np.nan_to_num(v[mapping.graph_ab_idx]) + df[k + "_ba"].values[mapping.network_ba_idx] = np.nan_to_num(v[mapping.graph_ba_idx]) + + # Tot Flow + df[k + "_tot"] = np.nan_to_num(df[k + "_ab"].values) + np.nan_to_num(df[k + "_ba"].values) + + return df + + def set_select_links(self, links: Dict[str, List[Tuple[int, int]]]): + """ + Set the selected links. Checks if the links and directions are valid. Translates `links=None` and + direction into unique link ID used in compact graph. + + Supply `links=None` to disable select link analysis. + + :Arguments: + **links** (:obj:`Union[None, Dict[str, List[Tuple[int, int]]]]`): name of link set and + Link IDs and directions to be used in select link analysis. + """ + self._selected_links = {} + + if links is None: + del self._config["select_links"] + return + + max_id = self.graph.compact_graph.id.max() + 1 + + for name, link_set in links.items(): + if len(name.split(" ")) != 1: + warnings.warn("Input string name has a space in it. Replacing with _") + name = str.join("_", name.split(" ")) + + link_ids = [] + for link, dir in link_set: + if dir == 0: + query = (self.graph.graph["link_id"] == link) & ( + (self.graph.graph["direction"] == -1) | (self.graph.graph["direction"] == 1) + ) + else: + query = (self.graph.graph["link_id"] == link) & (self.graph.graph["direction"] == dir) + if not query.any(): + raise ValueError(f"link_id or direction {(link, dir)} is not present within graph.") + # Check for duplicate compressed link ids in the current link set + for comp_id in self.graph.graph[query]["__compressed_id__"].values: + if comp_id == max_id: + raise ValueError( + f"link ID {link} and direction {dir} is not present in compressed graph. " + "It may have been removed during dead-end removal." + ) + elif comp_id in link_ids: + warnings.warn( + "Two input links map to the same compressed link in the network" + f", removing superfluous link {link} and direction {dir} with compressed id {comp_id}" + ) + else: + link_ids.append(comp_id) + self._selected_links[name] = link_ids + self._config["select_links"] = str(links) + + def get_select_link_results(self) -> pd.DataFrame: + """ + Get the select link loading results. + + :Returns: + **dataset** (:obj:`Tuple[pd.DataFrame, pd.DataFrame]`): + A tuple of uncompressed and compressed select link loading results as DataFrames. + Columns are the matrix name concatenated with the select link set and direction. + """ + + if self.matrix is None: + raise ValueError( + "AequilibraE matrix was not initially provided. To perform link loading set the `RouteChoice.matrix` attribute." + ) + + tmp = self.__rc.select_link_loading(self.matrix, self._selected_links) + + self.sl_link_loads = {} + self.sl_compact_link_loads = {} + self.sl_od_matrix = {} + for name, sl_res in tmp.items(): + for sl_name, res in sl_res.items(): + mat, (u, c) = res + self.sl_od_matrix[name + "_" + sl_name] = mat + self.sl_link_loads[name + "_" + sl_name] = u + self.sl_compact_link_loads[name + "_" + sl_name] = c + + # Create a data store with a row for each uncompressed link + m = _get_graph_to_network_mapping(self.graph.graph.link_id.values, self.graph.graph.direction.values) + lids = np.unique(self.graph.graph.link_id.values) + uncompressed_df = self.__link_loads_to_df(m, lids, self.sl_link_loads) + + m_compact = _get_graph_to_network_mapping( + self.graph.compact_graph.link_id.values, self.graph.compact_graph.direction.values + ) + compact_lids = np.unique(self.graph.compact_graph.link_id.values) + compressed_df = self.__link_loads_to_df(m_compact, compact_lids, self.sl_compact_link_loads) + + return uncompressed_df, compressed_df + + def __save_dataframe(self, df, method_name: str, description: str, table_name: str, report: dict, project) -> None: + self.procedure_id = uuid4().hex + data = [ + table_name, + "select link", + self.procedure_id, + str(report), + self.procedure_date, + description, + ] + + # sqlite3 context managers only commit, they don't close, oh well + conn = sqlite3.connect(pathlib.Path(project.project_base_path) / "results_database.sqlite") + with conn: + df.to_sql(table_name, conn, index=False) + conn.close() + + conn = project.connect() + with conn: + conn.execute( + """Insert into results(table_name, procedure, procedure_id, procedure_report, timestamp, + description) Values(?,?,?,?,?,?)""", + data, + ) + conn.close() + + def save_link_flows(self, table_name: str, project=None) -> None: + """ + Saves the link link flows for all classes into the results database. + + :Arguments: + **table_name** (:obj:`str`): Name of the table being inserted to. + **project** (:obj:`Project`, `Optional`): Project we want to save the results to. + Defaults to the active project + """ + if not project: + project = self.project or get_active_project() + + u, c = self.get_load_results() + info = self.info() + self.__save_dataframe( + u, + "Link loading", + "Uncompressed link loading results", + table_name + "_uncompressed", + info, + project=project, + ) + + self.__save_dataframe( + c, + "Link loading", + "Compressed link loading results", + table_name + "_compressed", + info, + project=project, + ) + + def save_select_link_flows(self, table_name: str, project=None) -> None: + """ + Saves the select link link flows for all classes into the results database. Additionally, it exports + the OD matrices into OMX format. + + :Arguments: + **table_name** (:obj:`str`): Name of the table being inserted to and the name of the + OpenMatrix file used for OD matrices. + **project** (:obj:`Project`, `Optional`): Project we want to save the results to. + Defaults to the active project + """ + if not project: + project = self.project or get_active_project() + + u, c = self.get_select_link_results() + info = self.info() + self.__save_dataframe( + u, + "Select link analysis", + "Uncompressed select link analysis results", + table_name + "_uncompressed", + info, + project=project, + ) + + self.__save_dataframe( + c, + "Select link analysis", + "Compressed select link analysis results", + table_name + "_compressed", + info, + project=project, + ) + + for k, v in self.sl_od_matrix.items(): + v.to_disk((pathlib.Path(project.project_base_path) / "matrices" / table_name).with_suffix(".omx"), k) diff --git a/aequilibrae/paths/route_choice.pyx b/aequilibrae/paths/route_choice.pyx deleted file mode 100644 index b22dd0c08..000000000 --- a/aequilibrae/paths/route_choice.pyx +++ /dev/null @@ -1,871 +0,0 @@ -# cython: language_level=3str - -"""This module aims to implemented the BFS-LE algorithm as described in Rieser-Schüssler, Balmer, and Axhausen, 'Route -Choice Sets for Very High-Resolution Data'. https://doi.org/10.1080/18128602.2012.671383 - -A rough overview of the algorithm is as follows. - 1. Prepare the initial graph, this is depth 0 with no links removed. - 2. Find a short path, P. If P is not empty add P to the path set. - 3. For all links p in P, remove p from E, compounding with the previously removed links. - 4. De-duplicate the sub-graphs, we only care about unique sub-graphs. - 5. Go to 2. - -Details: The general idea of the algorithm is pretty simple, as is the implementation. The caveats here is that there is -a lot of cpp interop and memory management. A description of the purpose of variables is in order: - -route_set: See route_choice.pxd for full type signature. It's an unordered set (hash set) of pointers to vectors of link -IDs. It uses a custom hashing function and comparator. The hashing function is defined in a string that in inlined -directly into the output ccp. This is done allow declaring of the `()` operator, which is required and AFAIK not -possible in Cython. The hash is designed to dereference then hash order dependent vectors. One isn't provided by -stdlib. The comparator simply dereferences the pointer and uses the vector comparator. It's designed to store the -outputted paths. Heap allocated (needs to be returned). - -removed_links: See route_choice.pxd for full type signature. It's an unordered set of pointers to unordered sets of link -IDs. Similarly to `route_set` is uses a custom hash function and comparator. This hash function is designed to be order -independent and should only use commutative operations. The comparator is the same. It's designed to store all of the -removed link sets we've seen before. This allows us to detected duplicated graphs. - -rng: A custom imported version of std::linear_congruential_engine. libcpp doesn't provide one so we do. It should be -significantly faster than the std::mersenne_twister_engine without sacrificing much. We don't need amazing RNG, just -ok and fast. This is only used to shuffle the queue. - -queue, next_queue: These are vectors of pointers to sets of removed links. We never need to push to the front of these so a -vector is best. We maintain two queues, one that we are currently iterating over, and one that we can add to, building -up with all the newly removed link sets. These two are swapped at the end of an iteration, next_queue is then -cleared. These store sets of removed links. - -banned, next_banned: `banned` is the iterator variable for `queue`. `banned` is copied into `next_banned` where another -link can be added without mutating `banned`. If we've already seen this set of removed links `next_banned` is immediately -deallocated. Otherwise it's placed into `next_queue`. - -vec: `vec` is a scratch variable to store pointers to new vectors, or rather, paths while we are building them. Each time a path -is found a new one is allocated, built, and stored in the route_set. - -p, connector: Scratch variables for iteration. - -Optimisations: As described in the paper, both optimisations have been implemented. The path finding operates on the -compressed graph and the queue is shuffled if its possible to fill the route set that iteration. The route set may not -be filled due to duplicate paths but we can't know that in advance so we shuffle anyway. - -Any further optimisations should focus on the path finding, from benchmarks it dominates the run time (~98%). Since huge -routes aren't required small-ish things like the memcpy and banned link set copy aren't high priority. - -""" - -from aequilibrae import Graph - -from libc.math cimport INFINITY, pow, exp -from libc.string cimport memcpy -from libc.limits cimport UINT_MAX -from libc.stdlib cimport abort -from libcpp cimport nullptr -from libcpp.vector cimport vector -from libcpp.unordered_set cimport unordered_set -from libcpp.unordered_map cimport unordered_map -from libcpp.utility cimport pair -from libcpp.algorithm cimport sort, lower_bound -from cython.operator cimport dereference as deref, preincrement as inc -from cython.parallel cimport parallel, prange, threadid -cimport openmp - -import numpy as np -import pyarrow as pa -from typing import List, Tuple -import itertools -import pathlib -import logging -import warnings - -cimport numpy as np # Numpy *must* be cimport'd BEFORE pyarrow.lib, there's nothing quite like Cython. -cimport pyarrow as pa -cimport pyarrow.lib as libpa -import pyarrow.dataset -import pyarrow.parquet as pq -from libcpp.memory cimport shared_ptr - -from libc.stdio cimport fprintf, printf, stderr - -# It would really be nice if these were modules. The 'include' syntax is long deprecated and adds a lot to compilation times -include 'basic_path_finding.pyx' - -@cython.embedsignature(True) -cdef class RouteChoiceSet: - """ - Route choice implemented via breadth first search with link removal (BFS-LE) as described in Rieser-Schüssler, - Balmer, and Axhausen, 'Route Choice Sets for Very High-Resolution Data' - """ - - route_set_dtype = pa.list_(pa.uint32()) - - schema = pa.schema([ - pa.field("origin id", pa.uint32(), nullable=False), - pa.field("destination id", pa.uint32(), nullable=False), - pa.field("route set", route_set_dtype, nullable=False), - ]) - - psl_schema = pa.schema([ - pa.field("origin id", pa.uint32(), nullable=False), - pa.field("destination id", pa.uint32(), nullable=False), - pa.field("route set", route_set_dtype, nullable=False), - pa.field("cost", pa.float64(), nullable=False), - pa.field("gamma", pa.float64(), nullable=False), - pa.field("probability", pa.float64(), nullable=False), - ]) - - def __cinit__(self): - """C level init. For C memory allocation and initialisation. Called exactly once per object.""" - pass - - def __init__(self, graph: Graph): - """Python level init, may be called multiple times, for things that can't be done in __cinit__.""" - # self.heuristic = HEURISTIC_MAP[self.res._heuristic] - self.cost_view = graph.compact_cost - self.graph_fs_view = graph.compact_fs - self.b_nodes_view = graph.compact_graph.b_node.values - self.nodes_to_indices_view = graph.compact_nodes_to_indices - - # tmp = graph.lonlat_index.loc[graph.compact_all_nodes] - # self.lat_view = tmp.lat.values - # self.lon_view = tmp.lon.values - self.a_star = False - - self.ids_graph_view = graph.compact_graph.id.values - self.num_nodes = graph.compact_num_nodes - self.zones = graph.num_zones - self.block_flows_through_centroids = graph.block_centroid_flows - - - def __dealloc__(self): - """ - C level deallocation. For freeing memory allocated by this object. *Must* have GIL, `self` may be in a - partially deallocated state already. - """ - pass - - @cython.embedsignature(True) - def run(self, origin: int, destination: int, *args, **kwargs): - """ - Compute the a route set for a single OD pair. - - Often the returned list's length is ``max_routes``, however, it may be limited by ``max_depth`` or if all - unique possible paths have been found then a smaller set will be returned. - - Thin wrapper around ``RouteChoiceSet.batched``. Additional arguments are forwarded to ``RouteChoiceSet.batched``. - - :Arguments: - **origin** (:obj:`int`): Origin node ID. Must be present within compact graph. Recommended to choose a centroid. - **destination** (:obj:`int`): Destination node ID. Must be present within compact graph. Recommended to choose a centroid. - - :Returns: - **route set** (:obj:`list[tuple[int, ...]]): Returns a list of unique variable length tuples of compact link IDs. - Represents paths from ``origin`` to ``destination``. - """ - return [tuple(x) for x in self.batched([(origin, destination)], *args, **kwargs).column("route set").to_pylist()] - - # Bounds checking doesn't really need to be disabled here but the warning is annoying - @cython.boundscheck(False) - @cython.wraparound(False) - @cython.embedsignature(True) - @cython.initializedcheck(False) - def batched( - self, - ods: List[Tuple[int, int]], - max_routes: int = 0, - max_depth: int = 0, - seed: int = 0, - cores: int = 0, - a_star: bool = True, - bfsle: bool = True, - penalty: float = 0.0, - where: Optional[str] = None, - path_size_logit: bool = False, - beta: float = 1.0, - theta: float = 1.0, - ): - """ - Compute the a route set for a list of OD pairs. - - Often the returned list for each OD pair's length is ``max_routes``, however, it may be limited by ``max_depth`` or if all - unique possible paths have been found then a smaller set will be returned. - - :Arguments: - **ods** (:obj:`list[tuple[int, int]]`): List of OD pairs ``(origin, destination)``. Origin and destination node ID must be - present within compact graph. Recommended to choose a centroids. - **max_routes** (:obj:`int`): Maximum size of the generated route set. Must be non-negative. Default of ``0`` for unlimited. - **max_depth** (:obj:`int`): Maximum depth BFSLE can explore, or maximum number of iterations for link penalisation. - Must be non-negative. Default of ``0`` for unlimited. - **seed** (:obj:`int`): Seed used for rng. Must be non-negative. Default of ``0``. - **cores** (:obj:`int`): Number of cores to use when parallelising over OD pairs. Must be non-negative. Default of ``0`` for all available. - **bfsle** (:obj:`bool`): Whether to use Breadth First Search with Link Removal (BFSLE) over link penalisation. Default ``True``. - **penalty** (:obj:`float`): Penalty to use for Link Penalisation. Must be ``> 1.0``. Not compatible with ``bfsle=True``. - **where** (:obj:`str`): Optional file path to save results to immediately. Will return None. - - :Returns: - **route sets** (:obj:`dict[tuple[int, int], list[tuple[int, ...]]]`): Returns a list of unique tuples of compact link IDs for - each OD pair provided (as keys). Represents paths from ``origin`` to ``destination``. None if ``where`` was not None. - """ - cdef: - long long o, d - - if max_routes == 0 and max_depth == 0: - raise ValueError("Either `max_routes` or `max_depth` must be > 0") - - if max_routes < 0 or max_depth < 0: - raise ValueError("`max_routes`, `max_depth`, and `cores` must be non-negative") - - if penalty != 0.0 and bfsle: - raise ValueError("Link penalisation (`penalty` > 1.0) and `bfsle` cannot be enabled at once") - - if not bfsle and penalty <= 1.0: - raise ValueError("`penalty` must be > 1.0. `penalty=1.1` is recommended") - - if path_size_logit and (beta < 0 or theta <= 0): - raise ValueError("`beta` must be >= 0 and `theta` > 0 for path sized logit model") - - for o, d in ods: - if self.nodes_to_indices_view[o] == -1: - raise ValueError(f"Origin {o} is not present within the compact graph") - if self.nodes_to_indices_view[d] == -1: - raise ValueError(f"Destination {d} is not present within the compact graph") - - cdef: - long long origin_index, dest_index, i - unsigned int c_max_routes = max_routes - unsigned int c_max_depth = max_depth - unsigned int c_seed = seed - unsigned int c_cores = cores if cores > 0 else openmp.omp_get_num_threads() - - vector[pair[long long, long long]] c_ods - - # A* (and Dijkstra's) require memory views, so we must allocate here and take slices. Python can handle this memory - double [:, :] cost_matrix = np.empty((c_cores, self.cost_view.shape[0]), dtype=float) - long long [:, :] predecessors_matrix = np.empty((c_cores, self.num_nodes + 1), dtype=np.int64) - long long [:, :] conn_matrix = np.empty((c_cores, self.num_nodes + 1), dtype=np.int64) - long long [:, :] b_nodes_matrix = np.broadcast_to(self.b_nodes_view, (c_cores, self.b_nodes_view.shape[0])).copy() - - # This matrix is never read from, it exists to allow using the Dijkstra's method without changing the - # interface. - long long [:, :] _reached_first_matrix - - vector[RouteSet_t *] *results - size_t max_results_len, batch_len, j - - # self.a_star = a_star - - if self.a_star: - _reached_first_matrix = np.zeros((c_cores, 1), dtype=np.int64) # Dummy array to allow slicing - else: - _reached_first_matrix = np.zeros((c_cores, self.num_nodes + 1), dtype=np.int64) - - set_ods = set(ods) - if len(set_ods) != len(ods): - warnings.warn(f"Duplicate OD pairs found, dropping {len(ods) - len(set_ods)} OD pairs") - - if where is not None: - checkpoint = Checkpoint(where, self.schema, partition_cols=["origin id"]) - batches = list(Checkpoint.batches(list(set_ods))) - max_results_len = max(len(batch) for batch in batches) - else: - batches = [list(set_ods)] - max_results_len = len(set_ods) - - results = new vector[RouteSet_t *](max_results_len) - - cdef: - RouteSet_t *route_set - pair[vector[long long] *, vector[long long] *] freq_pair - vector[long long] *link_union = nullptr - vector[vector[double] *] *cost_set = nullptr - vector[vector[double] *] *gamma_set = nullptr - vector[vector[double] *] *prob_set= nullptr - - if path_size_logit: - cost_set = new vector[vector[double] *](max_results_len) - gamma_set = new vector[vector[double] *](max_results_len) - prob_set = new vector[vector[double] *](max_results_len) - - for batch in batches: - c_ods = batch # Convert the batch to a cpp vector, this isn't strictly efficient but is nicer - batch_len = c_ods.size() - results.resize(batch_len) # We know we've allocated enough size to store all max length batch but we resize to a smaller size when not needed - - if path_size_logit: - cost_set.clear() - gamma_set.clear() - prob_set.clear() - - cost_set.resize(batch_len) - gamma_set.resize(batch_len) - prob_set.resize(batch_len) - - with nogil, parallel(num_threads=c_cores): - # The link union needs to be allocated per thread as scratch space, as its of unknown length we can't allocated a matrix of them. - # Additionally getting them to be reused between batches is complicated, instead we just get a new one each batch - if path_size_logit: - link_union = new vector[long long]() - - for i in prange(batch_len): - origin_index = self.nodes_to_indices_view[c_ods[i].first] - dest_index = self.nodes_to_indices_view[c_ods[i].second] - - if self.block_flows_through_centroids: - blocking_centroid_flows( - 0, # Always blocking - origin_index, - self.zones, - self.graph_fs_view, - b_nodes_matrix[threadid()], - self.b_nodes_view, - ) - - if bfsle: - route_set = RouteChoiceSet.bfsle( - self, - origin_index, - dest_index, - c_max_routes, - c_max_depth, - cost_matrix[threadid()], - predecessors_matrix[threadid()], - conn_matrix[threadid()], - b_nodes_matrix[threadid()], - _reached_first_matrix[threadid()], - c_seed, - ) - else: - route_set = RouteChoiceSet.link_penalisation( - self, - origin_index, - dest_index, - c_max_routes, - c_max_depth, - cost_matrix[threadid()], - predecessors_matrix[threadid()], - conn_matrix[threadid()], - b_nodes_matrix[threadid()], - _reached_first_matrix[threadid()], - penalty, - c_seed, - ) - - if path_size_logit: - link_union.clear() - freq_pair = RouteChoiceSet.compute_frequency(route_set, deref(link_union)) - deref(cost_set)[i] = RouteChoiceSet.compute_cost(route_set, self.cost_view) - deref(gamma_set)[i] = RouteChoiceSet.compute_gamma(route_set, freq_pair, deref(deref(cost_set)[i]), self.cost_view) - deref(prob_set)[i] = RouteChoiceSet.compute_prob(deref(deref(cost_set)[i]), deref(deref(gamma_set)[i]), beta, theta) - del freq_pair.first - del freq_pair.second - - deref(results)[i] = route_set - - if self.block_flows_through_centroids: - blocking_centroid_flows( - 1, # Always unblocking - origin_index, - self.zones, - self.graph_fs_view, - b_nodes_matrix[threadid()], - self.b_nodes_view, - ) - - if path_size_logit: - del link_union - - table = libpa.pyarrow_wrap_table(RouteChoiceSet.make_table_from_results(c_ods, deref(results), cost_set, gamma_set, prob_set)) - - # Once we've made the table all results have been copied into some pyarrow structure, we can free our inner internal structures - if path_size_logit: - for j in range(batch_len): - del deref(cost_set)[j] - del deref(gamma_set)[j] - del deref(prob_set)[j] - - for j in range(batch_len): - for route in deref(deref(results)[j]): - del route - - if where is not None: - checkpoint.write(table) - del table - else: - break # There was only one batch anyway - - # We're done with everything now, we can free the outer internal structures - if path_size_logit: - del cost_set - del gamma_set - del prob_set - del results - - if where is None: - return table - else: - return - - @cython.initializedcheck(False) - cdef void path_find( - RouteChoiceSet self, - long origin_index, - long dest_index, - double [:] thread_cost, - long long [:] thread_predecessors, - long long [:] thread_conn, - long long [:] thread_b_nodes, - long long [:] _thread_reached_first - ) noexcept nogil: - """Small wrapper around path finding, thread locals should be passes as arguments.""" - if self.a_star: - path_finding_a_star( - origin_index, - dest_index, - thread_cost, - thread_b_nodes, - self.graph_fs_view, - self.nodes_to_indices_view, - self.lat_view, - self.lon_view, - thread_predecessors, - self.ids_graph_view, - thread_conn, - EQUIRECTANGULAR # FIXME: enum import failing due to redefinition - ) - else: - path_finding( - origin_index, - dest_index, - thread_cost, - thread_b_nodes, - self.graph_fs_view, - thread_predecessors, - self.ids_graph_view, - thread_conn, - _thread_reached_first - ) - - @cython.boundscheck(False) - @cython.wraparound(False) - @cython.embedsignature(True) - @cython.initializedcheck(False) - cdef RouteSet_t *bfsle( - RouteChoiceSet self, - long origin_index, - long dest_index, - unsigned int max_routes, - unsigned int max_depth, - double [:] thread_cost, - long long [:] thread_predecessors, - long long [:] thread_conn, - long long [:] thread_b_nodes, - long long [:] _thread_reached_first, - unsigned int seed - ) noexcept nogil: - """Main method for route set generation. See top of file for commentary.""" - cdef: - RouteSet_t *route_set - LinkSet_t removed_links - minstd_rand rng - - # Scratch objects - vector[unordered_set[long long] *] queue - vector[unordered_set[long long] *] next_queue - unordered_set[long long] *banned - unordered_set[long long] *new_banned - vector[long long] *vec - long long p, connector - - max_routes = max_routes if max_routes != 0 else UINT_MAX - max_depth = max_depth if max_depth != 0 else UINT_MAX - - queue.push_back(new unordered_set[long long]()) # Start with no edges banned - route_set = new RouteSet_t() - rng.seed(seed) - - # We'll go at most `max_depth` iterations down, at each depth we maintain a queue of the next set of banned edges to consider - for depth in range(max_depth): - if route_set.size() >= max_routes or queue.size() == 0: - break - - # If we could potentially fill the route_set after this depth, shuffle the queue - if queue.size() + route_set.size() >= max_routes: - shuffle(queue.begin(), queue.end(), rng) - - for banned in queue: - # Copying the costs back into the scratch costs buffer. We could keep track of the modifications and reverse them as well - memcpy(&thread_cost[0], &self.cost_view[0], self.cost_view.shape[0] * sizeof(double)) - - for connector in deref(banned): - thread_cost[connector] = INFINITY - - RouteChoiceSet.path_find(self, origin_index, dest_index, thread_cost, thread_predecessors, thread_conn, thread_b_nodes, _thread_reached_first) - - # Mark this set of banned links as seen - removed_links.insert(banned) - - # If the destination is reachable we must build the path and readd - if thread_predecessors[dest_index] >= 0: - vec = new vector[long long]() - # Walk the predecessors tree to find our path, we build it up in a cpp vector because we can't know how long it'll be - p = dest_index - while p != origin_index: - connector = thread_conn[p] - p = thread_predecessors[p] - vec.push_back(connector) - - for connector in deref(vec): - # This is one area for potential improvement. Here we construct a new set from the old one, copying all the elements - # then add a single element. An incremental set hash function could be of use. However, the since of this set is - # directly dependent on the current depth and as the route set size grows so incredibly fast the depth will rarely get - # high enough for this to matter. - # Copy the previously banned links, then for each vector in the path we add one and push it onto our queue - new_banned = new unordered_set[long long](deref(banned)) - new_banned.insert(connector) - # If we've already seen this set of removed links before we already know what the path is and its in our route set - if removed_links.find(new_banned) != removed_links.end(): - del new_banned - else: - next_queue.push_back(new_banned) - - # The deduplication of routes occurs here - route_set.insert(vec) - if route_set.size() >= max_routes: - break - - queue.swap(next_queue) - next_queue.clear() - - # We may have added more banned link sets to the queue then found out we hit the max depth, we should free those - for banned in queue: - del banned - - # We should also free all the sets in removed_links, we don't be needing them - for banned in removed_links: - del banned - - return route_set - - @cython.wraparound(False) - @cython.embedsignature(True) - @cython.boundscheck(False) - @cython.initializedcheck(False) - cdef RouteSet_t *link_penalisation( - RouteChoiceSet self, - long origin_index, - long dest_index, - unsigned int max_routes, - unsigned int max_depth, - double [:] thread_cost, - long long [:] thread_predecessors, - long long [:] thread_conn, - long long [:] thread_b_nodes, - long long [:] _thread_reached_first, - double penatly, - unsigned int seed - ) noexcept nogil: - cdef: - RouteSet_t *route_set - - # Scratch objects - vector[long long] *vec - long long p, connector - - max_routes = max_routes if max_routes != 0 else UINT_MAX - max_depth = max_depth if max_depth != 0 else UINT_MAX - route_set = new RouteSet_t() - memcpy(&thread_cost[0], &self.cost_view[0], self.cost_view.shape[0] * sizeof(double)) - - for depth in range(max_depth): - if route_set.size() >= max_routes: - break - - RouteChoiceSet.path_find(self, origin_index, dest_index, thread_cost, thread_predecessors, thread_conn, thread_b_nodes, _thread_reached_first) - - if thread_predecessors[dest_index] >= 0: - vec = new vector[long long]() - # Walk the predecessors tree to find our path, we build it up in a cpp vector because we can't know how long it'll be - p = dest_index - while p != origin_index: - connector = thread_conn[p] - p = thread_predecessors[p] - vec.push_back(connector) - - for connector in deref(vec): - thread_cost[connector] *= penatly - - route_set.insert(vec) - else: - break - - return route_set - - @cython.wraparound(False) - @cython.embedsignature(True) - @cython.boundscheck(False) - @cython.initializedcheck(False) - @staticmethod - cdef pair[vector[long long] *, vector[long long] *] compute_frequency(RouteSet_t *route_set, vector[long long] &link_union) noexcept nogil: - cdef: - vector[long long] *keys - vector[long long] *counts - - # Scratch objects - size_t length, count - long long link, i - - link_union.clear() - - keys = new vector[long long]() - counts = new vector[long long]() - - length = 0 - for route in deref(route_set): - length = length + route.size() - link_union.reserve(length) - - for route in deref(route_set): - link_union.insert(link_union.end(), route.begin(), route.end()) - - sort(link_union.begin(), link_union.end()) - - union_iter = link_union.begin() - while union_iter != link_union.end(): - count = 0 - link = deref(union_iter) - while link == deref(union_iter): - count = count + 1 - inc(union_iter) - - keys.push_back(link) - counts.push_back(count) - - return make_pair(keys, counts) - - @cython.wraparound(False) - @cython.embedsignature(True) - @cython.boundscheck(False) - @cython.initializedcheck(False) - @staticmethod - cdef vector[double] *compute_cost(RouteSet_t *route_set, double[:] cost_view) noexcept nogil: - cdef: - vector[double] *cost_vec - - # Scratch objects - double cost - long long link, i - - cost_vec = new vector[double]() - cost_vec.reserve(route_set.size()) - - for route in deref(route_set): - cost = 0.0 - for link in deref(route): - cost = cost + cost_view[link] - - cost_vec.push_back(cost) - - return cost_vec - - @cython.wraparound(False) - @cython.embedsignature(True) - @cython.boundscheck(False) - @cython.initializedcheck(False) - @staticmethod - cdef vector[double] *compute_gamma( - RouteSet_t *route_set, - pair[vector[long long] *, vector[long long] *] &freq_set, - vector[double] &total_cost, - double[:] cost_view - ) noexcept nogil: - """ - Notation changes: - i: j - a: link - t_a: cost_view - c_i: total_costs - A_i: route - sum_{k in R}: delta_{a,k}: freq_set - """ - cdef: - vector[double] *gamma_vec - - # Scratch objects - vector[long long].const_iterator link_iter - double gamma - long long link, j - size_t i - - gamma_vec = new vector[double]() - gamma_vec.reserve(route_set.size()) - - j = 0 - for route in deref(route_set): - gamma = 0.0 - for link in deref(route): - # We know the frequency table is ordered and contains every link in the union of the routes. - # We want to find the index of the link, and use that to look up it's frequency - link_iter = lower_bound(freq_set.first.begin(), freq_set.first.end(), link) - - gamma = gamma + cost_view[link] / deref(freq_set.second)[link_iter - freq_set.first.begin()] - - gamma_vec.push_back(gamma / total_cost[j]) - - j = j + 1 - - return gamma_vec - - @cython.wraparound(False) - @cython.embedsignature(True) - @cython.boundscheck(False) - @cython.initializedcheck(False) - @staticmethod - cdef vector[double] *compute_prob( - vector[double] &total_cost, - vector[double] &gamma_vec, - double beta, - double theta - ) noexcept nogil: - cdef: - # Scratch objects - vector[double] *prob_vec - double inv_prob - long long route_set_idx - size_t i, j - - prob_vec = new vector[double]() - prob_vec.reserve(total_cost.size()) - - # Beware when refactoring the below, the scale of the costs may cause floating point errors. Large costs will lead to NaN results - for i in range(total_cost.size()): - inv_prob = 0.0 - for j in range(total_cost.size()): - inv_prob = inv_prob + pow(gamma_vec[j] / gamma_vec[i], beta) * exp(-theta * (total_cost[j] - total_cost[i])) - - prob_vec.push_back(1.0 / inv_prob) - - return prob_vec - - @cython.wraparound(False) - @cython.embedsignature(True) - @cython.boundscheck(False) - @cython.initializedcheck(False) - @staticmethod - cdef shared_ptr[libpa.CTable] make_table_from_results( - vector[pair[long long, long long]] &ods, - vector[RouteSet_t *] &route_sets, - vector[vector[double] *] *cost_set, - vector[vector[double] *] *gamma_set, - vector[vector[double] *] *prob_set - ): - cdef: - shared_ptr[libpa.CArray] paths - shared_ptr[libpa.CArray] offsets - - libpa.CMemoryPool *pool = libpa.c_get_memory_pool() - - # Custom imports, these are declared in route_choice.pxd *not* libarrow. - CUInt32Builder *path_builder = new CUInt32Builder(pool) - CDoubleBuilder *cost_col = nullptr - CDoubleBuilder *gamma_col = nullptr - CDoubleBuilder *prob_col = nullptr - - libpa.CInt32Builder *offset_builder = new libpa.CInt32Builder(pool) # Must be Int32 *not* UInt32 - libpa.CUInt32Builder *o_col = new libpa.CUInt32Builder(pool) - libpa.CUInt32Builder *d_col = new libpa.CUInt32Builder(pool) - vector[shared_ptr[libpa.CArray]] columns - shared_ptr[libpa.CDataType] route_set_dtype = libpa.pyarrow_unwrap_data_type(RouteChoiceSet.route_set_dtype) - - libpa.CResult[shared_ptr[libpa.CArray]] route_set_results - - int offset = 0 - bint psl = (cost_set != nullptr and gamma_set != nullptr and prob_set != nullptr) - - # Origins, Destination, Route set, [Cost for route, Gamma for route, Probability for route] - columns.resize(6 if psl else 3) - - if psl: - cost_col = new CDoubleBuilder(pool) - gamma_col = new CDoubleBuilder(pool) - prob_col = new CDoubleBuilder(pool) - - for i in range(ods.size()): - cost_col.AppendValues(deref(deref(cost_set)[i])) - gamma_col.AppendValues(deref(deref(gamma_set)[i])) - prob_col.AppendValues(deref(deref(prob_set)[i])) - - for i in range(ods.size()): - route_set = route_sets[i] - - # Instead of construction a "list of lists" style object for storing the route sets we instead will construct one big array of link ids - # with a corresponding offsets array that indicates where each new row (path) starts. - for route in deref(route_set): - o_col.Append(ods[i].first) - d_col.Append(ods[i].second) - - offset_builder.Append(offset) - path_builder.AppendValues(route.crbegin(), route.crend()) - - offset += route.size() - - path_builder.Finish(&paths) - - offset_builder.Append(offset) # Mark the end of the array in offsets - offset_builder.Finish(&offsets) - - route_set_results = libpa.CListArray.FromArraysAndType(route_set_dtype, deref(offsets.get()), deref(paths.get()), pool, shared_ptr[libpa.CBuffer]()) - - o_col.Finish(&columns[0]) - d_col.Finish(&columns[1]) - columns[2] = deref(route_set_results) - - if psl: - cost_col.Finish(&columns[3]) - gamma_col.Finish(&columns[4]) - prob_col.Finish(&columns[5]) - - cdef shared_ptr[libpa.CSchema] schema = libpa.pyarrow_unwrap_schema(RouteChoiceSet.psl_schema if psl else RouteChoiceSet.schema) - cdef shared_ptr[libpa.CTable] table = libpa.CTable.MakeFromArrays(schema, columns) - - del path_builder - del offset_builder - del o_col - del d_col - - if psl: - del cost_col - del gamma_col - del prob_col - - return table - - -@cython.embedsignature(True) -cdef class Checkpoint: - """ - A small wrapper class to write a dataset partition by partition - """ - - def __init__(self, where, schema, partition_cols = None): - """Python level init, may be called multiple times, for things that can't be done in __cinit__.""" - self.where = pathlib.Path(where) - self.schema = schema - self.partition_cols = partition_cols - - def write(self, table): - logger = logging.getLogger("aequilibrae") - pq.write_to_dataset( - table, - self.where, - partitioning=self.partition_cols, - partitioning_flavor="hive", - schema=self.schema, - use_threads=True, - existing_data_behavior="overwrite_or_ignore", - file_visitor=lambda written_file: logger.info(f"Wrote partition dataset at {written_file.path}") - ) - - def read_dataset(self): - return pa.dataset.dataset(self.where, format="parquet", partitioning=pa.dataset.HivePartitioning(self.schema)) - - @staticmethod - def batches(ods: List[Tuple[int, int]]): - return (list(g) for k, g in itertools.groupby(sorted(ods), key=lambda x: x[0])) diff --git a/aequilibrae/paths/route_choice.pxd b/aequilibrae/paths/route_choice_set.pxd similarity index 78% rename from aequilibrae/paths/route_choice.pxd rename to aequilibrae/paths/route_choice_set.pxd index bf4671d47..85f675972 100644 --- a/aequilibrae/paths/route_choice.pxd +++ b/aequilibrae/paths/route_choice_set.pxd @@ -1,5 +1,7 @@ # cython: language_level=3str from aequilibrae.paths.results import PathResults +from aequilibrae.matrix.sparse_matrix cimport COO + from libcpp.vector cimport vector from libcpp.unordered_set cimport unordered_set from libcpp.unordered_map cimport unordered_map @@ -43,7 +45,7 @@ cdef extern from "" namespace "std" nogil: cdef extern from "" namespace "std" nogil: pair[T, U] make_pair[T, U](T&& t, U&& u) -# To define our own hashing functions we have to write a little cpp. The string is inlined directly into route_choice.cpp +# To define our own hashing functions we have to write a little C++. The string is inlined directly into route_choice.cpp # To make Cython aware of our hash types we also must declare them with the right signatures # # OrderedVectorPointerHasher: This hash function is for hashing the routes, thus it should be order *DEPENDENT*. @@ -55,8 +57,8 @@ cdef extern from "" namespace "std" nogil: # New hash functions and their use in authentication and set equality # https://doi.org/10.1016/0022-0000(81)90033-7 # -# PointerDereferenceEqualTo: Because we are storing and hashing the pointers to objects to avoid unnessecary copies we must -# define our own comparitor to resolve hash collisions. Without this equaility operator the bare pointers are compared. +# PointerDereferenceEqualTo: Because we are storing and hashing the pointers to objects to avoid unnecessary copies we must +# define our own comparator to resolve hash collisions. Without this equality operator the bare pointers are compared. cdef extern from * nogil: """ // Source: https://stackoverflow.com/a/72073933 @@ -104,7 +106,7 @@ cdef extern from * nogil: bool operator()(const T& lhs, const T& rhs) const -# For typing (haha) convenince, the types names are getting long +# For typing (haha) convenience, the types names are getting long ctypedef unordered_set[vector[long long] *, OrderedVectorPointerHasher, PointerDereferenceEqualTo[vector[long long] *]] RouteSet_t ctypedef unordered_set[unordered_set[long long] *, UnorderedSetPointerHasher, PointerDereferenceEqualTo[unordered_set[long long] *]] LinkSet_t ctypedef vector[pair[unordered_set[long long] *, vector[long long] *]] RouteMap_t @@ -118,6 +120,7 @@ cdef extern from "arrow/builder.h" namespace "arrow" nogil: libpa.CStatus Append(const uint32_t value) libpa.CStatus AppendValues(const vector[uint32_t] &values) libpa.CStatus AppendValues(vector[uint32_t].const_reverse_iterator values_begin, vector[uint32_t].const_reverse_iterator values_end) + libpa.CStatus AppendValues(const uint32_t *values, int64_t length, const uint8_t *valid_bytes = nullptr) cdef cppclass CDoubleBuilder" arrow::DoubleBuilder"(libpa.CArrayBuilder): CDoubleBuilder(libpa.CMemoryPool* pool) @@ -134,12 +137,26 @@ cdef class RouteChoiceSet: double [:] lat_view double [:] lon_view long long [:] ids_graph_view + long long [:] graph_compressed_id_view long long [:] compressed_link_ids long long num_nodes + long long num_links long long zones bint block_flows_through_centroids bint a_star + vector[pair[long long, long long]] *ods + vector[RouteSet_t *] *results + vector[vector[long long] *] *link_union_set + vector[vector[double] *] *cost_set + vector[vector[double] *] *path_overlap_set + vector[vector[double] *] *prob_set + + unsigned int [:] mapping_idx + unsigned int [:] mapping_data + + cdef void deallocate(RouteChoiceSet self) nogil + cdef void path_find( RouteChoiceSet self, long origin_index, @@ -157,11 +174,13 @@ cdef class RouteChoiceSet: long dest_index, unsigned int max_routes, unsigned int max_depth, + unsigned int max_misses, double [:] thread_cost, long long [:] thread_predecessors, long long [:] thread_conn, long long [:] thread_b_nodes, long long [:] _thread_reached_first, + double penatly, unsigned int seed ) noexcept nogil @@ -171,6 +190,7 @@ cdef class RouteChoiceSet: long dest_index, unsigned int max_routes, unsigned int max_depth, + unsigned int max_misses, double [:] thread_cost, long long [:] thread_predecessors, long long [:] thread_conn, @@ -181,13 +201,13 @@ cdef class RouteChoiceSet: ) noexcept nogil @staticmethod - cdef pair[vector[long long] *, vector[long long] *] compute_frequency(RouteSet_t *route_set, vector[long long] &link_union) noexcept nogil + cdef pair[vector[long long] *, vector[long long] *] compute_frequency(RouteSet_t *route_set) noexcept nogil @staticmethod cdef vector[double] *compute_cost(RouteSet_t *route_sets, double[:] cost_view) noexcept nogil @staticmethod - cdef vector[double] *compute_gamma( + cdef vector[double] *compute_path_overlap( RouteSet_t *route_set, pair[vector[long long] *, vector[long long] *] &freq_set, vector[double] &total_cost, @@ -197,17 +217,38 @@ cdef class RouteChoiceSet: @staticmethod cdef vector[double] *compute_prob( vector[double] &total_cost, - vector[double] &gamma_vec, + vector[double] &path_overlap_vec, double beta, - double theta + double theta, + double cutoff_prob ) noexcept nogil @staticmethod + cdef vector[vector[double] *] *compute_path_files( + vector[pair[long long, long long]] &ods, + vector[RouteSet_t *] &results, + vector[vector[long long] *] &link_union_set, + vector[vector[double] *] &prob_set, + unsigned int cores + ) noexcept nogil + + cdef vector[double] *apply_link_loading(RouteChoiceSet self, double[:, :] matrix_view) noexcept nogil + cdef vector[double] *apply_link_loading_from_path_files(RouteChoiceSet self, double[:, :] matrix_view, vector[vector[double] *] &path_files) noexcept nogil + cdef apply_link_loading_func(RouteChoiceSet self, vector[double] *ll, int cores) + + cdef vector[double] *apply_select_link_loading( + RouteChoiceSet self, + COO sparse_mat, + double[:, :] matrix_view, + unordered_set[long] &select_link_set + ) noexcept nogil + cdef shared_ptr[libpa.CTable] make_table_from_results( + RouteChoiceSet self, vector[pair[long long, long long]] &ods, vector[RouteSet_t *] &route_sets, vector[vector[double] *] *cost_set, - vector[vector[double] *] *gamma_set, + vector[vector[double] *] *path_overlap_set, vector[vector[double] *] *prob_set ) diff --git a/aequilibrae/paths/route_choice_set.pyx b/aequilibrae/paths/route_choice_set.pyx new file mode 100644 index 000000000..a726724f8 --- /dev/null +++ b/aequilibrae/paths/route_choice_set.pyx @@ -0,0 +1,1463 @@ +# cython: language_level=3str + +from aequilibrae.paths.graph import Graph +from aequilibrae.matrix import AequilibraeMatrix +from aequilibrae.matrix.sparse_matrix cimport COO + +from cython.operator cimport dereference as deref +from cython.operator cimport preincrement as inc +from cython.parallel cimport parallel, prange, threadid +from libc.limits cimport UINT_MAX +from libc.math cimport INFINITY, exp, pow, log +from libc.stdlib cimport abort +from libc.string cimport memcpy +from libcpp cimport nullptr +from libcpp.algorithm cimport lower_bound, reverse, sort, copy, min_element +from libcpp.unordered_map cimport unordered_map +from libcpp.unordered_set cimport unordered_set +from libcpp.utility cimport pair +from libcpp.vector cimport vector +from openmp cimport omp_get_max_threads + +from libc.stdio cimport fprintf, stderr + +import itertools +import logging +import pathlib +import warnings +from typing import List, Tuple + +import numpy as np +import pyarrow as pa +import pyarrow.dataset +import pyarrow.parquet as pq + +cimport numpy as np # Numpy *must* be cimport'd BEFORE pyarrow.lib, there's nothing quite like Cython. +cimport pyarrow as pa +cimport pyarrow.lib as libpa + +"""This module aims to implemented the BFS-LE algorithm as described in Rieser-Schüssler, Balmer, and Axhausen, 'Route +Choice Sets for Very High-Resolution Data'. https://doi.org/10.1080/18128602.2012.671383 + +A rough overview of the algorithm is as follows. 1. Prepare the initial graph, this is depth 0 with no links removed. + 2. Find a short path, P. If P is not empty add P to the path set. 3. For all links p in P, remove p from E, + compounding with the previously removed links. 4. De-duplicate the sub-graphs, we only care about unique + sub-graphs. 5. Go to 2. + +Details: The general idea of the algorithm is pretty simple, as is the implementation. The caveats here is that there is +a lot of cpp interop and memory management. A description of the purpose of variables is in order: + +route_set: See route_choice.pxd for full type signature. It's an unordered set (hash set) of pointers to vectors of link +IDs. It uses a custom hashing function and comparator. The hashing function is defined in a string that in inlined +directly into the output ccp. This is done allow declaring of the `()` operator, which is required and AFAIK not +possible in Cython. The hash is designed to dereference then hash order dependent vectors. One isn't provided by +stdlib. The comparator simply dereferences the pointer and uses the vector comparator. It's designed to store the +outputted paths. Heap allocated (needs to be returned). + +removed_links: See route_choice.pxd for full type signature. It's an unordered set of pointers to unordered sets of link +IDs. Similarly to `route_set` is uses a custom hash function and comparator. This hash function is designed to be order +independent and should only use commutative operations. The comparator is the same. It's designed to store all of the +removed link sets we've seen before. This allows us to detected duplicated graphs. + +rng: A custom imported version of std::linear_congruential_engine. libcpp doesn't provide one so we do. It should be +significantly faster than the std::mersenne_twister_engine without sacrificing much. We don't need amazing RNG, just ok +and fast. This is only used to shuffle the queue. + +queue, next_queue: These are vectors of pointers to sets of removed links. We never need to push to the front of these +so a vector is best. We maintain two queues, one that we are currently iterating over, and one that we can add to, +building up with all the newly removed link sets. These two are swapped at the end of an iteration, next_queue is then +cleared. These store sets of removed links. + +banned, next_banned: `banned` is the iterator variable for `queue`. `banned` is copied into `next_banned` where another +link can be added without mutating `banned`. If we've already seen this set of removed links `next_banned` is +immediately deallocated. Otherwise it's placed into `next_queue`. + +vec: `vec` is a scratch variable to store pointers to new vectors, or rather, paths while we are building them. Each +time a path is found a new one is allocated, built, and stored in the route_set. + +p, connector: Scratch variables for iteration. + +Optimisations: As described in the paper, both optimisations have been implemented. The path finding operates on the +compressed graph and the queue is shuffled if its possible to fill the route set that iteration. The route set may not +be filled due to duplicate paths but we can't know that in advance so we shuffle anyway. + +Any further optimisations should focus on the path finding, from benchmarks it dominates the run time (~98%). Since huge +routes aren't required small-ish things like the memcpy and banned link set copy aren't high priority. + +""" + +# It would really be nice if these were modules. The 'include' syntax is long deprecated and adds a lot to compilation +# times +include 'basic_path_finding.pyx' +include 'parallel_numpy.pyx' + + +@cython.embedsignature(True) +cdef class RouteChoiceSet: + """ + Route choice implemented via breadth first search with link removal (BFS-LE) as described in Rieser-Schüssler, + Balmer, and Axhausen, 'Route Choice Sets for Very High-Resolution Data' + """ + + route_set_dtype = pa.list_(pa.uint32()) + + schema = pa.schema([ + pa.field("origin id", pa.uint32(), nullable=False), + pa.field("destination id", pa.uint32(), nullable=False), + pa.field("route set", route_set_dtype, nullable=False), + ]) + + psl_schema = pa.schema([ + pa.field("origin id", pa.uint32(), nullable=False), + pa.field("destination id", pa.uint32(), nullable=False), + pa.field("route set", route_set_dtype, nullable=False), + pa.field("cost", pa.float64(), nullable=False), + pa.field("path overlap", pa.float64(), nullable=False), + pa.field("probability", pa.float64(), nullable=False), + ]) + + def __cinit__(self): + """C level init. For C memory allocation and initialisation. Called exactly once per object.""" + results = nullptr + link_union_set = nullptr + cost_set = nullptr + path_overlap_set = nullptr + prob_set = nullptr + ods = nullptr + + def __init__(self, graph: Graph): + """Python level init, may be called multiple times, for things that can't be done in __cinit__.""" + # self.heuristic = HEURISTIC_MAP[self.res._heuristic] + self.cost_view = graph.compact_cost + self.graph_fs_view = graph.compact_fs + self.b_nodes_view = graph.compact_graph.b_node.values + self.nodes_to_indices_view = graph.compact_nodes_to_indices + + # tmp = graph.lonlat_index.loc[graph.compact_all_nodes] + # self.lat_view = tmp.lat.values + # self.lon_view = tmp.lon.values + self.a_star = False + + self.ids_graph_view = graph.compact_graph.id.values + self.graph_compressed_id_view = graph.graph.__compressed_id__.values + self.num_nodes = graph.compact_num_nodes + self.num_links = graph.compact_num_links + self.zones = graph.num_zones + self.block_flows_through_centroids = graph.block_centroid_flows + + self.mapping_idx, self.mapping_data, _ = graph.create_compressed_link_network_mapping() + + def __dealloc__(self): + """ + C level deallocation. For freeing memory allocated by this object. *Must* have GIL, `self` may be in a + partially deallocated state already. Do not call any other Python method. + """ + self.deallocate() + + cdef void deallocate(RouteChoiceSet self) nogil: + """__dealloc__ cannot be called from normal code.""" + cdef: + RouteSet_t *route_set + vector[long long] *link_vec + vector[double] *double_vec + + if self.results != nullptr: + for route_set in deref(self.results): + for link_vec in deref(route_set): + del link_vec + del route_set + del self.results + self.results = nullptr + + if self.link_union_set != nullptr: + for link_vec in deref(self.link_union_set): + del link_vec + del self.link_union_set + self.link_union_set = nullptr + + if self.cost_set != nullptr: + for double_vec in deref(self.cost_set): + del double_vec + del self.cost_set + self.cost_set = nullptr + + if self.path_overlap_set != nullptr: + for double_vec in deref(self.path_overlap_set): + del double_vec + del self.path_overlap_set + self.path_overlap_set = nullptr + + if self.prob_set != nullptr: + for double_vec in deref(self.prob_set): + del double_vec + del self.prob_set + self.prob_set = nullptr + + if self.ods != nullptr: + del self.ods + self.ods = prob_set = nullptr + + @cython.embedsignature(True) + def run(self, origin: int, destination: int, *args, **kwargs): + """Compute the a route set for a single OD pair. + + Often the returned list's length is ``max_routes``, however, it may be limited by ``max_depth`` or if all + unique possible paths have been found then a smaller set will be returned. + + Additional arguments are forwarded to ``RouteChoiceSet.batched``. + + :Arguments: + **origin** (:obj:`int`): Origin node ID. Must be present within compact graph. Recommended to choose a + centroid. + **destination** (:obj:`int`): Destination node ID. Must be present within compact graph. Recommended to + choose a centroid. + + :Returns: **route set** (:obj:`list[tuple[int, ...]]): Returns a list of unique variable length tuples of + link IDs. Represents paths from ``origin`` to ``destination``. + """ + self.batched([(origin, destination)], *args, **kwargs) + where = kwargs.get("where", None) + if where is not None: + schema = self.psl_schema if kwargs.get("path_size_logit", False) else self.schema + results = pa.dataset.dataset( + where, format="parquet", partitioning=pa.dataset.HivePartitioning(schema) + ).to_table() + else: + results = self.get_results() + return [tuple(x) for x in results.column("route set").to_pylist()] + + # Bounds checking doesn't really need to be disabled here but the warning is annoying + @cython.boundscheck(False) + @cython.wraparound(False) + @cython.embedsignature(True) + @cython.initializedcheck(False) + def batched( + self, + ods: List[Tuple[int, int]], + max_routes: int = 0, + max_depth: int = 0, + max_misses: int = 100, + seed: int = 0, + cores: int = 0, + a_star: bool = True, + bfsle: bool = True, + penalty: float = 1.0, + where: Optional[str] = None, + path_size_logit: bool = False, + beta: float = 1.0, + theta: float = 1.0, + cutoff_prob: float = 1.0, + ): + """Compute the a route set for a list of OD pairs. + + Often the returned list for each OD pair's length is ``max_routes``, however, it may be limited by ``max_depth`` + or if all unique possible paths have been found then a smaller set will be returned. + + :Arguments: + **ods** (:obj:`list[tuple[int, int]]`): List of OD pairs ``(origin, destination)``. Origin and destination + node ID must be present within compact graph. Recommended to choose a centroids. + **max_routes** (:obj:`int`): Maximum size of the generated route set. Must be non-negative. Default of + ``0`` for unlimited. + **max_depth** (:obj:`int`): Maximum depth BFSLE can explore, or maximum number of iterations for link + penalisation. Must be non-negative. Default of ``0`` for unlimited. + **max_misses** (:obj:`int`): Maximum number of collective duplicate routes found for a single OD pair. + Terminates if exceeded. + **seed** (:obj:`int`): Seed used for rng. Must be non-negative. Default of ``0``. + **cores** (:obj:`int`): Number of cores to use when parallelising over OD pairs. Must be non-negative. + Default of ``0`` for all available. + **bfsle** (:obj:`bool`): Whether to use Breadth First Search with Link Removal (BFSLE) over link + penalisation. Default ``True``. + **penalty** (:obj:`float`): Penalty to use for Link Penalisation and BFSLE with LP. + **where** (:obj:`str`): Optional file path to save results to immediately. Will return None. + """ + cdef: + long long o, d + + if max_routes == 0 and max_depth == 0: + raise ValueError("Either `max_routes` or `max_depth` must be > 0") + + if max_routes < 0 or max_depth < 0: + raise ValueError("`max_routes`, `max_depth`, and `cores` must be non-negative") + + if path_size_logit and (beta < 0 or theta <= 0): + raise ValueError("`beta` must be >= 0 and `theta` > 0 for path sized logit model") + + if path_size_logit and not 0.0 <= cutoff_prob <= 1.0: + raise ValueError("`cutoff_prob` must be 0 <= `cutoff_prob` <= 1 for path sized logit model") + + for o, d in ods: + if self.nodes_to_indices_view[o] == -1: + raise ValueError(f"Origin {o} is not present within the compact graph") + if self.nodes_to_indices_view[d] == -1: + raise ValueError(f"Destination {d} is not present within the compact graph") + + cdef: + long long origin_index, dest_index, i + unsigned int c_max_routes = max_routes + unsigned int c_max_depth = max_depth + unsigned int c_max_misses = max_misses + unsigned int c_seed = seed + unsigned int c_cores = cores if cores > 0 else omp_get_max_threads() + + # Scale cutoff prob from [0, 1] -> [0.5, 1]. Values below 0.5 produce negative inverse binary logit values. + double scaled_cutoff_prob = cutoff_prob * 0.5 + 0.5 + + vector[pair[long long, long long]] c_ods + + # A* (and Dijkstra's) require memory views, so we must allocate here and take slices. Python can handle this + # memory + double [:, :] cost_matrix = np.empty((c_cores, self.cost_view.shape[0]), dtype=float) + long long [:, :] predecessors_matrix = np.empty((c_cores, self.num_nodes + 1), dtype=np.int64) + long long [:, :] conn_matrix = np.empty((c_cores, self.num_nodes + 1), dtype=np.int64) + long long [:, :] b_nodes_matrix = np.broadcast_to( + self.b_nodes_view, + (c_cores, self.b_nodes_view.shape[0]) + ).copy() + + # This matrix is never read from, it exists to allow using the Dijkstra's method without changing the + # interface. + long long [:, :] _reached_first_matrix + + vector[RouteSet_t *] *results + size_t max_results_len, batch_len, j + + # self.a_star = a_star + + pa.set_io_thread_count(c_cores) + + if self.a_star: + _reached_first_matrix = np.zeros((c_cores, 1), dtype=np.int64) # Dummy array to allow slicing + else: + _reached_first_matrix = np.zeros((c_cores, self.num_nodes + 1), dtype=np.int64) + + set_ods = set(ods) + if len(set_ods) != len(ods): + warnings.warn(f"Duplicate OD pairs found, dropping {len(ods) - len(set_ods)} OD pairs") + + if where is not None: + checkpoint = Checkpoint( + where, + self.psl_schema if path_size_logit else self.schema, partition_cols=["origin id"] + ) + batches = list(Checkpoint.batches(list(set_ods))) + max_results_len = max(len(batch) for batch in batches) + else: + batches = [list(set_ods)] + max_results_len = len(set_ods) + + results = new vector[RouteSet_t *](max_results_len) + + cdef: + RouteSet_t *route_set + pair[vector[long long] *, vector[long long] *] freq_pair + vector[vector[long long] *] *link_union_set = nullptr + vector[vector[double] *] *cost_set = nullptr + vector[vector[double] *] *path_overlap_set = nullptr + vector[vector[double] *] *prob_set = nullptr + + if path_size_logit: + link_union_set = new vector[vector[long long] *](max_results_len) + cost_set = new vector[vector[double] *](max_results_len) + path_overlap_set = new vector[vector[double] *](max_results_len) + prob_set = new vector[vector[double] *](max_results_len) + + self.deallocate() # We may be storing results from a previous run + + for batch in batches: + c_ods = batch # Convert the batch to a C++ vector, this isn't strictly efficient but is nicer + batch_len = c_ods.size() + # We know we've allocated enough size to store all max length batch but we resize to a smaller size when not + # needed + results.resize(batch_len) + + if path_size_logit: + # We may clear these objects because it's either: + # - the first iteration and they contain no elements, thus no memory to leak + # - the internal objects were freed by the previous iteration + link_union_set.clear() + cost_set.clear() + path_overlap_set.clear() + prob_set.clear() + + link_union_set.resize(batch_len) + cost_set.resize(batch_len) + path_overlap_set.resize(batch_len) + prob_set.resize(batch_len) + + with nogil, parallel(num_threads=c_cores): + for i in prange(batch_len): + origin_index = self.nodes_to_indices_view[c_ods[i].first] + dest_index = self.nodes_to_indices_view[c_ods[i].second] + + if self.block_flows_through_centroids: + blocking_centroid_flows( + 0, # Always blocking + origin_index, + self.zones, + self.graph_fs_view, + b_nodes_matrix[threadid()], + self.b_nodes_view, + ) + + if bfsle: + route_set = RouteChoiceSet.bfsle( + self, + origin_index, + dest_index, + c_max_routes, + c_max_depth, + c_max_misses, + cost_matrix[threadid()], + predecessors_matrix[threadid()], + conn_matrix[threadid()], + b_nodes_matrix[threadid()], + _reached_first_matrix[threadid()], + penalty, + c_seed, + ) + else: + route_set = RouteChoiceSet.link_penalisation( + self, + origin_index, + dest_index, + c_max_routes, + c_max_depth, + c_max_misses, + cost_matrix[threadid()], + predecessors_matrix[threadid()], + conn_matrix[threadid()], + b_nodes_matrix[threadid()], + _reached_first_matrix[threadid()], + penalty, + c_seed, + ) + + if path_size_logit: + freq_pair = RouteChoiceSet.compute_frequency(route_set) + deref(link_union_set)[i] = freq_pair.first + deref(cost_set)[i] = RouteChoiceSet.compute_cost(route_set, self.cost_view) + deref(path_overlap_set)[i] = RouteChoiceSet.compute_path_overlap( + route_set, + freq_pair, + deref(deref(cost_set)[i]), + self.cost_view + ) + deref(prob_set)[i] = RouteChoiceSet.compute_prob( + deref(deref(cost_set)[i]), + deref(deref(path_overlap_set)[i]), + beta, + theta, + scaled_cutoff_prob + ) + # While we need the unique sorted links (.first), we don't need the frequencies (.second) + del freq_pair.second + + deref(results)[i] = route_set + + if self.block_flows_through_centroids: + blocking_centroid_flows( + 1, # Always unblocking + origin_index, + self.zones, + self.graph_fs_view, + b_nodes_matrix[threadid()], + self.b_nodes_view, + ) + + if where is not None: + table = libpa.pyarrow_wrap_table( + self.make_table_from_results(c_ods, deref(results), cost_set, path_overlap_set, prob_set) + ) + + # Once we've made the table all results have been copied into some pyarrow structure, we can free our + # inner internal structures + if path_size_logit: + for j in range(batch_len): + del deref(link_union_set)[j] + del deref(cost_set)[j] + del deref(path_overlap_set)[j] + del deref(prob_set)[j] + + for j in range(batch_len): + for route in deref(deref(results)[j]): + del route + del deref(results)[j] + + checkpoint.write(table) + del table + else: + # where is None implies len(batches) == 1, i.e. there was only one batch and we should keep everything + # in memory + pass + + # Here we decide if we wish to preserve our results for later saving/link loading + if where is not None: + # We're done with everything now, we can free the outer internal structures + del results + if path_size_logit: + del link_union_set + del cost_set + del path_overlap_set + del prob_set + else: + self.results = results + self.link_union_set = link_union_set + self.cost_set = cost_set + self.path_overlap_set = path_overlap_set + self.prob_set = prob_set + + # Copy the c_ods vector, it was provided by the auto Cython conversion and is allocated on the stack, + # we should copy it to keep it around + self.ods = new vector[pair[long long, long long]](c_ods) + + @cython.initializedcheck(False) + cdef void path_find( + RouteChoiceSet self, + long origin_index, + long dest_index, + double [:] thread_cost, + long long [:] thread_predecessors, + long long [:] thread_conn, + long long [:] thread_b_nodes, + long long [:] _thread_reached_first + ) noexcept nogil: + """Small wrapper around path finding, thread locals should be passes as arguments.""" + if self.a_star: + path_finding_a_star( + origin_index, + dest_index, + thread_cost, + thread_b_nodes, + self.graph_fs_view, + self.nodes_to_indices_view, + self.lat_view, + self.lon_view, + thread_predecessors, + self.ids_graph_view, + thread_conn, + EQUIRECTANGULAR # FIXME: enum import failing due to redefinition + ) + else: + path_finding( + origin_index, + dest_index, + thread_cost, + thread_b_nodes, + self.graph_fs_view, + thread_predecessors, + self.ids_graph_view, + thread_conn, + _thread_reached_first + ) + + @cython.boundscheck(False) + @cython.wraparound(False) + @cython.embedsignature(True) + @cython.initializedcheck(False) + cdef RouteSet_t *bfsle( + RouteChoiceSet self, + long origin_index, + long dest_index, + unsigned int max_routes, + unsigned int max_depth, + unsigned int max_misses, + double [:] thread_cost, + long long [:] thread_predecessors, + long long [:] thread_conn, + long long [:] thread_b_nodes, + long long [:] _thread_reached_first, + double penatly, + unsigned int seed + ) noexcept nogil: + """Main method for route set generation. See top of file for commentary.""" + cdef: + # Output + RouteSet_t *route_set + + # Scratch objects + LinkSet_t removed_links + minstd_rand rng + + # These objects are juggled to prevent more allocations than necessary + vector[unordered_set[long long] *] queue + vector[unordered_set[long long] *] next_queue + unordered_set[long long] *banned + unordered_set[long long] *new_banned + + # Local variables, Cython doesn't allow conditional declarations + vector[long long] *vec + pair[RouteSet_t.iterator, bool] status + unsigned int miss_count = 0 + long long p, connector + + # Link penalisation, only used when penalty != 1.0 + bint lp = penatly != 1.0 + vector[double] *penalised_cost = nullptr + vector[double] *next_penalised_cost = nullptr + + max_routes = max_routes if max_routes != 0 else UINT_MAX + max_depth = max_depth if max_depth != 0 else UINT_MAX + + queue.push_back(new unordered_set[long long]()) # Start with no edges banned + route_set = new RouteSet_t() + rng.seed(seed) + + if lp: + # Although we don't need the dynamic ability of vectors here, Cython doesn't have the std::array module. + penalised_cost = new vector[double](self.cost_view.shape[0]) + next_penalised_cost = new vector[double](self.cost_view.shape[0]) + copy(&self.cost_view[0], &self.cost_view[0] + self.cost_view.shape[0], penalised_cost.begin()) + copy(&self.cost_view[0], &self.cost_view[0] + self.cost_view.shape[0], next_penalised_cost.begin()) + + # We'll go at most `max_depth` iterations down, at each depth we maintain a queue of the next set of banned + # edges to consider + for depth in range(max_depth): + if miss_count > max_misses or route_set.size() >= max_routes or queue.size() == 0: + break + + # If we could potentially fill the route_set after this depth, shuffle the queue + if queue.size() + route_set.size() >= max_routes: + shuffle(queue.begin(), queue.end(), rng) + + for banned in queue: + if lp: + # We copy the penalised cost buffer into the thread cost buffer to allow us to apply link penalisation, + copy(penalised_cost.cbegin(), penalised_cost.cend(), &thread_cost[0]) + else: + # ...otherwise we just copy directly from the cost view. + memcpy(&thread_cost[0], &self.cost_view[0], self.cost_view.shape[0] * sizeof(double)) + + for connector in deref(banned): + thread_cost[connector] = INFINITY + + RouteChoiceSet.path_find( + self, + origin_index, + dest_index, + thread_cost, + thread_predecessors, + thread_conn, + thread_b_nodes, + _thread_reached_first + ) + + # Mark this set of banned links as seen + removed_links.insert(banned) + + # If the destination is reachable we must build the path and readd + if thread_predecessors[dest_index] >= 0: + vec = new vector[long long]() + # Walk the predecessors tree to find our path, we build it up in a C++ vector because we can't know + # how long it'll be + p = dest_index + while p != origin_index: + connector = thread_conn[p] + p = thread_predecessors[p] + vec.push_back(connector) + + if lp: + # Here we penalise all seen links for the *next* depth. If we penalised on the current depth + # then we would introduce a bias for earlier seen paths + for connector in deref(vec): + # *= does not work + deref(next_penalised_cost)[connector] = penatly * deref(next_penalised_cost)[connector] + + reverse(vec.begin(), vec.end()) + + for connector in deref(vec): + # This is one area for potential improvement. Here we construct a new set from the old one, + # copying all the elements then add a single element. An incremental set hash function could be + # of use. However, the since of this set is directly dependent on the current depth and as the + # route set size grows so incredibly fast the depth will rarely get high enough for this to + # matter. Copy the previously banned links, then for each vector in the path we add one and + # push it onto our queue + new_banned = new unordered_set[long long](deref(banned)) + new_banned.insert(connector) + # If we've already seen this set of removed links before we already know what the path is and + # its in our route set + if removed_links.find(new_banned) != removed_links.end(): + del new_banned + else: + next_queue.push_back(new_banned) + + # The deduplication of routes occurs here + status = route_set.insert(vec) + miss_count = miss_count + (not status.second) + if miss_count > max_misses or route_set.size() >= max_routes: + break + + queue.swap(next_queue) + next_queue.clear() + + if lp: + # Update the penalised_cost vector, since next_penalised_cost is always the one updated we just need to + # bring penalised_cost up to date. + copy(next_penalised_cost.cbegin(), next_penalised_cost.cend(), penalised_cost.begin()) + + # We may have added more banned link sets to the queue then found out we hit the max depth, we should free those + for banned in queue: + del banned + + # We should also free all the sets in removed_links, we don't be needing them + for banned in removed_links: + del banned + + if lp: + # If we had enabled link penalisation, we'll need to free those vectors as well + del penalised_cost + del next_penalised_cost + + return route_set + + @cython.wraparound(False) + @cython.embedsignature(True) + @cython.boundscheck(False) + @cython.initializedcheck(False) + cdef RouteSet_t *link_penalisation( + RouteChoiceSet self, + long origin_index, + long dest_index, + unsigned int max_routes, + unsigned int max_depth, + unsigned int max_misses, + double [:] thread_cost, + long long [:] thread_predecessors, + long long [:] thread_conn, + long long [:] thread_b_nodes, + long long [:] _thread_reached_first, + double penatly, + unsigned int seed + ) noexcept nogil: + """Link penalisation algorithm for choice set generation.""" + cdef: + RouteSet_t *route_set + + # Scratch objects + vector[long long] *vec + long long p, connector + pair[RouteSet_t.iterator, bool] status + unsigned int miss_count = 0 + + max_routes = max_routes if max_routes != 0 else UINT_MAX + max_depth = max_depth if max_depth != 0 else UINT_MAX + route_set = new RouteSet_t() + memcpy(&thread_cost[0], &self.cost_view[0], self.cost_view.shape[0] * sizeof(double)) + + for depth in range(max_depth): + if route_set.size() >= max_routes: + break + + RouteChoiceSet.path_find( + self, + origin_index, + dest_index, + thread_cost, + thread_predecessors, + thread_conn, + thread_b_nodes, + _thread_reached_first + ) + + if thread_predecessors[dest_index] >= 0: + vec = new vector[long long]() + # Walk the predecessors tree to find our path, we build it up in a C++ vector because we can't know how + # long it'll be + p = dest_index + while p != origin_index: + connector = thread_conn[p] + p = thread_predecessors[p] + vec.push_back(connector) + + for connector in deref(vec): + thread_cost[connector] = penatly * thread_cost[connector] + + reverse(vec.begin(), vec.end()) + + # To prevent runaway algorithms if we find N duplicate routes we should stop + status = route_set.insert(vec) + miss_count = miss_count + (not status.second) + if miss_count > max_misses: + break + else: + break + + return route_set + + @cython.wraparound(False) + @cython.embedsignature(True) + @cython.boundscheck(False) + @cython.initializedcheck(False) + @staticmethod + cdef pair[vector[long long] *, vector[long long] *] compute_frequency(RouteSet_t *route_set) noexcept nogil: + """ + Compute a frequency map for each route. + + Each node at index i in the first returned vector has frequency at index i in the second vector. + """ + cdef: + vector[long long] *keys + vector[long long] *counts + vector[long long] link_union + vector[long long].const_iterator union_iter + vector[long long] *route + + # Scratch objects + size_t length, count + long long link, i + + keys = new vector[long long]() + counts = new vector[long long]() + + length = 0 + for route in deref(route_set): + length = length + route.size() + link_union.reserve(length) + + for route in deref(route_set): + link_union.insert(link_union.end(), route.begin(), route.end()) + + sort(link_union.begin(), link_union.end()) + + union_iter = link_union.cbegin() + while union_iter != link_union.cend(): + count = 0 + link = deref(union_iter) + while link == deref(union_iter) and union_iter != link_union.cend(): + count = count + 1 + inc(union_iter) + + keys.push_back(link) + counts.push_back(count) + + return make_pair(keys, counts) + + @cython.wraparound(False) + @cython.embedsignature(True) + @cython.boundscheck(False) + @cython.initializedcheck(False) + @staticmethod + cdef vector[double] *compute_cost(RouteSet_t *route_set, double[:] cost_view) noexcept nogil: + """Compute the cost each route.""" + cdef: + vector[double] *cost_vec + + # Scratch objects + double cost + long long link, i + + cost_vec = new vector[double]() + cost_vec.reserve(route_set.size()) + + for route in deref(route_set): + cost = 0.0 + for link in deref(route): + cost = cost + cost_view[link] + + cost_vec.push_back(cost) + + return cost_vec + + @cython.wraparound(False) + @cython.embedsignature(True) + @cython.boundscheck(False) + @cython.initializedcheck(False) + @staticmethod + cdef vector[double] *compute_path_overlap( + RouteSet_t *route_set, + pair[vector[long long] *, vector[long long] *] &freq_set, + vector[double] &total_cost, + double[:] cost_view + ) noexcept nogil: + """ + Compute the path overlap figure based on the route cost and frequency. + + Notation changes: + i: j + a: link + t_a: cost_view + c_i: total_costs + A_i: route + sum_{k in R}: delta_{a,k}: freq_set + """ + cdef: + vector[double] *path_overlap_vec + + # Scratch objects + vector[long long].const_iterator link_iter + double path_overlap + long long link, j + size_t i + + path_overlap_vec = new vector[double]() + path_overlap_vec.reserve(route_set.size()) + + j = 0 + for route in deref(route_set): + path_overlap = 0.0 + for link in deref(route): + # We know the frequency table is ordered and contains every link in the union of the routes. + # We want to find the index of the link, and use that to look up it's frequency + link_iter = lower_bound(freq_set.first.begin(), freq_set.first.end(), link) + + path_overlap = path_overlap + cost_view[link] \ + / deref(freq_set.second)[link_iter - freq_set.first.begin()] + + path_overlap_vec.push_back(path_overlap / total_cost[j]) + + j = j + 1 + + return path_overlap_vec + + @cython.wraparound(False) + @cython.embedsignature(True) + @cython.boundscheck(False) + @cython.initializedcheck(False) + @staticmethod + cdef vector[double] *compute_prob( + vector[double] &total_cost, + vector[double] &path_overlap_vec, + double beta, + double theta, + double cutoff_prob + ) noexcept nogil: + """Compute a probability for each route in the route set based on the path overlap. + + Computes a binary logit between the minimum cost path and each path, if the total cost is greater than the + minimum + the difference in utilities required to produce the cut-off probability then the route is excluded from + the route set. + """ + cdef: + # Scratch objects + vector[double] *prob_vec + double inv_prob + long long route_set_idx + size_t i, j + + vector[bool] route_mask = vector[bool](total_cost.size()) + double cutoff_cost = deref(min_element(total_cost.cbegin(), total_cost.cend())) \ + + inverse_binary_logit(cutoff_prob, 0.0, 1.0) + + prob_vec = new vector[double]() + prob_vec.reserve(total_cost.size()) + + # The route mask should be True for the routes we wish to include. + for i in range(total_cost.size()): + route_mask[i] = total_cost[i] <= cutoff_cost + + # Beware when refactoring the below, the scale of the costs may cause floating point errors. Large costs will + # lead to NaN results + for i in range(total_cost.size()): + if route_mask[i]: + inv_prob = 0.0 + for j in range(total_cost.size()): + # We must skip any other routes that are not included in the mask otherwise our probabilities + # won't add up. + if route_mask[j]: + inv_prob = inv_prob + pow(path_overlap_vec[j] / path_overlap_vec[i], beta) \ + * exp(-theta * (total_cost[j] - total_cost[i])) + prob_vec.push_back(1.0 / inv_prob) + else: + # Anything that has been excluded gets a probability of 0 rather than be removed entirely. + prob_vec.push_back(0.0) + + return prob_vec + + @cython.embedsignature(True) + def link_loading(RouteChoiceSet self, matrix, generate_path_files: bool = False, cores: int = 0): + """ + Apply link loading to the network using the demand matrix and the previously computed route sets. + """ + if self.ods == nullptr \ + or self.link_union_set == nullptr \ + or self.prob_set == nullptr: + raise ValueError("link loading requires Route Choice path_size_logit results") + + if not isinstance(matrix, AequilibraeMatrix): + raise ValueError("`matrix` is not an AequilibraE matrix") + + cores = cores if cores > 0 else omp_get_max_threads() + + cdef: + vector[vector[double] *] *path_files = nullptr + vector[double] *vec + + if generate_path_files: + path_files = RouteChoiceSet.compute_path_files( + deref(self.ods), + deref(self.results), + deref(self.link_union_set), + deref(self.prob_set), + cores, + ) + + # # FIXME, write out path files + # tmp = [] + # for vec in deref(path_files): + # tmp.append(deref(vec)) + # print(tmp) + + link_loads = {} + for i, name in enumerate(matrix.names): + m = matrix.matrix_view if len(matrix.view_names) == 1 else matrix.matrix_view[:, :, i] + + ll = self.apply_link_loading_from_path_files(m, deref(path_files)) \ + if generate_path_files else self.apply_link_loading(m) + + link_loads[name] = self.apply_link_loading_func(ll, cores) + del ll + + if generate_path_files: + for vec in deref(path_files): + del vec + del path_files + + return link_loads + + cdef apply_link_loading_func(RouteChoiceSet self, vector[double] *ll, int cores): + """Helper function for link_loading.""" + # This incantation creates a 2d (ll.size() x 1) memory view object around the underlying vector data without + # transferring ownership. + compressed = &deref(ll)[0] + + actual = np.zeros((self.graph_compressed_id_view.shape[0], 1), dtype=np.float64) + assign_link_loads_cython( + actual, + compressed, + self.graph_compressed_id_view, + cores + ) + compressed = np.array(compressed, copy=True) + return actual.reshape(-1), compressed.reshape(-1) + + @cython.boundscheck(False) + @cython.wraparound(False) + @cython.embedsignature(True) + @cython.initializedcheck(False) + @staticmethod + cdef vector[vector[double] *] *compute_path_files( + vector[pair[long long, long long]] &ods, + vector[RouteSet_t *] &results, + vector[vector[long long] *] &link_union_set, + vector[vector[double] *] &prob_set, + unsigned int cores + ) noexcept nogil: + """ + Computes the path files for the provided vector of RouteSets. + + Returns vector of vectors of link loads corresponding to each link in it's link_union_set. + """ + cdef: + vector[vector[double] *] *link_loads = new vector[vector[double] *](ods.size()) + vector[long long] *link_union + vector[double] *loads + vector[long long] *links + + vector[long long].const_iterator link_union_iter + vector[long long].const_iterator link_iter + + size_t link_loc + double prob + long long i + + with parallel(num_threads=cores): + for i in prange(ods.size()): + link_union = link_union_set[i] + loads = new vector[double](link_union.size(), 0.0) + + # We now iterate over all routes in the route_set, each route has an associated probability + route_prob_iter = prob_set[i].cbegin() + for route in deref(results[i]): + prob = deref(route_prob_iter) + inc(route_prob_iter) + + if prob == 0.0: + continue + + # For each link in the route, we need to assign the appropriate demand * prob Because the link union + # is known to be sorted, if the links in the route are also sorted we can just step along both + # arrays simultaneously, skipping elements in the link_union when appropriate. This allows us to + # operate on the link loads as a sparse map and avoid blowing up memory usage when using a dense + # formulation. This is also more cache efficient, the only downsides are that the code is + # harder to read and it requires sorting the route. + + # NOTE: the sorting of routes is technically something that is already computed, during the + # computation of the link frequency we merge and sort all links, if we instead sorted then used an + # N-way merge we could reuse the sorted routes and the sorted link union. + + # We copy the links in case the routes haven't already been saved + links = new vector[long long](deref(route)) + sort(links.begin(), links.end()) + + # links and link_union are sorted, and links is a subset of link_union + link_union_iter = link_union.cbegin() + link_iter = links.cbegin() + + while link_iter != links.cend(): + # Find the next location for the current link in links + while deref(link_iter) != deref(link_union_iter) and link_iter != links.cend(): + inc(link_union_iter) + + link_loc = link_union_iter - link_union.cbegin() + deref(loads)[link_loc] = deref(loads)[link_loc] + prob # += here results in all zeros? Odd + + inc(link_iter) + + del links + + deref(link_loads)[i] = loads + + return link_loads + + @cython.boundscheck(False) + @cython.wraparound(False) + @cython.embedsignature(True) + @cython.initializedcheck(False) + cdef vector[double] *apply_link_loading_from_path_files( + RouteChoiceSet self, + double[:, :] matrix_view, + vector[vector[double] *] &path_files + ) noexcept nogil: + """ + Apply link loading from path files. + + Returns a vector of link loads indexed by compressed link ID. + """ + cdef: + vector[double] *loads + vector[long long] *link_union + long origin_index, dest_index + double demand + + vector[double] *link_loads = new vector[double](self.num_links) + + for i in range(self.ods.size()): + loads = path_files[i] + link_union = deref(self.link_union_set)[i] + + origin_index = self.nodes_to_indices_view[deref(self.ods)[i].first] + dest_index = self.nodes_to_indices_view[deref(self.ods)[i].second] + demand = matrix_view[origin_index, dest_index] + + for j in range(link_union.size()): + link = deref(link_union)[j] + deref(link_loads)[link] = deref(link_loads)[link] + demand * deref(loads)[j] + + return link_loads + + @cython.boundscheck(False) + @cython.wraparound(False) + @cython.embedsignature(True) + @cython.initializedcheck(False) + cdef vector[double] *apply_link_loading(self, double[:, :] matrix_view) noexcept nogil: + """ + Apply link loading. + + Returns a vector of link loads indexed by compressed link ID. + """ + cdef: + RouteSet_t *route_set + vector[double] *route_set_prob + vector[double].const_iterator route_prob_iter + long origin_index, dest_index + double demand, prob, load + + vector[double] *link_loads = new vector[double](self.num_links) + + for i in range(self.ods.size()): + route_set = deref(self.results)[i] + route_set_prob = deref(self.prob_set)[i] + + origin_index = self.nodes_to_indices_view[deref(self.ods)[i].first] + dest_index = self.nodes_to_indices_view[deref(self.ods)[i].second] + demand = matrix_view[origin_index, dest_index] + + route_prob_iter = route_set_prob.cbegin() + for route in deref(route_set): + prob = deref(route_prob_iter) + inc(route_prob_iter) + + load = prob * demand + for link in deref(route): + deref(link_loads)[link] = deref(link_loads)[link] + load # += here results in all zeros? Odd + + return link_loads + + @cython.embedsignature(True) + def select_link_loading(RouteChoiceSet self, matrix, select_links: Dict[str, List[long]], cores: int = 0): + """ + Apply link loading to the network using the demand matrix and the previously computed route sets. + """ + if self.ods == nullptr \ + or self.link_union_set == nullptr \ + or self.prob_set == nullptr: + raise ValueError("select link loading requires Route Choice path_size_logit results") + + if not isinstance(matrix, AequilibraeMatrix): + raise ValueError("`matrix` is not an AequilibraE matrix") + + cores = cores if cores > 0 else omp_get_max_threads() + + cdef: + unordered_set[long] select_link_set + vector[double] *ll + + link_loads = {} + + for i, name in enumerate(matrix.names): + matrix_ll = {} + m = matrix.matrix_view if len(matrix.view_names) == 1 else matrix.matrix_view[:, :, i] + for (k, v) in select_links.items(): + select_link_set = v + + coo = COO((self.zones, self.zones)) + + ll = self.apply_select_link_loading(coo, m, select_link_set) + res = self.apply_link_loading_func(ll, cores) + del ll + + matrix_ll[k] = (coo, res) + link_loads[name] = matrix_ll + + return link_loads + + @cython.boundscheck(False) + @cython.wraparound(False) + @cython.embedsignature(True) + @cython.initializedcheck(False) + cdef vector[double] *apply_select_link_loading( + RouteChoiceSet self, + COO sparse_mat, + double[:, :] matrix_view, + unordered_set[long] &select_link_set + ) noexcept nogil: + """ + Apply select link loading. + + Returns a vector of link loads indexed by compressed link ID. + """ + cdef: + RouteSet_t *route_set + vector[double] *route_set_prob + vector[double].const_iterator route_prob_iter + long origin_index, dest_index, o, d + double demand, prob, load + + vector[double] *link_loads = new vector[double](self.num_links) + + bool link_present + + # For each OD pair, if a route contains one or more links in a select link set, add that ODs demand to + # a sparse matrix of Os to Ds + + # For each route, if it contains one or more links in a select link set, apply the link loading for + # that route + + for i in range(self.ods.size()): + route_set = deref(self.results)[i] + route_set_prob = deref(self.prob_set)[i] + + origin_index = self.nodes_to_indices_view[deref(self.ods)[i].first] + dest_index = self.nodes_to_indices_view[deref(self.ods)[i].second] + demand = matrix_view[origin_index, dest_index] + + route_prob_iter = route_set_prob.cbegin() + for route in deref(route_set): + prob = deref(route_prob_iter) + inc(route_prob_iter) + load = prob * demand + + link_present = False + for link in deref(route): + if select_link_set.find(link) != select_link_set.end(): + sparse_mat.append(origin_index, dest_index, load) + link_present = True + break + + if link_present: + for link in deref(route): + deref(link_loads)[link] = deref(link_loads)[link] + load # += here results in all zeros? Odd + + return link_loads + + @cython.wraparound(False) + @cython.embedsignature(True) + @cython.boundscheck(False) + @cython.initializedcheck(False) + cdef shared_ptr[libpa.CTable] make_table_from_results( + RouteChoiceSet self, + vector[pair[long long, long long]] &ods, + vector[RouteSet_t *] &route_sets, + vector[vector[double] *] *cost_set, + vector[vector[double] *] *path_overlap_set, + vector[vector[double] *] *prob_set + ): + """ + Construct an Arrow table from C++ stdlib structures. + + Note: this function directly utilises the Arrow C++ API, the Arrow Cython API is not sufficient. + See `route_choice_set.pxd` for Cython declarations. + + Returns a shared pointer to a Arrow CTable. This should be wrapped in a Python table before use. + Compressed link IDs are expanded to full network link IDs. + """ + cdef: + shared_ptr[libpa.CArray] paths + shared_ptr[libpa.CArray] offsets + + libpa.CMemoryPool *pool = libpa.c_get_memory_pool() + + # Custom imports, these are declared in route_choice.pxd *not* libarrow. + CUInt32Builder *path_builder = new CUInt32Builder(pool) + CDoubleBuilder *cost_col = nullptr + CDoubleBuilder *path_overlap_col = nullptr + CDoubleBuilder *prob_col = nullptr + + libpa.CInt32Builder *offset_builder = new libpa.CInt32Builder(pool) # Must be Int32 *not* UInt32 + libpa.CUInt32Builder *o_col = new libpa.CUInt32Builder(pool) + libpa.CUInt32Builder *d_col = new libpa.CUInt32Builder(pool) + vector[shared_ptr[libpa.CArray]] columns + shared_ptr[libpa.CDataType] route_set_dtype = libpa.pyarrow_unwrap_data_type(RouteChoiceSet.route_set_dtype) + + libpa.CResult[shared_ptr[libpa.CArray]] route_set_results + + int offset = 0 + size_t network_link_begin, network_link_end, link + bint psl = (cost_set != nullptr and path_overlap_set != nullptr and prob_set != nullptr) + + # Origins, Destination, Route set, [Cost for route, Path_Overlap for route, Probability for route] + columns.resize(6 if psl else 3) + + if psl: + cost_col = new CDoubleBuilder(pool) + path_overlap_col = new CDoubleBuilder(pool) + prob_col = new CDoubleBuilder(pool) + + for i in range(ods.size()): + cost_col.AppendValues(deref(deref(cost_set)[i])) + path_overlap_col.AppendValues(deref(deref(path_overlap_set)[i])) + prob_col.AppendValues(deref(deref(prob_set)[i])) + + for i in range(ods.size()): + route_set = route_sets[i] + + # Instead of constructing a "list of lists" style object for storing the route sets we instead will + # construct one big array of link IDs with a corresponding offsets array that indicates where each new row + # (path) starts. + for route in deref(route_set): + o_col.Append(ods[i].first) + d_col.Append(ods[i].second) + + offset_builder.Append(offset) + + for link in deref(route): + # Translate the compressed link IDs in route to network link IDs, this is a 1:n mapping + network_link_begin = self.mapping_idx[link] + network_link_end = self.mapping_idx[link + 1] + path_builder.AppendValues( + &self.mapping_data[network_link_begin], + network_link_end - network_link_begin + ) + + offset += network_link_end - network_link_begin + + path_builder.Finish(&paths) + + offset_builder.Append(offset) # Mark the end of the array in offsets + offset_builder.Finish(&offsets) + + route_set_results = libpa.CListArray.FromArraysAndType( + route_set_dtype, + deref(offsets.get()), + deref(paths.get()), + pool, + shared_ptr[libpa.CBuffer]() + ) + + o_col.Finish(&columns[0]) + d_col.Finish(&columns[1]) + columns[2] = deref(route_set_results) + + if psl: + cost_col.Finish(&columns[3]) + path_overlap_col.Finish(&columns[4]) + prob_col.Finish(&columns[5]) + + cdef shared_ptr[libpa.CSchema] schema = libpa.pyarrow_unwrap_schema( + RouteChoiceSet.psl_schema if psl else RouteChoiceSet.schema + ) + cdef shared_ptr[libpa.CTable] table = libpa.CTable.MakeFromArrays(schema, columns) + + del path_builder + del offset_builder + del o_col + del d_col + + if psl: + del cost_col + del path_overlap_col + del prob_col + + return table + + def get_results(self): # Cython doesn't like this type annotation... -> pa.Table: + """ + :Returns: + **route sets** (:obj:`pyarrow.Table`): Returns a table of OD pairs to lists of link IDs for + each OD pair provided (as columns). Represents paths from ``origin`` to ``destination``. + """ + if self.results == nullptr or self.ods == nullptr: + raise RuntimeError("Route Choice results not computed yet") + + table = libpa.pyarrow_wrap_table( + self.make_table_from_results( + deref(self.ods), + deref(self.results), + self.cost_set, + self.path_overlap_set, + self.prob_set + ) + ) + + return table + + +@cython.embedsignature(True) +cdef class Checkpoint: + """ + A small wrapper class to write a dataset partition by partition + """ + + def __init__(self, where, schema, partition_cols=None): + """Python level init, may be called multiple times, for things that can't be done in __cinit__.""" + self.where = pathlib.Path(where) + self.schema = schema + self.partition_cols = partition_cols + + def write(self, table): + logger = logging.getLogger("aequilibrae") + pq.write_to_dataset( + table, + self.where, + partitioning=self.partition_cols, + partitioning_flavor="hive", + schema=self.schema, + use_threads=True, + existing_data_behavior="overwrite_or_ignore", + file_visitor=lambda written_file: logger.info(f"Wrote partition dataset at {written_file.path}") + ) + + def read_dataset(self): + return pa.dataset.dataset(self.where, format="parquet", partitioning=pa.dataset.HivePartitioning(self.schema)) + + @staticmethod + def batches(ods: List[Tuple[int, int]]): + return (list(g) for k, g in itertools.groupby(sorted(ods), key=lambda x: x[0])) + + +cdef double inverse_binary_logit(double prob, double beta0, double beta1) noexcept nogil: + if prob == 1.0: + return INFINITY + elif prob == 0.0: + return -INFINITY + else: + return (log(prob / (1.0 - prob)) - beta0) / beta1 diff --git a/docs/source/examples/trip_distribution/plot_route_choice.py b/docs/source/examples/trip_distribution/plot_route_choice.py new file mode 100644 index 000000000..4c00f0949 --- /dev/null +++ b/docs/source/examples/trip_distribution/plot_route_choice.py @@ -0,0 +1,166 @@ +""".. _example_usage_route_choice: + +Route Choice +================= + +In this example, we show how to perform route choice set generation using BFSLE and Link penalisation, for a city in La +Serena Metropolitan Area in Chile. + +""" + +# Imports +from uuid import uuid4 +from tempfile import gettempdir +from os.path import join +from aequilibrae.utils.create_example import create_example + +# We create the example project inside our temp folder +fldr = join(gettempdir(), uuid4().hex) + +project = create_example(fldr, "coquimbo") + +# %% +import logging +import sys + +# We the project opens, we can tell the logger to direct all messages to the terminal as well +logger = project.logger +stdout_handler = logging.StreamHandler(sys.stdout) +formatter = logging.Formatter("%(asctime)s;%(levelname)s ; %(message)s") +stdout_handler.setFormatter(formatter) +logger.addHandler(stdout_handler) + +# %% +# Route Choice +# --------------- + +# %% +import numpy as np + +# %% +# Let's build all graphs +project.network.build_graphs() +# We get warnings that several fields in the project are filled with NaNs. +# This is true, but we won't use those fields. + +# %% +# We grab the graph for cars +graph = project.network.graphs["c"] + +# we also see what graphs are available +project.network.graphs.keys() + +# let's say we want to minimize the distance +graph.set_graph("distance") + +# But let's say we only want a skim matrix for nodes 28-40, and 49-60 (inclusive), these happen to be a selection of +# western centroids. +graph.prepare_graph(np.array(list(range(28, 41)) + list(range(49, 91)))) + +# %% +# Mock demand matrix +# ~~~~~~~~~~~~~~~~~~ +# We'll create a mock demand matrix with demand `1` for every zone. +from aequilibrae.matrix import AequilibraeMatrix + +names_list = ["demand", "5x demand"] + +mat = AequilibraeMatrix() +mat.create_empty(zones=graph.num_zones, matrix_names=names_list, memory_only=True) +mat.index = graph.centroids[:] +mat.matrices[:, :, 0] = np.full((graph.num_zones, graph.num_zones), 1.0) +mat.matrices[:, :, 1] = np.full((graph.num_zones, graph.num_zones), 5.0) +mat.computational_view() + +# %% +# Route Choice class +# ~~~~~~~~~~~~~~~~~~ +# Here we'll construct and use the Route Choice class to generate our route sets +from aequilibrae.paths import RouteChoice + +# %% +# This object construct might take a minute depending on the size of the graph due to the construction of the compressed +# link to network link mapping that's required. This is a one time operation per graph and is cached. We need to +# supply a Graph and optionally a AequilibraeMatrix, if the matrix is not provided link loading cannot be preformed. +rc = RouteChoice(graph, mat) + +# %% + +# Here we'll set the parameters of our set generation. There are two algorithms available: Link penalisation, or BFSLE +# based on the paper +# "Route choice sets for very high-resolution data" by Nadine Rieser-Schüssler, Michael Balmer & Kay W. Axhausen (2013). +# https://doi.org/10.1080/18128602.2012.671383 +# +# Our BFSLE implementation is slightly different and has extended to allow applying link penalisation as well. Every +# link in all routes found at a depth are penalised with the `penalty` factor for the next depth. So at a depth of 0 no +# links are penalised nor removed. At depth 1, all links found at depth 0 are penalised, then the links marked for +# removal are removed. All links in the routes found at depth 1 are then penalised for the next depth. The penalisation +# compounds. Pass set `penalty=1.0` to disable. +# +# To assist in filtering out bad results during the assignment, a `cutoff_prob` parameter can be provided to exclude +# routes from the path-sized logit model. The `cutoff_prob` is used to compute an inverse binary logit and obtain a max +# difference in utilities. If a paths total cost is greater than the minimum cost path in the route set plus the max +# difference, the route is excluded from the PSL calculations. The route is still returned, but with a probability of +# 0.0. +# +# The `cutoff_prob` should be in the range [0, 1]. It is then rescaled internally to [0.5, 1] as probabilities below 0.5 +# produce negative differences in utilities. A higher `cutoff_prob` includes more routes. A value of `0.0` will only +# include the minimum cost route. A value of `1.0` includes all routes. +# +# It is highly recommended to set either `max_routes` or `max_depth` to prevent runaway results. + +# rc.set_choice_set_generation("link-penalisation", max_routes=5, penalty=1.1) +rc.set_choice_set_generation("bfsle", max_routes=5, beta=1.1, theta=1.1) + +# %% +# All parameters are optional, the defaults are: +print(rc.default_paramaters) + +# %% +# We can now perform a computation for single OD pair if we'd like. Here we do one between the first and last centroid +# as well an an assignment. +results = rc.execute_single(28, 90, perform_assignment=True) +print(results[0]) + +# %% +# Because we asked it to also perform an assignment we can access the various results from that +# The default return is a Pyarrow Table but Pandas is nicer for viewing. +rc.get_results().to_pandas() + +# %% +# To perform a batch operation we need to prepare the object first. We can either provide a list of tuple of the OD +# pairs we'd like to use, or we can provided a 1D list and the generation will be run on all permutations. +rc.prepare(graph.centroids[:5]) # You can inspect the result with rc.nodes + +# %% +# Now we can perform a batch computation with an assignment +rc.execute(perform_assignment=True) +rc.get_results().to_pandas() + +# %% +# Since we provided a matrix initially we can also perform link loading based on our assignment results. +rc.get_load_results() + +# %% +# Select link analysis +# ~~~~~~~~~~~~~~~~~~ +# We can also enable select link analysis by providing the links and the directions that we are interested in +rc.set_select_links({"sl1": [(5372, 1), (5374, 1)], "sl2": [(23845, 0)]}) + +# %% +# We can get then the results in a Pandas data frame for both the network and compressed graph. +u_sl, c_sl = rc.get_select_link_results() +u_sl + +# %% +# We can also access the OD matrices for this link loading. These matrices are sparse and can be converted to +# scipy.sparse matrices for ease of use. They're stored in a dictionary where the key is the matrix name concatenated +# wit the select link set name via an underscore. These matrices are constructed during `get_select_link_results`. +list(rc.sl_od_matrix.keys()) + +# %% +od_matrix = rc.sl_od_matrix["demand_sl1"] +od_matrix.to_scipy() + +# %% +project.close() diff --git a/setup.cfg b/setup.cfg index eb3eee575..0651c05d8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,4 +2,8 @@ universal = 1 [metadata] -license_files = [LICENSE.TXT] \ No newline at end of file +license_files = [LICENSE.TXT] + +[pycodestyle] +max-line-length = 120 +ignore = E225 \ No newline at end of file diff --git a/setup.py b/setup.py index b68b463a6..17e693d9b 100644 --- a/setup.py +++ b/setup.py @@ -61,8 +61,8 @@ ) ext_mod_bfs_le = Extension( - "aequilibrae.paths.route_choice", - [join("aequilibrae", "paths", "route_choice.pyx")], + "aequilibrae.paths.route_choice_set", + [join("aequilibrae", "paths", "route_choice_set.pyx")], extra_compile_args=compile_args, extra_link_args=link_args, define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")], @@ -82,6 +82,16 @@ language="c++", ) +ext_mod_sparse_matrix = Extension( + "aequilibrae.matrix.sparse_matrix", + [join("aequilibrae", "matrix", "sparse_matrix.pyx")], + extra_compile_args=compile_args, + extra_link_args=link_args, + define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")], + include_dirs=include_dirs, + language="c++", +) + with open("requirements.txt", "r") as fl: install_requirements = [x.strip() for x in fl.readlines()] @@ -123,7 +133,6 @@ license_files=("LICENSE.TXT",), classifiers=[ "Programming Language :: Python", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", @@ -131,7 +140,7 @@ ], cmdclass={"build_ext": build_ext}, ext_modules=cythonize( - [ext_mod_aon, ext_mod_ipf, ext_mod_put, ext_mod_bfs_le, ext_mod_graph_building], + [ext_mod_aon, ext_mod_ipf, ext_mod_put, ext_mod_bfs_le, ext_mod_graph_building, ext_mod_sparse_matrix], compiler_directives={"language_level": "3str"}, ), ) diff --git a/tests/aequilibrae/matrix/test_sparse_matrix.py b/tests/aequilibrae/matrix/test_sparse_matrix.py new file mode 100644 index 000000000..f75dc70da --- /dev/null +++ b/tests/aequilibrae/matrix/test_sparse_matrix.py @@ -0,0 +1,38 @@ +from tempfile import gettempdir +from aequilibrae.matrix import COO +from unittest import TestCase +from uuid import uuid4 +import scipy.sparse +import numpy as np +import pathlib + + +class TestSparseMatrix(TestCase): + def setUp(self) -> None: + self.data = np.full((100, 100), 5.0) + self.dir = pathlib.Path(gettempdir()) / uuid4().hex + self.dir.mkdir() + + def tearDown(self) -> None: + pass + + def test_round_trip(self): + p = self.dir / "test.omx" + + coo = COO.from_matrix( + self.data, + ) + coo.to_disk(p, "m1") + coo.to_disk(p, "m2") + + sp = coo.to_scipy() + + coo1 = COO.from_disk(p) + coo2 = COO.from_disk(p, aeq=True) + + for m in ["m1", "m2"]: + self.assertIsInstance(coo1[m], scipy.sparse.csr_matrix) + self.assertIsInstance(coo2[m], COO) + + np.testing.assert_allclose(sp.A, coo1[m].A) + np.testing.assert_allclose(sp.A, coo2[m].to_scipy().A) diff --git a/tests/aequilibrae/paths/test_route_choice.py b/tests/aequilibrae/paths/test_route_choice.py index b68f64d97..5b3fbc88f 100644 --- a/tests/aequilibrae/paths/test_route_choice.py +++ b/tests/aequilibrae/paths/test_route_choice.py @@ -2,20 +2,24 @@ import uuid import zipfile from os.path import join, dirname +import pathlib +import sqlite3 from tempfile import gettempdir from unittest import TestCase import pandas as pd import numpy as np import pyarrow as pa -from aequilibrae import Graph, Project -from aequilibrae.paths.route_choice import RouteChoiceSet +from aequilibrae import Project +from aequilibrae.paths.route_choice_set import RouteChoiceSet +from aequilibrae.paths.route_choice import RouteChoice +from aequilibrae.matrix import AequilibraeMatrix, Sparse from ...data import siouxfalls_project # In these tests `max_depth` should be provided to prevent a runaway test case and just burning CI time -class TestRouteChoice(TestCase): +class TestRouteChoiceSet(TestCase): def setUp(self) -> None: os.environ["PATH"] = os.path.join(gettempdir(), "temp_data") + ";" + os.environ["PATH"] @@ -25,19 +29,23 @@ def setUp(self) -> None: self.project = Project() self.project.open(proj_path) - self.project.network.build_graphs(fields=["distance"], modes=["c"]) + self.project.network.build_graphs(fields=["distance", "free_flow_time"], modes=["c"]) self.graph = self.project.network.graphs["c"] # type: Graph self.graph.set_graph("distance") self.graph.set_blocked_centroid_flows(False) + self.mat = self.project.matrices.get_matrix("demand_omx") + self.mat.computational_view() + def tearDown(self) -> None: + self.mat.close() self.project.close() def test_route_choice(self): rc = RouteChoiceSet(self.graph) a, b = 1, 20 - for kwargs in [{"bfsle": True}, {"bfsle": False, "penalty": 1.1}]: + for kwargs in [{"bfsle": True}, {"bfsle": False, "penalty": 1.1}, {"bfsle": True, "penalty": 1.1}]: with self.subTest(**kwargs): results = rc.run(a, b, max_routes=10, **kwargs) self.assertEqual(len(results), 10, "Returned more routes than expected") @@ -47,10 +55,11 @@ def test_route_choice(self): results = rc.run(a, b, max_routes=0, max_depth=1) self.assertEqual(len(results), 1, "Depth of 1 didn't yield a lone route") self.assertListEqual( - results, [(1, 5, 8, 12, 24, 29, 52, 58)], "Initial route isn't the shortest A* route" + results, [(2, 6, 9, 13, 25, 30, 53, 59)], "Initial route isn't the shortest A* route" ) - # A depth of 2 should yield the same initial route plus the length of that route more routes minus duplicates and unreachable paths + # A depth of 2 should yield the same initial route plus the length of that route more routes minus + # duplicates and unreachable paths results2 = rc.run(a, b, max_routes=0, max_depth=2, **kwargs) self.assertTrue(results[0] in results2, "Initial route isn't present in a lower depth") @@ -72,8 +81,9 @@ def test_route_choice_empty_path(self): rc = RouteChoiceSet(self.graph) a = 1 + rc.batched([(a, a)], max_routes=0, max_depth=3, **kwargs) self.assertFalse( - rc.batched([(a, a)], max_routes=0, max_depth=3, **kwargs), + rc.get_results(), "Route set from self to self should be empty", ) @@ -100,7 +110,8 @@ def test_route_choice_batched(self): nodes = [tuple(x) for x in np.random.choice(self.graph.centroids, size=(10, 2), replace=False)] max_routes = 20 - results = rc.batched(nodes, max_routes=max_routes, max_depth=10) + rc.batched(nodes, max_routes=max_routes, max_depth=10, max_misses=200) + results = rc.get_results() gb = results.to_pandas().groupby(by="origin id") self.assertEqual(len(gb), len(nodes), "Requested number of route sets not returned") @@ -118,7 +129,8 @@ def test_route_choice_duplicates_batched(self): max_routes = 20 with self.assertWarns(UserWarning): - results = rc.batched(nodes, max_routes=max_routes, max_depth=10) + rc.batched(nodes, max_routes=max_routes, max_depth=10) + results = rc.get_results() gb = results.to_pandas().groupby(by="origin id") self.assertEqual(len(gb), 1, "Duplicates not dropped") @@ -138,10 +150,6 @@ def test_route_choice_exceptions(self): with self.assertRaises(ValueError): rc.run(a, b, max_routes=max_routes, max_depth=max_depth) - with self.assertRaises(ValueError): - rc.run(1, 1, max_routes=1, max_depth=1, bfsle=True, penalty=1.5) - rc.run(1, 1, max_routes=1, max_depth=1, bfsle=False, penalty=0.1) - def test_round_trip(self): np.random.seed(1000) rc = RouteChoiceSet(self.graph) @@ -150,8 +158,9 @@ def test_round_trip(self): max_routes = 20 path = join(self.project.project_base_path, "batched results") - table = rc.batched(nodes, max_routes=max_routes, max_depth=10) - rc.batched(nodes, max_routes=max_routes, max_depth=10, where=path) + rc.batched(nodes, max_routes=max_routes, max_depth=10) + table = rc.get_results().to_pandas() + rc.batched(nodes, max_routes=max_routes, max_depth=10, where=path, cores=1) dataset = pa.dataset.dataset(path, format="parquet", partitioning=pa.dataset.HivePartitioning(rc.schema)) new_table = ( @@ -161,7 +170,7 @@ def test_round_trip(self): .reset_index(drop=True) ) - table = table.to_pandas().sort_values(by=["origin id", "destination id"]).reset_index(drop=True) + table = table.sort_values(by=["origin id", "destination id"]).reset_index(drop=True) pd.testing.assert_frame_equal(table, new_table) @@ -169,35 +178,293 @@ def test_cost_results(self): np.random.seed(0) rc = RouteChoiceSet(self.graph) nodes = [tuple(x) for x in np.random.choice(self.graph.centroids, size=(10, 2), replace=False)] - table = rc.batched(nodes, max_routes=20, max_depth=10, path_size_logit=True) - table = table.to_pandas() + rc.batched(nodes, max_routes=20, max_depth=10, path_size_logit=True) + + table = rc.get_results().to_pandas() gb = table.groupby(by=["origin id", "destination id"]) for od, df in gb: for route, cost in zip(df["route set"].values, df["cost"].values): - np.testing.assert_almost_equal(self.graph.cost[route].sum(), cost, err_msg=f"Cost differs for OD {od}") + np.testing.assert_almost_equal( + self.graph.network.set_index("link_id").loc[route][self.graph.cost_field].sum(), + cost, + err_msg=f", cost differs for OD {od}", + ) - def test_gamma_results(self): + def test_path_overlap_results(self): np.random.seed(0) rc = RouteChoiceSet(self.graph) nodes = [tuple(x) for x in np.random.choice(self.graph.centroids, size=(10, 2), replace=False)] - table = rc.batched(nodes, max_routes=20, max_depth=10, path_size_logit=True) - table = table.to_pandas() + rc.batched(nodes, max_routes=20, max_depth=10, path_size_logit=True) + table = rc.get_results().to_pandas() gb = table.groupby(by=["origin id", "destination id"]) for od, df in gb: - self.assertTrue(all((df["gamma"] > 0) & (df["gamma"] <= 1))) + self.assertTrue(all((df["path overlap"] > 0) & (df["path overlap"] <= 1))) def test_prob_results(self): np.random.seed(0) rc = RouteChoiceSet(self.graph) nodes = [tuple(x) for x in np.random.choice(self.graph.centroids, size=(10, 2), replace=False)] - table = rc.batched(nodes, max_routes=20, max_depth=10, path_size_logit=True) - table = table.to_pandas() - gb = table.groupby(by=["origin id", "destination id"]) - for od, df in gb: - self.assertAlmostEqual(1.0, sum(df["probability"].values), msg="Probability not close to 1.0") + for kwargs in [{"cutoff_prob": 0.0}, {"cutoff_prob": 0.5}, {"cutoff_prob": 1.0}]: + with self.subTest(**kwargs): + rc.batched(nodes, max_routes=20, max_depth=10, path_size_logit=True, **kwargs) + table = rc.get_results().to_pandas() + + gb = table.groupby(by=["origin id", "destination id"]) + for od, df in gb: + self.assertAlmostEqual(1.0, sum(df["probability"].values), msg=", probability not close to 1.0") + + def test_link_loading(self): + np.random.seed(0) + rc = RouteChoiceSet(self.graph) + nodes = [tuple(x) for x in np.random.choice(self.graph.centroids, size=(10, 2), replace=False)] + rc.batched(nodes, max_routes=20, max_depth=10, path_size_logit=True) + + n = self.mat.names[0] + + ll = rc.link_loading(self.mat)[n] + ll2 = rc.link_loading(self.mat, generate_path_files=True)[n] + + np.testing.assert_array_almost_equal(ll, ll2) + + def test_known_results(self): + for cost in ["distance", "free_flow_time"]: + with self.subTest(cost=cost): + self.graph.set_graph(cost) + + np.random.seed(0) + rc = RouteChoiceSet(self.graph) + nodes = [tuple(x) for x in np.random.choice(self.graph.centroids, size=(10, 2), replace=False)] + rc.batched(nodes, max_routes=20, max_depth=10, path_size_logit=True) + + mat = AequilibraeMatrix() + mat.create_empty( + memory_only=True, + zones=self.graph.num_zones, + matrix_names=["all zeros", "single one"], + ) + mat.index = self.graph.centroids[:] + mat.computational_view() + mat.matrix_view[:, :, 0] = np.full((self.graph.num_zones, self.graph.num_zones), 1.0) + mat.matrix_view[:, :, 1] = np.zeros((self.graph.num_zones, self.graph.num_zones)) + + for od in nodes: + mat.matrix_view[:, :, 0][od[0] - 1, od[1] - 1] = 0.0 + + mat.matrix_view[:, :, 1][nodes[0][0] - 1, nodes[0][1] - 1] = 1.0 + + link_loads = rc.link_loading(mat) + table = rc.get_results().to_pandas() + + with self.subTest(matrix="all zeros"): + u, c = link_loads["all zeros"] + np.testing.assert_allclose(u, 0.0) + np.testing.assert_allclose(c, 0.0) + + with self.subTest(matrix="single one"): + u, c = link_loads["single one"] + link = self.graph.graph[ + (self.graph.graph.a_node == nodes[0][0] - 1) & (self.graph.graph.b_node == nodes[0][1] - 1) + ] + + lid = link.link_id.values[0] + c_lid = link.__compressed_id__.values[0] + t = table[table["route set"].apply(lambda x, lid=lid: lid in set(x))] + v = t.probability.sum() + + self.assertAlmostEqual(u[lid - 1], v, places=6) + self.assertAlmostEqual(c[c_lid], v, places=6) + + def test_select_link(self): + for cost in ["distance", "free_flow_time"]: + with self.subTest(cost=cost): + self.graph.set_graph(cost) + + np.random.seed(0) + rc = RouteChoiceSet(self.graph) + nodes = [tuple(x) for x in np.random.choice(self.graph.centroids, size=(10, 2), replace=False)] + rc.batched(nodes, max_routes=20, max_depth=10, path_size_logit=True) + + mat = AequilibraeMatrix() + mat.create_empty( + memory_only=True, + zones=self.graph.num_zones, + matrix_names=["all ones"], + ) + mat.index = self.graph.centroids[:] + mat.computational_view() + mat.matrix_view[:, :] = np.full((self.graph.num_zones, self.graph.num_zones), 1.0) + + table = rc.get_results().to_pandas() + + # Shortest routes between 20-4, and 21-2 share links 23 and 26. Link 26 also appears in between 10-8 and 17-9 + # 20-4 also shares 11 with 5-3 + ods = [(20, 4), (21, 2), (10, 8), (17, 9)] + sl_link_loads = rc.select_link_loading( + mat, + { + "sl1": self.graph.graph.set_index("link_id").loc[[23, 26]].__compressed_id__.to_list(), + "sl2": self.graph.graph.set_index("link_id").loc[[11]].__compressed_id__.to_list(), + }, + ) + + m, (u, c) = sl_link_loads["all ones"]["sl1"] + m2, (u2, c2) = sl_link_loads["all ones"]["sl2"] + m = m.to_scipy() + m2 = m2.to_scipy() + self.assertSetEqual(set(zip(*(m > 0.0001).nonzero())), {(o - 1, d - 1) for o, d in ods}) + self.assertSetEqual(set(zip(*(m2 > 0.0001).nonzero())), {(20 - 1, 4 - 1), (5 - 1, 3 - 1)}) + + t1 = table[(table.probability > 0.0) & table["route set"].apply(lambda x: bool(set(x) & {23, 26}))] + t2 = table[(table.probability > 0.0) & table["route set"].apply(lambda x: 11 in set(x))] + sl1_link_union = np.unique(np.hstack(t1["route set"].values)) + sl2_link_union = np.unique(np.hstack(t2["route set"].values)) + + np.testing.assert_equal(u.nonzero()[0] + 1, sl1_link_union) + np.testing.assert_equal(u2.nonzero()[0] + 1, sl2_link_union) + + np.testing.assert_allclose(u, c) + np.testing.assert_allclose(u2, c2) + + self.assertAlmostEqual(u.sum(), (t1["route set"].apply(len) * t1.probability).sum()) + self.assertAlmostEqual(u2.sum(), (t2["route set"].apply(len) * t2.probability).sum()) + + +class TestRouteChoice(TestCase): + def setUp(self) -> None: + os.environ["PATH"] = os.path.join(gettempdir(), "temp_data") + ";" + os.environ["PATH"] + + proj_path = os.path.join(gettempdir(), "test_route_choice" + uuid.uuid4().hex) + os.mkdir(proj_path) + zipfile.ZipFile(join(dirname(siouxfalls_project), "sioux_falls_single_class.zip")).extractall(proj_path) + + self.project = Project() + self.project.open(proj_path) + self.project.network.build_graphs(fields=["distance"], modes=["c"]) + self.graph = self.project.network.graphs["c"] # type: Graph + self.graph.set_graph("distance") + self.graph.set_blocked_centroid_flows(False) + + self.mat = self.project.matrices.get_matrix("demand_omx") + self.mat.computational_view() + + self.rc = RouteChoice(self.graph, self.mat) + + def test_prepare(self): + with self.assertRaises(ValueError): + self.rc.prepare([]) + + with self.assertRaises(ValueError): + self.rc.prepare(["1", "2"]) + + with self.assertRaises(ValueError): + self.rc.prepare([("1", "2")]) + + with self.assertRaises(ValueError): + self.rc.prepare([1]) + + self.rc.prepare([1, 2]) + self.assertListEqual(self.rc.nodes, [(1, 2), (2, 1)]) + self.rc.prepare([(1, 2)]) + self.assertListEqual(self.rc.nodes, [(1, 2)]) + + def test_set_save_routes(self): + self.rc = RouteChoice(self.graph, self.mat) + + with self.assertRaises(ValueError): + self.rc.set_save_routes("/non-existent-path") + + def test_set_choice_set_generation(self): + self.rc.set_choice_set_generation("link-penalization", max_routes=20, penalty=1.1) + self.assertDictEqual( + self.rc.parameters, + {"seed": 0, "max_routes": 20, "max_depth": 0, "max_misses": 100, "penalty": 1.1, "cutoff_prob": 1.0}, + ) + + self.rc.set_choice_set_generation("bfsle", max_routes=20, beta=1.1) + self.assertDictEqual( + self.rc.parameters, + { + "seed": 0, + "max_routes": 20, + "max_depth": 0, + "max_misses": 100, + "beta": 1.1, + "theta": 1.0, + "penalty": 1.0, + "cutoff_prob": 1.0, + }, + ) + + with self.assertRaises(ValueError): + self.rc.set_choice_set_generation("link-penalization", max_routes=20, penalty=1.1, beta=1.0) + + with self.assertRaises(AttributeError): + self.rc.set_choice_set_generation("not an algorithm", max_routes=20, penalty=1.1) + + def test_link_results(self): + self.rc.set_choice_set_generation("link-penalization", max_routes=20, penalty=1.1) + + self.rc.set_select_links({"sl1": [(23, 1), (26, 1)], "sl2": [(11, 0)]}) + + self.rc.prepare(self.graph.centroids) + + self.rc.execute(perform_assignment=True) + + u, c = self.rc.get_load_results() + u_sl, c_sl = self.rc.get_select_link_results() + + pd.testing.assert_frame_equal(u, c) + pd.testing.assert_frame_equal(u_sl, c_sl) + + self.assertListEqual( + list(u.columns), + ["link_id"] + [mat_name + "_" + dir for dir in ["ab", "ba", "tot"] for mat_name in self.mat.names], + ) + + self.assertListEqual( + list(u_sl.columns), + ["link_id"] + + [ + mat_name + "_" + sl_name + "_" + dir + for sl_name in ["sl1", "sl2"] + for dir in ["ab", "ba"] + for mat_name in self.mat.names + ] + + [mat_name + "_" + sl_name + "_tot" for sl_name in ["sl1", "sl2"] for mat_name in self.mat.names], + ) + + def test_saving(self): + self.rc.set_choice_set_generation("link-penalization", max_routes=20, penalty=1.1) + self.rc.set_select_links({"sl1": [(23, 1), (26, 1)], "sl2": [(11, 0)]}) + self.rc.prepare(self.graph.centroids) + self.rc.execute(perform_assignment=True) + u, c = self.rc.get_load_results() + u_sl, c_sl = self.rc.get_select_link_results() + + self.rc.save_link_flows("ll") + self.rc.save_select_link_flows("sl") + + conn = sqlite3.connect(pathlib.Path(self.project.project_base_path) / "results_database.sqlite") + with conn: + for table, df in [ + ("ll_uncompressed", u), + ("ll_compressed", c), + ("sl_uncompressed", u_sl), + ("sl_compressed", c_sl), + ]: + with self.subTest(table=table): + pd.testing.assert_frame_equal(pd.read_sql(f"select * from {table}", conn), df) + conn.close() + + matrices = Sparse.from_disk( + (pathlib.Path(self.project.project_base_path) / "matrices" / "sl").with_suffix(".omx") + ) + + for name, matrix in self.rc.sl_od_matrix.items(): + np.testing.assert_allclose(matrix.to_scipy().A, matrices[name].A) def generate_line_strings(project, graph, results):