Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to SlotPool #5091

Merged
merged 7 commits into from Dec 14, 2017

Conversation

tillrohrmann
Copy link
Contributor

What is the purpose of the change

This commit adds support for queued scheduling with slot sharing to the
SlotPool. The idea of slot sharing is that multiple tasks can run in the
same slot. Moreover, queued scheduling means that a slot request must not
be completed right away but at a later point in time. This allows to
start new TaskExecutors in case that there are no more slots left.

The main component responsible for the management of shared slots is the
SlotSharingManager. The SlotSharingManager maintains internally a tree-like
structure which stores the SlotContext future of the underlying
AllocatedSlot. Whenever this future is completed potentially pending
LogicalSlot instantiations are executed and sent to the slot requester.

A shared slot is represented by a MultiTaskSlot which can harbour multiple
TaskSlots. A TaskSlot can either be a MultiTaskSlot or a SingleTaskSlot.

In order to represent co-location constraints, we first obtain a root
MultiTaskSlot and then allocate a nested MultiTaskSlot in which the
co-located tasks are allocated. The corresponding SlotRequestID is assigned
to the CoLocationConstraint in order to make the TaskSlot retrievable for
other tasks assigned to the same CoLocationConstraint.

This PR also moves the SlotPool components to o.a.f.runtime.jobmaster.slotpool.

This PR is based on #5090

Brief change log

  • Add SlotSharingManager to manage shared slots
  • Rework SlotPool to use SlotSharingManager
  • Add SlotPool#allocateMultiTaskSlot to allocate a shared slot
  • Add SlotPool#allocateCoLocatedMultiTaskSlot to allocate a co-located slot
  • Move SlotPool components to o.a.f.runtime.jobmaster.slotpool

Verifying this change

  • Port SchedulerSlotSharingTest, SchedulerIsolatedTasksTest and
    ScheduleWithCoLocationHintTest to run with SlotPool
  • Add SlotSharingManagerTest, SlotPoolSlotSharingTest and
    SlotPoolCoLocationTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

CC: @GJL

tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Dec 1, 2017
…ing to SlotPool

This commit adds support for queued scheduling with slot sharing to the
SlotPool. The idea of slot sharing is that multiple tasks can run in the
same slot. Moreover, queued scheduling means that a slot request must not
be completed right away but at a later point in time. This allows to
start new TaskExecutors in case that there are no more slots left.

The main component responsible for the management of shared slots is the
SlotSharingManager. The SlotSharingManager maintains internally a tree-like
structure which stores the SlotContext future of the underlying
AllocatedSlot. Whenever this future is completed potentially pending
LogicalSlot instantiations are executed and sent to the slot requester.

A shared slot is represented by a MultiTaskSlot which can harbour multiple
TaskSlots. A TaskSlot can either be a MultiTaskSlot or a SingleTaskSlot.

In order to represent co-location constraints, we first obtain a root
MultiTaskSlot and then allocate a nested MultiTaskSlot in which the
co-located tasks are allocated. The corresponding SlotRequestID is assigned
to the CoLocationConstraint in order to make the TaskSlot retrievable for
other tasks assigned to the same CoLocationConstraint.

Port SchedulerSlotSharingTest, SchedulerIsolatedTasksTest and
ScheduleWithCoLocationHintTest to run with SlotPool.

Restructure SlotPool components.

Add SlotSharingManagerTest, SlotPoolSlotSharingTest and
SlotPoolCoLocationTest.

This closes apache#5091.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Dec 3, 2017
…ing to SlotPool

This commit adds support for queued scheduling with slot sharing to the
SlotPool. The idea of slot sharing is that multiple tasks can run in the
same slot. Moreover, queued scheduling means that a slot request must not
be completed right away but at a later point in time. This allows to
start new TaskExecutors in case that there are no more slots left.

The main component responsible for the management of shared slots is the
SlotSharingManager. The SlotSharingManager maintains internally a tree-like
structure which stores the SlotContext future of the underlying
AllocatedSlot. Whenever this future is completed potentially pending
LogicalSlot instantiations are executed and sent to the slot requester.

A shared slot is represented by a MultiTaskSlot which can harbour multiple
TaskSlots. A TaskSlot can either be a MultiTaskSlot or a SingleTaskSlot.

In order to represent co-location constraints, we first obtain a root
MultiTaskSlot and then allocate a nested MultiTaskSlot in which the
co-located tasks are allocated. The corresponding SlotRequestID is assigned
to the CoLocationConstraint in order to make the TaskSlot retrievable for
other tasks assigned to the same CoLocationConstraint.

Port SchedulerSlotSharingTest, SchedulerIsolatedTasksTest and
ScheduleWithCoLocationHintTest to run with SlotPool.

Restructure SlotPool components.

Add SlotSharingManagerTest, SlotPoolSlotSharingTest and
SlotPoolCoLocationTest.

This closes apache#5091.
Consumer<Tuple3<SlotRequestId, SlotSharingGroupId, Throwable>> currentReleaseSlotConsumer = this.releaseSlotConsumer;

if (currentReleaseSlotConsumer != null) {
currentReleaseSlotConsumer.accept(Tuple3.of(slotRequestId, slotSharingGroupId, cause ));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: whitespace after cause

... cause	));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix it.

fail("Scheduler accepted dead instance");
}
catch (IllegalArgumentException e) {
// stimmt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😃

assertFalse(i2.isAlive());
assertFalse(i3.isAlive());
}
catch (Exception e) {
Copy link
Member

@GJL GJL Dec 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better propagate the exception but I guess this file was just copy pasted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jup, but I'll remove it.

}

private void progressToNextElement() {
while(baseIterator.hasNext() && ! currentIterator.hasNext()) {
Copy link
Member

@GJL GJL Dec 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Missing space between while and (. Excess space after !.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Will correct it.

import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrong import order (not sorted lexicographically)

import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;

items should appear before LogicalSlot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will correct it.

*/
@VisibleForTesting
public Collection<MultiTaskSlot> getResolvedRootSlots() {
ResolvedRootSlotValues vs = resolvedMultiTaskSlotValues;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is only for testing, do we really need to cache the instance in resolvedMultiTaskSlotValues? It is not obvious that resolvedMultiTaskSlotValues is only used for testing. How about just

	public Collection<MultiTaskSlot> getResolvedRootSlots() {
		return new ResolvedRootSlotValues();
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is better. Will change it.


private final SlotSharingGroupId slotSharingGroupId;

// needed to release allocated slots after a complete multi task slot hierarchy has been released
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: All fields are commented with non-javadoc comments. Normally comments on fields are also done in Javadoc style, e.g., SlotPool. Javadoc comments on fields are displayed by IntelliJ (Ctrl + J).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. Will change it.

final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);

if (resolvedRootNode != null) {
final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory the signature of this method allows concurrent modifications on resolvedRootSlots. Maybe a comment why this cannot happen would be nice; perhaps with@GuardedBy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. I'll add the necessary synchronization because it's not guaranteed that the future completion won't change in the future.

@@ -32,6 +34,20 @@
*/
public interface LogicalSlot {

Payload TERMINATED_PAYLOAD = new Payload() {

private final CompletableFuture<?> COMPLETED_TERMINATION_FUTURE = CompletableFuture.completedFuture(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: COMPLETED_TERMINATION_FUTURE should be camel cased because is not actually a constant (not static).

// sanity check
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));

final SlotSharingManager.SingleTaskSlot leave = multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: variable name should be leaf

https://www.dict.cc/?s=leaf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jup, we wouldn't want the single task slot to leave after all.

this,
providerAndOwner));

final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name is confusing. multiTaskSlotFuture is not of type Future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, will rename it.


Payload TERMINATED_PAYLOAD = new Payload() {

private final CompletableFuture<?> COMPLETED_TERMINATION_FUTURE = CompletableFuture.completedFuture(null);
Copy link
Member

@GJL GJL Dec 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: COMPLETED_TERMINATION_FUTURE should be camel cased because is not actually a constant (not static).

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will fix it.

// lock the co-location constraint once we have obtained the allocated slot
coLocationSlot.getSlotContextFuture().whenComplete(
(SlotContext slotContext, Throwable throwable) -> {
if (throwable == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it acceptable to swallow the Throwable here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will log the throwable here.


multiTaskSlotFuture = slotSharingManager.createRootSlot(
multiTaskSlotRequestId,
futureSlot.thenApply(Function.identity()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If you change the signature to

public MultiTaskSlot createRootSlot(
			SlotRequestId slotRequestId,
			CompletableFuture<? extends SlotContext> slotContextFuture,
			SlotRequestId allocatedSlotRequestId) 

you won't need this trick to satisfy the compiler (PECS rule). Unfortunately more changes in the code would be needed, e.g., in MultiTaskSlot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will change it.

*
* <p>The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and
* {@link SingleTaskSlot}. The former class represents inner nodes which can contain
* a number of other {@link TaskSlot} and the latter class represents the leave nodes.
Copy link
Member

@GJL GJL Dec 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: leaf nodes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch.

Locality locality) {
Preconditions.checkState(!super.contains(groupId));

final SingleTaskSlot leave = new SingleTaskSlot(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: leaf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change.

*/
public class SlotSharingManagerTest extends TestLogger {

private static final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be SLOT_SHARING_GROUP_ID since it is a constant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True.

@Test
public void testRootSlotRelease() throws ExecutionException, InterruptedException {
final CompletableFuture<SlotRequestId> slotReleasedFuture = new CompletableFuture<>();
final TestingAllocatedSlotActions testingAllocatedSlotActions = new TestingAllocatedSlotActions();
Copy link
Member

@GJL GJL Dec 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You already have an instance of TestingAllocatedSlotActions defined in line 57.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. Will change it.


private static final DummySlotOwner slotOwner = new DummySlotOwner();

private static final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This instance is mutable... should not be static

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make it non static.


public ScheduledUnit(
Execution task,
JobVertexID jobVertexId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get JobVertexID from Execution. Do we need this in Constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes because task can be null. Will add the missing @Nullable annotation.

null,
new FlinkException("Locality constraint is not better fulfilled by allocated slot."));
}
return multiTaskSlotLocality;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If multiTaskSlotLocality != null and multiTaskSlotLocality.getLocality() != LOCAL and slotAndLocality == null
It will return multiTaskSlotLocality without LOCAL locality. Is this expectable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes because it could not find a slot which is LOCAL. The alternative would be to ask the ResourceManager but it will only give you a slot with an UNKNOWN locality because it is uncompleted.

The cluster entrypoints start the ResourceManager with the web interface URL.
This URL is used to set the correct tracking URL in Yarn when registering the
Yarn application.

This closes apache#5128.
This commit introduces the SlotContext which is an abstraction for the SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by the
SlotPool.

This closes apache#5088.
Before logical slots like the SimpleSlot and SharedSlot where associated to the
actually allocated slot via the AllocationID. This, however, was sub-optimal because
allocated slots can be re-used to fulfill also other slot requests (logical slots).
Therefore, we should bind the logical slots to the right id with the right lifecycle
which is the slot request id.

This closes apache#5089.
Not only check for a slot request with the right allocation id but also check
whether we can fulfill other pending slot requests with an unclaimed offered
slot before adding it to the list of available slots.

This closes apache#5090.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Dec 14, 2017
…ing to SlotPool

This commit adds support for queued scheduling with slot sharing to the
SlotPool. The idea of slot sharing is that multiple tasks can run in the
same slot. Moreover, queued scheduling means that a slot request must not
be completed right away but at a later point in time. This allows to
start new TaskExecutors in case that there are no more slots left.

The main component responsible for the management of shared slots is the
SlotSharingManager. The SlotSharingManager maintains internally a tree-like
structure which stores the SlotContext future of the underlying
AllocatedSlot. Whenever this future is completed potentially pending
LogicalSlot instantiations are executed and sent to the slot requester.

A shared slot is represented by a MultiTaskSlot which can harbour multiple
TaskSlots. A TaskSlot can either be a MultiTaskSlot or a SingleTaskSlot.

In order to represent co-location constraints, we first obtain a root
MultiTaskSlot and then allocate a nested MultiTaskSlot in which the
co-located tasks are allocated. The corresponding SlotRequestID is assigned
to the CoLocationConstraint in order to make the TaskSlot retrievable for
other tasks assigned to the same CoLocationConstraint.

Port SchedulerSlotSharingTest, SchedulerIsolatedTasksTest and
ScheduleWithCoLocationHintTest to run with SlotPool.

Restructure SlotPool components.

Add SlotSharingManagerTest, SlotPoolSlotSharingTest and
SlotPoolCoLocationTest.

This closes apache#5091.
…ing to SlotPool

This commit adds support for queued scheduling with slot sharing to the
SlotPool. The idea of slot sharing is that multiple tasks can run in the
same slot. Moreover, queued scheduling means that a slot request must not
be completed right away but at a later point in time. This allows to
start new TaskExecutors in case that there are no more slots left.

The main component responsible for the management of shared slots is the
SlotSharingManager. The SlotSharingManager maintains internally a tree-like
structure which stores the SlotContext future of the underlying
AllocatedSlot. Whenever this future is completed potentially pending
LogicalSlot instantiations are executed and sent to the slot requester.

A shared slot is represented by a MultiTaskSlot which can harbour multiple
TaskSlots. A TaskSlot can either be a MultiTaskSlot or a SingleTaskSlot.

In order to represent co-location constraints, we first obtain a root
MultiTaskSlot and then allocate a nested MultiTaskSlot in which the
co-located tasks are allocated. The corresponding SlotRequestID is assigned
to the CoLocationConstraint in order to make the TaskSlot retrievable for
other tasks assigned to the same CoLocationConstraint.

Port SchedulerSlotSharingTest, SchedulerIsolatedTasksTest and
ScheduleWithCoLocationHintTest to run with SlotPool.

Restructure SlotPool components.

Add SlotSharingManagerTest, SlotPoolSlotSharingTest and
SlotPoolCoLocationTest.

This closes apache#5091.
@tillrohrmann
Copy link
Contributor Author

Thanks for the review @GJL and @ifndef-SleePy. Travis passed locally. Merging this PR.

@asfgit asfgit merged commit 917fbcb into apache:master Dec 14, 2017
@tillrohrmann tillrohrmann deleted the slotPoolSlots branch February 13, 2018 09:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants