Skip to content

Commit

Permalink
fix(engine): cleanup orphaned job timeouts and backoffs on migration
Browse files Browse the repository at this point in the history
After an update from a previous version, both backoff and deadline
column families might contain entries without a corresponding job or
multiple entries for a single job. Before fixing #12797 and #13041,
these were cleaned up ad-hoc whenever they were found. This is no longer
the case because we now prevent the creation of duplicated entries and
always cleanup properly.
This adds two necessary migrations that remove orphaned entries that
were left by a previous version. The migrations run once and walk
through all deadline and backoff entries, removing those without a job
and duplicates which don't match the current job state.

(cherry picked from commit 1d82e6e)
  • Loading branch information
lenaschoenburg committed Aug 15, 2023
1 parent f8accc3 commit 9035f23
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,34 @@ public JobRecord updateJobRetries(final long jobKey, final int retries) {
return job;
}

@Override
public void cleanupTimeoutsWithoutJob() {
deadlinesColumnFamily.whileTrue(
(key, value) -> {
final var jobKey = key.second().inner();
final var deadline = key.first().getValue();
final var job = jobsColumnFamily.get(jobKey);
if (job == null || job.getRecord().getDeadline() != deadline) {
deadlinesColumnFamily.deleteExisting(key);
}
return true;
});
}

@Override
public void cleanupBackoffsWithoutJobs() {
backoffColumnFamily.whileTrue(
(key, value) -> {
final var jobKey = key.second().inner();
final var backoff = key.first().getValue();
final var job = jobsColumnFamily.get(jobKey);
if (job == null || job.getRecord().getRetryBackoff() != backoff) {
backoffColumnFamily.deleteExisting(key);
}
return true;
});
}

private void createJob(final long key, final JobRecord record, final DirectBuffer type) {
createJobRecord(key, record);
initializeJobState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ public class DbMigratorImpl implements DbMigrator {
List.of(
new ProcessMessageSubscriptionSentTimeMigration(),
new MessageSubscriptionSentTimeMigration(),
new TemporaryVariableMigration());
new TemporaryVariableMigration(),
new JobTimeoutCleanup(),
new JobBackoffCleanup());

// Be mindful of https://github.com/camunda/zeebe/issues/7248. In particular, that issue
// should be solved first, before adding any migration that can take a long time

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.state.migration;

import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;

public class JobBackoffCleanup implements MigrationTask {

@Override
public String getIdentifier() {
return getClass().getSimpleName();
}

@Override
public void runMigration(final MutableZeebeState processingState) {
processingState.getJobState().cleanupBackoffsWithoutJobs();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.state.migration;

import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;

public class JobTimeoutCleanup implements MigrationTask {

@Override
public String getIdentifier() {
return getClass().getSimpleName();
}

@Override
public void runMigration(final MutableZeebeState processingState) {
processingState.getJobState().cleanupTimeoutsWithoutJobs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ public interface MutableJobState extends JobState {
void resolve(long key, JobRecord updatedValue);

JobRecord updateJobRetries(long jobKey, int retries);

void cleanupTimeoutsWithoutJob();

void cleanupBackoffsWithoutJobs();
}

0 comments on commit 9035f23

Please sign in to comment.