Skip to content

[core] Parallelize GlobalIndexEvaluator across multiple BTree fields#7951

Merged
JingsongLi merged 19 commits into
apache:masterfrom
JingsongLi:btree_multi_thread
May 25, 2026
Merged

[core] Parallelize GlobalIndexEvaluator across multiple BTree fields#7951
JingsongLi merged 19 commits into
apache:masterfrom
JingsongLi:btree_multi_thread

Conversation

@JingsongLi
Copy link
Copy Markdown
Contributor

When a compound predicate (AND/OR) involves multiple fields, each field's BTree index lookup is now submitted to a thread pool and executed in parallel. This reduces scan latency for multi-field global index queries.

Key changes:

  • GlobalIndexEvaluator accepts an optional ExecutorService for parallel child predicate evaluation in visitParallel, with sequential fallback.
  • GlobalIndexScanner creates an isolated evaluator thread pool to avoid deadlocks with the existing UnionGlobalIndexReader executor.
  • Python side uses ThreadPoolExecutor with threading.Lock for thread-safe reader cache access.
  • Added GlobalIndexEvaluatorTest for both Java and Python.

When a compound predicate (AND/OR) involves multiple fields, each
field's BTree index lookup is now submitted to a thread pool and
executed in parallel. This reduces scan latency for multi-field
global index queries.

Key changes:
- GlobalIndexEvaluator accepts an optional ExecutorService for parallel
  child predicate evaluation in visitParallel, with sequential fallback.
- GlobalIndexScanner creates an isolated evaluator thread pool to avoid
  deadlocks with the existing UnionGlobalIndexReader executor.
- Python side uses ThreadPoolExecutor with threading.Lock for
  thread-safe reader cache access.
- Added GlobalIndexEvaluatorTest for both Java and Python.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the improvement. The idea of separating the evaluator executor from the existing UnionGlobalIndexReader executor makes sense, and the cache was also made thread-safe.

I think there is one blocking concurrency issue to fix before merge:

visitParallel recursively submits every compound child to the same bounded executor and then waits on Future#get. However, PredicateBuilder.and/or builds compound predicates as a binary tree, not a flat n-ary node. For example and(a, b, c) becomes and(and(a, b), c). With global-index.thread-num = 2, the top-level and submits two tasks. One worker evaluates the nested and(a, b), enters visitParallel again, submits a and b to the same executor, and then waits for them. If the other top-level child is still occupying the second worker (or blocks on IO), the nested tasks cannot start and the worker waits on work queued behind itself. The same pattern also exists in the Python implementation with ThreadPoolExecutor(max_workers=...) and nested Predicate(method='and'/'or', ...).

This is not just theoretical because the scanner uses a fixed-size pool from global-index.thread-num, and the new tests only cover flat two-child predicates. Could you either flatten same-kind compound predicates before parallel submission / only parallelize leaf-level work, or avoid recursively submitting from executor worker threads? Please also add a regression test with a nested predicate and a small pool (e.g. 2 threads) for both Java and Python.

…allel evaluation

Flatten same-kind compound predicates before parallel submission to
prevent deadlock when nested binary-tree predicates (e.g. and(and(a,b),c))
recursively submit tasks to the same bounded executor.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick update. Flattening same-kind compound predicates fixes the and(and(a,b),c) / or(or(a,b),c) case I mentioned, and the added Java/Python regression tests cover that path.

However, I think the recursive executor deadlock can still happen for mixed compound predicates. For example:

and(or(a, b), or(c, d))

With global-index.thread-num = 2, the top-level AND submits the two OR children. Both worker threads can then enter _visit_or_parallel / visitParallel, submit their own leaf tasks to the same bounded executor, and block waiting for them. Since both workers are already blocked inside nested compound evaluation, the queued leaf tasks cannot start.

The current same-kind flattening does not change this shape because the children are OR under AND (or AND under OR). Could you add a mixed nested regression test like AND(OR(a,b), OR(c,d)) with a 2-thread executor for both Java and Python? I think the implementation should avoid recursively submitting compound predicates to the same bounded executor, e.g. flatten/evaluate into leaf-level tasks or make nested compound evaluation run sequentially inside worker tasks.

…ation

Compound children (e.g. OR under AND) submitted to the executor now
evaluate their own children sequentially, preventing recursive pool
submissions that cause deadlock with bounded thread pools.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this fixes the mixed one-level case (AND(OR(a,b), OR(a,c))) and the added regression is good.

I think there is still one remaining recursive-submit path for deeper mixed trees. evaluateChildSequentially only forces the immediate compound child to visitSequential, but visitSequential still calls predicate1.visit(this) for each grandchild. If one of those grandchildren is another compound with more than one child, it will go back through visit(CompoundPredicate) and enter visitParallel again on the same executor worker.

For example, this shape can still recursively submit from both top-level workers:

AND(OR(AND(a,b), c), OR(AND(d,e), f))

Top-level AND submits the two OR children. Each worker evaluates its OR sequentially, but when it reaches AND(a,b) / AND(d,e), those grandchildren call visitParallel again and wait for tasks queued to the same bounded pool.

Could we make the sequential path truly recursive, e.g. pass an allowParallel flag / add evaluateSequentially(Predicate) so any compound below a worker task is evaluated without submitting again? Please also add a deeper mixed regression test for Java and Python, not only one-level AND(OR(...), OR(...)).

…ested deadlock

Replace evaluateChildSequentially with evaluateWithoutParallel that
recursively evaluates the entire subtree without ever submitting to the
executor. This prevents deadlock for arbitrarily deep mixed compound
predicates like AND(OR(AND(a,b),c), OR(AND(d,e),f)).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. The new evaluateWithoutParallel / _evaluate_without_parallel path addresses the recursive-submit deadlock I raised earlier.

I found one remaining blocking concurrency issue before this can be approved: visitParallel still submits every flattened child predicate independently, but visit(LeafPredicate) fetches readers from indexReadersCache by fieldId. If the predicate contains multiple leaves on the same field, e.g. a very common range filter:

AND(a > 10, a < 20)

then both evaluator tasks will call the same cached GlobalIndexReader instances for field a concurrently. The Python version has the same shape; the cache lock only protects cache population, not concurrent use of the cached readers.

I do not think those readers are safe to share concurrently. For example, the BTree reader goes through SstFileReader / BlockCache; BlockCache keeps a plain HashMap and reads from a shared SeekableInputStream via seek(offset) followed by readFully(...). Two concurrent range queries on the same BTree reader can interleave those seeks/reads and corrupt the bytes being read. LazyField / GlobalIndexResult also does not add synchronization, and visit(LeafPredicate) calls results().isEmpty(), so the read is actually triggered during evaluation.

Could we make parallelism field-aware here? For example, group flattened children by referenced field id/name and evaluate each same-field group sequentially in a single task, or otherwise guard/clone reader access so the same field's cached readers are never used concurrently. This would still preserve the intended parallelism across different BTree fields. Please also add a Java and Python regression test for same-field predicates (for example AND(a > ..., a < ...) or a detecting reader that fails on concurrent access to the same field).

…der access

Group flattened children by field before parallel submission so that
predicates referencing the same field are evaluated sequentially within
a single task. This prevents concurrent access to non-thread-safe
GlobalIndexReader instances while preserving parallelism across fields.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. Grouping direct leaf children by field fixes the simple same-field shape such as AND(a=1, a=2, a=3), and the added concurrency test covers that case.

I think the same reader-sharing issue can still happen when the duplicated field is inside mixed compound children. groupByField only groups children that are direct LeafPredicates; any compound child is put into its own singleton group and submitted as an independent task. For example:

AND(OR(a = 1, b = 2), OR(a = 3, c = 4))

After same-kind flattening at the top-level AND, the two children are still the two OR subtrees, so they become two separate tasks. Each task evaluates its subtree via evaluateWithoutParallel, but both can still reach leaf predicates on field a at the same time and use the same cached readers for a. The Python implementation has the same pattern in _group_by_field.

Interestingly, the existing mixed-nested regression already has this shape (AND(OR(a, b), OR(a, c))), but it uses a simple stub reader, so it does not detect concurrent access to the same field.

Could we make the grouping based on the full field set of each child subtree rather than only direct leaf children, and avoid submitting two groups concurrently when their field sets overlap? Alternatively, only parallelize field-disjoint leaf/subtree groups. Please also extend the concurrency-detecting Java/Python test to cover a nested mixed case like AND(OR(a=1, b=2), OR(a=3, c=4)), where the reader for a would fail if accessed concurrently.

…urrent reader access

The field-based grouping now collects all referenced fields from the
entire subtree of each child (not just direct leaf children). Children
whose field sets overlap are merged into the same sequential group,
ensuring non-thread-safe readers are never accessed concurrently even
in mixed nested predicates like AND(OR(a=1,b=2), OR(a=3,c=4)).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the overlapping-field grouping now addresses the nested mixed same-field concurrency case, and the new Java/Python tests cover the shape I mentioned.

I found one smaller remaining Java regression in the field collection logic. collectFieldsRecursive currently does:

fields.add(((LeafPredicate) predicate).fieldName());

LeafPredicate.fieldName() only works for FieldTransform leaves because it delegates to fieldRefOptional().get(). However Paimon predicates can also be leaves with other transforms, for example CastTransform, LowerTransform / other StringTransforms, or NullTransform for always-true/false predicates. Before this PR, GlobalIndexEvaluator.visit(LeafPredicate) handled such non-field leaves gracefully by returning Optional.empty() when fieldRefOptional() was absent. With the new grouping step, a compound predicate containing such a leaf can now fail before evaluation with NoSuchElementException.

Could we collect fields from leaf transforms instead of calling fieldName() directly? For example, use the existing LeafPredicate.fieldNames() / PredicateVisitor.collectFieldNames(...), or check fieldRefOptional() and leave the set empty when absent. Please also add a small regression test with a transformed/non-field leaf combined with another predicate, to ensure the evaluator keeps returning unsupported/empty instead of throwing.

Use fieldRefOptional() instead of fieldName() in collectFieldsRecursive
to avoid NoSuchElementException for predicates with NullTransform or
other non-field transforms (e.g. alwaysTrue/alwaysFalse).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the Java non-field leaf issue is fixed now, and the Java GlobalIndexEvaluatorTest passes locally for me.

I found one remaining test/build issue on the Python side: the newly added paimon-python/pypaimon/tests/global_index_evaluator_test.py imports IntType from pypaimon.schema.data_types, but that module does not define IntType. The existing Python code/tests use AtomicType("INT") instead.

Locally this fails immediately with:

ImportError: cannot import name 'IntType' from 'pypaimon.schema.data_types'

Could you change the test to import AtomicType and build fields with AtomicType("INT") (or otherwise add the missing type alias if that is intended)? After that I think this should be ready from my side, pending CI.

… IntType

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. The latest version addresses my previous concerns:

  • recursive executor submission is avoided by evaluating worker-side subtrees without further parallel submission;
  • same-field reader sharing is avoided by grouping overlapping field sets, including nested mixed predicates;
  • non-field leaves no longer fail during field collection;
  • the Python test import issue is fixed.

I also verified the targeted tests locally:

mvn -pl paimon-common -DskipITs -Dcheckstyle.skip -Drat.skip=true -Dspotless.check.skip=true -Dfmt.skip=true -Dtest=GlobalIndexEvaluatorTest test
PYTHONPATH=/tmp/paimon-pr7951-work5/paimon-python python -m unittest pypaimon.tests.global_index_evaluator_test

Both passed. +1, pending the remaining CI jobs.

JingsongLi and others added 3 commits May 25, 2026 15:12
Remove the separate ManifestReadThreadPool executor from
UnionGlobalIndexReader. The evaluator's field-based grouping already
provides cross-field parallelism, and evaluateWithoutParallel prevents
nested submissions. Shard reads within a field now run sequentially
inside each worker task, eliminating the need for two thread pools.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…eld locks

Replace the field-grouping parallel approach with a simpler CompletableFuture
composition that maximizes parallelism across different fields while using
per-field synchronized locks to ensure thread-safe reader access.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The visitor pattern is no longer needed since evaluation is driven by
the internal visitAsync dispatch rather than predicate.visit(this).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@JingsongLi JingsongLi force-pushed the btree_multi_thread branch from 5b196bd to b2fcbe5 Compare May 25, 2026 07:55
JingsongLi and others added 3 commits May 25, 2026 16:00
Each GlobalIndexReader within a leaf predicate now runs in its own
future with a per-reader lock, enabling concurrent execution across
readers for the same field while protecting individual reader state.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ntation

Each reader is now evaluated in its own future with a per-reader lock,
matching the Java implementation for maximum parallelism.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Simplify locking by synchronizing on the reader instance directly,
eliminating the readerLocks map. Also fix Python race condition in
reader lock creation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@JingsongLi JingsongLi force-pushed the btree_multi_thread branch from 114e9a1 to da89369 Compare May 25, 2026 08:10
jerry-024

This comment was marked as spam.

throw (Error) e.getCause();
}
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] InterruptedException swallowed — interrupt flag not restored

CompletableFuture.get() throws InterruptedException when the calling thread is interrupted. The current catch (Exception e) swallows it and wraps in RuntimeException without restoring the interrupt flag. This breaks upstream cancel/shutdown semantics.

Suggest:

} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new RuntimeException("Interrupted during index evaluation", e);
} catch (ExecutionException e) {
    if (e.getCause() instanceof RuntimeException) {
        throw (RuntimeException) e.getCause();
    }
    if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
    }
    throw new RuntimeException(e.getCause());
}

This is not a style issue — it affects upper-layer cancellation and shutdown semantics.

}
result.add(child);
}
return result;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] Unbounded recursion depth — not just flattenChildren

flattenChildren() recurses on same-type nesting (AND(AND(AND(...)))) and can overflow the stack with deeply nested predicates from crafted SQL.

However, mixed-type nesting (AND(OR(AND(OR(...))))) also recurses unboundedly through the visitAsync() → visitCompoundAsync() → visitAsync() path. Converting only flattenChildren to iterative fixes the same-type case, but for complete protection a depth limit or fully iterative approach is needed across both paths.

Suggest:

  1. Convert flattenChildren to iterative (Deque-based)
  2. Add a global depth guard (e.g., MAX_PREDICATE_DEPTH = 1000) that throws on exceeding — covering both flatten and visitAsync recursion

if compound_result.is_empty():
return compound_result

return compound_result
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] Undocumented semantic change: or_and_ for multi-reader leaf combination

The base Python code combined multiple readers of the same field using or_ (union). This PR changes it to and_ (intersection), which matches the Java side (Java base always used and).

This looks like a fix for a pre-existing Python/Java inconsistency, but the PR description only mentions parallelization — it doesn't call out this semantic correction. Please:

  1. Explicitly note this as a bug fix in the PR description
  2. Add a multi-reader test that verifies the AND semantics (multiple readers per field returning overlapping results → intersection expected)

JingsongLi and others added 2 commits May 25, 2026 17:13
…eader lock

BitmapFileIndex$Reader uses non-thread-safe SeekableInputStream and
HashMap internally. The previous code only synchronized the visit()
call but left lazy result materialization (triggered by results())
outside the lock, causing concurrent seek/read corruption (EOFException,
NegativeArraySizeException) when multiple predicates reference the
same field.

Fix: call results() inside synchronized(reader) / with lock to ensure
all I/O completes before releasing the lock. Added regression tests
for both Java and Python that verify lazy suppliers are never
materialized concurrently.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… multi-reader AND test

- Split catch(Exception) into catch(InterruptedException) + catch(ExecutionException)
  with Thread.currentThread().interrupt() restoration
- Convert recursive flattenChildren to iterative Deque-based implementation
  to prevent StackOverflowError on deeply nested same-kind predicates
- Add MAX_PREDICATE_DEPTH=1000 guard for mixed-type recursion in visitAsync
- Add testMultipleReadersPerFieldCombinedWithAnd verifying AND semantics
  when multiple index readers exist for the same field
- Apply all corresponding fixes to Python implementation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@JingsongLi JingsongLi merged commit b26aa9f into apache:master May 25, 2026
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants