Skip to content

Commit

Permalink
merge: #12403
Browse files Browse the repository at this point in the history
12403: feat: run timer due date checker concurrently to processing r=oleschoenburg a=oleschoenburg

This follows a pattern we established for message TTL checking and introduces a new experimental feature flag
`zeebe.broker.experimental.features.enableTimerDueDateCheckerAsync`. When enabled, timer due dates are checked outside of the processing actor.
This ensures that a large number of expired timers does not block other processing and thus cause latency spikes.

Again similar to message TTL checking, there is another experimental configuration that enables _yielding_ of the timer due date checker after a fixed amount of time has passed. This is conceptually similar to `ttlCheckerBatchLimit` in that it ensures that a single run of a scheduled task does not block for too long and produces a limited amount of new records.
Yielding continues to function the same, regardless of whether async checking is enabled or not.

closes #11594 

Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and lenaschoenburg committed Apr 17, 2023
2 parents 692d6dc + 1093893 commit f402edc
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public final class FeatureFlagsCfg {
private boolean enableYieldingDueDateChecker = DEFAULT_SETTINGS.yieldingDueDateChecker();
private boolean enableActorMetrics = DEFAULT_SETTINGS.enableActorMetrics();
private boolean enableMessageTtlCheckerAsync = DEFAULT_SETTINGS.enableMessageTTLCheckerAsync();
private boolean enableTimerDueDateCheckerAsync =
DEFAULT_SETTINGS.enableTimerDueDateCheckerAsync();

public boolean isEnableYieldingDueDateChecker() {
return enableYieldingDueDateChecker;
Expand All @@ -58,9 +60,20 @@ public void setEnableMessageTtlCheckerAsync(final boolean enableMessageTtlChecke
this.enableMessageTtlCheckerAsync = enableMessageTtlCheckerAsync;
}

public boolean isEnableTimerDueDateCheckerAsync() {
return enableTimerDueDateCheckerAsync;
}

public void setEnableTimerDueDateCheckerAsync(final boolean enableTimerDueDateCheckerAsync) {
this.enableTimerDueDateCheckerAsync = enableTimerDueDateCheckerAsync;
}

public FeatureFlags toFeatureFlags() {
return new FeatureFlags(
enableYieldingDueDateChecker, enableActorMetrics, enableMessageTtlCheckerAsync
enableYieldingDueDateChecker,
enableActorMetrics,
enableMessageTtlCheckerAsync,
enableTimerDueDateCheckerAsync
/*, enableFoo*/ );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,37 @@ void shouldSetEnableMessageTtlCheckerAsyncFromEnv() {
// then
assertThat(featureFlagsCfg.isEnableMessageTtlCheckerAsync()).isTrue();
}

@Test
void shouldDisableDueDateCheckerAsyncByDefault() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("empty", environment);
final var featureFlagsCfg = cfg.getExperimental().getFeatures();

// then
assertThat(featureFlagsCfg.isEnableTimerDueDateCheckerAsync()).isFalse();
}

@Test
void shouldSetEnableTimerDueDateCheckerAsyncFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("feature-flags-cfg", environment);
final var featureFlagsCfg = cfg.getExperimental().getFeatures();

// then
assertThat(featureFlagsCfg.isEnableTimerDueDateCheckerAsync()).isTrue();
}

@Test
void shouldSetEnableTimerDueDateCheckerAsyncFromEnv() {
// given
environment.put("zeebe.broker.experimental.features.enableMessageDueDateAsync", "true");

// when
final BrokerCfg cfg = TestConfigReader.readConfig("feature-flags-cfg", environment);
final var featureFlagsCfg = cfg.getExperimental().getFeatures();

// then
assertThat(featureFlagsCfg.isEnableTimerDueDateCheckerAsync()).isTrue();
}
}
1 change: 1 addition & 0 deletions broker/src/test/resources/system/feature-flags-cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ zeebe:
enableYieldingDueDateChecker: true
enableActorMetrics: true
enableMessageTTLCheckerAsync: true
enableTimerDueDateCheckerAsync: true
8 changes: 8 additions & 0 deletions dist/src/main/config/broker.standalone.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -1056,3 +1056,11 @@
# enabling it in production.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_FEATURES_ENABLEMESSAGETTLCHECKERASYNC
# enableMessageTTLCheckerAsync: false

# While disabled, checking for due timers blocks all other executions that occur on the
# stream processor, including process execution and job activation/completion.
# When enabled, the Due Date Checker will run asynchronous to the Engine's stream processor.
# This helps improve throughput and process latency when there are a lot of timers.
# We recommend testing this feature in a non-production environment before enabling it in production.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_FEATURES_ENABLETIMERDUEDATECHECKERASYNC
# enableTimerDueDateCheckerAsync: false
8 changes: 8 additions & 0 deletions dist/src/main/config/broker.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -966,3 +966,11 @@
# enabling it in production.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_FEATURES_ENABLEMESSAGETTLCHECKERASYNC
# enableMessageTTLCheckerAsync: false

# While disabled, checking for due timers blocks all other executions that occur on the
# stream processor, including process execution and job activation/completion.
# When enabled, the Due Date Checker will run asynchronous to the Engine's stream processor.
# This helps improve throughput and process latency when there are a lot of timers.
# We recommend testing this feature in a non-production environment before enabling it in production.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_FEATURES_ENABLETIMERDUEDATECHECKERASYNC
# enableTimerDueDateCheckerAsync: false
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public static TypedRecordProcessors createEngineProcessors(
final var config = typedRecordProcessorContext.getConfig();

final DueDateTimerChecker timerChecker =
new DueDateTimerChecker(processingState.getTimerState(), featureFlags);
new DueDateTimerChecker(
typedRecordProcessorContext.getScheduledTaskDbState().getTimerState(), featureFlags);

final var jobMetrics = new JobMetrics(partitionId);
final var processEngineMetrics = new ProcessEngineMetrics(processingState.getPartitionId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public JobBackoffChecker(final JobState jobState) {
backOffDueDateChecker =
new DueDateChecker(
BACKOFF_RESOLUTION,
false,
taskResultBuilder ->
jobState.findBackedOffJobs(
ActorClock.currentTimeMillis(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@
import io.camunda.zeebe.scheduler.clock.ActorClock;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.stream.api.scheduling.ProcessingScheduleService;
import io.camunda.zeebe.stream.api.scheduling.Task;
import io.camunda.zeebe.stream.api.scheduling.TaskResult;
import io.camunda.zeebe.stream.api.scheduling.TaskResultBuilder;
import java.time.Duration;
import java.util.function.Function;

public final class DueDateChecker implements StreamProcessorLifecycleAware {

private ProcessingScheduleService scheduleService;
private ScheduleDelayed scheduleService;
private final boolean scheduleAsync;

private boolean checkerRunning;
private boolean shouldRescheduleChecker;
Expand All @@ -30,8 +29,11 @@ public final class DueDateChecker implements StreamProcessorLifecycleAware {
private final TriggerEntitiesTask triggerEntitiesTask;

public DueDateChecker(
final long timerResolution, final Function<TaskResultBuilder, Long> nextDueDateFunction) {
final long timerResolution,
final boolean scheduleAsync,
final Function<TaskResultBuilder, Long> nextDueDateFunction) {
this.timerResolution = timerResolution;
this.scheduleAsync = scheduleAsync;
nextDueDateSupplier = nextDueDateFunction;
triggerEntitiesTask = new TriggerEntitiesTask();
}
Expand Down Expand Up @@ -83,7 +85,13 @@ private Duration calculateDelayForNextRun(final long dueDate) {

@Override
public void onRecovered(final ReadonlyStreamProcessorContext processingContext) {
scheduleService = processingContext.getScheduleService();
final var scheduleService = processingContext.getScheduleService();
if (scheduleAsync) {
this.scheduleService = scheduleService::runDelayedAsync;
} else {
this.scheduleService = scheduleService::runDelayed;
}

shouldRescheduleChecker = true;
// check if timers are due after restart
scheduleTriggerEntitiesTask();
Expand Down Expand Up @@ -125,4 +133,20 @@ public TaskResult execute(final TaskResultBuilder taskResultBuilder) {
return taskResultBuilder.build();
}
}

/**
* Abstracts over async and sync scheduling methods of {@link
* io.camunda.zeebe.stream.api.scheduling.ProcessingScheduleService}.
*/
@FunctionalInterface
interface ScheduleDelayed {
/**
* Implemented by either {@link
* io.camunda.zeebe.stream.api.scheduling.ProcessingScheduleService#runDelayed(Duration, Task)}
* or {@link
* io.camunda.zeebe.stream.api.scheduling.ProcessingScheduleService#runDelayedAsync(Duration,
* Task)}
*/
void runDelayed(final Duration delay, final Task task);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public DueDateTimerChecker(
dueDateChecker =
new DueDateChecker(
TIMER_RESOLUTION,
featureFlags.enableTimerDueDateCheckerAsync(),
new TriggerTimersSideEffect(
timerInstanceState, ActorClock.current(), featureFlags.yieldingDueDateChecker()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,27 @@
import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.immutable.TimerInstanceState;
import io.camunda.zeebe.engine.state.instance.DbTimerInstanceState;
import io.camunda.zeebe.engine.state.message.DbMessageState;
import io.camunda.zeebe.protocol.ZbColumnFamilies;

/** Contains read-only state that can be accessed safely by scheduled tasks. */
public final class ScheduledTaskDbState {
private final MessageState messageState;
private final TimerInstanceState timerInstanceState;

public ScheduledTaskDbState(
final ZeebeDb<ZbColumnFamilies> zeebeDb, final TransactionContext transactionContext) {
this.messageState = new DbMessageState(zeebeDb, transactionContext);
this.timerInstanceState = new DbTimerInstanceState(zeebeDb, transactionContext);
}

public MessageState getMessageState() {
return messageState;
}

public TimerInstanceState getTimerState() {
return timerInstanceState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class DueDateCheckerTest {
@Test
public void shouldNotScheduleTwoTasks() {
// given
final var dueDateChecker = new DueDateChecker(100, (builder) -> 0L);
final var dueDateChecker = new DueDateChecker(100, false, (builder) -> 0L);
final var mockContext = mock(ReadonlyStreamProcessorContext.class);
final var mockScheduleService = mock(ProcessingScheduleService.class);

Expand All @@ -48,7 +48,7 @@ public void shouldNotScheduleTwoTasks() {
@Test
public void shouldScheduleForAnEarlierTasks() {
// given
final var dueDateChecker = new DueDateChecker(100, (builder) -> 0L);
final var dueDateChecker = new DueDateChecker(100, false, (builder) -> 0L);
final var mockContext = mock(ReadonlyStreamProcessorContext.class);
final var mockScheduleService = mock(ProcessingScheduleService.class);

Expand Down
14 changes: 11 additions & 3 deletions util/src/main/java/io/camunda/zeebe/util/FeatureFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import org.apache.commons.lang3.builder.ToStringStyle;

public record FeatureFlags(
boolean yieldingDueDateChecker, boolean enableActorMetrics, boolean enableMessageTTLCheckerAsync
boolean yieldingDueDateChecker,
boolean enableActorMetrics,
boolean enableMessageTTLCheckerAsync,
boolean enableTimerDueDateCheckerAsync
/*, boolean foo*/ ) {

/* To add a new feature toggle, please follow these steps:
Expand Down Expand Up @@ -45,10 +48,14 @@ public record FeatureFlags(
private static final boolean ENABLE_ACTOR_METRICS = false;

private static final boolean ENABLE_MSG_TTL_CHECKER_ASYNC = false;
private static final boolean ENABLE_DUE_DATE_CHECKER_ASYNC = false;

public static FeatureFlags createDefault() {
return new FeatureFlags(
YIELDING_DUE_DATE_CHECKER, ENABLE_ACTOR_METRICS, ENABLE_MSG_TTL_CHECKER_ASYNC
YIELDING_DUE_DATE_CHECKER,
ENABLE_ACTOR_METRICS,
ENABLE_MSG_TTL_CHECKER_ASYNC,
ENABLE_DUE_DATE_CHECKER_ASYNC
/*, FOO_DEFAULT*/ );
}

Expand All @@ -61,7 +68,8 @@ public static FeatureFlags createDefaultForTests() {
return new FeatureFlags(
true, /* YIELDING_DUE_DATE_CHECKER*/
false, /* ENABLE_ACTOR_METRICS */
true /* ENABLE_MSG_TTL_CHECKER_ASYNC */
true, /* ENABLE_MSG_TTL_CHECKER_ASYNC */
true /* ENABLE_DUE_DATE_CHECKER_ASYNC */
/*, FOO_DEFAULT*/ );
}

Expand Down

0 comments on commit f402edc

Please sign in to comment.