Skip to content

Conversation

@tillrohrmann
Copy link
Contributor

@tillrohrmann tillrohrmann commented Jul 10, 2019

What is the purpose of the change

This PR is based on #9043.

This commit adds a new type of slot request which can be issued to the SlotPoolImpl.
The batch slot request is intended for batch jobs which can be executed with a single
slot (having at least one slot for every requested resource profile equivalence class).
Usually, a job which fulfills this criterion must not contain a pipelined shuffle.

The new batch slot request behaves in the following aspects differently than the normal
slot request:

  • Batch slot request don't time out if the SlotPool contains at least one allocated slot
    which can fulfill the pending slot request
  • Batch slot request don't react to the failAllocation signal from the ResourceManager
  • Batch slot request don't fail if the slot request to the resource manager fails

In order to time out batch slot request which cannot be fulfilled with an allocated slot,
the SlotPoolImpl schedules a periodic task which checks for this condition. If a slot cannot
be fulfilled, it is marked as unfulfillable and the current timestamp is recorded. If the
slot cannot be marked as fulfillable until the batch slot timeout has been exceeded, the
slot request will be timed out.

The batch slot request will be requested by calling SlotPool#requestNewAllocatedBatchSlot.

cc @xintongsong @StephanEwen

Verifying this change

Added SlotPoolBatchSlotRequestTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 10, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 1de0f63 (Tue Aug 06 15:40:46 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@xintongsong
Copy link
Contributor

Hi @tillrohrmann, thanks for opening this PR.

I have one concern about the way we handle batch slot requests.

  • Batch slot request don't time out if the SlotPool contains at least one allocated slot
    which can fulfill the pending slot request
  • Batch slot request don't react to the failAllocation signal from the ResourceManager
  • Batch slot request don't fail if the slot request to the resource manager fails

From above descriptions, it seems to me that you are assuming a batch request the is not fulfilled in the first place can only wait for another slot in the slot pool to be freed. I'm not sure about this.

I think we need to consider the possibility that a pending slot request in slot pool can also be satisfied by the resource manager after the slot request to RM failed in the first place.

  • When there are multiple jobs running in the cluster, there might be resources taken by other jobs becoming available. While for streaming jobs there may not be many cases that users run multiple jobs in one cluster, it is probably not that rare for batch jobs.
  • If we think of fine grained resource profiles, we might be able to regenerate new slots from the resources returned by existing slots. E.g., if we have 1 slot with 100MB and 2 slots with 50MB managed memory in the slot pool, and lots of tasks that requires 100MB managed memory to run. There is a chance that when tasks in the two 50MB slots finished, the two slots are freed and then the resource manager is able to allocate another 100MB slot from the task executor.

Therefore, I would suggest to make the slot pool retry requesting slot from the resource manager for batch slot requests before their timeouts.

@tillrohrmann
Copy link
Contributor Author

Hi @xintongsong, you are right that we don't retry slot requests from within the SlotPool. However, we do so in case of a failover.

I'm actually not so sure whether the SlotPool should retry the requests. One could also argue that the SlotManager should simply not fail the slot requests if it thinks that it can eventually satisfy it. In both cases you need to define a timeout, though, and it just happens that in the current implementation we don't retry.

Moreover, I think that this retry logic would be an optimization of the existing logic and, thus, not strictly release critical. Since the batch request is anyway only a band aid/temporary bridge, I would like to avoid to add even more logic which we need to rework later on just before the feature freeze.

Please object if you think that this feature is a release blocker and needs to be added.

@StephanEwen
Copy link
Contributor

I agree with Till here. The logic is not yet perfect, but should be an improvement over the current state.

Under fine-grained recovery, the current state would lead to failure of a task and individual recovery, re-triggering a request to the RM. That is good, but the downside is that it takes away recovery attempts. I think this is tricky for users to understand, that we rely on failure / recovery to re-request resources. It makes re-try attempts meaningless and brings users to debug jobs (because they see unexpected failures) when really nothing is wrong.

With this change here, we don't rely on failure/recovery any more, but do not re-trigger timed out requests within a stage. It may hence be that a stage does not optimally use its resources. Requests come again in the next stage.

Like Till suggested, for 1.10, we should consider a different model. Requests from the SlotPool to the RM should not time out (unless there is an actual failure) and resources that appear at the RM make it to the SlotPool. Letting the SlotPool periodically request resources seems like a workaround to me.

@tillrohrmann
Copy link
Contributor Author

Another thing which deserves discussion is how we handle failed slot requests to the ResourceManager. With this PR, batch requests will simply ignore the failed requests to the ResourceManager and rely on the batchSlotTimeout to fail the request eventually.

However, with the #8740, we fail the ResourceManager slot request if we know that we cannot fulfill the requested ResourceProfile. In this case, we won't react to this signal and instead wait on the timeout. If the contract is that a slot request to the ResourceManager only fails if the ResourceManager is sure that it cannot fulfill the request (never not just in the moment the request arrived), then we could use this signal to immediately fail the slot request.

If we decide to do this, then we should do it as a follow up to this PR.

@xintongsong
Copy link
Contributor

Sorry to bring it back here.

With this change here, we don't rely on failure/recovery any more, but do not re-trigger timed out requests within a stage. It may hence be that a stage does not optimally use its resources. Requests come again in the next stage.

I'm worried about that is is not only a matter of whether optimally use the resources, but may cause jobs fail to execute when the total resource of the cluster is actually enough.

Consider we have two vertex A (2 parallelism, 100MB managed memory) and B (1 parallelism, 200MB managed memory), and the cluster has one TM with 200MB managed memory in total. Ideally, we would expect the two tasks of vertex A run on the TM concurrently, and then one task of vertex B. However, if we first request two 100MB slots, RM can not allocate any more 200MB slot for B, and there is no 200MB in the slot pool neither. As a result, the slot request for B is failed, leading to the job failure.

Please correct me if I was wrong. I'm not very sure about whether the failure of slot request for B would cause the job to fail, or whether this can be resolved by the fine grained recovery.

@zhuzhurk
Copy link
Contributor

Hi Xingtong, I think you are right that this improvement cannot handle the case you describes. However, the fine-grained recovery can work as a fallback. It uses re-scheduling as a retry for resources. In this way B will finally get assigned with the resources that is released from A and returned to RM.
As Stephan mentioned, the failover way can be annoying to Flink users. And Till's PR is targeting for improvement this by reducing task failovers caused by slot allocation timeout. It works for most cases, although not all(like the one you mentioned).

@flinkbot
Copy link
Collaborator

CI report for commit 5b0aa0e: FAILURE Build

@xintongsong
Copy link
Contributor

Thank you for the clarification, @zhuzhurk.

If the fine grained recover can work as a fallback, I think we can eventually get jobs in scenarios that I described above to work. As long as we don't fails jobs that should be able to run, then this should be an optimization that we don't necessarily do for this version.

And @tillrohrmann, I want to make one more clarification.

However, with the #8740, we fail the ResourceManager slot request if we know that we cannot fulfill the requested ResourceProfile. In this case, we won't react to this signal and instead wait on the timeout. If the contract is that a slot request to the ResourceManager only fails if the ResourceManager is sure that it cannot fulfill the request (never not just in the moment the request arrived), then we could use this signal to immediately fail the slot request.

I think currently we are failing the ResourceManager slot request if it cannot be fulfilled not only for the moment request arrived with #8740. To be more specific, the logic is to fail slot request that can not be fulfilled by neither a registered slot (after the startup period, for standalone), nor a pending slot that can be allocated (for Yarn/Mesos). In other words, we are failing slot requests that can, to the best of our knowledge, never be fulfilled.

I would prefer the current way (fail requests that can never be fulfilled), while @StephanEwen said we could also consider the other way (fail requests that can not be fulfilled at the moment). What do you think?

@tillrohrmann
Copy link
Contributor Author

Yes, I think the fine grained recovery should effectively retrigger the request as part of its failover. This is not nice, but should do the trick.

For the problem with differently sized slots which originate from the same TM, this might actually be an argument to not include the change which allows to share managed memory between slots in this release @StephanEwen. It seems to me that not all implications are clear at the moment. Being able to dynamically size the slots complicates the slot allocation protocol further because with this change requests might become fulfillable depending on what one releases.

Copy link
Contributor

@StephanEwen StephanEwen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very good all in all.

Some minor comments in line.

As a follow-up, it might make sense to start using Duration/Deadline in the SlotPool instread of long values. Makes it more explicit and avoids the danger of accidentally confusing millis/nanos. That is orthogonal to this change, though.


releaseTaskManager(slotPool, directMainThreadExecutor, taskManagerResourceId);

clock.advanceTime(1L, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line can probably be removed from the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will remove.


try {
slotFuture.get();
fail("Expected TimeoutFuture.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That error message seems a bit off.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, will correct it.

@StephanEwen
Copy link
Contributor

+1 to merge this

This commit makes sure that slot requests enqueued at the SlotPoolImpl will get completed
in the order in which they were requested. This makes sure that the original request order
will be respected, hence preventing deadlock situations where dependent requests get completed
first.

This closes apache#9043.
Move testFailingAllocationFailsPendingSlotRequests from SlotPoolImplTest to
SlotPoolPendingRequestFailureTest.
…Request to SlotPoolPendingRequestFailureTest
This commit adds a new type of slot request which can be issued to the SlotPoolImpl.
The batch slot request is intended for batch jobs which can be executed with a single
slot (having at least one slot for every requested resource profile equivalence class).
Usually, a job which fulfills this criterion must not contain a pipelined shuffle.

The new batch slot request behaves in the following aspects differently than the normal
slot request:

* Batch slot request don't time out if the SlotPool contains at least one allocated slot
which can fulfill the pending slot request
* Batch slot request don't react to the failAllocation signal from the ResourceManager
* Batch slot request don't fail if the slot request to the resource manager fails

In order to time out batch slot request which cannot be fulfilled with an allocated slot,
the SlotPoolImpl schedules a periodic task which checks for this condition. If a slot cannot
be fulfilled, it is marked as unfulfillable and the current timestamp is recorded. If the
slot cannot be marked as fulfillable until the batch slot timeout has been exceeded, the
slot request will be timed out.

The batch slot request will be requested by calling SlotPool#requestNewAllocatedBatchSlot.

Add SlotPool#requestNewAllocatedBatchSlot

This closes apache#9058.
In order to not clutter the production implementation with testing methods, this
commit introduces the TestingSlotPoolImpl and moves the trigger timeout methods
and the convenience constructor to this class.
@tillrohrmann
Copy link
Contributor Author

Thanks for the review @StephanEwen. I've addressed your comments. Merging once Travis gives green light.

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 11, 2019

CI report:

@tillrohrmann tillrohrmann deleted the FLINK-13166 branch July 12, 2019 06:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants