Skip to content

Commit

Permalink
Prevent re-creating MasterContext when result is available [5.1.z] (#…
Browse files Browse the repository at this point in the history
…21076)

The issue is the following:

- when a job is finalized then a JobResult (1) is created and after that
  MasterContext is deleted (2) from masterContexts map
- JobCoordinationService.startJobIfNotStartedOrCompleted called from
  JobCoordinationService.doScanJobs checks this in the same order -
  check job result (3) and putIfAbsent for master context (4)

So this order of actions is possible:
3, 1, 2, 4
at this point, we have a MasterContext for a job that already has a
JobResult and its original MasterContext has already been removed. This
is then handled by completeMasterContextIfJobAlreadyCompleted (5)

The problem is when there is an action using
JobCoordinationService#callWithJob between actions 4 and 5. In the case
of this test failure it returns empty metrics because it collects
metrics using MasterJobContext#collectMetrics and doesn't use already
stored metrics (it actually just returns empty list because the
re-created MC has job state NOT_RUNNING and empty list is the initial
value for jobMetrics).

We check the JobResult while holding the lock to avoid this scenario:

- We find no job result
- Another thread creates the result and removes the master context in
  completeJob
- We re-create the master context below

The removal of MasterContext happens while holding the lock.

Fixes #19946
Fixes #20277
HZ-997

Backport of #21048 for 5.1.z

Co-authored-by: Viliam Durina <viliam@hazelcast.com>
  • Loading branch information
frant-hartm and viliam-durina committed Mar 29, 2022
1 parent a4619cf commit f99016f
Showing 1 changed file with 26 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ CompletableFuture<Void> completeJob(MasterContext masterContext, Throwable error
? masterContext.jobContext().jobMetrics()
: null;
jobRepository.completeJob(masterContext, jobMetrics, error, completionTime);
if (masterContexts.remove(masterContext.jobId(), masterContext)) {
if (removeMasterContext(masterContext)) {
completeObservables(masterContext.jobRecord().getOwnedObservables(), error);
logger.fine(masterContext.jobIdString() + " is completed");
(error == null ? jobCompletedSuccessfully : jobCompletedWithFailure).inc();
Expand All @@ -960,6 +960,12 @@ CompletableFuture<Void> completeJob(MasterContext masterContext, Throwable error
});
}

private boolean removeMasterContext(MasterContext masterContext) {
synchronized (lock) {
return masterContexts.remove(masterContext.jobId(), masterContext);
}
}

/**
* Schedules a restart task that will be run in future for the given job
*/
Expand Down Expand Up @@ -1102,15 +1108,20 @@ private CompletableFuture<Void> startJobIfNotStartedOrCompleted(
) {
// the order of operations is important.
long jobId = jobRecord.getJobId();
JobResult jobResult = jobRepository.getJobResult(jobId);
if (jobResult != null) {
logger.fine("Not starting job " + idToString(jobId) + ", already has result: " + jobResult);
return jobResult.asCompletableFuture();
}

MasterContext masterContext;
MasterContext oldMasterContext;
synchronized (lock) {
// We check the JobResult while holding the lock to avoid this scenario:
// 1. We find no job result
// 2. Another thread creates the result and removes the master context in completeJob
// 3. We re-create the master context below
JobResult jobResult = jobRepository.getJobResult(jobId);
if (jobResult != null) {
logger.fine("Not starting job " + idToString(jobId) + ", already has result: " + jobResult);
return jobResult.asCompletableFuture();
}

checkOperationalState();

masterContext = createMasterContext(jobRecord, jobExecutionRecord);
Expand All @@ -1121,11 +1132,10 @@ private CompletableFuture<Void> startJobIfNotStartedOrCompleted(
return oldMasterContext.jobContext().jobCompletionFuture();
}

// If job is not currently running, it might be that it just completed.
// Since we've put the MasterContext into the masterContexts map, someone else could
// have joined to the job in the meantime so we should notify its future.
if (completeMasterContextIfJobAlreadyCompleted(masterContext)) {
return masterContext.jobContext().jobCompletionFuture();
assert jobRepository.getJobResult(jobId) == null : "jobResult should not exist at this point";

if (finalizeJobIfAutoScalingOff(masterContext)) {
masterContext.jobContext().jobCompletionFuture();
}

if (jobExecutionRecord.isSuspended()) {
Expand All @@ -1146,16 +1156,19 @@ private boolean completeMasterContextIfJobAlreadyCompleted(MasterContext masterC
logger.fine("Completing master context for " + masterContext.jobIdString()
+ " since already completed with result: " + jobResult);
masterContext.jobContext().setFinalResult(jobResult.getFailureAsThrowable());
return masterContexts.remove(jobId, masterContext);
return removeMasterContext(masterContext);
}

return finalizeJobIfAutoScalingOff(masterContext);
}

private boolean finalizeJobIfAutoScalingOff(MasterContext masterContext) {
if (!masterContext.jobConfig().isAutoScaling() && masterContext.jobExecutionRecord().executed()) {
logger.info("Suspending or failing " + masterContext.jobIdString()
+ " since auto-restart is disabled and the job has been executed before");
masterContext.jobContext().finalizeJob(new TopologyChangedException());
return true;
}

return false;
}

Expand Down

0 comments on commit f99016f

Please sign in to comment.