Skip to content

Conversation

@hvanhovell
Copy link
Contributor

@hvanhovell hvanhovell commented Mar 10, 2017

What changes were proposed in this pull request?

It is sometimes useful to use multiple threads in a task to parallelize tasks. These threads might register some completion/failure listeners to clean up when the task completes or fails. We currently cannot register such a callback and be sure that it will get called, because the context might be in the process of invoking its callbacks, when the the callback gets registered.

This PR improves this by making sure that you cannot add a completion/failure listener from a different thread when the context is being marked as completed/failed in another thread. This is done by synchronizing these methods on the task context itself.

Failure listeners were called only once. Completion listeners now follow the same pattern; this lifts the idempotency requirement for completion listeners and makes it easier to implement them. In some cases we can (accidentally) add a completion/failure listener after the fact, these listeners will be called immediately in order make sure we can safely clean-up after a task.

As a result of this change we could make the failure and completed flags non-volatile. The isCompleted() method now uses synchronization to ensure that updates are visible across threads.

How was this patch tested?

Adding tests to TaskContestSuite to test adding listeners to a completed/failed context.

@hvanhovell
Copy link
Contributor Author

cc @rxin @sameeragarwal @zsxwing

@SparkQA
Copy link

SparkQA commented Mar 10, 2017

Test build #74323 has finished for PR 17244 at commit d16ad88.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Mar 10, 2017

LGTM

@zsxwing
Copy link
Member

zsxwing commented Mar 10, 2017

Using TaskContext.synchronized out of TaskContext should not be encouraged. How about make TaskContext.addTaskCompletionListener check isCompleted internally? If it's done, don't add it to the list, then either ignore the listener or call the listener immediately.

@SparkQA
Copy link

SparkQA commented Mar 10, 2017

Test build #74327 has finished for PR 17244 at commit 535349d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -57,57 +68,75 @@ private[spark] class TaskContextImpl(
// Whether the task has failed.
@volatile private var failed: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This need not be volatile anymore - given that it is updated and queried within a synchronized block.
We could revisit for completed too - though that would be an extension.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If drop the volatility then we need to make isCompleted synchronized as well; to ensure safe publication.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, which is why I mentioned it as extension :-)
For failed, it is already valid to remove volatile

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@GuardedBy("this")
override def addTaskCompletionListener(listener: TaskCompletionListener): this.type = {
onCompleteCallbacks += listener
synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: method synchronized instead of block ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@GuardedBy("this")
override def addTaskFailureListener(listener: TaskFailureListener): this.type = {
onFailureCallbacks += listener
synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: method synchronized instead of block ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (completed) {
listener.onTaskCompletion(this)
}
// Always add the listener because it is legal to call them multiple times.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not realize this, interesting !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I was rather surprised about this, but the current code path seems to allow this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the doc in TaskContext to reflect this.

@SparkQA
Copy link

SparkQA commented Mar 10, 2017

Test build #74335 has finished for PR 17244 at commit 12f947e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 12, 2017

Test build #74404 has finished for PR 17244 at commit 41448ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 12, 2017

Test build #74405 has finished for PR 17244 at commit f3b9b97.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Mar 12, 2017

LGTM. Would be great if other reviewers can also take a look.
+CC @zsxwing, @rxin

Copy link
Member

@sameeragarwal sameeragarwal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nits, LGTM!

* Adds a (Java friendly) listener to be executed on task completion.
* This will be called in all situation - success, failure, or cancellation.
* This will be called in all situation - success, failure, or cancellation. Adding a listener
* to an already completed task will result in that listeners being called immediately.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

micro nit: s/listeners/listener here and below


/**
* Adds a listener to be executed on task failure.
* Operations defined here must be idempotent, as `onTaskFailure` can be called multiple times.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why delete this? onTaskFailure can also be called multiple times right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was disabled in #11504. So the comment does not make sense anymore.

context.addTaskCompletionListener(_ => invocations += 1)
assert(invocations == 1)
context.markTaskCompleted()
assert(invocations == 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call context.markTaskCompleted() once again and assert invocations == 2 to have a test for idempotency?

@hvanhovell
Copy link
Contributor Author

Ok, had a small discussion offline. It seems weird that we have different calling policies for failure and completion listeners. I am going to change the invocation of completion listeners to exactly once as well.

@SparkQA
Copy link

SparkQA commented Mar 14, 2017

Test build #74466 has finished for PR 17244 at commit 4199619.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Mar 15, 2017

LGTM

@hvanhovell
Copy link
Contributor Author

Thanks for the reviews! Merging to master.

@asfgit asfgit closed this in 9ff85be Mar 15, 2017
override def addTaskCompletionListener(listener: TaskCompletionListener)
: this.type = synchronized {
if (completed) {
listener.onTaskCompletion(this)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also try catch here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or call the invokeListeners

Copy link
Contributor Author

@hvanhovell hvanhovell Mar 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we do that, if we are going to rethrow the exception anyway? The only difference is that it would be a TaskCompletionListenerException instead. Calling invokeListeners would also call already invoked listeners, which is what we are trying to avoid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invokeListeners takes a list of listeners, so we are able to only call this listener.

I think it's better to make these listeners consistent, i.e. throw TaskCompletionListenerException when failure happens during calling listener.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants