Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12763][runtime] Fail job immediately if tasks’ resource needs can not be satisfied. #8740

Closed

Conversation

xintongsong
Copy link
Contributor

@xintongsong xintongsong commented Jun 14, 2019

What is the purpose of the change

This pull request is based on #8704. It makes SlotManager immediately fails slot requests with resource profiles that cannot be satisfied.

Brief change log

  • d4e9868: Covert ResourceSpec of job vertex to ResourceProfile of the slot request, which marks the fine grained resource profiles are actually used for scheduling.
  • ea27192: Hotfix of exiting test cases that misuses unknown resource profiles in slot offers.
  • 611e460: Introduce a switch failUnfulfillableRequest and corresponding tests in SlotManager. When switched on, the SlotManager should immediately fail pending and new coming slot requests that cannot be satisfied by neither any registered slot nor any pending slot that can be allocated.
  • 740bc66: Resource managers on Yarn/Mesos check whether the requested resources profile can be satisfied in startNewWorker, according to the default configuration. New workers will not be started if the requests can not be satisfied.
  • 5c37636: Resource managers on Yarn/Mesos turn the failUnfulfillableRequest switch of SlotManager on right after started, because they can decide whether requested resource profiles can be fulfilled from the configuration.
  • 27f9881: Resource managers of standalone clusters turn the failUnfulfillableRequest switch of SlotManager on after a start-up period, during which it expect task managers to be registered. Introduce a config option for this start-up period.

Verifying this change

This change added tests and can be verified as follows:

  • Added SlotManagerTest#testFailUnfulfillableSlotRequests that validates slot manager fails slot requests that can not be fulfilled by neither registered slots nor pending slots that can be allocated when the failUnfulfillableRequest switch is turned on.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 14, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 27f9881 (Tue Aug 06 15:39:08 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

} else {
clusterInitTime = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Paring of clusterInitTime occurs in both StandaloneResourceManagerFactory and StandaloneResourceManagerWithUUIDFactory, I think it can be moved to somewhere in common and here we can get the parsed value directly.

* and failed. If not configured, {@link JobManagerOptions#SLOT_REQUEST_TIMEOUT} will be used by default.
*/
public static final ConfigOption<Long> CLUSTER_INITIALIZATION_TIME = ConfigOptions
.key("resourcemanager.cluster-init-time")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that cluster-init-time seems to be too general. Do you think names like resourcemanager.standalone.slot-resources-registration-timeout would be better?

@@ -133,7 +139,7 @@
private final FatalErrorHandler fatalErrorHandler;

/** The slot manager maintains the available slots. */
private final SlotManager slotManager;
protected final SlotManager slotManager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be better to have a protected setAllowedSlotResourceProfiles method in the ResourceManager than making slotManager to be protected directly.

@@ -123,6 +125,9 @@
/** Release task executor only when each produced result partition is either consumed or failed. */
private final boolean waitResultConsumedBeforeRelease;

/** Resource profiles of slots that can be acquired. */
private final HashSet<ResourceProfile> allowedSlotResourceProfiles = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name is a little ambiguous and it seems to be the resource profiles of the slots that are allowed to created on the TM side. I think it may be better to use names like availableSlotResourceProfiles. The comments and the name of setAllowedSlotResourceProfiles may also be modified according.

Preconditions.checkArgument(
ResourceProfile.UNKNOWN.equals(resourceProfile),
"The YarnResourceManager does not support custom ResourceProfiles yet. It assumes that all containers have the same resources.");
// start new TM with default resource profile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

starts

assertTrue(slotManager.registerSlotRequest(slotRequest1));
try {
slotManager.registerSlotRequest(slotRequest2);
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catch SlotManagerException may be better.

assertTrue(slotManager.registerSlotRequest(slotRequest2));
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may also need to test the case that pendingRequest is marked as fail when setAllowedSlotResourceProfiles.


private ResourceID taskExecutorResourceID;

private ResourceID resourceManagerResourceID;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resourceManagerResourceID is not used


private static final Time CLUSTER_INIT_TIME = Time.seconds(1L);

private static final long HEARTBEAT_TIMEOUT = 5000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HEARTBEAT_TIMEOUT_IN_MILLS

*/
public class StandaloneResourceManagerTest extends TestLogger {

private static final Time TIMEOUT = Time.seconds(10L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIMEOUT seems to be too general and may be RPC_TIMEOUT


private HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this empty line.

* limitations under the License.
*/

package org.apache.flink.runtime.resourcemanager.utils;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this test located in the utils package? I think it may be in the org.apache.flink.runtime.resourcemanager

slotReport,
TIMEOUT).get();

Thread.sleep(CLUSTER_INIT_TIME.toMilliseconds());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slack time interval may be need to ensure stability.

@xintongsong
Copy link
Contributor Author

@gaoyunhaii Thanks again for the review. Your comments are addressed.

@xintongsong xintongsong force-pushed the fail-job-large-slotrequest branch 2 times, most recently from 36dfa37 to 0cca74c Compare June 19, 2019 02:39
@xintongsong
Copy link
Contributor Author

@flinkbot attention @tillrohrmann

@StephanEwen
Copy link
Contributor

I think that the problem you describe is a more general problem for the standalone resource manager.

I standalone mode, it can take a long time until the "not enough resources" exception comes for streaming jobs, and for batch jobs the "no matching slot". So why don't we solve it in a more general way?

I like the idea of a "startup period" in which the standalone RM waits for a longer timeout for TMs (and thus slots) to appear, and after that period slot requests are failed immediately if no free slot is readily available. That idea has floated around for a bit, maybe it is time to go for it.

What I don't quite understand is the "mixed solution" in this PR that the startup period is used to discover what resource profiles are available. After that, requests still time out after a long time unless they request a resource profile that is incompatible with the ones seen during the startup period.

I think this may lead to strange behavior:

  • TaskManagers that register late might not get used. You can start larger TMs later, they register, but slot requests still fail.
  • A profile might be available during the startup period, but the TMs shut down later, and the slot requests cannot be fulfilled any more. But the requests take a long time, because the resource profile was a known profile.

All this becomes both easier and more consistent with a simple startup-period for the StandaloneResourceManager. After that, all fail immediately unless a slot is directly available.

What do you think?

BTW: This would be a change we need to discuss on dev/user mailing lists, because it changes system behavior. Probably most users would agree that it is for the better, but nonetheless, we need to be transparent there.

@xintongsong
Copy link
Contributor Author

I think you are right. I can see the problem of the current solution.

Just one question about the solution your proposed. What do we do for the batch job?

  • Do we allow requests to pend and wait for other tasks to finish and release resource? If so, that means we need different behaviors for streaming/batch jobs, and that requires the resource manager to be aware of the two different job types.
  • Or do we always fail the requests and tell the JM why the request is failed, because the requested resource is too large (probably shouldn't retry) or because there are temporally no available slots (may retry later). This might make some job starving because before it retry the failed requests, resources may become available and then taken by other requests that come latter.

@xintongsong xintongsong force-pushed the fail-job-large-slotrequest branch 2 times, most recently from 0452d82 to 2767427 Compare July 4, 2019 16:15
@StephanEwen
Copy link
Contributor

I think the difference between batch and streaming should not manifest in the ResourceManager.

It can manifest in the scheduler, so let's see if we can cover this there. What do you think about this approach:

  • When the scheduler requests slots from the SlotPool, it uses a timeout.

  • For streaming, that is finite (you want a "NotEnoughResourcesAvailable" exception rather soon.

  • For batch, it is infinite, because the same slots can be used after another.

  • Failures from the ResourceManager to allocate a slot (timeout, whatsoever) only cancel the Future. But this is not propagated to the request from the scheduler.

  • Open issue: How to ensure that there is at least one slot of the relevant size

Long Term Approach

  • We want to change the SlotPool such that you set something like
    min: x slots of profile a and y slots of profile b
    preferred: k slots of profile a, i slots of profile b
  • That is also the way to grow resources before triggering scaling in streaming auto scaling
  • In streaming, when "NotEnoughResourcesAvailable" exception comes, then we trigger auto-scale-down

Short Term

  • Maybe we assume the minimum is always one
  • Slot pool requests do not fail as long as there is one slot of the desired resource profile.

@xintongsong
Copy link
Contributor Author

@flinkbot attention @StephanEwen

Hi @StephanEwen , I've update this PR, as well as its the description. Please take a look at your convenience. Thank you.

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 6, 2019

CI report for commit 2767427: SUCCESS Build

xintongsong added a commit to xintongsong/flink that referenced this pull request Jul 7, 2019
@flinkbot
Copy link
Collaborator

flinkbot commented Jul 7, 2019

CI report for commit 30ef9c3: FAILURE Build

xintongsong added a commit to xintongsong/flink that referenced this pull request Jul 7, 2019
…n slot offers.

The test cases should not user ResourceProfile#UNKNOWN in slot offers.
  - ResourceProfile#UNKNOWN is used for slot requests whose resource needs are not specified.
  - ResourceProfile#ANY is used for task manager slots whose can match any slot request.
These cases haven't been failing because so far slot requests always have unknown resource profiles.
…ker when requested resource profile cannot be satisfied.
…ter that after this period StandaloneResourceManager set SlotManager to fail unfulfillable requests.
@xintongsong
Copy link
Contributor Author

Rebased to master after #8704 is merged.

xintongsong added a commit to xintongsong/flink that referenced this pull request Jul 8, 2019
@flinkbot
Copy link
Collaborator

flinkbot commented Jul 8, 2019

CI report for commit 27f9881: FAILURE Build

@StephanEwen
Copy link
Contributor

StephanEwen commented Jul 9, 2019

Looks good overall. And thanks for splitting this into finer grained commits for the review - it helped a lot.

I think we may have a latent bug in the ResourceProfile - not introduced by this PR, but it may affect the code in this PR.

In isMatching, the ResourceProfile checks whether (required == UNKNOWN) which may not be always the case, even is requires is equal to unknown. This probably needs to be UNKNOWN.equals(required).

Especially because the ResourceProfile is serializable, the object may get duplicated and not be the same instance any more.

@xintongsong
Copy link
Contributor Author

xintongsong commented Jul 9, 2019

Thank you for the review, @StephanEwen.
I don't think this is a bug. If the required profile is equal to unknown, the isMatching will return true anyway according to the comparing logics. The (required == UNKNOWN) is only a shortcut to avoid comparing individual properties, because comparing object reference is much less work than comparing the properties. If we use UNKNOWN.equals(required) here, it will not speed up at all.

@StephanEwen
Copy link
Contributor

True - UNKNOWN has negative requirements (I confused it with ANY, which has infinite requirements).
You are right, it should work, just not take the shortcut.

@xintongsong
Copy link
Contributor Author

Manually merged by @StephanEwen in ea032ae.

@xintongsong xintongsong deleted the fail-job-large-slotrequest branch July 10, 2019 12:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants