Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 82 additions & 35 deletions bench/indexing/index_query_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@

SIZES = (1_000_000, 2_000_000, 5_000_000, 10_000_000)
DEFAULT_REPEATS = 3
KINDS = ("summary", "bucket", "partial", "full")
KINDS = ("summary", "bucket", "partial", "full", "opsi")
DEFAULT_KIND = "bucket"
DISTS = ("sorted", "block-shuffled", "permuted", "random")
RNG_SEED = 0
DEFAULT_OPLEVEL = 5
FULL_QUERY_MODES = ("auto", "selective-ooc", "whole-load")
DATASET_LAYOUT_VERSION = "payload-ramp-v1"
BUILD_MODES = ("auto", "memory", "ooc")
FULL_INDEX_METHODS = ("global-sort",)

COLD_COLUMNS = [
("rows", lambda result: f"{result['size']:,}"),
Expand Down Expand Up @@ -377,39 +378,40 @@ def _open_index_sidecar(path: str | os.PathLike[str], no_mmap: bool):
def index_sizes(descriptor: dict, *, no_mmap: bool) -> tuple[int, int]:
logical = 0
disk = 0

def add_sidecar(path: str | None) -> None:
nonlocal logical, disk
if not path:
return
array = _open_index_sidecar(path, no_mmap)
logical += int(np.prod(array.shape)) * array.dtype.itemsize
disk += os.path.getsize(path)

for level_info in descriptor["levels"].values():
dtype = np.dtype(level_info["dtype"])
logical += dtype.itemsize * level_info["nsegments"]
if level_info["path"]:
disk += os.path.getsize(level_info["path"])

light = descriptor.get("light")
if light is not None:
for key in ("values_path", "bucket_positions_path", "offsets_path"):
array = _open_index_sidecar(light[key], no_mmap)
logical += int(np.prod(array.shape)) * array.dtype.itemsize
disk += os.path.getsize(light[key])

reduced = descriptor.get("reduced")
if reduced is not None:
values = _open_index_sidecar(reduced["values_path"], no_mmap)
positions = _open_index_sidecar(reduced["positions_path"], no_mmap)
offsets = _open_index_sidecar(reduced["offsets_path"], no_mmap)
logical += values.shape[0] * values.dtype.itemsize
logical += positions.shape[0] * positions.dtype.itemsize
logical += offsets.shape[0] * offsets.dtype.itemsize
disk += os.path.getsize(reduced["values_path"])
disk += os.path.getsize(reduced["positions_path"])
disk += os.path.getsize(reduced["offsets_path"])
add_sidecar(level_info.get("path"))

bucket = descriptor.get("bucket")
if bucket is not None:
for key in ("values_path", "bucket_positions_path", "offsets_path", "l1_path", "l2_path"):
add_sidecar(bucket.get(key))

partial = descriptor.get("partial")
if partial is not None:
for key in ("values_path", "positions_path", "offsets_path", "l1_path", "l2_path"):
add_sidecar(partial.get(key))

full = descriptor.get("full")
if full is not None:
values = _open_index_sidecar(full["values_path"], no_mmap)
positions = _open_index_sidecar(full["positions_path"], no_mmap)
logical += values.shape[0] * values.dtype.itemsize
logical += positions.shape[0] * positions.dtype.itemsize
disk += os.path.getsize(full["values_path"])
disk += os.path.getsize(full["positions_path"])
for key in ("values_path", "positions_path", "l1_path", "l2_path"):
add_sidecar(full.get(key))
for run in full.get("runs", ()):
add_sidecar(run.get("values_path"))
add_sidecar(run.get("positions_path"))

opsi = descriptor.get("opsi")
if opsi is not None:
for key in ("values_path", "positions_path", "mins_path", "maxs_path"):
add_sidecar(opsi.get(key))
return logical, disk


Expand Down Expand Up @@ -443,19 +445,31 @@ def _condition_expr(lo: object, hi: object, dtype: np.dtype, *, query_single_val
return f"(id >= {lo_literal}) & (id <= {hi_literal})"


def _index_kind_method(kind: str) -> tuple[str, str | None]:
if kind == "full":
return "full", "global-sort"
return kind, None


def _valid_index_descriptor(arr: blosc2.NDArray, kind: str, optlevel: int, build: str) -> dict | None:
actual_kind, method = _index_kind_method(kind)
for descriptor in arr.indexes:
if descriptor.get("version") != blosc2_indexing.INDEX_FORMAT_VERSION:
continue
expected_ooc = build != "memory"
if (
if not (
descriptor.get("field") == "id"
and descriptor.get("kind") == kind
and descriptor.get("kind") == actual_kind
and int(descriptor.get("optlevel", -1)) == int(optlevel)
and bool(descriptor.get("ooc", False)) is bool(expected_ooc)
and not descriptor.get("stale", False)
):
return descriptor
continue
if method is not None:
build_method = descriptor.get("full", {}).get("build_method", "global-sort")
if build_method != method:
continue
return descriptor
return None


Expand All @@ -482,6 +496,7 @@ def _open_or_build_indexed_array(
clevel: int | None,
nthreads: int | None,
no_mmap: bool,
opsi_max_cycles: int | None,
) -> tuple[blosc2.NDArray, float]:
if path.exists():
arr = blosc2.open(path, mode="a")
Expand All @@ -493,12 +508,17 @@ def _open_or_build_indexed_array(

arr = build_persistent_array(size, dist, id_dtype, path, chunks, blocks)
build_start = time.perf_counter()
actual_kind, method = _index_kind_method(kind)
kwargs = {
"field": "id",
"kind": blosc2.IndexKind[kind.upper()],
"kind": blosc2.IndexKind[actual_kind.upper()],
"optlevel": optlevel,
"build": build,
}
if method is not None:
kwargs["method"] = method
if actual_kind == "opsi" and opsi_max_cycles is not None:
kwargs["opsi_max_cycles"] = opsi_max_cycles
cparams = {}
if codec is not None:
cparams["codec"] = codec
Expand Down Expand Up @@ -531,6 +551,7 @@ def benchmark_size(
kinds: tuple[str, ...],
repeats: int,
no_mmap: bool,
opsi_max_cycles: int | None,
cold_row_callback=None,
) -> list[dict]:
arr = _open_or_build_persistent_array(
Expand Down Expand Up @@ -574,6 +595,7 @@ def benchmark_size(
clevel,
nthreads,
no_mmap,
opsi_max_cycles,
)
idx_cond = blosc2.lazyexpr(condition_str, idx_arr.fields)
idx_expr = idx_cond.where(idx_arr)
Expand Down Expand Up @@ -725,12 +747,30 @@ def parse_args() -> argparse.Namespace:
default="auto",
help="Index builder policy: auto, memory, or ooc. Default: auto.",
)
parser.add_argument(
"--method",
choices=FULL_INDEX_METHODS,
default="global-sort",
help=(
"Full-index build method. OPSI is a separate index kind; use --kind opsi to benchmark it. "
"Default: global-sort."
),
)
parser.add_argument(
"--full-query-mode",
choices=FULL_QUERY_MODES,
default="auto",
help="How full exact queries should run during the benchmark: auto, selective-ooc, or whole-load.",
)
parser.add_argument(
"--opsi-max-cycles",
type=int,
default=None,
help=(
"Maximum OPSI cycles for --kind opsi. Default: derive from optlevel "
"(optlevel for optlevel < 8, optlevel * 2 otherwise)."
),
)
parser.add_argument(
"--codec",
type=str,
Expand Down Expand Up @@ -776,6 +816,8 @@ def main() -> None:
raise SystemExit("--clevel must be >= 0")
if args.nthreads is not None and args.nthreads <= 0:
raise SystemExit("--nthreads must be a positive integer")
if args.opsi_max_cycles is not None and args.opsi_max_cycles < 0:
raise SystemExit("--opsi-max-cycles must be >= 0")
sizes = (args.size,) if args.size is not None else SIZES
dists = DISTS if args.dist == "all" else (args.dist,)
kinds = KINDS if args.kind == "all" else (args.kind,)
Expand All @@ -801,6 +843,7 @@ def main() -> None:
args.clevel,
args.nthreads,
args.no_mmap,
args.opsi_max_cycles,
)
else:
args.outdir.mkdir(parents=True, exist_ok=True)
Expand All @@ -823,6 +866,7 @@ def main() -> None:
args.clevel,
args.nthreads,
args.no_mmap,
args.opsi_max_cycles,
)


Expand All @@ -845,6 +889,7 @@ def run_benchmarks(
clevel: int | None,
nthreads: int | None,
no_mmap: bool,
opsi_max_cycles: int | None,
) -> None:
all_results = []

Expand All @@ -863,7 +908,8 @@ def run_benchmarks(
f"full_query_mode={full_query_mode}, index_codec={'auto' if codec is None else codec.name}, "
f"index_clevel={'auto' if clevel is None else clevel}, "
f"index_nthreads={'auto' if nthreads is None else nthreads}, "
f"index_mmap={'off' if no_mmap else 'on'}"
f"index_mmap={'off' if no_mmap else 'on'}, "
f"opsi_max_cycles={'optlevel' if opsi_max_cycles is None else opsi_max_cycles}"
)
cold_widths = progress_widths(COLD_COLUMNS, sizes, dists, kinds, id_dtype)
print()
Expand Down Expand Up @@ -894,6 +940,7 @@ def cold_progress_callback(row: dict) -> None:
kinds,
repeats,
no_mmap,
opsi_max_cycles,
cold_row_callback=cold_progress_callback,
)
all_results.extend(size_results)
Expand Down
19 changes: 13 additions & 6 deletions doc/getting_started/tutorials/14.indexing-arrays.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"source": [
"# Indexing Arrays\n",
"\n",
"Blosc2 can attach indexes to 1-D `NDArray` objects and to fields inside 1-D structured arrays. These indexes accelerate selective masks, and `full` indexes can also drive ordered access directly through `sort(order=...)`, `NDArray.argsort(order=...)`, `LazyExpr.argsort(order=...)`, and `iter_sorted(...)`.\n",
"Blosc2 can attach indexes to 1-D `NDArray` objects and to fields inside 1-D structured arrays. These indexes accelerate selective masks, and `full` indexes can also drive ordered access directly through `sort(order=...)`, `NDArray.argsort(order=...)`, `LazyExpr.argsort(order=...)`, and `iter_sorted(...)`. OPSI indexes are a separate tunable iterative-ordering kind: they improve the physical order used for exact filtering, but they are not intended to converge to a completely sorted `full`/CSI index.\n",
"\n",
"This tutorial covers:\n",
"\n",
Expand Down Expand Up @@ -108,16 +108,19 @@
"source": [
"## Index kinds and how to create them\n",
"\n",
"Blosc2 currently supports four index kinds:\n",
"Blosc2 currently supports five index kinds:\n",
"\n",
"- `summary`: compact summaries only,\n",
"- `bucket`: summary levels plus lightweight per-block payloads,\n",
"- `partial`: richer payloads for positional filtering,\n",
"- `opsi`: tunable iterative ordering for exact filtering,\n",
"- `full`: globally sorted payloads for positional filtering and ordered reuse.\n",
"\n",
"`OPSI` is intentionally a separate kind, not a `full` index construction method. It performs a configurable number of ordering cycles and then keeps that iterative ordering as-is. Achieving a completely sorted index (CSI) is not a goal for OPSI; use `FULL` when you require global sorted order or direct ordered reuse. By default, `OPSI` uses `optlevel` cycles for `optlevel < 8`, and `2 * optlevel` cycles for `optlevel >= 8`. You can override this with `opsi_max_cycles=...`.\n",
"\n",
"There is one active index per target field or expression. If you create another index on the same target, it replaces the previous one. The easiest way to compare kinds is to build them on separate arrays.\n",
"\n",
"The next cell times index creation and reports the compressed storage footprint of each index relative to the compressed base array."
"The next cell times index creation and reports the compressed storage footprint of each index relative to the compressed base array.\n"
]
},
{
Expand Down Expand Up @@ -152,6 +155,7 @@
" blosc2.IndexKind.SUMMARY,\n",
" blosc2.IndexKind.BUCKET,\n",
" blosc2.IndexKind.PARTIAL,\n",
" blosc2.IndexKind.OPSI,\n",
" blosc2.IndexKind.FULL,\n",
"):\n",
" arr = data.copy()\n",
Expand Down Expand Up @@ -238,7 +242,7 @@
"source": [
"### Timing the mask with and without indexes\n",
"\n",
"The next cell measures the same selective mask on all four index kinds and compares it with a forced full scan. On this workload, `partial` and `full` usually show the clearest benefit because they carry richer payloads for positional filtering."
"The next cell measures the same selective mask on all five index kinds and compares it with a forced full scan. On this workload, `partial`, `opsi`, and `full` usually show the clearest benefit because they carry richer payloads for positional filtering.\n"
]
},
{
Expand Down Expand Up @@ -299,7 +303,9 @@
"source": [
"## `full` indexes and ordered access\n",
"\n",
"A `full` index stores a global sorted payload. This is the required index tier for direct ordered reuse. Build it directly with `create_index(kind=blosc2.IndexKind.FULL)`."
"A `full` index stores a global sorted payload. This is the required index tier for direct ordered reuse. Build it directly with `create_index(kind=blosc2.IndexKind.FULL)`.\n",
"\n",
"If you only want a tunable iterative ordering index for exact filtering, use `create_index(kind=blosc2.IndexKind.OPSI)` instead. OPSI can improve cold-query locality as `optlevel` or `opsi_max_cycles` increases, but it does not replace `FULL` for globally sorted access.\n"
]
},
{
Expand Down Expand Up @@ -585,11 +591,12 @@
"## Practical guidance\n",
"\n",
"- Use `partial` when your main goal is faster selective masks.\n",
"- Use `opsi` when you want exact filtering with tunable iterative ordering. Increase `optlevel` or pass `opsi_max_cycles` to spend more build time on ordering; do not expect OPSI to become a `full`/CSI index.\n",
"- Use `full` when you also want ordered reuse through `sort(order=...)`, `NDArray.argsort(order=...)`, `LazyExpr.argsort(order=...)`, or `iter_sorted(...)`.\n",
"- Persist the base array if you want indexes to survive reopen automatically.\n",
"- After unsupported mutations, use `rebuild_index()`.\n",
"- For append-heavy `full` indexes, compact explicitly at convenient maintenance boundaries instead of on every append.\n",
"- Measure your own workload: compact indexes, predicate selectivity, and ordered access needs all affect which kind is best.\n"
"- Measure your own workload: compact indexes, predicate selectivity, iterative-ordering level, and ordered access needs all affect which kind is best.\n"
]
},
{
Expand Down
4 changes: 3 additions & 1 deletion doc/getting_started/tutorials/15.indexing-ctables.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@
"source": [
"## Creating an index\n",
"\n",
"Call `create_index(col_name)` to build a bucket index on a column.\n",
"Call `create_index(col_name)` to build a bucket index on a column. Pass `kind=...` to choose another index kind, including `blosc2.IndexKind.OPSI` for tunable iterative ordering or `blosc2.IndexKind.FULL` for globally sorted indexes that can also support ordered reuse. OPSI is a separate exact-filtering index kind, not a slower way to build a `FULL`/CSI index; its build effort is controlled by `optlevel` or the explicit `opsi_max_cycles` keyword.\n",
"\n",
"The returned `CTableIndex` handle shows the column name, kind, and whether the index is stale.\n"
]
},
Expand Down Expand Up @@ -494,6 +495,7 @@
"- **Mutations** (`append`, `extend`, `setitem`, `assign`, `sort_by`, `compact`) mark indexes stale.\n",
"- **Stale indexes** trigger automatic scan fallback — no user intervention needed.\n",
"- **Persistent indexes** survive table close and reopen.\n",
"- **OPSI indexes** are tunable iterative-ordering indexes for exact filtering; use `FULL` for completely sorted ordered reuse.\n",
"- **Views** cannot own indexes; only root tables can.\n"
]
},
Expand Down
5 changes: 4 additions & 1 deletion doc/reference/ctable.rst
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ CTable indexes can also target **direct expressions** over stored columns via
predicates without adding either a computed column or a materialized stored one.
A matching ``FULL`` direct-expression index can also be reused by ordering paths
such as :meth:`CTable.sort_by` when sorting by a computed column backed by the
same expression.
same expression. ``OPSI`` indexes are a separate exact-filtering tier with a
tunable number of iterative ordering cycles; they are not intended to converge
to a completely sorted ``FULL``/CSI index, so use ``FULL`` when globally sorted
ordered reuse is required.

.. autosummary::

Expand Down
Loading
Loading