Skip to content

fix: harden ingesting autoscalers around task-count boundaries#19269

Merged
Fly-Style merged 6 commits intoapache:masterfrom
Fly-Style:lag-based-autoscaler
Apr 15, 2026
Merged

fix: harden ingesting autoscalers around task-count boundaries#19269
Fly-Style merged 6 commits intoapache:masterfrom
Fly-Style:lag-based-autoscaler

Conversation

@Fly-Style
Copy link
Copy Markdown
Contributor

@Fly-Style Fly-Style commented Apr 7, 2026

This PR:

  • fix lag-based autoscaler by using taskCount from ioConfig for scale action calculations instead of activeTaskGroups;
  • hardens seekable-stream autoscalers when a supervisor is configured with a handwritten taskCount outside the allowed bounds. For both cost-based and lag-based autoscalers, if the current taskCount is below taskCountMin or above taskCountMax, the scaler now returns the nearest valid boundary instead of using the out-of-range value as the scaling baseline. This keeps supervisors within configured limits and avoids inconsistent scaling decisions.

This PR has:

  • been self-reviewed.
  • a release note entry in the PR description.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.

@Fly-Style Fly-Style changed the title bug: lag-based autoscaler: use taskCount from ioConfig for scale action instead of activeTaskGroups bug: lag-based autoscaler: use taskCount from ioConfig for scale action instead of activeTaskGroups Apr 7, 2026
@Fly-Style Fly-Style changed the title bug: lag-based autoscaler: use taskCount from ioConfig for scale action instead of activeTaskGroups fix: lag-based autoscaler: use taskCount from ioConfig for scale action instead of activeTaskGroups Apr 7, 2026
@Fly-Style Fly-Style force-pushed the lag-based-autoscaler branch from 4977473 to 746c760 Compare April 7, 2026 12:36
@Fly-Style Fly-Style requested a review from jtuglu1 April 7, 2026 13:16
Copy link
Copy Markdown
Contributor

@amaechler amaechler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐻

@Fly-Style

This comment was marked as outdated.

@Fly-Style Fly-Style changed the title fix: lag-based autoscaler: use taskCount from ioConfig for scale action instead of activeTaskGroups fix: harden ingesting autoscalers around task-count boundaries Apr 8, 2026
@Fly-Style
Copy link
Copy Markdown
Contributor Author

cc @zhangyue19921010


// If task count is out of bounds, scale to the configured boundary
// regardless of optimal task count, to get back to a safe state.
if (isScaleActionAllowed() && isTaskCountOutOfBounds) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to respect this isScaleActionAllowed if we're violating min/max task count bounds?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a tricky thing, but my take here is -- eventually we will scale (and by eventually I mean - within minTriggerScaleActionFrequencyMillis ms), and it might be harmful to scale immediately.

I don't have a strong opinion here, I am open to remove isScaleActionAllowed() from the condition.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leave this as-is for now, maybe add a comment explaining the decision?

|| currentTaskCount > config.getTaskCountMax();
if (isTaskCountOutOfBounds) {
currentTaskCount = Math.min(config.getTaskCountMax(),
Math.max(config.getTaskCountMin(), supervisor.getIoConfig().getTaskCount()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use currentTaskCount

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! It is cleaner.


final int result = autoScaler.computeTaskCountForScaleAction();

Assert.assertEquals(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Either mock computeOptimalTaskCount to return a value different from the clamped value (e.g. mock it to return -1 or taskCountMin - 1) and assert the boundary is returned, or use verify(autoScaler, never()).computeOptimalTaskCount(any()) to confirm the early-return path was taken.

* @return Integer, target number of tasksCount. -1 means skip scale action.
*/
private int computeDesiredTaskCount(List<Long> lags)
int computeDesiredTaskCount(List<Long> lags)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's mark with the proper @VisibleForTests annotation

// regardless of optimal task count, to get back to a safe state.
if (isScaleActionAllowed() && isTaskCountOutOfBounds) {
taskCount = currentTaskCount;
log.info("Task count for supervisor[%s] was out of bounds [%d,%d], scaling.", supervisorId, config.getTaskCountMin(), config.getTaskCountMax());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to set: lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

);

int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We donot need to change this no?
We can still reference the activeTaskGroupCount and start clamping things below ?

Copy link
Copy Markdown
Contributor Author

@Fly-Style Fly-Style Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still sure that is the correct approach - autoscaler explicitly changes taskCount and operates with different task counts configurations.

Anyway, my aim of changing this was not only the change itself, but also to born discussion.
@jtuglu1 WDYT regarding changing that?

Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still sure that is the correct approach - autoscaler explicitly changes taskCount and operates with different task counts configurations.

I'm fine with this switch.

I'm generally for using more static static configs when doing any kind of algorithm as it helps reduce chance of races, increases determinism/debuggability, etc. I would imagine in some cases there can be lots of skew between supervisor.getActiveTaskGroupsCount() and supervisor.getIoConfig().getTaskCount(), for example during roll-over. In other words, I'm fine with reading a potentially slightly stale value as long as it's generally consistent with the current world view.

@Fly-Style Fly-Style requested a review from jtuglu1 April 10, 2026 08:31
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
"Already at min task count"
)
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount));
Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should try and move this min/max hit emission logic out of a specific auto-scaler implementation, and into the generic DynamicScaleActionNotice logic. All an auto-scaler implementation should do is tell what task count it recommends for a given supervisor. The supervisor has min/max task count configs, and should IMO attempt to apply the auto-scaler recommendation. This way, we don't need to keep worrying about maintaining observability parity between the auto-scaler implementations.

Copy link
Copy Markdown
Contributor Author

@Fly-Style Fly-Style Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File an issue for that please, personally I do not have a time right now to resolve that issue, but that absolutely makes sense!

Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, can we emit metrics on the cost-based auto-scaler when we are capped by min/max? To keep consistent with lag-based auto-scaler? These metrics are useful for operators who may have automated mechanisms to go and, say, bump the max task count.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do! Thanks!

@jtuglu1 jtuglu1 self-requested a review April 14, 2026 16:32
Configs.valueOrDefault(taskCount, autoScalerConfig.getTaskCountMin())
);
} else if (isAutoScalerAvailable) {
this.taskCount = taskCount != null ? taskCount : autoScalerConfig.getTaskCountMin();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Configs.valueOrDefault

if (isScaleActionAllowed() && isTaskCountOutOfBounds) {
taskCount = currentTaskCount;
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
log.info("Task count for supervisor[%s] was out of bounds [%d,%d], scaling.", supervisorId, config.getTaskCountMin(), config.getTaskCountMax());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's include what we're scaling to. The other messages do.


// Take the current task count but clamp it to the configured boundaries if it is outside the boundaries.
// There might be a configuration instance with a handwritten taskCount that is outside the boundaries.
final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin()
Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to make sure we are not bounding if task count count min, say, if we don't have metrics temporarily (return -1 in computeOptimalTaskCount). Right now, I think we can trigger this condition if we're not checking for a non-negative.

Copy link
Copy Markdown
Contributor Author

@Fly-Style Fly-Style Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, in the code path when you're not having metrics you won't trigger it. See:

You compute final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
and touch this value outside code path when
if (isScaleActionAllowed() && isTaskCountOutOfBounds) condition was checked.

If this ^^^ condition works, the rest of the code path includes returning taskCount from the method.

Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we still emit metrics for hitting minimum in that case?

Consider optimalTaskCount == -1 (no metrics case) and we cannot scale due to isScaleActionAllowed() == false. Wouldn't we hit this line? IMO, it would be cleaner to handle the -1 case separately.

Copy link
Copy Markdown
Contributor Author

@Fly-Style Fly-Style Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, fair, but why I did it: we're emitting min/max with scalingSkipReason dimension ... so it explicitly means 'no scaling was performed'.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of the reason I want to move this min/max check stuff into the shared code at some point so implementors don't need to care about this stuff (just emit the task count and let supervisor logic handle it). But, for now, I think we can just handle this -1 case separately.

Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, fair, but why I did it: we're emitting min/max with scalingSkipReason dimension ... so it explicitly means 'no scaling was performed'.

Yes but the metric is a different meaning here. IMO, we should not be emitting a metric if we explicitly don't want to scale/don't want to make a decision on scaling. The metric being emitted means we wanted to scale up/down to some value that's outside the bounds of our spec. The value being -1 implies something else.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metric being emitted means we wanted to scale up/down to some value that's outside the bounds of our spec.

That won't happen in the case of CBA, the algorithm has hard limitation of picking potential values only within bounds.

Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, nevertheless, I'd still prefer to be defensive here and not assume anything about the future behavior of the algorithm or its outputs. IMO treating the scaling algorithm as a blackbox allows us to be more safe and provides a correctness layer against other bugs.

Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after CI passes. I think the comments RE not making assumptions about what the algorithm will return will make it easier to port the min/max bounds checking + metric emission logic to a shared piece of code later on (as well as defend against bugs/incorrect assumptions). They also help with readability (either assumptions explicitly documented or logical guards against stuff like -1 help the reader understand the code).

@Fly-Style Fly-Style merged commit 5f37b7b into apache:master Apr 15, 2026
63 of 64 checks passed
@Fly-Style Fly-Style deleted the lag-based-autoscaler branch April 15, 2026 10:30
@github-actions github-actions bot added this to the 38.0.0 milestone Apr 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants