From e6731aef0f6f03d5dd09937dad784419d567c986 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Fri, 24 Apr 2026 08:43:48 +0200 Subject: [PATCH 1/5] Preliminary implementation of OPSI indexes --- bench/indexing/index_query_bench.py | 65 +- plans/ctable-indexes-opsi.md | 1014 +++++++++++++++++++++++++++ src/blosc2/ctable.py | 40 +- src/blosc2/indexing.py | 431 +++++++++++- src/blosc2/indexing_ext.pyx | 302 ++++++++ src/blosc2/ndarray.py | 16 +- 6 files changed, 1838 insertions(+), 30 deletions(-) create mode 100644 plans/ctable-indexes-opsi.md diff --git a/bench/indexing/index_query_bench.py b/bench/indexing/index_query_bench.py index e68985a6..e6509f56 100644 --- a/bench/indexing/index_query_bench.py +++ b/bench/indexing/index_query_bench.py @@ -24,7 +24,7 @@ SIZES = (1_000_000, 2_000_000, 5_000_000, 10_000_000) DEFAULT_REPEATS = 3 -KINDS = ("summary", "bucket", "partial", "full") +KINDS = ("summary", "bucket", "partial", "full", "opsi") DEFAULT_KIND = "bucket" DISTS = ("sorted", "block-shuffled", "permuted", "random") RNG_SEED = 0 @@ -32,6 +32,8 @@ FULL_QUERY_MODES = ("auto", "selective-ooc", "whole-load") DATASET_LAYOUT_VERSION = "payload-ramp-v1" BUILD_MODES = ("auto", "memory", "ooc") +FULL_INDEX_METHODS = ("global-sort", "opsi") +FULL_INDEX_METHODS = ("global-sort", "opsi") COLD_COLUMNS = [ ("rows", lambda result: f"{result['size']:,}"), @@ -443,19 +445,33 @@ def _condition_expr(lo: object, hi: object, dtype: np.dtype, *, query_single_val return f"(id >= {lo_literal}) & (id <= {hi_literal})" +def _index_kind_method(kind: str) -> tuple[str, str | None]: + if kind == "opsi": + return "full", "opsi" + if kind == "full": + return "full", "global-sort" + return kind, None + + def _valid_index_descriptor(arr: blosc2.NDArray, kind: str, optlevel: int, build: str) -> dict | None: + actual_kind, method = _index_kind_method(kind) for descriptor in arr.indexes: if descriptor.get("version") != blosc2_indexing.INDEX_FORMAT_VERSION: continue expected_ooc = build != "memory" - if ( + if not ( descriptor.get("field") == "id" - and descriptor.get("kind") == kind + and descriptor.get("kind") == actual_kind and int(descriptor.get("optlevel", -1)) == int(optlevel) and bool(descriptor.get("ooc", False)) is bool(expected_ooc) and not descriptor.get("stale", False) ): - return descriptor + continue + if method is not None: + build_method = descriptor.get("full", {}).get("build_method", "global-sort") + if build_method != method: + continue + return descriptor return None @@ -482,6 +498,7 @@ def _open_or_build_indexed_array( clevel: int | None, nthreads: int | None, no_mmap: bool, + opsi_max_cycles: int, ) -> tuple[blosc2.NDArray, float]: if path.exists(): arr = blosc2.open(path, mode="a") @@ -493,12 +510,17 @@ def _open_or_build_indexed_array( arr = build_persistent_array(size, dist, id_dtype, path, chunks, blocks) build_start = time.perf_counter() + actual_kind, method = _index_kind_method(kind) kwargs = { "field": "id", - "kind": blosc2.IndexKind[kind.upper()], + "kind": blosc2.IndexKind[actual_kind.upper()], "optlevel": optlevel, "build": build, } + if method is not None: + kwargs["method"] = method + if method == "opsi": + kwargs["opsi_max_cycles"] = opsi_max_cycles cparams = {} if codec is not None: cparams["codec"] = codec @@ -531,6 +553,7 @@ def benchmark_size( kinds: tuple[str, ...], repeats: int, no_mmap: bool, + opsi_max_cycles: int, cold_row_callback=None, ) -> list[dict]: arr = _open_or_build_persistent_array( @@ -574,6 +597,7 @@ def benchmark_size( clevel, nthreads, no_mmap, + opsi_max_cycles, ) idx_cond = blosc2.lazyexpr(condition_str, idx_arr.fields) idx_expr = idx_cond.where(idx_arr) @@ -725,12 +749,28 @@ def parse_args() -> argparse.Namespace: default="auto", help="Index builder policy: auto, memory, or ooc. Default: auto.", ) + parser.add_argument( + "--method", + choices=FULL_INDEX_METHODS, + default="global-sort", + help=( + "Full-index build method. Use with --kind full: global-sort selects the current full " + "builder, opsi selects the OPSI builder. --kind opsi is a shorthand for " + "--kind full --method opsi. Default: global-sort." + ), + ) parser.add_argument( "--full-query-mode", choices=FULL_QUERY_MODES, default="auto", help="How full exact queries should run during the benchmark: auto, selective-ooc, or whole-load.", ) + parser.add_argument( + "--opsi-max-cycles", + type=int, + default=3, + help="Maximum OPSI cycles before falling back to global-sort. Default: 3.", + ) parser.add_argument( "--codec", type=str, @@ -776,9 +816,18 @@ def main() -> None: raise SystemExit("--clevel must be >= 0") if args.nthreads is not None and args.nthreads <= 0: raise SystemExit("--nthreads must be a positive integer") + if args.opsi_max_cycles < 0: + raise SystemExit("--opsi-max-cycles must be >= 0") + if args.method == "opsi" and args.kind != "full": + raise SystemExit("--method opsi requires --kind full; alternatively use --kind opsi") + if args.kind == "opsi" and args.method != "global-sort": + raise SystemExit("--kind opsi already selects the OPSI method; do not also pass --method") sizes = (args.size,) if args.size is not None else SIZES dists = DISTS if args.dist == "all" else (args.dist,) - kinds = KINDS if args.kind == "all" else (args.kind,) + if args.kind == "full" and args.method == "opsi": + kinds = ("opsi",) + else: + kinds = KINDS if args.kind == "all" else (args.kind,) if args.outdir is None: with tempfile.TemporaryDirectory() as tmpdir: @@ -801,6 +850,7 @@ def main() -> None: args.clevel, args.nthreads, args.no_mmap, + args.opsi_max_cycles, ) else: args.outdir.mkdir(parents=True, exist_ok=True) @@ -823,6 +873,7 @@ def main() -> None: args.clevel, args.nthreads, args.no_mmap, + args.opsi_max_cycles, ) @@ -845,6 +896,7 @@ def run_benchmarks( clevel: int | None, nthreads: int | None, no_mmap: bool, + opsi_max_cycles: int, ) -> None: all_results = [] @@ -894,6 +946,7 @@ def cold_progress_callback(row: dict) -> None: kinds, repeats, no_mmap, + opsi_max_cycles, cold_row_callback=cold_progress_callback, ) all_results.extend(size_results) diff --git a/plans/ctable-indexes-opsi.md b/plans/ctable-indexes-opsi.md new file mode 100644 index 00000000..baa52779 --- /dev/null +++ b/plans/ctable-indexes-opsi.md @@ -0,0 +1,1014 @@ +# OPSI indexing implementation plan + +## Goal + +Implement a new index construction method, tentatively named **OPSI**. OPSI builds a completely sorted index (CSI) by repeatedly combining: + +1. **Stage 1**: sort every chunk completely, carrying positions together with values. +2. **Stage 2**: compute a global block relocation order from per-block boundary keys, alternating between block minimum and block maximum values. + +Unlike the in-memory `full` builder, OPSI should avoid sorting the full value array in memory. The current persistent/on-disk `full` builder already uses an out-of-core global-sort path; OPSI is intended as an alternative full-index builder with global memory proportional to the number of blocks, not the number of indexed elements. + +The final output should be usable as a full sorted index: sorted values plus original/global positions. + +## Existing context + +Current index kinds are: + +- `summary` +- `bucket` +- `partial` +- `full` + +Only `full` currently guarantees a completely sorted global index. + +Relevant files/functions: + +- `src/blosc2/indexing.py` + - `_build_full_descriptor()` + - `_build_full_descriptor_ooc()` + - `_build_partial_descriptor()` + - `_build_partial_descriptor_ooc()` + - `_sort_chunk_intra_chunk()` + - `_build_partial_chunk_payloads_intra_chunk()` + - `_prepare_chunk_index_payload_sidecars()` + - `_finalize_chunk_index_payload_storage()` + - `_read_ndarray_linear_span()` + - `_write_ndarray_linear_span()` + - `_rebuild_full_navigation_sidecars_from_handle()` / `_rebuild_full_navigation_sidecars_from_path()` +- `src/blosc2/indexing_ext.pyx` + - existing stable merge-sort helpers for values + positions + - new keysort-like helpers should live here +- `src/blosc2/ctable.py` + - index kind validation + - persistent index construction dispatch + - catalog descriptor handling + +## Product/API decision: OPSI is a full-index build method + +OPSI should be implemented as an alternative **builder** for a `full` index, not as a new persistent index kind. + +The semantic meaning of `kind="full"` remains: + +```text +the final index is completely globally sorted and can provide sorted values plus original/global positions +``` + +The build method controls how that full/CSI index is produced: + +```python +arr.create_index(kind="full") # current behavior, defaults to method="global-sort" +arr.create_index( + kind="full", method="global-sort" +) # explicit current full-index builder +arr.create_index(kind="full", method="opsi") # new OPSI builder +``` + +So OPSI does **not** replace the current full-index construction path. The current method remains available and is named `global-sort` when an explicit method name is needed. + +OPSI internally builds the same sidecars as a full index and stores the final descriptor as a normal `full` descriptor: + +```python +{ + "kind": "full", + "full": { + "values_path": ..., + "positions_path": ..., + "runs": [], + "next_run_id": 0, + "l1_path": ..., + "l2_path": ..., + "sidecar_chunk_len": ..., + "sidecar_block_len": ..., + "build_method": "opsi", + "opsi_cycles": ..., + }, +} +``` + +For indexes built by the current full builder, descriptor metadata can use: + +```text +"build_method": "global-sort" +``` + +or omit `build_method` for backward compatibility and treat missing metadata as `global-sort`. + +This avoids touching most query-planning and ordered-access code, because the final OPSI result is logically a full sorted index. + +A future `IndexKind.OPSI` is not recommended unless there is a strong user-facing reason. If it is ever added, it should be normalized to `kind="full", method="opsi"` rather than stored as a separate persistent catalog kind. + +## Core algorithm + +Let: + +```text +N = number of indexed elements +C = chunk_len = int(array.chunks[0]) +B = block_len = int(array.blocks[0]) +nchunks = ceil(N / C) +nblocks = ceil(N / B) +``` + +OPSI maintains one completed Stage-1 sidecar set at a time: + +```text +values_sidecar +positions_sidecar +block_mins_sidecar +block_maxs_sidecar +``` + +Stage 2 is **not materialized**. It computes a block permutation in memory and feeds the next Stage 1. + +Pseudo-code: + +```python +cycle = 0 +source = original_array_or_previous_stage1_sidecars +block_order = None # identity for first Stage 1 + +while True: + current = build_stage1_sidecars(source, block_order) + + if is_csi(current.block_mins, current.block_maxs): + finalize_as_full_index(current) + remove_previous_stage1_sidecars() + break + + if cycle >= max_cycles: + cleanup_current_sidecars_if_needed() + raise RuntimeError("OPSI did not converge within max_cycles") + + if cycle % 2 == 0: + keys = load_block_mins(current) + else: + keys = load_block_maxs(current) + + block_order = sort_block_ids_by_keys(keys) + + remove_previous_stage1_sidecars() + source = current + cycle += 1 +``` + +The alternation is: + +```text +cycle 0 relocation: block mins +cycle 1 relocation: block maxs +cycle 2 relocation: block mins +cycle 3 relocation: block maxs +... +``` + +## Stage 1 details + +Stage 1 receives a logical stream of chunks. For the first cycle this stream is the original target values in original order. For later cycles this stream is built by gathering blocks from the previous Stage-1 sidecars according to the most recent Stage-2 `block_order`. + +For each destination chunk: + +1. Materialize `chunk_values` in memory. +2. Materialize `chunk_positions` in memory. +3. Sort `chunk_values` and carry `chunk_positions` in lockstep. +4. Write sorted values and positions to new sidecars. +5. Compute per-block min/max from the sorted chunk and write/update min/max sidecars. + +### First Stage 1 + +Read values from the indexed target. Positions are global row positions and can be synthesized: + +```python +chunk_positions = np.arange(chunk_start, chunk_stop, dtype=np.int64) +``` + +or, if the sorting helper returns an order/position vector, converted to global positions: + +```python +sorted_positions = chunk_start + local_order +``` + +### Later Stage 1 cycles + +Build each destination chunk by gathering relocated source blocks from the previous Stage-1 sidecars. + +For destination block slot `dst_block_id`: + +```python +src_block_id = block_order[dst_block_id] +src_start = src_block_id * B +src_stop = min(src_start + B, N) +``` + +Read the source values and source positions block, then copy both into the destination chunk buffer at the destination block offset. + +Positions must always be copied and sorted together with their corresponding values. + +### Per-block min/max sidecars + +After sorting a chunk, each block inside that chunk is sorted. For every global block: + +```python +block_min = sorted_block[0] +block_max = sorted_block[-1] +``` + +Store these in sidecars: + +```text +opsi_stageX.block_mins +opsi_stageX.block_maxs +``` + +These sidecars are regenerated after every Stage 1. They must not be reused across cycles, because Stage 1 re-sorts mixed chunks and therefore changes block boundaries. + +## CSI check + +After every Stage 1, check whether the current Stage-1 sidecars represent a completely sorted global index. + +The condition is: + +```python +block_maxs[i] <= block_mins[i + 1] +``` + +for all adjacent blocks. + +For integer, bool, datetime, and timedelta dtypes, normal `<=` semantics are sufficient. + +For floating dtypes, use ordered NaN-aware comparison: + +```text +finite values < NaN +NaN == NaN for ordering purposes +``` + +So: + +```text +finite <= finite -> normal comparison +finite <= NaN -> True +NaN <= NaN -> True +NaN <= finite -> False +``` + +Do **not** use raw NumPy `<=` for float CSI checks, because `np.nan <= np.nan` is false. + +The CSI check should be streamable. It does not need both complete min/max arrays in memory. It can read min/max sidecars by spans and compare boundaries, carrying the previous block max across span boundaries. + +## Stage 2 details + +Stage 2 chooses one scalar key per block: + +```python +keys = block_mins # min cycle +# or +keys = block_maxs # max cycle +``` + +Then it sorts block ids by these keys. + +The result is: + +```python +block_order[dst_block_id] = src_block_id +``` + +This block order is then consumed by the next Stage 1. No relocated values/positions sidecars are written in Stage 2. + +### Block id dtype + +Use `int64` for block ids/order. + +Rationale: + +- `nblocks <= 2**32 - 1` cannot be guaranteed. +- Source/destination offsets are computed as `block_id * block_len` and must not overflow. +- NumPy and Cython indexing paths naturally use `int64` / `intp` on 64-bit systems. +- Simpler and safer than conditional `uint32` block ids. + +### Stage 2 sorting implementation + +Preferred implementation: a keysort-like Cython helper: + +```python +indexing_ext.keysort_keys_indices(keys, block_ids) +``` + +This sorts `keys` in place and carries `block_ids` with it. After sorting, `block_ids` is the block relocation order. + +Comparator: + +```text +primary key: block key value +secondary key: current block id +``` + +For floats, use the same NaN convention: + +```text +finite values < NaN +NaNs compare equal by value, then tie-break by block id +``` + +Fallback implementation: + +```python +block_order = np.argsort(keys, kind="stable").astype(np.int64, copy=False) +``` + +The fallback is correct but can allocate more memory and may be slower. + +## Keysort / paired-sort implementation + +OPSI should add keysort-style Cython primitives in `src/blosc2/indexing_ext.pyx`. + +### Stage 1 paired value/position sort + +Add a helper similar to PyTables' `keysort`, but adapted for OPSI: + +```cython +def keysort_values_positions(np.ndarray values, np.ndarray positions): + """Sort values in-place and carry positions with the same swaps.""" +``` + +Properties: + +- in-place +- numeric dtypes supported by the existing indexing extension +- positions are `int64` initially +- comparator sorts by `(value, position)` +- float comparator sorts finite values before NaNs +- NaN values are considered equal for primary-key ordering and tie-broken by position + +Comparator for non-floating dtypes: + +```text +(a_value, a_pos) < (b_value, b_pos) +``` + +Comparator for floats: + +```text +a_nan = isnan(a_value) +b_nan = isnan(b_value) + +if a_nan: + if b_nan: + return a_pos < b_pos + return False # NaN after finite +if b_nan: + return True # finite before NaN +if a_value < b_value: + return True +if a_value > b_value: + return False +return a_pos < b_pos +``` + +This makes algorithmic stability unnecessary. Even with quicksort, equal values get deterministic ordering via positions. + +### Stage 2 key/block-id sort + +Add a second helper: + +```cython +def keysort_keys_indices(np.ndarray keys, np.ndarray indices): + """Sort scalar keys in-place and carry int64 indices with them.""" +``` + +This can share most implementation with `keysort_values_positions`, with `indices` as the tie-breaker. + +### Why quicksort/keysort + +Compared with: + +```python +order = np.argsort(values, kind="stable") +sorted_values = values[order] +sorted_positions = positions[order] +``` + +an in-place keysort avoids: + +- explicit `order` allocation +- two gather operations +- merge-sort temporary buffers +- extra memory bandwidth + +This is important for OPSI because chunk sorting happens every cycle and positions must travel with values. + +### Relationship to existing merge-sort helpers + +`indexing_ext.pyx` already contains stable merge-sort helpers. They are useful references and can remain as fallback paths, but OPSI should use the new keysort path for memory efficiency. + +The existing float NaN ordering logic should be mirrored for consistency. + +## Reading and writing sidecars + +### Reading blocks + +For Stage 1 after the first cycle, read source blocks from previous sidecars. + +`get_1d_span_numpy()` is the preferred low-level primitive for 1-D sidecar reads. Existing wrapper `_read_ndarray_linear_span()` is also useful because it handles spans crossing sidecar chunk boundaries and includes fallback behavior. + +If OPSI sidecar geometry enforces: + +```python +sidecar_chunk_len % block_len == 0 +``` + +then every block lies inside one sidecar chunk, and direct `get_1d_span_numpy()` is ideal. + +### Writing chunks + +Build one destination chunk in memory and write it once to each sidecar: + +```python +_write_ndarray_linear_span(values_handle, chunk_start, sorted_values) +_write_ndarray_linear_span(positions_handle, chunk_start, sorted_positions) +``` + +This minimizes small writes. + +### Coalescing reads + +Initial implementation can read one source block at a time. + +Later optimization: detect runs where consecutive destination block slots map to consecutive source blocks, e.g.: + +```text +dst blocks: 0, 1, 2 +src blocks: 8, 9, 10 +``` + +and read one larger span instead of three block reads. + +## Sidecar lifecycle and failure behavior + +During each Stage 1, create a fresh sidecar set: + +```text +values +positions +block_mins +block_maxs +``` + +Only after the new Stage-1 sidecars are fully written and validated should the previous Stage-1 sidecars be removed. + +If the process fails mid-build, the expected recovery is to restart the build, not to resume from a partially completed Stage 2. Therefore Stage 2 does not need persistent sidecars. + +For persistent arrays, temporary sidecar names should include a token, cycle, and stage identifier, for example: + +```text +.opsi.cycle000.values +.opsi.cycle000.positions +.opsi.cycle000.mins +.opsi.cycle000.maxs +``` + +Finalized full-compatible sidecars should use the normal full sidecar names: + +```text +full.values +full.positions +full_nav.l1 +full_nav.l2 +``` + +or should be copied/renamed into that canonical layout. + +Temporary sidecars must be deleted on successful completion and best-effort deleted on exceptions. + +## Navigation sidecars + +Once CSI is reached, build full navigation sidecars for the final sorted values: + +```text +full_nav.l1 +full_nav.l2 +``` + +Use existing helpers where possible: + +- `_rebuild_full_navigation_sidecars_from_handle()` if final values sidecar is already open +- `_rebuild_full_navigation_sidecars_from_path()` if final values path is known +- `_rebuild_full_navigation_sidecars()` if final values are in memory, which is less likely for OPSI + +The final `full` descriptor must include: + +```text +"sidecar_chunk_len": int(values_sidecar.chunks[0]) +"sidecar_block_len": int(values_sidecar.blocks[0]) +"l1_path": ... +"l2_path": ... +"l1_dtype": ... +"l2_dtype": ... +``` + +as expected by existing full-index query paths. + +## Memory requirements + +OPSI's memory pressure is primarily: + +1. one chunk of values + positions +2. one full block-key array + one full block-order array during Stage 2 + +Let: + +```text +N = number of elements +B = block length +C = chunk length +S = value dtype itemsize +P = position itemsize, initially 8 +K = block id itemsize, 8 +nblocks = ceil(N / B) +``` + +Approximate peak memory with keysort: + +```text +C * (S + P) + nblocks * (S + K) +``` + +plus small temporary block buffers and sidecar API overhead. + +If Stage 1 used merge sort or argsort, extra chunk-sized temporary arrays would be required. The keysort approach is therefore strongly preferred. + +### Example: float64/int64 values, int64 positions, block_len=4096 + +```text +S = 8 +P = 8 +K = 8 +Stage 2 global memory = nblocks * 16 bytes +``` + +Approximate Stage-2 key/order memory: + +| N items | nblocks | key + order memory | +|---:|---:|---:| +| 100 million | ~24,414 | ~0.39 MB | +| 1 billion | ~244,141 | ~3.9 MB | +| 10 billion | ~2.44 million | ~39 MB | +| 100 billion | ~24.4 million | ~390 MB | + +For smaller block sizes memory grows proportionally. For example, `block_len=256` uses 16x more block-key/order memory than `block_len=4096`. + +## Convergence and safeguards + +Add a `max_cycles` safeguard. + +Initial options: + +- internal default, e.g. `max_cycles=32` +- user-visible keyword for experimental tuning +- environment variable for testing + +If CSI is not reached within `max_cycles`, the initial implementation falls back to the existing `global-sort` full-index builder so that `method="opsi"` still produces a valid full index. The descriptor records this with: + +```text +"opsi_fallback": "global-sort" +``` + +A stricter future mode may instead raise: + +```python +RuntimeError("OPSI did not converge within max_cycles") +``` + +Track metadata: + +```text +"build_method": "opsi" +"opsi_cycles": cycles_completed +"opsi_max_cycles": max_cycles +``` + +## API integration + +Use `method` as the explicit full-index build-method keyword: + +```python +arr.create_index(kind="full") # equivalent to method="global-sort" +arr.create_index(kind="full", method="global-sort") # current full builder +arr.create_index(kind="full", method="opsi") # OPSI builder +``` + +The `method` keyword should only apply to `kind="full"` initially. Passing `method="opsi"` with `summary`, `bucket`, or `partial` should raise a clear `ValueError`. + +Descriptor metadata: + +```text +"build_method": "global-sort" # current/default full builder +"build_method": "opsi" # OPSI builder +``` + +For backward compatibility, a missing `build_method` in an existing full descriptor should be interpreted as `global-sort`. + +Do not add a persistent `kind="opsi"` catalog entry in the initial implementation. If a user-facing `IndexKind.OPSI` is later desired, normalize it to `kind="full", method="opsi"` during argument handling. + +## Implementation steps + +### 1. Add Cython keysort helpers + +In `src/blosc2/indexing_ext.pyx`: + +- add in-place keysort for supported value dtypes + `int64` positions +- comparator sorts by `(value, position)` +- float comparator handles NaNs as last +- add in-place keysort for block keys + `int64` block ids +- add tests against NumPy reference ordering + +Reference behavior: + +```python +expected_order = np.lexsort((positions, values)) +``` + +For floats with NaNs, use a reference that maps NaNs to a final ordering bucket and then tie-breaks by positions. + +### 2. Add OPSI Stage-1 builder helpers + +In `src/blosc2/indexing.py`: + +- helper to create temporary Stage-1 sidecar set +- helper to build first Stage 1 from original target values +- helper to build later Stage 1 by gathering blocks from previous sidecars according to `block_order` +- write values/positions chunk-by-chunk +- generate block min/max sidecars every Stage 1 + +Possible internal structure: + +```python +@dataclass +class OpsiStageSidecars: + values_path: str | None + positions_path: str | None + mins_path: str | None + maxs_path: str | None + values_handle: blosc2.NDArray | None + positions_handle: blosc2.NDArray | None + mins_handle: blosc2.NDArray | None + maxs_handle: blosc2.NDArray | None + chunk_len: int + block_len: int + nblocks: int +``` + +### 3. Add CSI check helper + +Implement: + +```python +def _opsi_is_csi(mins_handle, maxs_handle, dtype) -> bool: ... +``` + +- stream sidecars by spans +- use raw `<=` for ordered non-float types +- use NaN-aware ordered comparison for float types + +Consider adding a Cython helper for vectorized float CSI checks if Python/NumPy span checks are too slow. + +### 4. Add Stage-2 block-order helper + +Implement: + +```python +def _opsi_compute_block_order(stage, use_max: bool) -> np.ndarray: + keys = _read_sidecar_span( + stage.maxs_handle if use_max else stage.mins_handle, 0, nblocks + ) + block_ids = np.arange(nblocks, dtype=np.int64) + indexing_ext.keysort_keys_indices(keys, block_ids) + return block_ids +``` + +Fallback to NumPy if unsupported dtype: + +```text +order = np.argsort(keys, kind="stable") +return order.astype(np.int64, copy=False) +``` + +For deterministic equal-key behavior, prefer keysort comparator `(key, block_id)`. + +### 5. Add OPSI full descriptor builder + +Implement something like: + +```python +def _build_full_descriptor_opsi( + array, + target, + token, + kind, + dtype, + persistent, + cparams=None, + max_cycles=32, +) -> dict: ... +``` + +It should: + +1. run OPSI cycles +2. finalize/copy final Stage-1 values/positions as full sidecars +3. build full navigation sidecars +4. return a normal `full` descriptor with OPSI metadata +5. clean temporary sidecars + +### 6. Wire API/build dispatch + +Add/propagate the `method` keyword for full indexes: + +```python +create_index(kind="full", method="global-sort") # current full builder +create_index(kind="full", method="opsi") # OPSI builder +``` + +Default behavior remains unchanged: + +```python +create_index(kind="full") # method="global-sort" +``` + +Update: + +- full-index method normalization/validation +- CTable `_build_index_persistent()` dispatch +- top-level `create_index()` dispatch +- descriptor creation kwargs for rebuild/compact so `method="opsi"` survives rebuilds +- descriptor metadata so current full indexes are identified as `global-sort` when metadata is present + +Do not add `opsi` to persistent index-kind validation. OPSI is a method for creating `kind="full"`. + +### 7. Tests + +Add unit tests covering: + +- small arrays with random integers +- duplicate-heavy arrays +- already sorted arrays +- reverse sorted arrays +- arrays with many equal block mins/maxs +- float arrays with NaNs +- final values sidecar is globally sorted according to expected ordering +- positions sidecar maps sorted values back to original values +- CSI condition passes +- query results match existing full index for predicates +- ordered indices match expected sorted positions +- persistent table index create/drop/rebuild if integrated into CTable +- failure on too-small `max_cycles` cleans temporary sidecars best-effort + +Reference check: + +```python +expected_order = np.argsort(values, kind="stable") +expected_values = values[expected_order] +expected_positions = expected_order.astype(np.int64) +``` + +For OPSI with `(value, position)` comparator, this should match stable NumPy ordering for equal values when original positions are increasing. + +For floats with NaNs, ensure NumPy reference places NaNs last and duplicate NaNs retain position order. + +### 8. Benchmarks + +Compare: + +- existing `full` builder +- OPSI builder with keysort +- OPSI fallback using argsort + +Datasets: + +- uniform random int64/float64 +- already sorted +- reverse sorted +- low-cardinality duplicates +- data with NaNs +- large out-of-core persistent arrays + +Measure: + +- build time +- peak memory +- compressed sidecar sizes +- cycles to convergence +- query performance after build + +## Future work: OOC Stage 2 block-key sorting + +The initial OPSI design may compute Stage 2 block order in memory: + +```python +keys = read_all(block_mins_or_maxs) +block_ids = np.arange(nblocks, dtype=np.int64) +keysort_keys_indices(keys, block_ids) +``` + +This requires memory proportional to: + +```text +nblocks * (value_itemsize + 8) +``` + +For extremely large persistent arrays, even this block-level memory can become too large. A future OOC Stage 2 can avoid loading all block keys/order entries at once by using an external merge-sort subroutine over block boundary keys. + +This would mimic the current full OOC builder's sorted-run structure, but over **block keys**, not over all indexed values. + +### OOC Stage 2 outline + +For each Stage 2 cycle: + +1. Choose the key sidecar: + + ```text + key_sidecar = block_mins if use_min_cycle else block_maxs + ``` + +2. Read the key sidecar in runs: + + ```text + for run_start in range(0, nblocks, run_items): + keys = read_span(key_sidecar, run_start, run_stop) + block_ids = run_start + np.arange(run_stop - run_start, dtype=np.int64) + keysort_keys_indices(keys, block_ids) + materialize_sorted_block_key_run(keys, block_ids) + ``` + +3. Merge sorted block-key runs pairwise, reusing the same pair comparator: + + ```text + primary key: block min/max value + secondary key: block id + NaNs sort last for float keys + ``` + +4. The final merged run's `positions` sidecar is the block-order stream: + + ```text + block_order[dst_block_id] = src_block_id + ``` + +5. The next Stage 1 reads the block-order stream by destination chunk/block span instead of requiring the full `block_order` array in memory. + +### Relationship to merge sort + +This does not make OPSI a global merge-sort index builder. The current full OOC builder globally sorts all `N` value-position pairs. OOC Stage 2 for OPSI would sort only: + +```python +nblocks = ceil(N / block_len) +``` + +boundary-key/block-id pairs. OPSI remains an iterative local-sort plus block-relocation algorithm; external merge sort is only an implementation detail for producing the block relocation order when `nblocks` is too large for memory. + +### Abstraction needed + +To support both in-memory and OOC Stage 2, introduce a small block-order abstraction: + +```python +class OpsiBlockOrder: + def read_span(self, start_block: int, stop_block: int) -> np.ndarray: ... +``` + +Implementations: + +- memory-backed: wraps a NumPy `int64` array +- sidecar-backed: wraps the final OOC block-order positions sidecar + +Then Stage 1 can gather relocated blocks without caring whether Stage 2 was memory or OOC. + +### Tradeoff + +OOC Stage 2 lowers peak memory but adds temporary disk IO every OPSI cycle. It should therefore be selected only when necessary, for example under `build="ooc"` or when `build="auto"` detects that `nblocks * (value_itemsize + 8)` exceeds a configured memory threshold. + +## Future work: reuse keysort in existing index builders + +Even if OPSI is not competitive as a general full-index builder, the in-place paired keysort primitives developed for OPSI can be useful elsewhere in the indexing engine. + +The useful primitives are: + +```python +indexing_ext.keysort_values_positions(values, positions) +indexing_ext.keysort_keys_indices(keys, indices) +``` + +They sort in place by: + +```text +primary key: value/key +secondary key: position/index +``` + +with float NaNs ordered last. This gives deterministic stable-sort-like semantics when positions/indices are unique and initialized in original order. + +### Candidate integration points + +#### Full in-memory builder + +Current full in-memory construction uses a global argsort-like pattern: + +```python +order = np.argsort(values, kind="stable") +sorted_values = values[order] +positions = order.astype(np.int64) +``` + +A keysort-based alternative is: + +```python +sorted_values = values.copy() +positions = np.arange(values.size, dtype=np.int64) +indexing_ext.keysort_values_positions(sorted_values, positions) +``` + +This avoids the explicit `order` array and avoids a separate value gather. It should be benchmarked against NumPy's highly optimized argsort, especially for large arrays where memory bandwidth and temporary allocation pressure matter. + +#### Full OOC run generation + +Current full OOC construction creates sorted runs from spans of the base array. This is a strong candidate for keysort: + +```python +run_values = _slice_values_for_target(...).copy() +run_positions = np.arange(run_start, run_stop, dtype=np.int64) +indexing_ext.keysort_values_positions(run_values, run_positions) +materialize_sorted_run(run_values, run_positions) +``` + +This may reduce per-run temporary memory compared with merge-sort-style helpers and can lower memory bandwidth during run creation. The global OOC merge phase remains unchanged. + +#### Partial chunk-local sorting + +Partial indexes sort chunk-local payloads and carry local positions. This is also a natural fit: + +```python +chunk_values = values.copy() +chunk_positions = np.arange(chunk_size, dtype=position_dtype) +keysort_values_positions(chunk_values, chunk_positions) +``` + +However, current partial indexes use compact position dtypes such as `uint8`, `uint16`, or `uint32` when possible. The initial keysort helper expects `int64` positions, so this integration should either: + +1. extend keysort to support compact position dtypes while using widened comparisons for tie-breaking, or +2. restrict keysort use to paths where `int64` positions are already acceptable. + +Keeping compact position sidecars is important for partial-index size. + +#### Bucket and metadata sorting + +`keysort_keys_indices()` may also be useful for sorting smaller metadata arrays, boundary-key arrays, or future bucket/navigation construction steps where scalar keys need to carry ids or offsets. + +### Benchmarking needed + +Before replacing existing sort paths, benchmark: + +- current full in-memory builder vs keysort full in-memory builder +- current full OOC run generation vs keysort run generation +- current partial chunk sorting vs keysort chunk sorting, after compact position dtype support + +Datasets should include: + +- random numeric data +- duplicate-heavy data +- already sorted data +- reverse sorted data +- float data with NaNs + +Expected outcome: + +- full OOC run generation: likely improvement +- partial chunk-local sorting: likely improvement after compact position dtype support +- full in-memory builder: uncertain; depends on NumPy argsort speed vs reduced allocation/memory traffic + +## Open questions + +1. Default `max_cycles` value. +2. Whether non-persistent/in-memory arrays should support OPSI initially or only persistent/OOC paths. +3. Whether final sidecars should be renamed/copied into canonical `full.*` paths or whether descriptor can point directly to final OPSI temporary paths. +4. Whether to add Cython coalesced block-gather helpers after the Python implementation is correct. +5. Memory threshold policy for choosing in-memory vs OOC Stage 2 under `build="auto"`. + +## Summary + +OPSI should be implemented as an iterative chunk-local sort plus global block-relocation method. Stage 1 produces sorted chunks and regenerates block min/max sidecars. Stage 2 sorts one scalar key per block, alternating min and max keys, and produces only an in-memory block permutation. The next Stage 1 gathers blocks according to that permutation and immediately re-sorts chunks. + +Correctness depends on: + +- positions always traveling with values +- block min/max sidecars being regenerated after every Stage 1 +- CSI being checked as `max(block_i) <= min(block_i + 1)` +- NaNs being ordered consistently as last + +Efficiency depends on: + +- not materializing Stage 2 +- using `get_1d_span_numpy()` / `_read_ndarray_linear_span()` for block reads +- writing full chunks to sidecars +- using in-place keysort-style paired sorting for values/positions and keys/block ids + +The product integration is to expose OPSI as `method="opsi"` for `kind="full"`, while the current full builder remains available as `method="global-sort"` and remains the default. OPSI emits normal full-compatible sidecars with metadata recording `build_method="opsi"`. diff --git a/src/blosc2/ctable.py b/src/blosc2/ctable.py index f6f11879..15290f26 100644 --- a/src/blosc2/ctable.py +++ b/src/blosc2/ctable.py @@ -3603,6 +3603,8 @@ def _index_create_kwargs_from_descriptor(self, descriptor: dict) -> dict[str, An "build": build, "cparams": descriptor.get("cparams"), } + if descriptor.get("kind") == "full": + kwargs["method"] = descriptor.get("full", {}).get("build_method", "global-sort") target = descriptor.get("target") or {} if target.get("source") == "expression": kwargs["expression"] = target.get("expression") @@ -3752,6 +3754,7 @@ def _build_index_persistent( build: str, tmpdir: str | None, cparams_obj, + method: str | None = None, ) -> dict: """Build index sidecar files for a persistent-table column; return the descriptor.""" import tempfile @@ -3765,6 +3768,7 @@ def _build_index_persistent( _build_descriptor, _build_full_descriptor, _build_full_descriptor_ooc, + _build_full_descriptor_opsi, _build_levels_descriptor, _build_levels_descriptor_ooc, _build_partial_descriptor, @@ -3808,10 +3812,16 @@ def _build_index_persistent( ) full = None if kind == "full": - with tempfile.TemporaryDirectory(prefix="blosc2-index-ooc-", dir=resolved_tmpdir) as td: - full = _build_full_descriptor_ooc( - proxy, target, token, kind, dtype, persistent, Path(td), cparams_obj + if method == "opsi": + full = _build_full_descriptor_opsi( + proxy, target, token, kind, dtype, persistent, cparams_obj ) + else: + with tempfile.TemporaryDirectory(prefix="blosc2-index-ooc-", dir=resolved_tmpdir) as td: + full = _build_full_descriptor_ooc( + proxy, target, token, kind, dtype, persistent, Path(td), cparams_obj + ) + full["build_method"] = "global-sort" descriptor = _build_descriptor( proxy, target, @@ -3843,11 +3853,15 @@ def _build_index_persistent( if kind == "partial" else None ) - full = ( - _build_full_descriptor(proxy, token, kind, values, persistent, cparams_obj) - if kind == "full" - else None - ) + full = None + if kind == "full": + if method == "opsi": + full = _build_full_descriptor_opsi( + proxy, target, token, kind, dtype, persistent, cparams_obj + ) + else: + full = _build_full_descriptor(proxy, token, kind, values, persistent, cparams_obj) + full["build_method"] = "global-sort" descriptor = _build_descriptor( proxy, target, @@ -3869,7 +3883,7 @@ def _build_index_persistent( _PERSISTENT_INDEXES.pop(proxy_key, None) # evict proxy to avoid memory leak return result - def create_index( + def create_index( # noqa: C901 self, col_name: str | None = None, *, @@ -3898,6 +3912,7 @@ def create_index( _IN_MEMORY_INDEXES, _copy_descriptor, _normalize_build_mode, + _normalize_full_build_method, _normalize_index_cparams, _normalize_index_kind, _target_token, @@ -3905,11 +3920,15 @@ def create_index( from blosc2.indexing import create_index as _ix_create_index cparams_obj = _normalize_index_cparams(kwargs.pop("cparams", None)) + method = kwargs.pop("method", None) if kwargs: raise TypeError(f"unexpected keyword argument(s): {', '.join(sorted(kwargs))}") kind_str = _normalize_index_kind(kind) build_str = _normalize_build_mode(build) + method_str = _normalize_full_build_method(method) if kind_str == "full" else None + if method is not None and kind_str != "full": + raise ValueError("method is only supported for kind=IndexKind.FULL") catalog = self._storage.load_index_catalog() if expression is not None: @@ -3929,6 +3948,7 @@ def create_index( build=build, tmpdir=tmpdir, cparams=cparams_obj, + method=method_str, ) store = _IN_MEMORY_INDEXES.get(id(expr_arr)) if store is None: @@ -3974,6 +3994,7 @@ def create_index( build=build_str, tmpdir=tmpdir, cparams_obj=cparams_obj, + method=method_str, ) else: _ix_create_index( @@ -3985,6 +4006,7 @@ def create_index( build=build, tmpdir=tmpdir, cparams=cparams_obj, + method=method_str, ) store = _IN_MEMORY_INDEXES[id(col_arr)] descriptor = _copy_descriptor(store["indexes"]["__self__"]) diff --git a/src/blosc2/indexing.py b/src/blosc2/indexing.py index e70b1404..5ed0a13a 100644 --- a/src/blosc2/indexing.py +++ b/src/blosc2/indexing.py @@ -811,6 +811,14 @@ def _normalize_build_mode(build: str) -> str: return build +def _normalize_full_build_method(method: str | None) -> str: + if method is None: + return "global-sort" + if method not in {"global-sort", "opsi"}: + raise ValueError("full index method must be one of 'global-sort' or 'opsi'") + return method + + def _field_name_exists(array: blosc2.NDArray, field: str | None) -> bool: return field is not None and array.dtype.fields is not None and field in array.dtype.fields @@ -2993,6 +3001,396 @@ def _merge_run_pair( return SortedRun(out_values_path, out_positions_path, out_cursor) +@dataclass +class OpsiStageSidecars: + values: blosc2.NDArray | np.ndarray | None + positions: blosc2.NDArray | np.ndarray | None + mins: blosc2.NDArray | np.ndarray | None + medians: blosc2.NDArray | np.ndarray | None + maxs: blosc2.NDArray | np.ndarray | None + values_path: str | None + positions_path: str | None + mins_path: str | None + medians_path: str | None + maxs_path: str | None + chunk_len: int + block_len: int + nblocks: int + + +def _opsi_stage_create( + array: blosc2.NDArray, + token: str, + kind: str, + cycle: int, + size: int, + nblocks: int, + dtype: np.dtype, + persistent: bool, + chunk_len: int, + block_len: int, + cparams: dict | blosc2.CParams | None = None, +) -> OpsiStageSidecars: + category = f"opsi_cycle{cycle}" + if persistent: + values, values_info = _create_persistent_sidecar_handle( + array, + token, + kind, + category, + "values", + size, + dtype, + chunks=(chunk_len,), + blocks=(block_len,), + cparams=cparams, + ) + positions, positions_info = _create_persistent_sidecar_handle( + array, + token, + kind, + category, + "positions", + size, + np.dtype(np.int64), + chunks=(chunk_len,), + blocks=(block_len,), + cparams=cparams, + ) + minmax_chunk = max(1, min(nblocks, max(1, chunk_len // block_len))) + mins, mins_info = _create_persistent_sidecar_handle( + array, + token, + kind, + category, + "mins", + nblocks, + dtype, + chunks=(minmax_chunk,), + blocks=(minmax_chunk,), + cparams=cparams, + ) + medians, medians_info = _create_persistent_sidecar_handle( + array, + token, + kind, + category, + "medians", + nblocks, + dtype, + chunks=(minmax_chunk,), + blocks=(minmax_chunk,), + cparams=cparams, + ) + maxs, maxs_info = _create_persistent_sidecar_handle( + array, + token, + kind, + category, + "maxs", + nblocks, + dtype, + chunks=(minmax_chunk,), + blocks=(minmax_chunk,), + cparams=cparams, + ) + return OpsiStageSidecars( + values, + positions, + mins, + medians, + maxs, + values_info["path"], + positions_info["path"], + mins_info["path"], + medians_info["path"], + maxs_info["path"], + chunk_len, + block_len, + nblocks, + ) + + return OpsiStageSidecars( + np.empty(size, dtype=dtype), + np.empty(size, dtype=np.int64), + np.empty(nblocks, dtype=dtype), + np.empty(nblocks, dtype=dtype), + np.empty(nblocks, dtype=dtype), + None, + None, + None, + None, + None, + chunk_len, + block_len, + nblocks, + ) + + +def _opsi_drop_stage(stage: OpsiStageSidecars | None, *, keep_payload: bool = False) -> None: + if stage is None: + return + values_path = stage.values_path + positions_path = stage.positions_path + mins_path = stage.mins_path + medians_path = stage.medians_path + maxs_path = stage.maxs_path + # Release writable B2ND handles before removing their paths. Otherwise, + # when BLOSC_TRACE is enabled, c-blosc2 may report noisy short-write errors + # from handle finalizers trying to flush files that have already been + # removed. + stage.values = None + stage.positions = None + stage.mins = None + stage.medians = None + stage.maxs = None + if not keep_payload: + _remove_sidecar_path(values_path) + _remove_sidecar_path(positions_path) + _remove_sidecar_path(mins_path) + _remove_sidecar_path(medians_path) + _remove_sidecar_path(maxs_path) + + +def _opsi_sort_values_positions(values: np.ndarray, positions: np.ndarray) -> None: + try: + indexing_ext.keysort_values_positions(values, positions) + except (AttributeError, TypeError): + order = np.lexsort((positions, values)) + values[...] = values[order] + positions[...] = positions[order] + + +def _opsi_write_block_boundaries( + stage: OpsiStageSidecars, chunk_start: int, chunk_values: np.ndarray, size: int +) -> None: + if chunk_values.size == 0: + return + block_len = stage.block_len + first_block = chunk_start // block_len + nblocks = math.ceil(chunk_values.size / block_len) + mins = np.empty(nblocks, dtype=chunk_values.dtype) + medians = np.empty(nblocks, dtype=chunk_values.dtype) + maxs = np.empty(nblocks, dtype=chunk_values.dtype) + for idx in range(nblocks): + local_start = idx * block_len + local_stop = min(local_start + block_len, chunk_values.size) + mins[idx] = chunk_values[local_start] + medians[idx] = chunk_values[local_start + (local_stop - local_start - 1) // 2] + maxs[idx] = chunk_values[local_stop - 1] + _write_ndarray_linear_span(stage.mins, first_block, mins) + _write_ndarray_linear_span(stage.medians, first_block, medians) + _write_ndarray_linear_span(stage.maxs, first_block, maxs) + + +def _opsi_build_stage1( + array: blosc2.NDArray, + target: dict, + token: str, + kind: str, + dtype: np.dtype, + persistent: bool, + cycle: int, + previous: OpsiStageSidecars | None, + block_order: np.ndarray | None, + cparams: dict | blosc2.CParams | None = None, +) -> OpsiStageSidecars: + size = int(array.shape[0]) + chunk_len = int(array.chunks[0]) + block_len = int(array.blocks[0]) + if chunk_len % block_len != 0: + raise ValueError("OPSI requires chunk length to be a multiple of block length") + nblocks = math.ceil(size / block_len) + stage = _opsi_stage_create( + array, token, kind, cycle, size, nblocks, dtype, persistent, chunk_len, block_len, cparams + ) + + for chunk_start in range(0, size, chunk_len): + chunk_stop = min(chunk_start + chunk_len, size) + chunk_size = chunk_stop - chunk_start + if previous is None: + chunk_values = _slice_values_for_target(array, target, chunk_start, chunk_stop).astype( + dtype, copy=True + ) + chunk_positions = np.arange(chunk_start, chunk_stop, dtype=np.int64) + else: + chunk_values = np.empty(chunk_size, dtype=dtype) + chunk_positions = np.empty(chunk_size, dtype=np.int64) + local = 0 + while local < chunk_size: + dst_global = chunk_start + local + dst_block_id = dst_global // block_len + src_block_id = int(block_order[dst_block_id]) + src_start = src_block_id * block_len + src_stop = min(src_start + block_len, size) + take = min(src_stop - src_start, chunk_size - local) + _read_ndarray_linear_span(previous.values, src_start, chunk_values[local : local + take]) + _read_ndarray_linear_span( + previous.positions, src_start, chunk_positions[local : local + take] + ) + local += take + _opsi_sort_values_positions(chunk_values, chunk_positions) + _write_ndarray_linear_span(stage.values, chunk_start, chunk_values) + _write_ndarray_linear_span(stage.positions, chunk_start, chunk_positions) + _opsi_write_block_boundaries(stage, chunk_start, chunk_values, size) + return stage + + +def _opsi_ordered_le_array(left: np.ndarray, right: np.ndarray, dtype: np.dtype) -> np.ndarray: + dtype = np.dtype(dtype) + if dtype.kind != "f": + return left <= right + left_nan = np.isnan(left) + right_nan = np.isnan(right) + return (~left_nan & right_nan) | (left_nan & right_nan) | (~left_nan & ~right_nan & (left <= right)) + + +def _opsi_is_csi(stage: OpsiStageSidecars, dtype: np.dtype) -> bool: + if stage.nblocks <= 1: + return True + maxs = _read_sidecar_span(stage.maxs, 0, stage.nblocks - 1) + mins = _read_sidecar_span(stage.mins, 1, stage.nblocks) + return bool(np.all(_opsi_ordered_le_array(maxs, mins, dtype))) + + +def _opsi_compute_block_order(stage: OpsiStageSidecars, key_kind: str, size: int) -> np.ndarray: + # Whole-block relocation assumes fixed-size destination block slots. Keep + # a final partial block, when present, in the final slot for now. + sortable_blocks = stage.nblocks - 1 if size % stage.block_len else stage.nblocks + if key_kind == "min": + key_sidecar = stage.mins + elif key_kind == "median": + key_sidecar = stage.medians + elif key_kind == "max": + key_sidecar = stage.maxs + else: + raise ValueError(f"unsupported OPSI block-order key kind {key_kind!r}") + keys = _read_sidecar_span(key_sidecar, 0, sortable_blocks) + block_ids = np.arange(sortable_blocks, dtype=np.int64) + try: + indexing_ext.keysort_keys_indices(keys, block_ids) + except (AttributeError, TypeError): + order = np.lexsort((block_ids, keys)) + block_ids = block_ids[order] + if sortable_blocks != stage.nblocks: + block_ids = np.concatenate((block_ids, np.asarray([stage.nblocks - 1], dtype=np.int64))) + return block_ids + + +def _build_full_descriptor_opsi( + array: blosc2.NDArray, + target: dict, + token: str, + kind: str, + dtype: np.dtype, + persistent: bool, + cparams: dict | blosc2.CParams | None = None, + max_cycles: int = 3, +) -> dict: + if int(array.shape[0]) == 0: + full = _build_full_descriptor(array, token, kind, np.empty(0, dtype=dtype), persistent, cparams) + full["build_method"] = "opsi" + full["opsi_cycles"] = 0 + full["opsi_max_cycles"] = int(max_cycles) + return full + previous = None + block_order = None + cycle = 0 + try: + while True: + current = _opsi_build_stage1( + array, target, token, kind, dtype, persistent, cycle, previous, block_order, cparams + ) + if _opsi_is_csi(current, dtype): + full = { + "values_path": current.values_path, + "positions_path": current.positions_path, + "runs": [], + "next_run_id": 0, + "build_method": "opsi", + "opsi_cycles": cycle + 1, + "opsi_max_cycles": int(max_cycles), + } + if persistent: + _rebuild_full_navigation_sidecars_from_handle( + array, + token, + kind, + full, + current.values, + dtype, + int(array.shape[0]), + persistent, + cparams, + ) + else: + values = np.asarray(current.values) + positions = np.asarray(current.positions) + values_sidecar = _store_array_sidecar( + array, + token, + kind, + "full", + "values", + values, + persistent, + chunks=(current.chunk_len,), + blocks=(current.block_len,), + cparams=cparams, + ) + positions_sidecar = _store_array_sidecar( + array, + token, + kind, + "full", + "positions", + positions, + persistent, + chunks=(current.chunk_len,), + blocks=(current.block_len,), + cparams=cparams, + ) + full["values_path"] = values_sidecar["path"] + full["positions_path"] = positions_sidecar["path"] + _rebuild_full_navigation_sidecars(array, token, kind, full, values, persistent, cparams) + _opsi_drop_stage(previous) + _opsi_drop_stage(current, keep_payload=persistent) + return full + if cycle >= max_cycles: + if os.getenv("BLOSC_TRACE") or os.getenv("BLOSC_OPSI_TRACE"): + print( + "BLOSC_TRACE: OPSI exceeded opsi_max_cycles=" + f"{int(max_cycles)} without reaching CSI; falling back to full method " + "'global-sort'", + file=sys.stderr, + ) + _opsi_drop_stage(previous) + _opsi_drop_stage(current) + if persistent: + with tempfile.TemporaryDirectory( + prefix="blosc2-index-opsi-fallback-", dir=_resolve_full_index_tmpdir(array, None) + ) as tmpdir: + full = _build_full_descriptor_ooc( + array, target, token, kind, dtype, persistent, Path(tmpdir), cparams + ) + else: + values = _values_for_target(array, target) + full = _build_full_descriptor(array, token, kind, values, persistent, cparams) + full["build_method"] = "opsi" + full["opsi_cycles"] = cycle + 1 + full["opsi_max_cycles"] = int(max_cycles) + full["opsi_fallback"] = "global-sort" + return full + key_kind = ("min", "median", "max")[cycle % 3] + block_order = _opsi_compute_block_order(current, key_kind=key_kind, size=int(array.shape[0])) + _opsi_drop_stage(previous) + previous = current + cycle += 1 + except Exception: + _opsi_drop_stage(previous) + raise + + def _build_full_descriptor_ooc( array: blosc2.NDArray, target: dict, @@ -3159,6 +3557,8 @@ def create_index( directory and in-memory arrays use the system temporary directory. """ cparams = _normalize_index_cparams(kwargs.pop("cparams", None)) + method = kwargs.pop("method", None) + opsi_max_cycles = int(kwargs.pop("opsi_max_cycles", 3)) if kwargs: unexpected = ", ".join(sorted(kwargs)) raise TypeError(f"unexpected keyword argument(s): {unexpected}") @@ -3166,6 +3566,9 @@ def create_index( raise TypeError("kind must be a blosc2.IndexKind") kind = _normalize_index_kind(kind) build = _normalize_build_mode(build) + full_method = _normalize_full_build_method(method) if kind == "full" else None + if method is not None and kind != "full": + raise ValueError("method is only supported for kind=IndexKind.FULL") target, dtype = _normalize_create_index_target(array, field, expression, operands) token = _target_token(target) if persistent is None: @@ -3189,9 +3592,15 @@ def create_index( with tempfile.TemporaryDirectory( prefix="blosc2-index-ooc-", dir=_resolve_full_index_tmpdir(array, tmpdir) ) as tmpdir: - full = _build_full_descriptor_ooc( - array, target, token, kind, dtype, persistent, Path(tmpdir), cparams - ) + if full_method == "opsi": + full = _build_full_descriptor_opsi( + array, target, token, kind, dtype, persistent, cparams, opsi_max_cycles + ) + else: + full = _build_full_descriptor_ooc( + array, target, token, kind, dtype, persistent, Path(tmpdir), cparams + ) + full["build_method"] = "global-sort" descriptor = _build_descriptor( array, target, @@ -3221,11 +3630,15 @@ def create_index( if kind == "partial" else None ) - full = ( - _build_full_descriptor(array, token, kind, values, persistent, cparams) - if kind == "full" - else None - ) + full = None + if kind == "full": + if full_method == "opsi": + full = _build_full_descriptor_opsi( + array, target, token, kind, dtype, persistent, cparams, opsi_max_cycles + ) + else: + full = _build_full_descriptor(array, token, kind, values, persistent, cparams) + full["build_method"] = "global-sort" descriptor = _build_descriptor( array, target, @@ -3732,6 +4145,7 @@ def rebuild_index(array: blosc2.NDArray, field: str | None = None, name: str | N persistent=descriptor["persistent"], build="ooc" if descriptor.get("ooc", False) else "memory", name=descriptor["name"], + method=descriptor.get("full", {}).get("build_method") if descriptor["kind"] == "full" else None, ) return create_index( array, @@ -3741,6 +4155,7 @@ def rebuild_index(array: blosc2.NDArray, field: str | None = None, name: str | N persistent=descriptor["persistent"], build="ooc" if descriptor.get("ooc", False) else "memory", name=descriptor["name"], + method=descriptor.get("full", {}).get("build_method") if descriptor["kind"] == "full" else None, ) diff --git a/src/blosc2/indexing_ext.pyx b/src/blosc2/indexing_ext.pyx index 2be99330..759f5980 100644 --- a/src/blosc2/indexing_ext.pyx +++ b/src/blosc2/indexing_ext.pyx @@ -13,6 +13,10 @@ import cython from libc.stdint cimport int8_t, int16_t, int32_t, int64_t, uint8_t, uint16_t, uint32_t, uint64_t +DEF KEYSORT_STACK = 128 +DEF KEYSORT_INSERTION_CUTOFF = 16 + + ctypedef fused sort_float_t: np.float32_t np.float64_t @@ -29,6 +33,160 @@ ctypedef fused sort_ordered_t: np.uint64_t +ctypedef fused keysort_t: + np.float32_t + np.float64_t + np.int8_t + np.int16_t + np.int32_t + np.int64_t + np.uint8_t + np.uint16_t + np.uint32_t + np.uint64_t + + +cdef inline bint _keysort_pair_lt( + keysort_t left_value, + int64_t left_position, + keysort_t right_value, + int64_t right_position, +) noexcept nogil: + cdef bint left_nan + cdef bint right_nan + if keysort_t is np.float32_t or keysort_t is np.float64_t: + left_nan = left_value != left_value + right_nan = right_value != right_value + if left_nan: + if right_nan: + return left_position < right_position + return False + if right_nan: + return True + if left_value < right_value: + return True + if left_value > right_value: + return False + return left_position < right_position + + +cdef inline void _keysort_pair_swap( + keysort_t[:] values, + np.int64_t[:] positions, + Py_ssize_t left, + Py_ssize_t right, +) noexcept nogil: + cdef keysort_t value_tmp + cdef int64_t position_tmp + if left == right: + return + value_tmp = values[left] + values[left] = values[right] + values[right] = value_tmp + position_tmp = positions[left] + positions[left] = positions[right] + positions[right] = position_tmp + + +@cython.boundscheck(False) +@cython.wraparound(False) +cdef void _keysort_pair_insertion( + keysort_t[:] values, + np.int64_t[:] positions, + Py_ssize_t left, + Py_ssize_t right, +) noexcept nogil: + cdef Py_ssize_t i + cdef Py_ssize_t j + cdef keysort_t value_tmp + cdef int64_t position_tmp + for i in range(left + 1, right + 1): + value_tmp = values[i] + position_tmp = positions[i] + j = i + while j > left and _keysort_pair_lt(value_tmp, position_tmp, values[j - 1], positions[j - 1]): + values[j] = values[j - 1] + positions[j] = positions[j - 1] + j -= 1 + values[j] = value_tmp + positions[j] = position_tmp + + +@cython.boundscheck(False) +@cython.wraparound(False) +cdef void _keysort_pair_quicksort(keysort_t[:] values, np.int64_t[:] positions) noexcept nogil: + cdef Py_ssize_t n = values.shape[0] + cdef Py_ssize_t left = 0 + cdef Py_ssize_t right = n - 1 + cdef Py_ssize_t mid + cdef Py_ssize_t i + cdef Py_ssize_t j + cdef Py_ssize_t left_size + cdef Py_ssize_t right_size + cdef Py_ssize_t stack_left[KEYSORT_STACK] + cdef Py_ssize_t stack_right[KEYSORT_STACK] + cdef int stack_top = 0 + cdef keysort_t pivot_value + cdef int64_t pivot_position + + if n <= 1: + return + + while True: + while right - left > KEYSORT_INSERTION_CUTOFF: + mid = left + ((right - left) >> 1) + + if _keysort_pair_lt(values[mid], positions[mid], values[left], positions[left]): + _keysort_pair_swap(values, positions, mid, left) + if _keysort_pair_lt(values[right], positions[right], values[mid], positions[mid]): + _keysort_pair_swap(values, positions, right, mid) + if _keysort_pair_lt(values[mid], positions[mid], values[left], positions[left]): + _keysort_pair_swap(values, positions, mid, left) + + pivot_value = values[mid] + pivot_position = positions[mid] + _keysort_pair_swap(values, positions, mid, right - 1) + + i = left + j = right - 1 + while True: + i += 1 + while _keysort_pair_lt(values[i], positions[i], pivot_value, pivot_position): + i += 1 + j -= 1 + while _keysort_pair_lt(pivot_value, pivot_position, values[j], positions[j]): + j -= 1 + if i >= j: + break + _keysort_pair_swap(values, positions, i, j) + + _keysort_pair_swap(values, positions, i, right - 1) + + left_size = i - left + right_size = right - i + if left_size < right_size: + if i + 1 < right: + stack_left[stack_top] = i + 1 + stack_right[stack_top] = right + stack_top += 1 + right = i - 1 + else: + if left < i - 1: + stack_left[stack_top] = left + stack_right[stack_top] = i - 1 + stack_top += 1 + left = i + 1 + + if left < right: + _keysort_pair_insertion(values, positions, left, right) + + if stack_top == 0: + break + stack_top -= 1 + left = stack_left[stack_top] + right = stack_right[stack_top] + + cdef inline bint _le_float_pair( sort_float_t left_value, uint64_t left_position, @@ -2193,3 +2351,147 @@ def index_collect_reduced_chunk_nav_positions( lower, lower_inclusive, upper, upper_inclusive ) raise TypeError("unsupported dtype for index_collect_reduced_chunk_nav_positions") + + +cdef void _keysort_ndarray_float32(np.ndarray values, np.ndarray positions): + cdef np.float32_t[:] values_mv = values + cdef np.int64_t[:] positions_mv = positions + with nogil: + _keysort_pair_quicksort(values_mv, positions_mv) + + +cdef void _keysort_ndarray_float64(np.ndarray values, np.ndarray positions): + cdef np.float64_t[:] values_mv = values + cdef np.int64_t[:] positions_mv = positions + with nogil: + _keysort_pair_quicksort(values_mv, positions_mv) + + +cdef void _keysort_ndarray_int8(np.ndarray values, np.ndarray positions): + cdef np.int8_t[:] values_mv = values + cdef np.int64_t[:] positions_mv = positions + with nogil: + _keysort_pair_quicksort(values_mv, positions_mv) + + +cdef void _keysort_ndarray_int16(np.ndarray values, np.ndarray positions): + cdef np.int16_t[:] values_mv = values + cdef np.int64_t[:] positions_mv = positions + with nogil: + _keysort_pair_quicksort(values_mv, positions_mv) + + +cdef void _keysort_ndarray_int32(np.ndarray values, np.ndarray positions): + cdef np.int32_t[:] values_mv = values + cdef np.int64_t[:] positions_mv = positions + with nogil: + _keysort_pair_quicksort(values_mv, positions_mv) + + +cdef void _keysort_ndarray_int64(np.ndarray values, np.ndarray positions): + cdef np.int64_t[:] values_mv = values + cdef np.int64_t[:] positions_mv = positions + with nogil: + _keysort_pair_quicksort(values_mv, positions_mv) + + +cdef void _keysort_ndarray_uint8(np.ndarray values, np.ndarray positions): + cdef np.uint8_t[:] values_mv = values + cdef np.int64_t[:] positions_mv = positions + with nogil: + _keysort_pair_quicksort(values_mv, positions_mv) + + +cdef void _keysort_ndarray_uint16(np.ndarray values, np.ndarray positions): + cdef np.uint16_t[:] values_mv = values + cdef np.int64_t[:] positions_mv = positions + with nogil: + _keysort_pair_quicksort(values_mv, positions_mv) + + +cdef void _keysort_ndarray_uint32(np.ndarray values, np.ndarray positions): + cdef np.uint32_t[:] values_mv = values + cdef np.int64_t[:] positions_mv = positions + with nogil: + _keysort_pair_quicksort(values_mv, positions_mv) + + +cdef void _keysort_ndarray_uint64(np.ndarray values, np.ndarray positions): + cdef np.uint64_t[:] values_mv = values + cdef np.int64_t[:] positions_mv = positions + with nogil: + _keysort_pair_quicksort(values_mv, positions_mv) + + +cdef void _keysort_ndarray(np.ndarray values, np.ndarray positions): + cdef np.dtype dtype = values.dtype + if dtype == np.dtype(np.float32): + _keysort_ndarray_float32(values, positions) + return + if dtype == np.dtype(np.float64): + _keysort_ndarray_float64(values, positions) + return + if dtype == np.dtype(np.int8): + _keysort_ndarray_int8(values, positions) + return + if dtype == np.dtype(np.int16): + _keysort_ndarray_int16(values, positions) + return + if dtype == np.dtype(np.int32): + _keysort_ndarray_int32(values, positions) + return + if dtype == np.dtype(np.int64): + _keysort_ndarray_int64(values, positions) + return + if dtype == np.dtype(np.uint8): + _keysort_ndarray_uint8(values, positions) + return + if dtype == np.dtype(np.uint16): + _keysort_ndarray_uint16(values, positions) + return + if dtype == np.dtype(np.uint32): + _keysort_ndarray_uint32(values, positions) + return + if dtype == np.dtype(np.uint64): + _keysort_ndarray_uint64(values, positions) + return + if dtype == np.dtype(np.bool_): + _keysort_ndarray_uint8(values.view(np.uint8), positions) + return + if dtype.kind in {"m", "M"}: + _keysort_ndarray_int64(values.view(np.int64), positions) + return + raise TypeError("unsupported dtype for keysort") + + +def keysort_values_positions(np.ndarray values, np.ndarray positions): + """Sort *values* in-place and carry int64 *positions* in lockstep. + + Sort order is deterministic: primary key is the value and secondary key is + the int64 position. Float NaNs sort last and NaNs with equal primary order + are tie-broken by position. + """ + if values.ndim != 1 or positions.ndim != 1: + raise ValueError("values and positions must be 1-D arrays") + if values.shape[0] != positions.shape[0]: + raise ValueError("values and positions must have the same length") + if positions.dtype != np.dtype(np.int64): + raise TypeError("positions must have dtype int64") + if values.shape[0] <= 1: + return None + _keysort_ndarray(values, positions) + return None + + +def keysort_keys_indices(np.ndarray keys, np.ndarray indices): + """Sort scalar *keys* in-place and carry int64 *indices* in lockstep.""" + if keys.ndim != 1 or indices.ndim != 1: + raise ValueError("keys and indices must be 1-D arrays") + if keys.shape[0] != indices.shape[0]: + raise ValueError("keys and indices must have the same length") + if indices.dtype != np.dtype(np.int64): + raise TypeError("indices must have dtype int64") + if keys.shape[0] <= 1: + return None + _keysort_ndarray(keys, indices) + return None diff --git a/src/blosc2/ndarray.py b/src/blosc2/ndarray.py index d336178c..2d254e95 100644 --- a/src/blosc2/ndarray.py +++ b/src/blosc2/ndarray.py @@ -4830,13 +4830,15 @@ def create_index( same filesystem. In-memory arrays fall back to the system temporary directory. kwargs : dict, optional - Keyword arguments forwarded to the index builder. At the moment the - supported option is ``cparams``. Pass ``cparams`` to control the - compression settings used for index sidecars, including - ``codec``, ``clevel``, and ``nthreads``. If provided, - ``cparams["nthreads"]`` becomes the default build-thread count for - intra-chunk sorting unless ``BLOSC2_INDEX_BUILD_THREADS`` overrides - it. + Keyword arguments forwarded to the index builder. Supported options + include ``cparams`` and, for ``kind=FULL``, ``method``. Pass + ``method="global-sort"`` to select the current full-index builder + explicitly, or ``method="opsi"`` to select the iterative OPSI full + builder. Pass ``cparams`` to control the compression settings used + for index sidecars, including ``codec``, ``clevel``, and + ``nthreads``. If provided, ``cparams["nthreads"]`` becomes the + default build-thread count for intra-chunk sorting unless + ``BLOSC2_INDEX_BUILD_THREADS`` overrides it. Notes ----- From 0d6529c19b3af3fc89b4196dbb25da37f98eefab Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Fri, 24 Apr 2026 12:20:40 +0200 Subject: [PATCH 2/5] OPSI is a first-class index kind now, and not meant for achieving CSI anymore --- bench/indexing/index_query_bench.py | 98 ++++---- .../tutorials/14.indexing-arrays.ipynb | 19 +- .../tutorials/15.indexing-ctables.ipynb | 4 +- doc/reference/ctable.rst | 5 +- src/blosc2/__init__.py | 2 + src/blosc2/ctable.py | 47 ++-- src/blosc2/indexing.py | 232 +++++++++++------- src/blosc2/ndarray.py | 33 ++- tests/ndarray/test_indexing.py | 5 +- 9 files changed, 267 insertions(+), 178 deletions(-) diff --git a/bench/indexing/index_query_bench.py b/bench/indexing/index_query_bench.py index e6509f56..e988bba4 100644 --- a/bench/indexing/index_query_bench.py +++ b/bench/indexing/index_query_bench.py @@ -32,8 +32,7 @@ FULL_QUERY_MODES = ("auto", "selective-ooc", "whole-load") DATASET_LAYOUT_VERSION = "payload-ramp-v1" BUILD_MODES = ("auto", "memory", "ooc") -FULL_INDEX_METHODS = ("global-sort", "opsi") -FULL_INDEX_METHODS = ("global-sort", "opsi") +FULL_INDEX_METHODS = ("global-sort",) COLD_COLUMNS = [ ("rows", lambda result: f"{result['size']:,}"), @@ -379,39 +378,40 @@ def _open_index_sidecar(path: str | os.PathLike[str], no_mmap: bool): def index_sizes(descriptor: dict, *, no_mmap: bool) -> tuple[int, int]: logical = 0 disk = 0 + + def add_sidecar(path: str | None) -> None: + nonlocal logical, disk + if not path: + return + array = _open_index_sidecar(path, no_mmap) + logical += int(np.prod(array.shape)) * array.dtype.itemsize + disk += os.path.getsize(path) + for level_info in descriptor["levels"].values(): - dtype = np.dtype(level_info["dtype"]) - logical += dtype.itemsize * level_info["nsegments"] - if level_info["path"]: - disk += os.path.getsize(level_info["path"]) - - light = descriptor.get("light") - if light is not None: - for key in ("values_path", "bucket_positions_path", "offsets_path"): - array = _open_index_sidecar(light[key], no_mmap) - logical += int(np.prod(array.shape)) * array.dtype.itemsize - disk += os.path.getsize(light[key]) - - reduced = descriptor.get("reduced") - if reduced is not None: - values = _open_index_sidecar(reduced["values_path"], no_mmap) - positions = _open_index_sidecar(reduced["positions_path"], no_mmap) - offsets = _open_index_sidecar(reduced["offsets_path"], no_mmap) - logical += values.shape[0] * values.dtype.itemsize - logical += positions.shape[0] * positions.dtype.itemsize - logical += offsets.shape[0] * offsets.dtype.itemsize - disk += os.path.getsize(reduced["values_path"]) - disk += os.path.getsize(reduced["positions_path"]) - disk += os.path.getsize(reduced["offsets_path"]) + add_sidecar(level_info.get("path")) + + bucket = descriptor.get("bucket") + if bucket is not None: + for key in ("values_path", "bucket_positions_path", "offsets_path", "l1_path", "l2_path"): + add_sidecar(bucket.get(key)) + + partial = descriptor.get("partial") + if partial is not None: + for key in ("values_path", "positions_path", "offsets_path", "l1_path", "l2_path"): + add_sidecar(partial.get(key)) full = descriptor.get("full") if full is not None: - values = _open_index_sidecar(full["values_path"], no_mmap) - positions = _open_index_sidecar(full["positions_path"], no_mmap) - logical += values.shape[0] * values.dtype.itemsize - logical += positions.shape[0] * positions.dtype.itemsize - disk += os.path.getsize(full["values_path"]) - disk += os.path.getsize(full["positions_path"]) + for key in ("values_path", "positions_path", "l1_path", "l2_path"): + add_sidecar(full.get(key)) + for run in full.get("runs", ()): + add_sidecar(run.get("values_path")) + add_sidecar(run.get("positions_path")) + + opsi = descriptor.get("opsi") + if opsi is not None: + for key in ("values_path", "positions_path"): + add_sidecar(opsi.get(key)) return logical, disk @@ -446,8 +446,6 @@ def _condition_expr(lo: object, hi: object, dtype: np.dtype, *, query_single_val def _index_kind_method(kind: str) -> tuple[str, str | None]: - if kind == "opsi": - return "full", "opsi" if kind == "full": return "full", "global-sort" return kind, None @@ -498,7 +496,7 @@ def _open_or_build_indexed_array( clevel: int | None, nthreads: int | None, no_mmap: bool, - opsi_max_cycles: int, + opsi_max_cycles: int | None, ) -> tuple[blosc2.NDArray, float]: if path.exists(): arr = blosc2.open(path, mode="a") @@ -519,7 +517,7 @@ def _open_or_build_indexed_array( } if method is not None: kwargs["method"] = method - if method == "opsi": + if actual_kind == "opsi" and opsi_max_cycles is not None: kwargs["opsi_max_cycles"] = opsi_max_cycles cparams = {} if codec is not None: @@ -553,7 +551,7 @@ def benchmark_size( kinds: tuple[str, ...], repeats: int, no_mmap: bool, - opsi_max_cycles: int, + opsi_max_cycles: int | None, cold_row_callback=None, ) -> list[dict]: arr = _open_or_build_persistent_array( @@ -754,9 +752,8 @@ def parse_args() -> argparse.Namespace: choices=FULL_INDEX_METHODS, default="global-sort", help=( - "Full-index build method. Use with --kind full: global-sort selects the current full " - "builder, opsi selects the OPSI builder. --kind opsi is a shorthand for " - "--kind full --method opsi. Default: global-sort." + "Full-index build method. OPSI is a separate index kind; use --kind opsi to benchmark it. " + "Default: global-sort." ), ) parser.add_argument( @@ -768,8 +765,11 @@ def parse_args() -> argparse.Namespace: parser.add_argument( "--opsi-max-cycles", type=int, - default=3, - help="Maximum OPSI cycles before falling back to global-sort. Default: 3.", + default=None, + help=( + "Maximum OPSI cycles for --kind opsi. Default: derive from optlevel " + "(optlevel for optlevel < 8, optlevel * 2 otherwise)." + ), ) parser.add_argument( "--codec", @@ -816,18 +816,11 @@ def main() -> None: raise SystemExit("--clevel must be >= 0") if args.nthreads is not None and args.nthreads <= 0: raise SystemExit("--nthreads must be a positive integer") - if args.opsi_max_cycles < 0: + if args.opsi_max_cycles is not None and args.opsi_max_cycles < 0: raise SystemExit("--opsi-max-cycles must be >= 0") - if args.method == "opsi" and args.kind != "full": - raise SystemExit("--method opsi requires --kind full; alternatively use --kind opsi") - if args.kind == "opsi" and args.method != "global-sort": - raise SystemExit("--kind opsi already selects the OPSI method; do not also pass --method") sizes = (args.size,) if args.size is not None else SIZES dists = DISTS if args.dist == "all" else (args.dist,) - if args.kind == "full" and args.method == "opsi": - kinds = ("opsi",) - else: - kinds = KINDS if args.kind == "all" else (args.kind,) + kinds = KINDS if args.kind == "all" else (args.kind,) if args.outdir is None: with tempfile.TemporaryDirectory() as tmpdir: @@ -896,7 +889,7 @@ def run_benchmarks( clevel: int | None, nthreads: int | None, no_mmap: bool, - opsi_max_cycles: int, + opsi_max_cycles: int | None, ) -> None: all_results = [] @@ -915,7 +908,8 @@ def run_benchmarks( f"full_query_mode={full_query_mode}, index_codec={'auto' if codec is None else codec.name}, " f"index_clevel={'auto' if clevel is None else clevel}, " f"index_nthreads={'auto' if nthreads is None else nthreads}, " - f"index_mmap={'off' if no_mmap else 'on'}" + f"index_mmap={'off' if no_mmap else 'on'}, " + f"opsi_max_cycles={'optlevel' if opsi_max_cycles is None else opsi_max_cycles}" ) cold_widths = progress_widths(COLD_COLUMNS, sizes, dists, kinds, id_dtype) print() diff --git a/doc/getting_started/tutorials/14.indexing-arrays.ipynb b/doc/getting_started/tutorials/14.indexing-arrays.ipynb index e3a29258..302dc571 100644 --- a/doc/getting_started/tutorials/14.indexing-arrays.ipynb +++ b/doc/getting_started/tutorials/14.indexing-arrays.ipynb @@ -7,7 +7,7 @@ "source": [ "# Indexing Arrays\n", "\n", - "Blosc2 can attach indexes to 1-D `NDArray` objects and to fields inside 1-D structured arrays. These indexes accelerate selective masks, and `full` indexes can also drive ordered access directly through `sort(order=...)`, `NDArray.argsort(order=...)`, `LazyExpr.argsort(order=...)`, and `iter_sorted(...)`.\n", + "Blosc2 can attach indexes to 1-D `NDArray` objects and to fields inside 1-D structured arrays. These indexes accelerate selective masks, and `full` indexes can also drive ordered access directly through `sort(order=...)`, `NDArray.argsort(order=...)`, `LazyExpr.argsort(order=...)`, and `iter_sorted(...)`. OPSI indexes are a separate tunable iterative-ordering kind: they improve the physical order used for exact filtering, but they are not intended to converge to a completely sorted `full`/CSI index.\n", "\n", "This tutorial covers:\n", "\n", @@ -108,16 +108,19 @@ "source": [ "## Index kinds and how to create them\n", "\n", - "Blosc2 currently supports four index kinds:\n", + "Blosc2 currently supports five index kinds:\n", "\n", "- `summary`: compact summaries only,\n", "- `bucket`: summary levels plus lightweight per-block payloads,\n", "- `partial`: richer payloads for positional filtering,\n", + "- `opsi`: tunable iterative ordering for exact filtering,\n", "- `full`: globally sorted payloads for positional filtering and ordered reuse.\n", "\n", + "`OPSI` is intentionally a separate kind, not a `full` index construction method. It performs a configurable number of ordering cycles and then keeps that iterative ordering as-is. Achieving a completely sorted index (CSI) is not a goal for OPSI; use `FULL` when you require global sorted order or direct ordered reuse. By default, `OPSI` uses `optlevel` cycles for `optlevel < 8`, and `2 * optlevel` cycles for `optlevel >= 8`. You can override this with `opsi_max_cycles=...`.\n", + "\n", "There is one active index per target field or expression. If you create another index on the same target, it replaces the previous one. The easiest way to compare kinds is to build them on separate arrays.\n", "\n", - "The next cell times index creation and reports the compressed storage footprint of each index relative to the compressed base array." + "The next cell times index creation and reports the compressed storage footprint of each index relative to the compressed base array.\n" ] }, { @@ -152,6 +155,7 @@ " blosc2.IndexKind.SUMMARY,\n", " blosc2.IndexKind.BUCKET,\n", " blosc2.IndexKind.PARTIAL,\n", + " blosc2.IndexKind.OPSI,\n", " blosc2.IndexKind.FULL,\n", "):\n", " arr = data.copy()\n", @@ -238,7 +242,7 @@ "source": [ "### Timing the mask with and without indexes\n", "\n", - "The next cell measures the same selective mask on all four index kinds and compares it with a forced full scan. On this workload, `partial` and `full` usually show the clearest benefit because they carry richer payloads for positional filtering." + "The next cell measures the same selective mask on all five index kinds and compares it with a forced full scan. On this workload, `partial`, `opsi`, and `full` usually show the clearest benefit because they carry richer payloads for positional filtering.\n" ] }, { @@ -299,7 +303,9 @@ "source": [ "## `full` indexes and ordered access\n", "\n", - "A `full` index stores a global sorted payload. This is the required index tier for direct ordered reuse. Build it directly with `create_index(kind=blosc2.IndexKind.FULL)`." + "A `full` index stores a global sorted payload. This is the required index tier for direct ordered reuse. Build it directly with `create_index(kind=blosc2.IndexKind.FULL)`.\n", + "\n", + "If you only want a tunable iterative ordering index for exact filtering, use `create_index(kind=blosc2.IndexKind.OPSI)` instead. OPSI can improve cold-query locality as `optlevel` or `opsi_max_cycles` increases, but it does not replace `FULL` for globally sorted access.\n" ] }, { @@ -585,11 +591,12 @@ "## Practical guidance\n", "\n", "- Use `partial` when your main goal is faster selective masks.\n", + "- Use `opsi` when you want exact filtering with tunable iterative ordering. Increase `optlevel` or pass `opsi_max_cycles` to spend more build time on ordering; do not expect OPSI to become a `full`/CSI index.\n", "- Use `full` when you also want ordered reuse through `sort(order=...)`, `NDArray.argsort(order=...)`, `LazyExpr.argsort(order=...)`, or `iter_sorted(...)`.\n", "- Persist the base array if you want indexes to survive reopen automatically.\n", "- After unsupported mutations, use `rebuild_index()`.\n", "- For append-heavy `full` indexes, compact explicitly at convenient maintenance boundaries instead of on every append.\n", - "- Measure your own workload: compact indexes, predicate selectivity, and ordered access needs all affect which kind is best.\n" + "- Measure your own workload: compact indexes, predicate selectivity, iterative-ordering level, and ordered access needs all affect which kind is best.\n" ] }, { diff --git a/doc/getting_started/tutorials/15.indexing-ctables.ipynb b/doc/getting_started/tutorials/15.indexing-ctables.ipynb index c9364978..95dde002 100644 --- a/doc/getting_started/tutorials/15.indexing-ctables.ipynb +++ b/doc/getting_started/tutorials/15.indexing-ctables.ipynb @@ -130,7 +130,8 @@ "source": [ "## Creating an index\n", "\n", - "Call `create_index(col_name)` to build a bucket index on a column.\n", + "Call `create_index(col_name)` to build a bucket index on a column. Pass `kind=...` to choose another index kind, including `blosc2.IndexKind.OPSI` for tunable iterative ordering or `blosc2.IndexKind.FULL` for globally sorted indexes that can also support ordered reuse. OPSI is a separate exact-filtering index kind, not a slower way to build a `FULL`/CSI index; its build effort is controlled by `optlevel` or the explicit `opsi_max_cycles` keyword.\n", + "\n", "The returned `CTableIndex` handle shows the column name, kind, and whether the index is stale.\n" ] }, @@ -494,6 +495,7 @@ "- **Mutations** (`append`, `extend`, `setitem`, `assign`, `sort_by`, `compact`) mark indexes stale.\n", "- **Stale indexes** trigger automatic scan fallback — no user intervention needed.\n", "- **Persistent indexes** survive table close and reopen.\n", + "- **OPSI indexes** are tunable iterative-ordering indexes for exact filtering; use `FULL` for completely sorted ordered reuse.\n", "- **Views** cannot own indexes; only root tables can.\n" ] }, diff --git a/doc/reference/ctable.rst b/doc/reference/ctable.rst index 0cca3749..286778c7 100644 --- a/doc/reference/ctable.rst +++ b/doc/reference/ctable.rst @@ -135,7 +135,10 @@ CTable indexes can also target **direct expressions** over stored columns via predicates without adding either a computed column or a materialized stored one. A matching ``FULL`` direct-expression index can also be reused by ordering paths such as :meth:`CTable.sort_by` when sorting by a computed column backed by the -same expression. +same expression. ``OPSI`` indexes are a separate exact-filtering tier with a +tunable number of iterative ordering cycles; they are not intended to converge +to a completely sorted ``FULL``/CSI index, so use ``FULL`` when globally sorted +ordered reuse is required. .. autosummary:: diff --git a/src/blosc2/__init__.py b/src/blosc2/__init__.py index b28cb218..6e64f2fb 100644 --- a/src/blosc2/__init__.py +++ b/src/blosc2/__init__.py @@ -228,6 +228,8 @@ class IndexKind(Enum): PARTIAL = "partial" #: Globally ordered payloads for exact filtering and direct ordered reuse. FULL = "full" + #: Tunable iterative-ordering payloads for exact filtering; not a full/CSI index. + OPSI = "opsi" from .blosc2_ext import ( diff --git a/src/blosc2/ctable.py b/src/blosc2/ctable.py index 15290f26..420e7459 100644 --- a/src/blosc2/ctable.py +++ b/src/blosc2/ctable.py @@ -3605,6 +3605,8 @@ def _index_create_kwargs_from_descriptor(self, descriptor: dict) -> dict[str, An } if descriptor.get("kind") == "full": kwargs["method"] = descriptor.get("full", {}).get("build_method", "global-sort") + if descriptor.get("kind") == "opsi": + kwargs["opsi_max_cycles"] = descriptor.get("opsi", {}).get("max_cycles") target = descriptor.get("target") or {} if target.get("source") == "expression": kwargs["expression"] = target.get("expression") @@ -3755,6 +3757,7 @@ def _build_index_persistent( tmpdir: str | None, cparams_obj, method: str | None = None, + opsi_max_cycles: int | None = None, ) -> dict: """Build index sidecar files for a persistent-table column; return the descriptor.""" import tempfile @@ -3768,9 +3771,9 @@ def _build_index_persistent( _build_descriptor, _build_full_descriptor, _build_full_descriptor_ooc, - _build_full_descriptor_opsi, _build_levels_descriptor, _build_levels_descriptor_ooc, + _build_opsi_descriptor, _build_partial_descriptor, _build_partial_descriptor_ooc, _copy_descriptor, @@ -3792,6 +3795,8 @@ def _build_index_persistent( persistent = True dtype = col_arr.dtype use_ooc = _resolve_ooc_mode(kind, build) + if opsi_max_cycles is None: + opsi_max_cycles = max(1, optlevel if optlevel < 8 else optlevel * 2) if use_ooc: resolved_tmpdir = _resolve_full_index_tmpdir(proxy, tmpdir) @@ -3811,17 +3816,17 @@ def _build_index_persistent( else None ) full = None + opsi = None if kind == "full": - if method == "opsi": - full = _build_full_descriptor_opsi( - proxy, target, token, kind, dtype, persistent, cparams_obj + with tempfile.TemporaryDirectory(prefix="blosc2-index-ooc-", dir=resolved_tmpdir) as td: + full = _build_full_descriptor_ooc( + proxy, target, token, kind, dtype, persistent, Path(td), cparams_obj ) - else: - with tempfile.TemporaryDirectory(prefix="blosc2-index-ooc-", dir=resolved_tmpdir) as td: - full = _build_full_descriptor_ooc( - proxy, target, token, kind, dtype, persistent, Path(td), cparams_obj - ) - full["build_method"] = "global-sort" + full["build_method"] = "global-sort" + if kind == "opsi": + opsi = _build_opsi_descriptor( + proxy, target, token, kind, dtype, persistent, cparams_obj, opsi_max_cycles + ) descriptor = _build_descriptor( proxy, target, @@ -3837,6 +3842,7 @@ def _build_index_persistent( partial, full, cparams_obj, + opsi, ) else: values = _values_for_target(proxy, target) @@ -3854,14 +3860,14 @@ def _build_index_persistent( else None ) full = None + opsi = None if kind == "full": - if method == "opsi": - full = _build_full_descriptor_opsi( - proxy, target, token, kind, dtype, persistent, cparams_obj - ) - else: - full = _build_full_descriptor(proxy, token, kind, values, persistent, cparams_obj) - full["build_method"] = "global-sort" + full = _build_full_descriptor(proxy, token, kind, values, persistent, cparams_obj) + full["build_method"] = "global-sort" + if kind == "opsi": + opsi = _build_opsi_descriptor( + proxy, target, token, kind, dtype, persistent, cparams_obj, opsi_max_cycles + ) descriptor = _build_descriptor( proxy, target, @@ -3877,6 +3883,7 @@ def _build_index_persistent( partial, full, cparams_obj, + opsi, ) result = _copy_descriptor(descriptor) @@ -3921,6 +3928,9 @@ def create_index( # noqa: C901 cparams_obj = _normalize_index_cparams(kwargs.pop("cparams", None)) method = kwargs.pop("method", None) + opsi_max_cycles = kwargs.pop("opsi_max_cycles", None) + if opsi_max_cycles is not None: + opsi_max_cycles = max(1, int(opsi_max_cycles)) if kwargs: raise TypeError(f"unexpected keyword argument(s): {', '.join(sorted(kwargs))}") @@ -3949,6 +3959,7 @@ def create_index( # noqa: C901 tmpdir=tmpdir, cparams=cparams_obj, method=method_str, + opsi_max_cycles=opsi_max_cycles, ) store = _IN_MEMORY_INDEXES.get(id(expr_arr)) if store is None: @@ -3995,6 +4006,7 @@ def create_index( # noqa: C901 tmpdir=tmpdir, cparams_obj=cparams_obj, method=method_str, + opsi_max_cycles=opsi_max_cycles, ) else: _ix_create_index( @@ -4007,6 +4019,7 @@ def create_index( # noqa: C901 tmpdir=tmpdir, cparams=cparams_obj, method=method_str, + opsi_max_cycles=opsi_max_cycles, ) store = _IN_MEMORY_INDEXES[id(col_arr)] descriptor = _copy_descriptor(store["indexes"]["__self__"]) diff --git a/src/blosc2/indexing.py b/src/blosc2/indexing.py index 5ed0a13a..947e287b 100644 --- a/src/blosc2/indexing.py +++ b/src/blosc2/indexing.py @@ -45,6 +45,7 @@ "bucket": ("chunk", "block"), "partial": ("chunk", "block", "subblock"), "full": ("chunk", "block", "subblock"), + "opsi": ("chunk", "block", "subblock"), } _IN_MEMORY_INDEXES: dict[int, dict] = {} @@ -287,6 +288,8 @@ def _copy_descriptor(descriptor: dict) -> dict: copied["full"] = descriptor["full"].copy() if "runs" in copied["full"]: copied["full"]["runs"] = [run.copy() for run in copied["full"]["runs"]] + if descriptor.get("opsi") is not None: + copied["opsi"] = descriptor["opsi"].copy() return copied @@ -814,8 +817,8 @@ def _normalize_build_mode(build: str) -> str: def _normalize_full_build_method(method: str | None) -> str: if method is None: return "global-sort" - if method not in {"global-sort", "opsi"}: - raise ValueError("full index method must be one of 'global-sort' or 'opsi'") + if method != "global-sort": + raise ValueError("full index method must be 'global-sort'; use kind=IndexKind.OPSI for OPSI indexes") return method @@ -1545,7 +1548,7 @@ def _position_dtype(max_value: int) -> np.dtype: def _resolve_ooc_mode(kind: str, build: str) -> bool: - if kind not in {"summary", "bucket", "partial", "full"}: + if kind not in {"summary", "bucket", "partial", "full", "opsi"}: return False build = _normalize_build_mode(build) return build != "memory" @@ -3277,7 +3280,7 @@ def _opsi_compute_block_order(stage: OpsiStageSidecars, key_kind: str, size: int return block_ids -def _build_full_descriptor_opsi( +def _build_opsi_descriptor( array: blosc2.NDArray, target: dict, token: str, @@ -3287,50 +3290,59 @@ def _build_full_descriptor_opsi( cparams: dict | blosc2.CParams | None = None, max_cycles: int = 3, ) -> dict: - if int(array.shape[0]) == 0: - full = _build_full_descriptor(array, token, kind, np.empty(0, dtype=dtype), persistent, cparams) - full["build_method"] = "opsi" - full["opsi_cycles"] = 0 - full["opsi_max_cycles"] = int(max_cycles) - return full + max_cycles = max(0, int(max_cycles)) + size = int(array.shape[0]) + if size == 0: + values_sidecar = _store_array_sidecar( + array, token, kind, "opsi", "values", np.empty(0, dtype=dtype), persistent, cparams=cparams + ) + positions_sidecar = _store_array_sidecar( + array, + token, + kind, + "opsi", + "positions", + np.empty(0, dtype=np.int64), + persistent, + cparams=cparams, + ) + return { + "values_path": values_sidecar["path"], + "positions_path": positions_sidecar["path"], + "chunk_len": int(array.chunks[0]), + "block_len": int(array.blocks[0]), + "cycles": 0, + "max_cycles": max_cycles, + "is_csi": True, + } + previous = None block_order = None - cycle = 0 + current = None try: - while True: + for cycle in range(max_cycles): current = _opsi_build_stage1( array, target, token, kind, dtype, persistent, cycle, previous, block_order, cparams ) - if _opsi_is_csi(current, dtype): - full = { + is_csi = _opsi_is_csi(current, dtype) + if is_csi or cycle == max_cycles - 1: + opsi = { "values_path": current.values_path, "positions_path": current.positions_path, - "runs": [], - "next_run_id": 0, - "build_method": "opsi", - "opsi_cycles": cycle + 1, - "opsi_max_cycles": int(max_cycles), + "chunk_len": current.chunk_len, + "block_len": current.block_len, + "cycles": cycle + 1, + "max_cycles": max_cycles, + "is_csi": is_csi, } - if persistent: - _rebuild_full_navigation_sidecars_from_handle( - array, - token, - kind, - full, - current.values, - dtype, - int(array.shape[0]), - persistent, - cparams, - ) - else: + if not persistent: values = np.asarray(current.values) positions = np.asarray(current.positions) values_sidecar = _store_array_sidecar( array, token, kind, - "full", + "opsi", "values", values, persistent, @@ -3342,7 +3354,7 @@ def _build_full_descriptor_opsi( array, token, kind, - "full", + "opsi", "positions", positions, persistent, @@ -3350,44 +3362,20 @@ def _build_full_descriptor_opsi( blocks=(current.block_len,), cparams=cparams, ) - full["values_path"] = values_sidecar["path"] - full["positions_path"] = positions_sidecar["path"] - _rebuild_full_navigation_sidecars(array, token, kind, full, values, persistent, cparams) + opsi["values_path"] = values_sidecar["path"] + opsi["positions_path"] = positions_sidecar["path"] _opsi_drop_stage(previous) _opsi_drop_stage(current, keep_payload=persistent) - return full - if cycle >= max_cycles: - if os.getenv("BLOSC_TRACE") or os.getenv("BLOSC_OPSI_TRACE"): - print( - "BLOSC_TRACE: OPSI exceeded opsi_max_cycles=" - f"{int(max_cycles)} without reaching CSI; falling back to full method " - "'global-sort'", - file=sys.stderr, - ) - _opsi_drop_stage(previous) - _opsi_drop_stage(current) - if persistent: - with tempfile.TemporaryDirectory( - prefix="blosc2-index-opsi-fallback-", dir=_resolve_full_index_tmpdir(array, None) - ) as tmpdir: - full = _build_full_descriptor_ooc( - array, target, token, kind, dtype, persistent, Path(tmpdir), cparams - ) - else: - values = _values_for_target(array, target) - full = _build_full_descriptor(array, token, kind, values, persistent, cparams) - full["build_method"] = "opsi" - full["opsi_cycles"] = cycle + 1 - full["opsi_max_cycles"] = int(max_cycles) - full["opsi_fallback"] = "global-sort" - return full + return opsi key_kind = ("min", "median", "max")[cycle % 3] - block_order = _opsi_compute_block_order(current, key_kind=key_kind, size=int(array.shape[0])) + block_order = _opsi_compute_block_order(current, key_kind=key_kind, size=size) _opsi_drop_stage(previous) previous = current - cycle += 1 + current = None + raise RuntimeError("OPSI max_cycles must be greater than zero") except Exception: _opsi_drop_stage(previous) + _opsi_drop_stage(current) raise @@ -3510,6 +3498,7 @@ def _build_descriptor( partial: dict | None, full: dict | None, cparams: dict | None = None, + opsi: dict | None = None, ) -> dict: return { "name": name @@ -3531,6 +3520,7 @@ def _build_descriptor( "bucket": bucket, "partial": partial, "full": full, + "opsi": opsi, "cparams": _plain_index_cparams(cparams), } @@ -3558,7 +3548,7 @@ def create_index( """ cparams = _normalize_index_cparams(kwargs.pop("cparams", None)) method = kwargs.pop("method", None) - opsi_max_cycles = int(kwargs.pop("opsi_max_cycles", 3)) + opsi_max_cycles_arg = kwargs.pop("opsi_max_cycles", None) if kwargs: unexpected = ", ".join(sorted(kwargs)) raise TypeError(f"unexpected keyword argument(s): {unexpected}") @@ -3566,7 +3556,12 @@ def create_index( raise TypeError("kind must be a blosc2.IndexKind") kind = _normalize_index_kind(kind) build = _normalize_build_mode(build) - full_method = _normalize_full_build_method(method) if kind == "full" else None + if opsi_max_cycles_arg is None: + opsi_max_cycles = max(1, optlevel if optlevel < 8 else optlevel * 2) + else: + opsi_max_cycles = max(1, int(opsi_max_cycles_arg)) + if kind == "full": + _normalize_full_build_method(method) if method is not None and kind != "full": raise ValueError("method is only supported for kind=IndexKind.FULL") target, dtype = _normalize_create_index_target(array, field, expression, operands) @@ -3588,19 +3583,19 @@ def create_index( else None ) full = None + opsi = None if kind == "full": with tempfile.TemporaryDirectory( prefix="blosc2-index-ooc-", dir=_resolve_full_index_tmpdir(array, tmpdir) ) as tmpdir: - if full_method == "opsi": - full = _build_full_descriptor_opsi( - array, target, token, kind, dtype, persistent, cparams, opsi_max_cycles - ) - else: - full = _build_full_descriptor_ooc( - array, target, token, kind, dtype, persistent, Path(tmpdir), cparams - ) - full["build_method"] = "global-sort" + full = _build_full_descriptor_ooc( + array, target, token, kind, dtype, persistent, Path(tmpdir), cparams + ) + full["build_method"] = "global-sort" + if kind == "opsi": + opsi = _build_opsi_descriptor( + array, target, token, kind, dtype, persistent, cparams, opsi_max_cycles + ) descriptor = _build_descriptor( array, target, @@ -3616,6 +3611,7 @@ def create_index( partial, full, cparams, + opsi, ) else: values = _values_for_target(array, target) @@ -3631,14 +3627,14 @@ def create_index( else None ) full = None + opsi = None if kind == "full": - if full_method == "opsi": - full = _build_full_descriptor_opsi( - array, target, token, kind, dtype, persistent, cparams, opsi_max_cycles - ) - else: - full = _build_full_descriptor(array, token, kind, values, persistent, cparams) - full["build_method"] = "global-sort" + full = _build_full_descriptor(array, token, kind, values, persistent, cparams) + full["build_method"] = "global-sort" + if kind == "opsi": + opsi = _build_opsi_descriptor( + array, target, token, kind, dtype, persistent, cparams, opsi_max_cycles + ) descriptor = _build_descriptor( array, target, @@ -3654,6 +3650,7 @@ def create_index( partial, full, cparams, + opsi, ) store = _load_store(array) @@ -3722,6 +3719,11 @@ def iter_index_components(array: blosc2.NDArray, descriptor: dict): run.get("positions_path"), ) + opsi = descriptor.get("opsi") + if opsi is not None: + yield IndexComponent("opsi.values", "opsi", "values", opsi.get("values_path")) + yield IndexComponent("opsi.positions", "opsi", "positions", opsi.get("positions_path")) + def _component_nbytes(array: blosc2.NDArray, descriptor: dict, component: IndexComponent) -> int: if component.path is not None: @@ -3860,6 +3862,9 @@ def _drop_descriptor_sidecars(descriptor: dict) -> None: for run in descriptor["full"].get("runs", ()): _remove_sidecar_path(run.get("values_path")) _remove_sidecar_path(run.get("positions_path")) + if descriptor.get("opsi") is not None: + _remove_sidecar_path(descriptor["opsi"]["values_path"]) + _remove_sidecar_path(descriptor["opsi"]["positions_path"]) def _replace_levels_descriptor_tail( @@ -4133,6 +4138,11 @@ def rebuild_index(array: blosc2.NDArray, field: str | None = None, name: str | N store = _load_store(array) token = _resolve_index_token(store, field, name) descriptor = store["indexes"][token] + rebuild_kwargs = {} + if descriptor["kind"] == "full": + rebuild_kwargs["method"] = descriptor.get("full", {}).get("build_method") + if descriptor["kind"] == "opsi": + rebuild_kwargs["opsi_max_cycles"] = descriptor.get("opsi", {}).get("max_cycles") drop_index(array, field=descriptor["field"], name=descriptor["name"]) if descriptor["target"]["source"] == "expression": operands = array.fields if array.dtype.fields is not None else {SELF_TARGET_NAME: array} @@ -4145,7 +4155,7 @@ def rebuild_index(array: blosc2.NDArray, field: str | None = None, name: str | N persistent=descriptor["persistent"], build="ooc" if descriptor.get("ooc", False) else "memory", name=descriptor["name"], - method=descriptor.get("full", {}).get("build_method") if descriptor["kind"] == "full" else None, + **rebuild_kwargs, ) return create_index( array, @@ -4155,7 +4165,7 @@ def rebuild_index(array: blosc2.NDArray, field: str | None = None, name: str | N persistent=descriptor["persistent"], build="ooc" if descriptor.get("ooc", False) else "memory", name=descriptor["name"], - method=descriptor.get("full", {}).get("build_method") if descriptor["kind"] == "full" else None, + **rebuild_kwargs, ) @@ -4420,6 +4430,18 @@ def _load_full_sidecar_handles(array: blosc2.NDArray, descriptor: dict): return values_sidecar, positions_sidecar +def _load_opsi_sidecar_handles(array: blosc2.NDArray, descriptor: dict): + opsi = descriptor.get("opsi") + if opsi is None: + raise RuntimeError("OPSI-index metadata is not available") + token = descriptor["token"] + values_sidecar = _open_sidecar_handle(array, token, "opsi_handle", "values", opsi["values_path"]) + positions_sidecar = _open_sidecar_handle( + array, token, "opsi_handle", "positions", opsi["positions_path"] + ) + return values_sidecar, positions_sidecar + + def _load_full_run_sidecar_handles(array: blosc2.NDArray, descriptor: dict, run: dict): run_id = int(run["id"]) token = descriptor["token"] @@ -4741,7 +4763,7 @@ def _plan_exact_compare(node: ast.Compare, operands: dict) -> ExactPredicatePlan return None base, target_info, op, value = target descriptor = _descriptor_for_target(base, target_info) - if descriptor is None or descriptor.get("kind") not in {"bucket", "partial", "full"}: + if descriptor is None or descriptor.get("kind") not in {"bucket", "partial", "full", "opsi"}: return None try: value = _normalize_scalar(value, np.dtype(descriptor["dtype"])) @@ -5201,6 +5223,34 @@ def _exact_positions_from_full_selective_ooc( return _exact_positions_from_compact_full_base(array, descriptor, plan) +def _exact_positions_from_opsi( + array: blosc2.NDArray, descriptor: dict, plan: ExactPredicatePlan +) -> np.ndarray: + if _range_is_empty(plan): + return np.empty(0, dtype=np.int64) + dtype = np.dtype(descriptor["dtype"]) + values_sidecar, positions_sidecar = _load_opsi_sidecar_handles(array, descriptor) + chunk_boundaries = _sorted_chunk_boundaries_from_handle( + array, + descriptor["token"], + "opsi_bounds", + "chunks", + values_sidecar, + dtype, + ) + positions = _exact_positions_from_sorted_chunks( + values_sidecar, + positions_sidecar, + chunk_boundaries, + plan, + int(descriptor["opsi"].get("chunk_len", values_sidecar.chunks[0])), + dtype, + ) + if len(positions) == 0: + return np.empty(0, dtype=np.int64) + return np.sort(positions.astype(np.int64, copy=False), kind="stable") + + def _exact_positions_from_full( array: blosc2.NDArray, descriptor: dict, plan: ExactPredicatePlan ) -> np.ndarray: @@ -5840,6 +5890,8 @@ def _exact_positions_from_plan(plan: ExactPredicatePlan) -> np.ndarray | None: kind = plan.descriptor["kind"] if kind == "full": return _exact_positions_from_full(plan.base, plan.descriptor, plan) + if kind == "opsi": + return _exact_positions_from_opsi(plan.base, plan.descriptor, plan) if kind == "partial": return _exact_positions_from_partial( plan.base, plan.descriptor, np.dtype(plan.descriptor["dtype"]), plan @@ -5910,8 +5962,12 @@ def _plan_multi_exact_query(plans: list[ExactPredicatePlan]) -> IndexPlan | None def _plan_single_exact_query(exact_plan: ExactPredicatePlan) -> IndexPlan: kind = exact_plan.descriptor["kind"] - if kind == "full": - exact_positions = _exact_positions_from_full(exact_plan.base, exact_plan.descriptor, exact_plan) + if kind in {"full", "opsi"}: + exact_positions = ( + _exact_positions_from_full(exact_plan.base, exact_plan.descriptor, exact_plan) + if kind == "full" + else _exact_positions_from_opsi(exact_plan.base, exact_plan.descriptor, exact_plan) + ) return IndexPlan( True, f"{kind} index selected", diff --git a/src/blosc2/ndarray.py b/src/blosc2/ndarray.py index 2d254e95..7b3ced8c 100644 --- a/src/blosc2/ndarray.py +++ b/src/blosc2/ndarray.py @@ -4807,11 +4807,20 @@ def create_index( kind : IndexKind, optional Index tier to build. Use ``SUMMARY`` for the lightest pruning, ``BUCKET`` for bucketed chunk-local pruning, ``PARTIAL`` for exact - filtering with local ordering, and ``FULL`` when ordered access via + filtering with local ordering, ``OPSI`` for a tunable iterative + ordering index, and ``FULL`` when ordered access via ``sort(order=...)``, ``argsort(order=...)``, or - ``iter_sorted(...)`` should reuse the index directly. + ``iter_sorted(...)`` should reuse the index directly. ``OPSI`` is a + separate exact-filtering index kind; it incrementally improves + physical ordering but does not try to produce a completely sorted + full/CSI payload. optlevel : int, optional - Optimization level for index payload construction. + Optimization level for index payload construction. For + ``kind=OPSI``, this controls the default number of iterative OPSI + cycles: ``optlevel`` cycles for levels below 8, and + ``2 * optlevel`` cycles for levels 8 and above. More cycles can + improve cold-query locality and compression, but increase build + time. persistent : bool or None, optional Whether index sidecars should be persisted. If ``None``, this follows whether the base array is persistent. @@ -4831,14 +4840,16 @@ def create_index( temporary directory. kwargs : dict, optional Keyword arguments forwarded to the index builder. Supported options - include ``cparams`` and, for ``kind=FULL``, ``method``. Pass - ``method="global-sort"`` to select the current full-index builder - explicitly, or ``method="opsi"`` to select the iterative OPSI full - builder. Pass ``cparams`` to control the compression settings used - for index sidecars, including ``codec``, ``clevel``, and - ``nthreads``. If provided, ``cparams["nthreads"]`` becomes the - default build-thread count for intra-chunk sorting unless - ``BLOSC2_INDEX_BUILD_THREADS`` overrides it. + include ``cparams``, ``opsi_max_cycles`` for ``kind=OPSI``, and, + for ``kind=FULL``, ``method``. Pass ``method="global-sort"`` to + select the full-index builder explicitly. OPSI is selected with + ``kind=IndexKind.OPSI`` rather than as a full-index method. Pass + ``opsi_max_cycles`` to override the optlevel-derived cycle count. + Pass ``cparams`` to control the compression settings used for index + sidecars, including ``codec``, ``clevel``, and ``nthreads``. If + provided, ``cparams["nthreads"]`` becomes the default build-thread + count for intra-chunk sorting unless ``BLOSC2_INDEX_BUILD_THREADS`` + overrides it. Notes ----- diff --git a/tests/ndarray/test_indexing.py b/tests/ndarray/test_indexing.py index 77f03232..8e28f678 100644 --- a/tests/ndarray/test_indexing.py +++ b/tests/ndarray/test_indexing.py @@ -21,10 +21,11 @@ def _public_kind(kind: str) -> blosc2.IndexKind: "bucket": blosc2.IndexKind.BUCKET, "partial": blosc2.IndexKind.PARTIAL, "full": blosc2.IndexKind.FULL, + "opsi": blosc2.IndexKind.OPSI, }[kind] -@pytest.mark.parametrize("kind", ["summary", "bucket", "partial", "full"]) +@pytest.mark.parametrize("kind", ["summary", "bucket", "partial", "full", "opsi"]) def test_scalar_index_matches_scan(kind): data = np.arange(200_000, dtype=np.int64) arr = blosc2.asarray(data, chunks=(10_000,), blocks=(2_000,)) @@ -46,7 +47,7 @@ def test_scalar_index_matches_scan(kind): np.testing.assert_array_equal(indexed, data[(data >= 120_000) & (data < 125_000)]) -@pytest.mark.parametrize("kind", ["summary", "bucket", "partial", "full"]) +@pytest.mark.parametrize("kind", ["summary", "bucket", "partial", "full", "opsi"]) def test_structured_field_index_matches_scan(kind): dtype = np.dtype([("id", np.int64), ("payload", np.float64)]) data = np.empty(120_000, dtype=dtype) From fc0f5cc908c80056a42b422e3e0e7f33b7df3299 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Fri, 24 Apr 2026 12:55:08 +0200 Subject: [PATCH 3/5] More optimizations for OPSI lookups --- bench/indexing/index_query_bench.py | 2 +- src/blosc2/indexing.py | 230 ++++++++++++++++++++++++++-- 2 files changed, 215 insertions(+), 17 deletions(-) diff --git a/bench/indexing/index_query_bench.py b/bench/indexing/index_query_bench.py index e988bba4..1b8bebe4 100644 --- a/bench/indexing/index_query_bench.py +++ b/bench/indexing/index_query_bench.py @@ -410,7 +410,7 @@ def add_sidecar(path: str | None) -> None: opsi = descriptor.get("opsi") if opsi is not None: - for key in ("values_path", "positions_path"): + for key in ("values_path", "positions_path", "mins_path", "maxs_path"): add_sidecar(opsi.get(key)) return logical, disk diff --git a/src/blosc2/indexing.py b/src/blosc2/indexing.py index 947e287b..8f2f514b 100644 --- a/src/blosc2/indexing.py +++ b/src/blosc2/indexing.py @@ -3256,6 +3256,53 @@ def _opsi_is_csi(stage: OpsiStageSidecars, dtype: np.dtype) -> bool: return bool(np.all(_opsi_ordered_le_array(maxs, mins, dtype))) +def _opsi_block_pruning_score(stage: OpsiStageSidecars, dtype: np.dtype) -> tuple[float, int, float, float]: + """Estimate point-query block lookup cost; lower is better. + + A candidate block count alone is not enough for out-of-core lookups: many + isolated candidate blocks can be slower than a slightly larger but more + contiguous set because each run becomes a separate sidecar read. Score both + total candidate blocks and run fragmentation, using a small read-run penalty + so higher optlevels do not select layouts that prune more blocks but scatter + the remaining candidates across the file. + """ + if stage.nblocks <= 1: + return (1.0, 1, 1.0, 1.0) + mins = _read_sidecar_span(stage.mins, 0, stage.nblocks) + medians = _read_sidecar_span(stage.medians, 0, stage.nblocks) + maxs = _read_sidecar_span(stage.maxs, 0, stage.nblocks) + sample_count = min(129, stage.nblocks) + try: + samples = np.sort(medians, kind="stable") + except TypeError: + samples = medians + if sample_count < stage.nblocks: + sample_ids = np.linspace(0, stage.nblocks - 1, sample_count, dtype=np.int64) + samples = samples[sample_ids] + costs = [] + counts = [] + runs = [] + read_run_penalty = 8.0 + for sample in samples: + candidates = (mins <= sample) & (maxs >= sample) + true_ids = np.flatnonzero(candidates) + count = int(true_ids.size) + if count == 0: + run_count = 0 + else: + run_count = int(np.count_nonzero(np.diff(true_ids) != 1) + 1) + counts.append(count) + runs.append(run_count) + costs.append(count + read_run_penalty * run_count) + nsamples = max(1, len(costs)) + return ( + float(sum(costs)) / nsamples, + max(costs, default=0), + float(sum(counts)) / nsamples, + float(sum(runs)) / nsamples, + ) + + def _opsi_compute_block_order(stage: OpsiStageSidecars, key_kind: str, size: int) -> np.ndarray: # Whole-block relocation assumes fixed-size destination block slots. Keep # a final partial block, when present, in the final slot for now. @@ -3306,11 +3353,20 @@ def _build_opsi_descriptor( persistent, cparams=cparams, ) + mins_sidecar = _store_array_sidecar( + array, token, kind, "opsi_nav", "mins", np.empty(0, dtype=dtype), persistent, cparams=cparams + ) + maxs_sidecar = _store_array_sidecar( + array, token, kind, "opsi_nav", "maxs", np.empty(0, dtype=dtype), persistent, cparams=cparams + ) return { "values_path": values_sidecar["path"], "positions_path": positions_sidecar["path"], + "mins_path": mins_sidecar["path"], + "maxs_path": maxs_sidecar["path"], "chunk_len": int(array.chunks[0]), "block_len": int(array.blocks[0]), + "nblocks": 0, "cycles": 0, "max_cycles": max_cycles, "is_csi": True, @@ -3319,25 +3375,77 @@ def _build_opsi_descriptor( previous = None block_order = None current = None + best = None + best_cycle = 0 + best_is_csi = False + best_score = (math.inf, math.inf, math.inf, math.inf) + no_improve_patience = 3 try: for cycle in range(max_cycles): + old_previous = previous current = _opsi_build_stage1( array, target, token, kind, dtype, persistent, cycle, previous, block_order, cparams ) is_csi = _opsi_is_csi(current, dtype) - if is_csi or cycle == max_cycles - 1: + score = (1.0, 1, 1.0, 1.0) if is_csi else _opsi_block_pruning_score(current, dtype) + if score < best_score: + if best is not None and best is not old_previous and best is not current: + _opsi_drop_stage(best) + best = current + best_cycle = cycle + 1 + best_is_csi = is_csi + best_score = score + + exhausted_budget = cycle == max_cycles - 1 + stalled = (cycle + 1 - best_cycle) >= no_improve_patience + if is_csi or exhausted_budget or stalled: + final = best if best is not None else current + mins = _read_sidecar_span(final.mins, 0, final.nblocks) + maxs = _read_sidecar_span(final.maxs, 0, final.nblocks) + nav_chunk = max(1, min(final.nblocks, final.chunk_len // final.block_len)) + mins_sidecar = _store_array_sidecar( + array, + token, + kind, + "opsi_nav", + "mins", + mins, + persistent, + chunks=(nav_chunk,), + blocks=(nav_chunk,), + cparams=cparams, + ) + maxs_sidecar = _store_array_sidecar( + array, + token, + kind, + "opsi_nav", + "maxs", + maxs, + persistent, + chunks=(nav_chunk,), + blocks=(nav_chunk,), + cparams=cparams, + ) opsi = { - "values_path": current.values_path, - "positions_path": current.positions_path, - "chunk_len": current.chunk_len, - "block_len": current.block_len, - "cycles": cycle + 1, + "values_path": final.values_path, + "positions_path": final.positions_path, + "mins_path": mins_sidecar["path"], + "maxs_path": maxs_sidecar["path"], + "chunk_len": final.chunk_len, + "block_len": final.block_len, + "nblocks": final.nblocks, + "cycles": best_cycle, + "attempted_cycles": cycle + 1, "max_cycles": max_cycles, - "is_csi": is_csi, + "is_csi": best_is_csi, + "block_pruning_score": best_score[0], + "block_pruning_blocks": best_score[2], + "block_pruning_runs": best_score[3], } if not persistent: - values = np.asarray(current.values) - positions = np.asarray(current.positions) + values = np.asarray(final.values) + positions = np.asarray(final.positions) values_sidecar = _store_array_sidecar( array, token, @@ -3346,8 +3454,8 @@ def _build_opsi_descriptor( "values", values, persistent, - chunks=(current.chunk_len,), - blocks=(current.block_len,), + chunks=(final.chunk_len,), + blocks=(final.block_len,), cparams=cparams, ) positions_sidecar = _store_array_sidecar( @@ -3358,24 +3466,30 @@ def _build_opsi_descriptor( "positions", positions, persistent, - chunks=(current.chunk_len,), - blocks=(current.block_len,), + chunks=(final.chunk_len,), + blocks=(final.block_len,), cparams=cparams, ) opsi["values_path"] = values_sidecar["path"] opsi["positions_path"] = positions_sidecar["path"] - _opsi_drop_stage(previous) - _opsi_drop_stage(current, keep_payload=persistent) + if old_previous is not final: + _opsi_drop_stage(old_previous) + if current is not final: + _opsi_drop_stage(current) + _opsi_drop_stage(final, keep_payload=persistent) return opsi key_kind = ("min", "median", "max")[cycle % 3] block_order = _opsi_compute_block_order(current, key_kind=key_kind, size=size) - _opsi_drop_stage(previous) + if old_previous is not best: + _opsi_drop_stage(old_previous) previous = current current = None raise RuntimeError("OPSI max_cycles must be greater than zero") except Exception: _opsi_drop_stage(previous) _opsi_drop_stage(current) + if best is not previous and best is not current: + _opsi_drop_stage(best) raise @@ -3723,6 +3837,8 @@ def iter_index_components(array: blosc2.NDArray, descriptor: dict): if opsi is not None: yield IndexComponent("opsi.values", "opsi", "values", opsi.get("values_path")) yield IndexComponent("opsi.positions", "opsi", "positions", opsi.get("positions_path")) + yield IndexComponent("opsi_nav.mins", "opsi_nav", "mins", opsi.get("mins_path")) + yield IndexComponent("opsi_nav.maxs", "opsi_nav", "maxs", opsi.get("maxs_path")) def _component_nbytes(array: blosc2.NDArray, descriptor: dict, component: IndexComponent) -> int: @@ -3865,6 +3981,8 @@ def _drop_descriptor_sidecars(descriptor: dict) -> None: if descriptor.get("opsi") is not None: _remove_sidecar_path(descriptor["opsi"]["values_path"]) _remove_sidecar_path(descriptor["opsi"]["positions_path"]) + _remove_sidecar_path(descriptor["opsi"].get("mins_path")) + _remove_sidecar_path(descriptor["opsi"].get("maxs_path")) def _replace_levels_descriptor_tail( @@ -4442,6 +4560,16 @@ def _load_opsi_sidecar_handles(array: blosc2.NDArray, descriptor: dict): return values_sidecar, positions_sidecar +def _load_opsi_navigation_handles(array: blosc2.NDArray, descriptor: dict): + opsi = descriptor.get("opsi") + if opsi is None: + raise RuntimeError("OPSI-index metadata is not available") + token = descriptor["token"] + mins_sidecar = _open_sidecar_handle(array, token, "opsi_nav_handle", "mins", opsi.get("mins_path")) + maxs_sidecar = _open_sidecar_handle(array, token, "opsi_nav_handle", "maxs", opsi.get("maxs_path")) + return mins_sidecar, maxs_sidecar + + def _load_full_run_sidecar_handles(array: blosc2.NDArray, descriptor: dict, run: dict): run_id = int(run["id"]) token = descriptor["token"] @@ -5223,11 +5351,81 @@ def _exact_positions_from_full_selective_ooc( return _exact_positions_from_compact_full_base(array, descriptor, plan) +def _opsi_block_boundaries_from_handles( + array: blosc2.NDArray, descriptor: dict, mins_sidecar, maxs_sidecar, dtype: np.dtype +) -> np.ndarray: + cache_key = _data_cache_key(array, descriptor["token"], "opsi_bounds", "blocks") + cached = _DATA_CACHE.get(cache_key) + if cached is not None: + return cached + nblocks = int(mins_sidecar.shape[0]) + boundaries = np.empty(nblocks, dtype=_boundary_dtype(dtype)) + if nblocks: + boundaries["start"] = _read_sidecar_span(mins_sidecar, 0, nblocks) + boundaries["end"] = _read_sidecar_span(maxs_sidecar, 0, nblocks) + _DATA_CACHE[cache_key] = boundaries + return boundaries + + +def _exact_positions_from_opsi_block_nav( + array: blosc2.NDArray, descriptor: dict, plan: ExactPredicatePlan +) -> np.ndarray: + dtype = np.dtype(descriptor["dtype"]) + opsi = descriptor["opsi"] + values_sidecar, positions_sidecar = _load_opsi_sidecar_handles(array, descriptor) + mins_sidecar, maxs_sidecar = _load_opsi_navigation_handles(array, descriptor) + boundaries = _opsi_block_boundaries_from_handles(array, descriptor, mins_sidecar, maxs_sidecar, dtype) + candidate_blocks = _candidate_units_from_boundaries(boundaries, plan) + if not np.any(candidate_blocks): + return np.empty(0, dtype=np.int64) + + chunk_len = int(opsi.get("chunk_len", values_sidecar.chunks[0])) + block_len = int(opsi.get("block_len", values_sidecar.blocks[0])) + blocks_per_chunk = max(1, chunk_len // block_len) + size = int(values_sidecar.shape[0]) + parts = [] + span_count = 0 + + first_chunk = int(np.flatnonzero(candidate_blocks)[0]) // blocks_per_chunk + last_chunk = int(np.flatnonzero(candidate_blocks)[-1]) // blocks_per_chunk + for chunk_id in range(first_chunk, last_chunk + 1): + first_block = chunk_id * blocks_per_chunk + last_block = min(first_block + blocks_per_chunk, len(candidate_blocks)) + block_mask = np.asarray(candidate_blocks[first_block:last_block], dtype=bool) + if not np.any(block_mask): + continue + for block_start_idx, block_stop_idx in _contiguous_true_runs(block_mask): + span_count += 1 + span_start = (first_block + block_start_idx) * block_len + span_stop = min((first_block + block_stop_idx) * block_len, size) + if span_start >= span_stop: + continue + local_start = span_start - chunk_id * chunk_len + span_items = span_stop - span_start + span_values = np.empty(span_items, dtype=dtype) + values_sidecar.get_1d_span_numpy(span_values, chunk_id, local_start, span_items) + lo, hi = _search_bounds(span_values, plan) + if lo >= hi: + continue + matched = np.empty(hi - lo, dtype=np.int64) + _read_ndarray_linear_span(positions_sidecar, span_start + lo, matched) + parts.append(matched) + + if not parts: + return np.empty(0, dtype=np.int64) + positions = np.concatenate(parts) if len(parts) > 1 else parts[0] + return np.sort(positions.astype(np.int64, copy=False), kind="stable") + + def _exact_positions_from_opsi( array: blosc2.NDArray, descriptor: dict, plan: ExactPredicatePlan ) -> np.ndarray: if _range_is_empty(plan): return np.empty(0, dtype=np.int64) + try: + return _exact_positions_from_opsi_block_nav(array, descriptor, plan) + except Exception: + pass dtype = np.dtype(descriptor["dtype"]) values_sidecar, positions_sidecar = _load_opsi_sidecar_handles(array, descriptor) chunk_boundaries = _sorted_chunk_boundaries_from_handle( From e926cae10af2538905c81c82b5f3f88a2b16d925 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Fri, 24 Apr 2026 13:27:06 +0200 Subject: [PATCH 4/5] Use keysort for creating full indexes (seems a hair faster) --- src/blosc2/indexing.py | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/src/blosc2/indexing.py b/src/blosc2/indexing.py index 8f2f514b..843fd559 100644 --- a/src/blosc2/indexing.py +++ b/src/blosc2/indexing.py @@ -1510,6 +1510,18 @@ def _stream_copy_temp_run_to_full_sidecars( ) +def _keysort_values_with_positions(values: np.ndarray, start: int = 0) -> tuple[np.ndarray, np.ndarray]: + """Return values sorted with original/global positions carried in lockstep.""" + sorted_values = np.array(values, copy=True) + positions = np.arange(start, start + sorted_values.size, dtype=np.int64) + try: + indexing_ext.keysort_values_positions(sorted_values, positions) + return sorted_values, positions + except (AttributeError, TypeError): + order = np.argsort(values, kind="stable") + return values[order], (order + start).astype(np.int64, copy=False) + + def _build_full_descriptor( array: blosc2.NDArray, token: str, @@ -1518,9 +1530,7 @@ def _build_full_descriptor( persistent: bool, cparams: dict | None = None, ) -> dict: - order = np.argsort(values, kind="stable") - positions = order.astype(np.int64, copy=False) - sorted_values = values[order] + sorted_values, positions = _keysort_values_with_positions(values) values_sidecar = _store_array_sidecar( array, token, kind, "full", "values", sorted_values, persistent, cparams=cparams ) @@ -3522,12 +3532,20 @@ def _build_full_descriptor_ooc( } _rebuild_full_navigation_sidecars(array, token, kind, full, sorted_values, persistent, cparams) return full - run_items = max(int(array.chunks[0]), min(size, FULL_OOC_RUN_ITEMS)) + run_items_env = os.getenv("BLOSC2_FULL_OOC_RUN_ITEMS") + if run_items_env is not None: + try: + run_item_budget = max(1, int(run_items_env)) + except ValueError: + run_item_budget = FULL_OOC_RUN_ITEMS + else: + run_item_budget = FULL_OOC_RUN_ITEMS + run_items = max(int(array.chunks[0]), min(size, run_item_budget)) runs = [] for run_id, start in enumerate(range(0, size, run_items)): stop = min(start + run_items, size) values = _slice_values_for_target(array, target, start, stop) - sorted_values, sorted_positions = indexing_ext.intra_chunk_sort_run(values, start, np.int64) + sorted_values, sorted_positions = _keysort_values_with_positions(values, start) runs.append( _materialize_sorted_run( sorted_values, From c309059d8668d7729cf0ce7467856001a9573fe7 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Fri, 24 Apr 2026 13:30:12 +0200 Subject: [PATCH 5/5] Increase the number of items for ooc merge sorts --- src/blosc2/indexing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/blosc2/indexing.py b/src/blosc2/indexing.py index 843fd559..be55d046 100644 --- a/src/blosc2/indexing.py +++ b/src/blosc2/indexing.py @@ -75,7 +75,7 @@ _GATHER_MMAP_HANDLES: dict[str, object] = {} _HOT_CACHE_GLOBAL_SCOPE = ("global", 0) -FULL_OOC_RUN_ITEMS = 2_000_000 +FULL_OOC_RUN_ITEMS = 8_000_000 FULL_OOC_MERGE_BUFFER_ITEMS = 500_000 FULL_SELECTIVE_OOC_MAX_SPANS = 128 FULL_RUN_BOUNDED_FALLBACK_RUNS = 8