Skip to content

Makes schedule task async#18272

Merged
xiangfu0 merged 6 commits intoapache:masterfrom
noob-se7en:make_schedule_task_async
Apr 26, 2026
Merged

Makes schedule task async#18272
xiangfu0 merged 6 commits intoapache:masterfrom
noob-se7en:make_schedule_task_async

Conversation

@noob-se7en
Copy link
Copy Markdown
Contributor

@noob-se7en noob-se7en commented Apr 21, 2026

Problem
Schedule task has a coarse synchronised lock which sluggishly blocks any task generation if theres a live task generation run. Also the synchronised scheduleTasks blocks the grizzly controller API threads if task is scheduled via API making pinot controller UI unresponsive.

Solution
Focussing on quick short term solution here - Adding table level and cluster level flag to control scheduleTasks is blocking or non-blocking.
The long term fix will be to remove synchronised block from schedule tasks and make the distrubtedLock inside scheduledTask granular at table-task level.

Note: For this change to work: enableDistributedLocking config needs to be enabled.

noob-se7en and others added 4 commits April 21, 2026 11:17
Introduces an opt-in concurrent scheduling path in PinotTaskManager so that
task generation for different tables can run in parallel instead of
serializing on a single controller-wide `synchronized(this)`.

- New cluster flag `controller.task.concurrentSchedulingEnabled` (default
  false) and per-table override `TableTaskConfig.concurrentSchedulingEnabled`
  (null = inherit).
- `scheduleTasks(TaskSchedulingContext)` is no longer `synchronized`; a
  dispatcher picks between the legacy `synchronized(this)`-wrapped path and
  a concurrent path that uses per-table JVM `ReentrantLock`s before the
  distributed ZK lock. Both paths share the same body in `doScheduleTasks`.
- Per-table JVM locks are acquired in sorted order to avoid deadlock and
  cleaned up in `cleanUpCronTaskSchedulerForTable` to prevent map growth.
- Warn at startup if the concurrent default is enabled without distributed
  locking, since ad-hoc `createTask` still takes `synchronized(this)` only.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The concurrent scheduling path now relies solely on the distributed ZK
lock for same-table coordination. If two concurrent scheduleTasks calls
target the same table, one acquires the ZK lock and the other skips
generation with a lock-contention error and retries on the next cron
fire — acceptable because the operator cluster always runs with
distributed locking enabled.

- Removed `_tableJvmLocks` map, ReentrantLock import, and sort/acquire/
  release block inside `doScheduleTasks`.
- Simplified `doScheduleTasks` signature (dropped the flag parameter).
- Updated the startup warning to reflect that the concurrent path
  requires distributed locking for correctness.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The earlier concurrent-scheduling refactor inadvertently stripped the
`synchronized` modifier from two `@Deprecated(forRemoval = true)` wrappers
(`scheduleTasks(List<String>, boolean, String)` and
`scheduleTask(String, List<String>, String)`). Restore it so the deprecated
surface stays byte-for-byte unchanged against master, matching the six other
deprecated wrappers in this class. The canonical
`scheduleTasks(TaskSchedulingContext)` continues to handle concurrent dispatch.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Covers the two dispatch helpers introduced on this branch:
`PinotTaskManager#shouldUseConcurrentPath` and `#resolveConcurrentScheduling`.
Both are widened from `private` to package-private with `@VisibleForTesting`
so the tests can exercise them directly without booting a controller.

- `TableTaskConfigTest` (pinot-spi): single/two-arg constructor defaults,
  explicit true/false, JSON round-trip including omitted and explicit-null.
- `ControllerConfTest`: default and override of
  `isPinotTaskManagerConcurrentSchedulingEnabled`.
- `PinotTaskManagerConcurrentSchedulingTest`: table flag vs. cluster
  default precedence, per-table opt-out forcing legacy, database-scope
  expansion, all-tables scope, missing TableConfig skipped, empty-scope
  fallback to cluster default.

Also corrects a stale Javadoc on `TableTaskConfig#getConcurrentSchedulingEnabled`
that still described the per-table JVM lock dropped in 443d4da.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@noob-se7en noob-se7en added enhancement Improvement to existing functionality ingestion Related to data ingestion pipeline minion Related to Pinot Minion task framework configuration Config changes (addition/deletion/change in behavior) labels Apr 21, 2026
Boots a controller, registers a rendezvous-style task generator, and
fires two parallel scheduleTasks calls against two tables. When both
tables opt into concurrent scheduling via TableTaskConfig the generator
observes maxInFlight == 2; with the cluster default (false) the legacy
path serializes them and maxInFlight stays at 1.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 21, 2026

Codecov Report

❌ Patch coverage is 94.87179% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.62%. Comparing base (1d64a8e) to head (f5aeb37).
⚠️ Report is 38 commits behind head on master.

Files with missing lines Patch % Lines
...controller/helix/core/minion/PinotTaskManager.java 93.93% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18272      +/-   ##
============================================
+ Coverage     63.60%   63.62%   +0.01%     
+ Complexity     1660     1659       -1     
============================================
  Files          3246     3246              
  Lines        197514   197552      +38     
  Branches      30578    30589      +11     
============================================
+ Hits         125633   125686      +53     
+ Misses        61835    61824      -11     
+ Partials      10046    10042       -4     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.59% <94.87%> (+0.02%) ⬆️
java-21 63.58% <94.87%> (+<0.01%) ⬆️
temurin 63.62% <94.87%> (+0.01%) ⬆️
unittests 63.61% <94.87%> (+0.01%) ⬆️
unittests1 55.59% <100.00%> (+0.01%) ⬆️
unittests2 35.06% <94.87%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@noob-se7en noob-se7en requested a review from shounakmk219 April 21, 2026 08:48
}
checkedAnyTable = true;
if (!resolveConcurrentScheduling(tableConfig)) {
return false;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: To not over-engineer and split TaskSchedulingContext, just retuning false here for entire batch. scheduleTasks called by cron scheduler only sends 1 table in batch so it should be fine for cron scheduled tables.

@noob-se7en noob-se7en changed the title Make schedule task async Makes schedule task async Apr 21, 2026
Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is for many large minion task gen may just blow up the controller.
Shall we consider:

  1. limit the concurrency
  2. sync per tasks type or table based?

@noob-se7en
Copy link
Copy Markdown
Contributor Author

My main concern is for many large minion task gen may just blow up the controller. Shall we consider:

  1. limit the concurrency
  2. sync per tasks type or table based?
  1. Quartz uses a fixed thread pool (default SimpleThreadPool, threadCount=10), so at most ~10 scheduleTasks calls run concurrently from cron firings regardless of how many tables the controller leads. Excess triggers queue inside Quartz. We can make this configurable if needed.
  2. The distributed ZK lock still serializes same-table generation across controllers (prevents duplicate work) as lock is table level lock.
  3. Opt-in: the flag is off at cluster default and can be disabled per-table, so existing behavior is unchanged unless explicitly turned it on.

If Manual schedule API is called then the generation happens in a for-loop where generateTask is called in sync.
Hence manual API path is unchanged by this PR. scheduleTask() still iterates enabledTableConfigs sequentially and calls taskGenerator.generateTasks() one table at a time on a single grizzly thread.

Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found one high-signal concurrency issue; see inline comment.

public synchronized Map<String, TaskSchedulingInfo> scheduleTasks(TaskSchedulingContext context) {
public Map<String, TaskSchedulingInfo> scheduleTasks(TaskSchedulingContext context) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
if (shouldUseConcurrentPath(context)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can enter the unlocked path when a table opts in via TableTaskConfig.concurrentSchedulingEnabled while controller.task.enableDistributedLocking is still false: acquireTaskLock then returns null, so same-table scheduled runs and ad-hoc createTask no longer share any mutex. That can double-generate or submit minion tasks for the same table/window. Gate the concurrent path on distributed locking being enabled, or keep a per-table JVM lock when the distributed lock manager is absent.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is expected. Pre-requisite for this change is that enableDistributedLocking is enabled.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Xiang on having per table local locking so as to avoid concurrent task generations especially when user has configured an aggressive schedule so same controller ends up getting multiple triggers while older generations are still active.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if we decide to have a lock let's have it on table + task type as well to avoid dropping schedules of different tasks from same table

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Xiang on having per table local locking so as to avoid concurrent task generations especially when user has configured an aggressive schedule so same controller ends up getting multiple triggers while older generations are still active.

why do we need table local lock when we have distrubuted table lock?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the case where distributed locking is not enabled

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature doesn't make sense if distributed lock is disabled. I think we are kind of confusing the short term solution and long term, Pls re-check the PR description.

Copy link
Copy Markdown
Collaborator

@shounakmk219 shounakmk219 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also call out in the description (code comment as well) that when this is enabled along with distributed locking it may drop the task generations of different task types from same table in case the schedules/triggers coincide due to the distributed locking being on just at table level

// The concurrent path relies on the distributed ZK lock to coordinate same-table task
// generation (and to mutually exclude with ad-hoc createTask, which still takes
// synchronized(this)). Running without distributed locking leaves those races unprotected.
LOGGER.warn("Concurrent task scheduling is enabled but distributed locking is disabled. "
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set _clusterConcurrentSchedulingEnabled to false in this case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought about it but I didn't do it thinking that this will be an anti pattern because the config is enabled explicitly but code will then disable it internally. Gives a wrong signal to user.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can call it out as a dependent feature which needs distributed locking to be enabled to allow concurrent task generations.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already called it out in the description.
We have distrubuted lock enabled by default, and I will just set my table with concurrentScheduling enabled.

public synchronized Map<String, TaskSchedulingInfo> scheduleTasks(TaskSchedulingContext context) {
public Map<String, TaskSchedulingInfo> scheduleTasks(TaskSchedulingContext context) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
if (shouldUseConcurrentPath(context)) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Xiang on having per table local locking so as to avoid concurrent task generations especially when user has configured an aggressive schedule so same controller ends up getting multiple triggers while older generations are still active.

}
// If at least one table was inspected and none opted out, use concurrent path. Otherwise (no
// tables in scope) fall back to the cluster default so the decision is deterministic.
return checkedAnyTable || _clusterConcurrentSchedulingEnabled;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

promote the check on _clusterConcurrentSchedulingEnabled earlier on in the method to avoid the bunch of zk calls made here as when _clusterConcurrentSchedulingEnabled is true the table check are anyways redundant.

Copy link
Copy Markdown
Contributor Author

@noob-se7en noob-se7en Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table check is not redundant, table flag has higher priority than cluster level flag.
If tableFlag is not present in the table Config then only cluster flag is looked.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, we are doing all serially if any one of the table is not opting for concurrency.

public synchronized Map<String, TaskSchedulingInfo> scheduleTasks(TaskSchedulingContext context) {
public Map<String, TaskSchedulingInfo> scheduleTasks(TaskSchedulingContext context) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
if (shouldUseConcurrentPath(context)) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if we decide to have a lock let's have it on table + task type as well to avoid dropping schedules of different tasks from same table

@noob-se7en noob-se7en requested a review from shounakmk219 April 23, 2026 09:38
Copy link
Copy Markdown
Collaborator

@shounakmk219 shounakmk219 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature is safe to use only when distributed locking is enabled or on a cluster where tables do not have frequent cron schedules. With these limitations this short term fix is good enough to unblock concurrent task generations.

…k_async

# Conflicts:
#	pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@noob-se7en noob-se7en requested a review from xiangfu0 April 24, 2026 19:15
@noob-se7en
Copy link
Copy Markdown
Contributor Author

This feature is safe to use only when distributed locking is enabled or on a cluster where tables do not have frequent cron schedules. With these limitations this short term fix is good enough to unblock concurrent task generations.

Thanks Shounak. @xiangfu0 @swaminathanmanish can you pls help with merging the PR

@xiangfu0 xiangfu0 merged commit 1127eda into apache:master Apr 26, 2026
31 of 32 checks passed
@xiangfu0
Copy link
Copy Markdown
Contributor

Opened docs follow-up PR: pinot-contrib/pinot-docs#772

raghavyadav01 pushed a commit that referenced this pull request May 7, 2026
…tension (#18431)

* Make concurrent-scheduling dispatch helpers protected for subclass extension

Bumps PinotTaskManager#shouldUseConcurrentPath and #resolveConcurrentScheduling
from package-private @VisibleForTesting to protected @VisibleForTesting so that
subclasses in other packages can integrate with the concurrent scheduling path
introduced in #18272.

In particular, this lets a subclass override resolveConcurrentScheduling to
plug in a different per-table policy (for example, defaulting concurrent
scheduling to true for specific task types) and have its override invoked from
the parent's shouldUseConcurrentPath. Without this change, a different-package
subclass cannot polymorphically override the package-private resolver — Java
treats the same-named method as hidden rather than overridden — so the
extension hook is effectively unreachable.

No behavioral change. The methods retain @VisibleForTesting; existing same-
package tests continue to work unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Removes visibleForTesting

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

configuration Config changes (addition/deletion/change in behavior) enhancement Improvement to existing functionality ingestion Related to data ingestion pipeline minion Related to Pinot Minion task framework

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants