Skip to content
Permalink
Browse files
[NO ISSUE][ACTIVE] Account for force stop while suspending
- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- When a failure happens while trying to suspend ingestion,
  we will force stop the active job. If the job completes
  ungracefully, we set the listener state to TEMPORARILY_FAILED.
  However, since force to stop only waits for STOPPED state,
  the thread waiting for ingestion to be suspended will wait
  forever. This change accounts for such case and makes
  the force stop waits for TEMPORARILY_FAILED too.

Change-Id: Ib33f191be2b84d97a08e3bc6d607b0edbf35bed1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13144
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
  • Loading branch information
mhubail committed Sep 10, 2021
1 parent 8550e06 commit eed8714ae56bd61656750bf543181e7dd68c26c1
Showing 1 changed file with 5 additions and 3 deletions.
@@ -195,7 +195,7 @@ private boolean allPartitionsRegisteredAndNotCancelling() {
@SuppressWarnings("unchecked")
protected void finish(ActiveEvent event) throws HyracksDataException {
if (LOGGER.isEnabled(level)) {
LOGGER.log(level, "the job " + jobId + " finished");
LOGGER.log(level, "the job {} finished", jobId);
}
JobId lastJobId = jobId;
if (numRegistered != numDeRegistered) {
@@ -208,7 +208,7 @@ protected void finish(ActiveEvent event) throws HyracksDataException {
JobStatus jobStatus = status.getLeft();
List<Exception> exceptions = status.getRight();
if (LOGGER.isEnabled(level)) {
LOGGER.log(level, "The job finished with status: " + jobStatus);
LOGGER.log(level, "The job finished with status: {}", jobStatus);
}
if (!jobSuccessfullyTerminated(jobStatus)) {
jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
@@ -440,8 +440,9 @@ protected synchronized void doRecover(MetadataProvider metadataProvider) throws

private void cancelJob(Throwable th) {
cancelJobSafely(metadataProvider, th);
// we can come here due to a failure while in suspending state
final WaitForStateSubscriber cancelSubscriber =
new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED));
new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED, ActivityState.TEMPORARILY_FAILED));
final Span span = Span.start(2, TimeUnit.MINUTES);
InvokeUtil.doUninterruptibly(() -> {
if (!cancelSubscriber.sync(span)) {
@@ -491,6 +492,7 @@ protected synchronized void doStop(MetadataProvider metadataProvider, long timeo
forceStop(subscriber, ie);
Thread.currentThread().interrupt();
} catch (Throwable e) {
LOGGER.error("forcing active job stop due to", e);
forceStop(subscriber, e);
} finally {
Thread.currentThread().setName(nameBefore);

0 comments on commit eed8714

Please sign in to comment.