Skip to content

Arrow result collector: improved merge algorithm#625

Merged
adsharma merged 2 commits into
mainfrom
skip_arrow_sort2
Jun 28, 2026
Merged

Arrow result collector: improved merge algorithm#625
adsharma merged 2 commits into
mainfrom
skip_arrow_sort2

Conversation

@adsharma

Copy link
Copy Markdown
Contributor

Avoid expensive sort and merge when scanning REL tables in parallel

Follows the same general pattern as DuckDB's batch-index collector: each per-thread local collector is tagged with a unique batch_index; the collector stores per-batch chunks; final result is built by walking chunks in batch_index order (no pairwise merge, no global sort for the cheap path).

Highlights:

  • New OrderPreservationType enum (NO_ORDER, INSERTION_ORDER, FIXED_ORDER) on PhysicalOperator, with default NO_ORDER (Ladybug makes no insertion-order guarantee). OrderBy and TopK override operatorOrder() to FIXED_ORDER.

  • New PhysicalPlanUtil::getOrderPreservation walks the physical plan and asks each operator's metadata; returns FIXED_ORDER only if a fixed-order operator is on the data path, otherwise NO_ORDER. Walks child[0] only — Ladybug physical plans have a single data-flow edge per operator.

  • ArrowResultCollectorSharedState now stores per-batch Arrow chunks and CSR metadata in std::map<batch_index_t, ...>, plus a BatchIndexAssigner that hands out unique batch_index values per local collector (atomic fetch_add). Each local collector is assigned a batch_index at initLocalStateInternal time.

  • merge() in the cheap NO_ORDER path moves per-batch chunks into the global map (O(log N) map insertion, no pairwise merge, no sort). flattenCSRMetadata does one global decode/sort/rebuild at result-construction time. The FIXED_ORDER path collapses to a single chunk under key 0 via the existing pairwise mergeCSRMetadata.

  • Result construction (getQueryResult) walks the per-batch map in batch_index order — the map's natural ordering gives the correct global row order without sorting.

Algorithmic cost drops from O(N^2 * M) for the previous pairwise merge (N threads, M edges) to O(M log M) for the new approach. Same correctness as before (decode/re-encode per chunk), but the scan-time hot path only does map insertions.

…y combine

Follows the same general pattern as DuckDB's batch-index collector:
each per-thread local collector is tagged with a unique batch_index;
per-batch chunks flow straight through to ArrowQueryResult and the
consumer combines them lazily (no eager decode/sort/rebuild, no
std::sort).

Highlights:

* New OrderPreservationType enum (NO_ORDER, INSERTION_ORDER,
  FIXED_ORDER) on PhysicalOperator, with default NO_ORDER
  (Ladybug makes no insertion-order guarantee). OrderBy and TopK
  override operatorOrder() to FIXED_ORDER.

* New PhysicalPlanUtil::getOrderPreservation walks the physical
  plan and asks each operator's metadata; returns FIXED_ORDER
  only if a fixed-order operator is on the data path, otherwise
  NO_ORDER. Walks child[0] only — Ladybug physical plans have a
  single data-flow edge per operator.

* ArrowResultCollectorSharedState now stores per-batch Arrow
  chunks and CSR metadata in std::map<batch_index_t, ...>, plus
  a BatchIndexAssigner that hands out unique batch_index values
  per local collector (atomic fetch_add). Each local collector
  is assigned a batch_index at initLocalStateInternal time.

* merge() in the cheap NO_ORDER path moves per-batch chunks into
  the global map (O(log N) per call, no pairwise merge, no sort).
  Result construction is zero-work — it threads per-batch chunks
  into ArrowQueryResult in batch_index order and does no merging.

* ArrowQueryResult::ArrowChunkedArray — a ChunkedArray-style view
  of the merged Arrow arrays (similar to Arrow C++ lib's
  arrow::ChunkedArray). Backed by a shared vector that the
  hasNextArrowChunk / getNextArrowChunk iteration API also reads
  from, so both APIs coexist. Chunks are in batch_index order;
  python users can construct pyarrow.ChunkedArray from these
  chunks and call combine_chunks() to materialize on the consumer
  side (arrow::Concat isn't linked).

* ArrowQueryResult::combineCSRChunks() — lazy k-way merge of
  per-batch CSR chunks in batch_index order. Replaces the eager
  std::sort + decode/rebuild that flattenCSRMetadata used to do
  at result-construction time. Within each chunk, the per-batch
  CSR tracker requires src to be non-decreasing, so emitting in
  (src, batch_index, scan_order_within_batch) order is correct
  without any global sort. Merged result is cached for repeated
  access. The FIXED_ORDER path still goes through
  mergeCSRMetadata, which does its own pairwise sort to preserve
  global order.

Algorithmic cost drops from O(N^2 * M) for the previous pairwise
merge (N threads, M edges) plus O(M log M) for the eager std::sort,
to O(M log K) for the lazy k-way merge (K batches). Same
correctness as before, but the scan-time hot path only does map
insertions and result construction does no merging work.
@adsharma

adsharma commented Jun 27, 2026

Copy link
Copy Markdown
Contributor Author

Tested via:

t_start = time.time()
db=lb.Database('wikidata.lbdb', read_only=True);
con=lb.Connection(db)

relationship_result = con.query_as_arrow("""
MATCH (a:wikidata_node)-[r:wikidata_rel]->(b:wikidata_node)
RETURN a.rowid, b.rowid
""", 1000000)
relationship_csr = relationship_result.csr()

indptr = relationship_csr.indptr.cast(pa.uint64())
indices = relationship_csr.indices.cast(pa.uint64())

# Keep Arrow buffers alive while the NetworKit graph references CSR memory.
arrow_registry = {"indptr": indptr, "indices": indices}
graph = nk.Graph.fromCSR(len(indptr)-1, False, indices, indptr)

Loads wikidata in about 30 secs and uses 42GB of memory. In theory it should be using 750 million * 8 bytes = 6GB + indptr (which is much smaller). Needs further debug.

While I was able to run page rank on this graph, I wasn't able to run it correctly. Icebug requires both forward and reverse edges. Doing it in a way that fits on a 64GB machine is elusive.

@adsharma

Copy link
Copy Markdown
Contributor Author

Further optimizations are possible, but require careful design. Specifically, peak RSS includes both the chunked and merged results. It's possible to combine chunks in a more memory efficient way.

@adsharma adsharma force-pushed the skip_arrow_sort2 branch 2 times, most recently from a3aca32 to 78844d2 Compare June 28, 2026 04:20
@adsharma adsharma merged commit f8b6c88 into main Jun 28, 2026
12 checks passed
@adsharma adsharma deleted the skip_arrow_sort2 branch June 28, 2026 04:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant