Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-21136] Adjust timeouts for reactive mode #15159

Closed

Conversation

rmetzger
Copy link
Contributor

What is the purpose of the change

Adjust timeout behavior of adaptive scheduler according to the discussion in Jira.

Verifying this change

Covered by new and existing tests.

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit ddeb93a (Thu Mar 11 19:31:44 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 11, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@rmetzger
Copy link
Contributor Author

Reported CI failure: https://issues.apache.org/jira/browse/FLINK-21728

@tillrohrmann tillrohrmann self-assigned this Mar 12, 2021
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @rmetzger. The overall changes look good to me. I had a few comments. Please take a look.

Description.builder()
.text(
"The resource stabilization timeout defines the time the JobManager will wait "
+ "if fewer than the required resources are available, but sufficient resources for execution are there."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
+ "if fewer than the required resources are available, but sufficient resources for execution are there."
+ "if fewer than the required resources are available, but sufficient resources for execution are there. "

Comment on lines 54 to 67
public static void configureClusterForReactiveMode(Configuration configuration) {
LOG.info("Modifying Cluster configuration for reactive mode");

if (!configuration.contains(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT)) {
// Configure adaptive scheduler to schedule job even if desired resources are not
// available (but sufficient resources)
configuration.set(
JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, Duration.ofMillis(0));
}
if (!configuration.contains(JobManagerOptions.RESOURCE_WAIT_TIMEOUT)) {
// configure adaptive scheduler to wait forever for TaskManagers to register
configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(-1));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not saying that we have to change it but I wanted to make an observation about this kind of pattern (changing the Configuration so that the right configuration values are being picked up later):

The problem I see with this pattern is that it decouples the instantiation of the required components and their configuration. Consequently, we as the developers have to remember that at every entrypoint which might use the AdaptiveScheduler we have to call ReactiveModeUtils.configureclusterForReactiveMode (here we have the ClusterEntrypoint and the MiniCluster). If we forget it, then the reactive mode might not really work.

If these context sensitive configuration option overwrites would be moved closer to the instantiation of the components, then it is easier to reuse the "reactive mode" in another setting. E.g. when seeing in the DefaultJobManagerRunnerFactory that we are running a job in reactive mode, then one could read RESOURCE_STABILIZATION_TIMEOUT and RESOURCE_WAIT_TIMEOUT with different default values (0 and -1) which are then passed to the factory of the AdaptiveScheduler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the key observation is that if we keep related things closer together, then it is easier to reuse them and it is easier to maintain them because they are not spread all over the place.

Documentation.Sections.EXPERT_SCHEDULING,
Documentation.Sections.ALL_JOB_MANAGER
})
public static final ConfigOption<Duration> RESOURCE_STABILIZATION_TIMEOUT =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to document the different default values when using the reactive mode. Also for RESOURCE_WAIT_TIMEOUT.

Comment on lines 108 to 169
private void checkDesiredOrSufficientResourcesAvailable() {
if (context.hasDesiredResources(desiredResources)) {
createExecutionGraphWithAvailableResources();
return;
}

if (context.hasSufficientResources()) {
if (isResourceStabilizationDeadlineOverdue()) {
createExecutionGraphWithAvailableResources();
} else {
// schedule next resource check
context.runIfState(
this,
this::checkDesiredOrSufficientResourcesAvailable,
resourceStabilizationDeadline.timeLeft());
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the implementation of the resource stabilization timeout differs from the description of the option. According to the description of the option I would have expected that the timeout only starts after we have acquired enough resources to run the job. With the current implementation we already start the stabilization timeout when we enter the state.

Comment on lines 199 to 213
if (scheduledRunnable.getExpectedState() == wfr
&& !scheduledRunnable.getDelay().equals(initialResourceTimeout)
&& !scheduledRunnable.getDelay().equals(Duration.ZERO)) {
scheduledRunnable.runAction();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit hacky. Would it make sense to add something like advance time which runs all the runnables which are eligible for running? Right now we effectively test a scenario which is not possible because we leave the 0 delay runnables out.

@rmetzger
Copy link
Contributor Author

Thanks a lot for your feedback. I believe I've addressed all items!

.withDescription(
Description.builder()
.text(
"The resource stabilization timeout defines the time the JobManager will wait "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it quite difficult to grasp what exactly this option does, and would suggest a re-phrasing:

The time the JobManager will wait for to acquire further resources, once the minimum set of resources required for the execution of a job has been acquired. Once [...]

Maybe also expand the documentation along the lines of (a lower value results in the job starting up faster, while higher value ...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like Chesnay's proposal.

@rmetzger rmetzger force-pushed the FLINK-21136-v3-timeout-behavior branch from 30724c1 to 57d9b95 Compare March 16, 2021 12:18
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this PR @rmetzger. I had a few comments please take a look. One comment is concerning the resetting of the sufficient resource timeout. The other is concerning the way we test timeouts. Maybe we need to introduce a test time and utilities to run schedulables up to a certain point (the current test time). This might make things a bit easier.

<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-wait-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.<br />Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted), while decreasing this value reduces downtime of a job (provided that enough slots are available to still run the job).</td>
<td>The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.<br />Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted), while decreasing this value reduces downtime of a job (provided that enough slots are available to still run the job).<br />Setting a negative duration will disable the resource timeout: The JobManager will wait indefinitely for resources to appear.<br />If <span markdown="span">`scheduler-mode`</span> is configured to <span markdown="span">`REACTIVE`</span>, this configuration value will default to a negative value to disable the resource timeout.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it still correct to say that decreasing this value will reduce the downtime of jobs? If we have sufficient resources, then jobmanager.adaptive-scheduler.resource-stabilization-timeout should be the decisive factor how long the system waits to restart the job.

<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>The resource stabilization timeout defines the time the JobManager will wait if fewer than the required resources are available, but sufficient resources for execution are there. Once this timeout has passed, the job will start executing with the available resources, or fail, if the resources are not sufficient. The timeout starts as soon as there are sufficient resources available for execution.<br />If <span markdown="span">`scheduler-mode`</span> is configured to <span markdown="span">`REACTIVE`</span>, this configuration value will default to 0, so that jobs are starting immediately with the available resources.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe: The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available.

<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>The resource stabilization timeout defines the time the JobManager will wait if fewer than the required resources are available, but sufficient resources for execution are there. Once this timeout has passed, the job will start executing with the available resources, or fail, if the resources are not sufficient. The timeout starts as soon as there are sufficient resources available for execution.<br />If <span markdown="span">`scheduler-mode`</span> is configured to <span markdown="span">`REACTIVE`</span>, this configuration value will default to 0, so that jobs are starting immediately with the available resources.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we failing if there are not sufficient resources available? I thought that this timeout would only be triggered if we have sufficient resources?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that we can not guarantee sufficient resources, even if SlotAllocator.determineParallelism says so, because we could loose slots between "sufficient resources" and "start scheduling".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this can happen but to me the description reads a bit like we have seen once that the resources are sufficient and then we trigger the timeout and no matter what when the timeout triggers we try to schedule the job. Instead, the system should still check whether it has sufficient resources and only try to schedule the job if this is still true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification.

.withDescription(
Description.builder()
.text(
"The resource stabilization timeout defines the time the JobManager will wait "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like Chesnay's proposal.

Comment on lines 203 to 212
Duration initialResourceAllocationTimeout =
returnValueOrReplaceDefaultIfReactiveMode(
configuration,
JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(-1));
Duration resourceStabilizationTimeout =
returnValueOrReplaceDefaultIfReactiveMode(
configuration,
JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT,
Duration.ZERO);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be simpler to define the defaults based on configuration.get(JobManagerOptions.SCHEDULER_MODE) here and then simply call configuration.getOptional(option).orElseGet(() -> defaultValue) where defaultValue has been specified before?

this::checkDesiredOrSufficientResourcesAvailable,
deadline.timeLeft());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we clear the resource stabilization deadline if we no longer have sufficient resources so that we can start a new deadline once we have sufficient resources again?

Comment on lines 210 to 212
if (scheduledRunnable.getExpectedState() == wfr
&& !scheduledRunnable.getDelay().equals(initialResourceTimeout)
&& !scheduledRunnable.getDelay().equals(Duration.ZERO)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need these filters here?

// sufficient resources available
ctx.setHasDesiredResources(() -> false);
ctx.setHasSufficientResources(() -> true);
wfr.setTestDeadline(Deadline.fromNow(Duration.ofDays(600)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leaks quite some implementation details. If we need to go to these lengths then this is usually an indicator that the class is not well testable.

executeAllScheduledRunnables(ctx, wfr);
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are lacking a test where we have sufficient resources, then again no sufficient resources and then restart the timeout once we have sufficient resources again (e.g. testing that the sufficient resource timeout can be restarted).

assertNoStateTransitionsAfterExecutingRunnables(ctx, wfr);

// advance time to now
wfr.setTestDeadline(Deadline.now());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For these timeout tests, it might make sense to introduce a Clock for the global test time. This Clock could be used by the state as well as the test context. Then we could say that we can advance the clock which would automatically run all runnables which are eligible now. That way we might be able to express the tests a bit more naturally.

@rmetzger rmetzger force-pushed the FLINK-21136-v3-timeout-behavior branch from 28bf753 to 989abd4 Compare March 22, 2021 06:23
@rmetzger rmetzger force-pushed the FLINK-21136-v3-timeout-behavior branch from 989abd4 to 6651b8a Compare March 22, 2021 07:21
@rmetzger
Copy link
Contributor Author

Thanks a lot for your review! I addressed all your comments (however skipping optional comments for the sake of time). I also rebased this change onto the latest changes on master.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this PR @rmetzger. The changes look good to me. Also I like the new testing utilities and how you test for the timeouts. I think that way it is clearer now :-)

Comment on lines +348 to +357
Iterable<ScheduledTask<Void>> copyOfScheduledTasks = new ArrayList<>(scheduledTasks);
Iterator<ScheduledTask<Void>> scheduledTaskIterator = copyOfScheduledTasks.iterator();
while (scheduledTaskIterator.hasNext()) {
ScheduledTask<Void> scheduledTask = scheduledTaskIterator.next();
if (scheduledTask.getDelay(TimeUnit.MILLISECONDS) <= untilDelay) {
scheduledTask.execute();
if (!scheduledTask.isPeriodic()) {
scheduledTaskIterator.remove();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this implementation has the problem that we are forgetting to execute tasks which are created as part of other tasks because we copy scheduledTasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But they will be executed with the next call to runScheduledTasks(). I want to only execute tasks which are scheduled up to now.
Immediately executing newly scheduled tasks can lead to loops, for example WaitingForResources.checkDesiredOrSufficientResourcesAvailable() schedules itself in some circumstances.

But there was actually a flaw in my code: The removal mechanism doesn't work as intended: It removes elements from the copy. My intention was actually to execute non periodic tasks only once. I've addressed this now in the code.
If you disagree with any of these changes, I'm happy to address this in a hotfix alongside another pull request or a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

untilDelay should define an upper bound for the tasks to execute if there is no tasks with a delay of 0 which enqueues itself over and over again.

The problem with this implementation is that this might simulate an interleaving which cannot happen with a proper executor.

rmetzger added a commit to rmetzger/flink that referenced this pull request Mar 22, 2021
rmetzger added a commit to rmetzger/flink that referenced this pull request Mar 23, 2021
rmetzger added a commit to rmetzger/flink that referenced this pull request Mar 23, 2021
@rmetzger rmetzger closed this in 9635730 Mar 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants