Skip to content

Conversation

tillrohrmann
Copy link
Contributor

What is the purpose of the change

This PR adds support for multi task slot TaskExecutors to Flink. Before it was recommended to start a Flink cluster with single slot TaskExecutors. Now also multi slot TaskExecutors can be configured and Flink won't allocate resources over-excessively.

Brief change log

  • Extend ResourceActions#allocateResource to return Collection<ResourceProfile> indicating the set of slots to expect
  • Store expected slots as PendingTaskManagerSlot
  • Use PendingTaskManagerSlot to fulfill PendingSlotRequest
  • Only ask for new resources if there are no more TaskManagerSlots and PendingTaskManagerSlots

Verifying this change

  • Added SlotManagerTest#: testRequestNewResources, testFailingAllocationReturnsPendingTaskManagerSlot, testPendingTaskManagerSlotCompletion, testRegistrationOfDifferentSlot, testOnlyFreeSlotsCanFulfillPendingTaskManagerSlot,

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

Copy link
Contributor

@azagrebin azagrebin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the nice feature @tillrohrmann ! I left some comments to consider minor improvements before merge.

this.workersInLaunch = new HashMap<>(8);
this.workersBeingReturned = new HashMap<>(8);

final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would create a method to create and return slotsPerWorker to unload constructor code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

// create the specific TM parameters from the resource profile and some defaults
MesosTaskManagerParameters params = new MesosTaskManagerParameters(
resourceProfile.getCpuCores() < 1.0 ? taskManagerParameters.cpus() : resourceProfile.getCpuCores(),
taskManagerParameters.cpus(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a copying of immutable MesosTaskManagerParameters. Is it on purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it is not on purpose. Will correct it.

this(0, 0);
}

PendingSlotBalance(int numPendingSlots, int numUnassignedSlots) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: @nonnegative

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this class since it is a left-over from a previous implementation.

return pendingTaskManagerSlot;
}

public void assignPendingTaskManagerSlot(PendingTaskManagerSlot pendingTaskManagerSlotToAssign) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nonnull pendingTaskManagerSlotToAssign seems to make sense only then

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes will add it.

return resourceProfile;
}

public void assignPendingSlotRequest(PendingSlotRequest pendingSlotRequestToAssign) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here @nonnull pendingSlotRequestToAssign

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true.

}

updateSlot(slotId, allocationId, jobId);
private void completePendingTaskManagerSlot(ResourceProfile resourceProfile) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this completePendingTaskManagerSlot some leftover? looks unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will remove it.

/**
* Immutable slot balance.
*/
final class ImmutablePendingSlotBalance extends PendingSlotBalance {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot find where these balance classes are used. Are they for future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No a left-over from a previous implementation. Will remove it.

slotManager.registerTaskManager(taskExecutorConnection, slotReport);

assertThat(slotManager.getNumberRegisteredSlots(), is(numberSlots - 1));
assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slot request should be fulfilled now, right? Then we could also check:
assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think getNumberPendingTaskManagerSlots == 1 is an even stricter condition, because it requires that actually some of the pending task manager slots have been completed.

final ResourceActions resourceManagerActions = mock(ResourceActions.class);
final AtomicInteger allocateResourceCalls = new AtomicInteger(0);
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logic of some of this built TestingResourceActions repeat in some places, we could create some standard factory methods for them (maybe in future), e.g.:

resourceManagerActions = TestingResourceActionsBuilder.createAllocatingFixedSlotsResource(numOfSlotsPerWorker);
allocateResourceCalls = resourceManagerActions.getAllocationsCounter();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, one could add a helper function doing this.

// ------------------------------------------------------------------------

protected static Collection<ResourceProfile> createSlotsPerWorker(int numSlots) {
final Collection<ResourceProfile> slots = new ArrayList<>(numSlots);
Copy link
Member

@GJL GJL Sep 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be enough to write:
Collections.nCopies(numSlots, ResourceProfile.ANY);

The returned List would also be immutable. At the moment there is no need to return a mutable list. I would even say it is not right that after calling Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile), one may potentially modify the state of the ResourceManager (by accidentally modifying the returned collection).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'll update the PR.

final PendingTaskManagerSlot pendingTaskManagerSlot;

if (allocationId == null) {
pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use exactly matching here but isMatching in other place? I think we should make the methods for finding pending task manager slots extendable so user can define their own matching algorithm according to need.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we use the ResourceProfile at the moment as a key for fulfilling pending task manager slots. Therefore, we need to find an exactly matching resource profile. In the future we could introduce an ID to identify newly started slots.

Think for example that you have two pending slots with resource profile (1,1) and (2,2). Now a slot with (2,2) is registered. You don't want to complete the pending task manager slot (1,1) (even though it would fulfill the requirements), because the slot (1,1), won't be able to complete the remaining pending task manager slot.

Making the matching algorithm pluggable should be a separate issue.

Extend ResourceActions interface to return a set of ResourceProfiles describing
the set of slots with which the new resource will be started. The SlotManager
stores them as PendingTaskManagerSlots which can be assigned to PendingSlotRequests.
Only if there are no more TaskManagerSlots and PendingTaskManagerSlots, the
SlotManager will request new resources from the ResourceManager.

This closes apache#6734.
@tillrohrmann
Copy link
Contributor Author

Thanks for the review @azagrebin, @GJL and @shuai-xu. Merging this PR once Travis gives green light.

@asfgit asfgit closed this in 771277b Sep 28, 2018
@tillrohrmann tillrohrmann deleted the multiSlotSupport branch September 28, 2018 09:40
Clarkkkkk pushed a commit to Clarkkkkk/flink that referenced this pull request Mar 7, 2019
Extend ResourceActions interface to return a set of ResourceProfiles describing
the set of slots with which the new resource will be started. The SlotManager
stores them as PendingTaskManagerSlots which can be assigned to PendingSlotRequests.
Only if there are no more TaskManagerSlots and PendingTaskManagerSlots, the
SlotManager will request new resources from the ResourceManager.

This closes apache#6734.

(cherry picked from commit 771277b)
@Myracle
Copy link
Contributor

Myracle commented Jan 6, 2020

@tillrohrmann Hello, I am confused with you code. Before your code, YarnResourceManager's requestYarnContainer will addContainerRequest only if pendingSlotRequests is bigger than pendingSlotAllocation, I think that Flink already won't allocate resources over-excessive when numberOfTaskSlots is bigger than 1. So I am confused what your code does compared with the previous code. Thank you very much.

@tillrohrmann
Copy link
Contributor Author

Hi @Myracle, the problem was that Flink did not take the number of pending slots into account (one TaskExecutor can be started with many slots) and instead only looked at the number of container requests (== number of TaskExecutors to be started). With this change, Flink does this properly.

@Myracle
Copy link
Contributor

Myracle commented Jan 6, 2020

@tillrohrmann Thank you for your reply. Before your change, Flink calculate pendingTaskManagerSlots(==numPendingContainerRequests * numberOfTaskSlots) in the class YarnResourceManager. Flink will only request containers from yarn when pendingTaskManagerSlots is bigger than pendingTaskManagerSlots. It means that Flink will request resources only when total requested slots can not satisfy current's requirement. There is another issue ([FLINK-9567][yarn] Before requesting new containers always check if it is required). Does FLINK-9567 resolve the same problem? What's the difference between your issue and FLINK-9567? Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants