-
Notifications
You must be signed in to change notification settings - Fork 351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Asynced the standard backend execute/recover. #1947
Conversation
5316dd8
to
3eee601
Compare
@@ -105,7 +105,19 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta | |||
* | |||
* @return the execution handle for the job. | |||
*/ | |||
def execute(): ExecutionHandle | |||
def execute(): ExecutionHandle = { | |||
throw new NotImplementedError(s"Neither execute() nor executeAsync() implemented by $getClass") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this purport to know that executeAsync
hasn't been implemented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These sync versions are only ever called by the default async implementations. If the async is overridden, ok. If the sync is overridden, the default async calls the sync, so this is ok too. If neither async nor sync is overridden, the default async calls this default sync and you get an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's to stop something else from calling the default sync version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much. These methods should be overridden but not invoked. I could make them protected, but this is inside an actor so no one else has a reference to this object outside subclasses, specs, and akka.
The only callers to the async method, that by default just calls the sync method, should be the StandardAsyncExecutorActor itself in its executeOrRecover implementation.
* Returns the run status for the job. | ||
* | ||
* @param handle The handle of the running job. | ||
* @return The status of the job. | ||
*/ | ||
def pollStatus(handle: StandardAsyncPendingExecutionHandle): StandardAsyncRunStatus = { | ||
throw new NotImplementedError(s"pollStatus nor pollStatusAsync not implemented by $getClass") | ||
throw new NotImplementedError(s"Neither pollStatus nor pollStatusAsync implemented by $getClass") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar question here
tellMetadata(Map(CallMetadataKeys.JobId -> handle.pendingJob.jobId)) | ||
/* | ||
NOTE: Because of the async nature of the scala futures, there is a point in time where we have submitted this or | ||
the prior runnable to the thread pool this actor doesn't know the job id for aborting. The these runnables are |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The these => These
jobLogger.info(s"job id: ${handle.pendingJob.jobId}") | ||
tellMetadata(Map(CallMetadataKeys.JobId -> handle.pendingJob.jobId)) | ||
/* | ||
NOTE: Because of the async nature of the scala futures, there is a point in time where we have submitted this or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the scala futures => Scala Futures
3eee601
to
cae1eee
Compare
Added an extra akka message check just in case we miss the abort message when our scala future eventually runs.
cae1eee
to
7716a07
Compare
Added an extra akka message check just in case we miss the abort message when our scala future eventually runs.