Skip to content

Conversation

@tillrohrmann
Copy link
Contributor

What is the purpose of the change

This commit fixes a race condition between the TaskExecutor and the ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. With this commit, the
registration protocol changes such that the TaskExecutor first registers at the ResourceManager
and only after completing this step, it will announce the available slots to the SlotManager.

cc @GJL

Brief change log

  • Changed the TaskExecutor ResourceManager registration protocol to announce the available slots after the completion of the registration
  • Hardened the TaskExecutor#requestSlot to only accept the call if there is an established connection to a ResourceManager

Verifying this change

  • Added SlotManagerTest#testSlotRequestFailure
  • Added TaskExecutorTest#testIgnoringSlotRequestsIfNotRegistered, testReconnectionAttemptIfExplicitlyDisconnected, testInitialSlotReport and testInitialSlotReportFailure

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (n)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

…Executor

This commit fixes a race condition between the TaskExecutor and the ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. With this commit, the
registration protocol changes such that the TaskExecutor first registers at the ResourceManager
and only after completing this step, it will announce the available slots to the SlotManager.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request May 24, 2018
…Executor

This commit fixes a race condition between the TaskExecutor and the ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. With this commit, the
registration protocol changes such that the TaskExecutor first registers at the ResourceManager
and only after completing this step, it will announce the available slots to the SlotManager.

This closes apache#6067.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request May 24, 2018
…Executor

This commit fixes a race condition between the TaskExecutor and the ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. With this commit, the
registration protocol changes such that the TaskExecutor first registers at the ResourceManager
and only after completing this step, it will announce the available slots to the SlotManager.

This closes apache#6067.

log.debug(message);
throw new SlotAllocationException(message);
throw new TaskManagerException(message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not return an exceptional future here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already talked about it offline.

ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
if (log.isDebugEnabled()) {
log.debug("Close ResourceManager connection {}.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering whether cause can get logged twice:

log.debug("Close ResourceManager connection {}.",
					resourceManagerResourceId, cause);
log.debug("Terminating registration attempts towards ResourceManager {}.",
						resourceManagerConnection.getTargetAddress(), cause);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already talked about it offline.


final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);

assertThat(registrationQueue.isEmpty(), is(true));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: assertThat(registrationQueue, is(empty())); gives better failure messages. Now it's just a fancy way of writing assertTrue().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. Will change it.


final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> {
requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are using spaces for indentation here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrg, spaces.... How did they make it into this PR? Will get rid of them.

final BlockingQueue<ResourceID> registrationQueue = new ArrayBlockingQueue<>(1);

testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5 -> {
registrationQueue.offer(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: spaces are used for indentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrg, the same.

tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request May 24, 2018
…Executor

This commit fixes a race condition between the TaskExecutor and the ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. With this commit, the
registration protocol changes such that the TaskExecutor first registers at the ResourceManager
and only after completing this step, it will announce the available slots to the SlotManager.

This closes apache#6067.
@asfgit asfgit closed this in 47dc699 May 24, 2018
asfgit pushed a commit that referenced this pull request May 24, 2018
…Executor

This commit fixes a race condition between the TaskExecutor and the ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. With this commit, the
registration protocol changes such that the TaskExecutor first registers at the ResourceManager
and only after completing this step, it will announce the available slots to the SlotManager.

This closes #6067.
@tillrohrmann tillrohrmann deleted the fixTaskExecutorRegistration branch May 29, 2018 10:17
sampathBhat pushed a commit to sampathBhat/flink that referenced this pull request Jul 26, 2018
…Executor

This commit fixes a race condition between the TaskExecutor and the ResourceManager. Before,
it could happen that the ResourceManager sends requestSlots message before the TaskExecutor
registration was completed. Due to this, the TaskExecutor did not have all information it needed
to accept task submissions.

The problem was that the TaskExecutor sent the SlotReport at registration time. Due to this, t
he SlotManager could already assign these slots to pending slot requests. With this commit, the
registration protocol changes such that the TaskExecutor first registers at the ResourceManager
and only after completing this step, it will announce the available slots to the SlotManager.

This closes apache#6067.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants