Skip to content

Commit

Permalink
merge: #13035
Browse files Browse the repository at this point in the history
13035: fix(engine): don't mutate state when checking for timed out jobs r=korthout a=oleschoenburg

This PR closes #12797, ensuring that job deadlines are reliably cleaned up by event appliers. This is necessary so that the job timeout checker does not have to mutate state.

With 91da1e8, we fix the job yield and recur processors to produce events with the current job state. Previously, these propagated the job record from the command into new follow-up events which may overwrite already persisted changes to a job. 
With 330ce0b, it is now clear when deadlines are set and removed. This follows the [rules we discussed](#12797 (comment)), essentially adding a deadline on activation and removing whe the job is no longer activated or deleted.
Finally, cfdef6d is removing the illegal state modification from the checker. With the previous changes we can be sure that deadlines are cleaned up reliably and don't need to do redundant cleanup in the checker anymore.


Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and lenaschoenburg committed Jun 12, 2023
2 parents 34338c2 + cfdef6d commit a63aaa3
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void processRecord(final TypedRecord<JobRecord> record) {
final JobState.State state = jobState.getState(jobKey);

if (state == State.FAILED) {
final JobRecord recurredJob = record.getValue();
final JobRecord recurredJob = jobState.getJob(jobKey);

stateWriter.appendFollowUpEvent(jobKey, JobIntent.RECURRED_AFTER_BACKOFF, recurredJob);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import io.camunda.zeebe.stream.api.records.TypedRecord;

public final class JobTimeOutProcessor implements TypedRecordProcessor<JobRecord> {
Expand All @@ -44,32 +45,29 @@ public JobTimeOutProcessor(

@Override
public void processRecord(final TypedRecord<JobRecord> record) {
final long jobKey = record.getKey();
final JobState.State state = jobState.getState(jobKey);
final var jobKey = record.getKey();
final var job = jobState.getJob(jobKey);
final var state = jobState.getState(jobKey);

if (state == State.ACTIVATED) {
final JobRecord timedOutJob = record.getValue();
final var now = ActorClock.currentTimeMillis();
final var deadline = job.getDeadline();
final var hasTimedOut = now > deadline;

stateWriter.appendFollowUpEvent(jobKey, JobIntent.TIMED_OUT, timedOutJob);
jobMetrics.jobTimedOut(timedOutJob.getType());

jobActivationBehavior.publishWork(jobKey, timedOutJob);
if (state == State.ACTIVATED && hasTimedOut) {
stateWriter.appendFollowUpEvent(jobKey, JobIntent.TIMED_OUT, job);
jobMetrics.jobTimedOut(job.getType());
jobActivationBehavior.publishWork(jobKey, job);
} else {
final String textState;

switch (state) {
case ACTIVATABLE:
textState = "it must be activated first";
break;
case FAILED:
textState = "it is marked as failed";
break;
default:
textState = "no such job was found";
break;
}
final var reason =
switch (state) {
case ACTIVATED -> "it has not timed out";
case ACTIVATABLE -> "it must be activated first";
case FAILED -> "it is marked as failed and is not activated";
case ERROR_THROWN -> "it has thrown an error and is not activated";
case NOT_FOUND -> "no such job was found";
};

final String errorMessage = String.format(NOT_ACTIVATED_JOB_MESSAGE, jobKey, textState);
final String errorMessage = String.format(NOT_ACTIVATED_JOB_MESSAGE, jobKey, reason);
rejectionWriter.appendRejection(record, RejectionType.NOT_FOUND, errorMessage);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void processRecord(final TypedRecord<JobRecord> record) {
.check(state, jobKey)
.ifRightOrLeft(
ok -> {
final JobRecord yieldedJob = record.getValue();
final JobRecord yieldedJob = jobState.getJob(jobKey);

stateWriter.appendFollowUpEvent(jobKey, JobIntent.YIELDED, yieldedJob);
jobActivationBehavior.notifyJobAvailableAsSideEffect(yieldedJob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

public interface JobState {

void forEachTimedOutEntry(long upperBound, BiFunction<Long, JobRecord, Boolean> callback);
void forEachTimedOutEntry(long upperBound, BiPredicate<Long, JobRecord> callback);

boolean exists(long jobKey);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ public void activate(final long key, final JobRecord record) {

makeJobNotActivatable(type);

deadlineKey.wrapLong(deadline);
deadlinesColumnFamily.insert(deadlineJobKey, DbNil.INSTANCE);
addJobDeadline(key, deadline);
}

@Override
Expand All @@ -137,7 +136,6 @@ public void timeout(final long key, final JobRecord record) {
EnsureUtil.ensureGreaterThan("deadline", deadline, 0);

updateJob(key, record, State.ACTIVATABLE);
removeJobDeadline(deadline);
}

@Override
Expand Down Expand Up @@ -165,7 +163,6 @@ public void throwError(final long key, final JobRecord updatedValue) {
@Override
public void delete(final long key, final JobRecord record) {
final DirectBuffer type = record.getTypeBuffer();
final long deadline = record.getDeadline();

jobKey.wrapLong(key);
jobsColumnFamily.deleteExisting(jobKey);
Expand All @@ -174,7 +171,7 @@ public void delete(final long key, final JobRecord record) {

makeJobNotActivatable(type);

removeJobDeadline(deadline);
removeJobDeadline(key, record.getDeadline());
}

@Override
Expand Down Expand Up @@ -222,7 +219,6 @@ private void createJob(final long key, final JobRecord record, final DirectBuffe

private void updateJob(final long key, final JobRecord updatedValue, final State newState) {
final DirectBuffer type = updatedValue.getTypeBuffer();
final long deadline = updatedValue.getDeadline();

validateParameters(type);

Expand All @@ -234,8 +230,12 @@ private void updateJob(final long key, final JobRecord updatedValue, final State
makeJobActivatable(type, key);
}

if (deadline > 0) {
removeJobDeadline(deadline);
if (newState != State.ACTIVATED) {
// This only works because none of the events actually remove the deadline from the job
// record.
// If, say on job failure, the deadline is removed or reset to 0, then we would need to look
// at the current state of the job to determine what deadline to remove.
removeJobDeadline(key, updatedValue.getDeadline());
}
}

Expand All @@ -245,15 +245,14 @@ private void validateParameters(final DirectBuffer type) {

@Override
public void forEachTimedOutEntry(
final long upperBound, final BiFunction<Long, JobRecord, Boolean> callback) {
final long upperBound, final BiPredicate<Long, JobRecord> callback) {
deadlinesColumnFamily.whileTrue(
(key, value) -> {
final long deadline = key.first().getValue();
final boolean isDue = deadline < upperBound;
if (isDue) {
final long jobKey1 = key.second().inner().getValue();
return visitJob(
jobKey1, callback::apply, () -> deadlinesColumnFamily.deleteExisting(key));
return visitJob(jobKey1, callback, () -> {});
}
return false;
});
Expand Down Expand Up @@ -379,8 +378,19 @@ private void makeJobNotActivatable(final DirectBuffer type) {
activatableColumnFamily.deleteIfExists(typeJobKey);
}

private void removeJobDeadline(final long deadline) {
deadlineKey.wrapLong(deadline);
deadlinesColumnFamily.deleteIfExists(deadlineJobKey);
private void addJobDeadline(final long job, final long deadline) {
if (deadline > 0) {
jobKey.wrapLong(job);
deadlineKey.wrapLong(deadline);
deadlinesColumnFamily.insert(deadlineJobKey, DbNil.INSTANCE);
}
}

private void removeJobDeadline(final long job, final long deadline) {
if (deadline > 0) {
jobKey.wrapLong(job);
deadlineKey.wrapLong(deadline);
deadlinesColumnFamily.deleteIfExists(deadlineJobKey);
}
}
}

0 comments on commit a63aaa3

Please sign in to comment.