Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(engine): thread-safe transient subscription state #13072

Merged
merged 4 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.mutable.MutablePendingMessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.PendingMessageSubscriptionState;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import java.time.Duration;
Expand All @@ -21,14 +21,14 @@ public final class MessageObserver implements StreamProcessorLifecycleAware {

private final SubscriptionCommandSender subscriptionCommandSender;
private final MessageState messageState;
private final MutablePendingMessageSubscriptionState pendingState;
private final PendingMessageSubscriptionState pendingState;
private final int messagesTtlCheckerBatchLimit;
private final Duration messagesTtlCheckerInterval;
private final boolean enableMessageTtlCheckerAsync;

public MessageObserver(
final MessageState messageState,
final MutablePendingMessageSubscriptionState pendingState,
final PendingMessageSubscriptionState pendingState,
final SubscriptionCommandSender subscriptionCommandSender,
final Duration messagesTtlCheckerInterval,
final int messagesTtlCheckerBatchLimit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,28 @@
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.state.immutable.PendingMessageSubscriptionState;
import io.camunda.zeebe.engine.state.message.MessageSubscription;
import io.camunda.zeebe.engine.state.mutable.MutablePendingMessageSubscriptionState;
import io.camunda.zeebe.scheduler.clock.ActorClock;

public final class PendingMessageSubscriptionChecker implements Runnable {
private final SubscriptionCommandSender commandSender;
private final MutablePendingMessageSubscriptionState transientState;
private final PendingMessageSubscriptionState state;

private final long subscriptionTimeout;

public PendingMessageSubscriptionChecker(
final SubscriptionCommandSender commandSender,
final MutablePendingMessageSubscriptionState transientState,
final PendingMessageSubscriptionState state,
final long subscriptionTimeout) {
this.commandSender = commandSender;
this.transientState = transientState;
this.state = state;
this.subscriptionTimeout = subscriptionTimeout;
}

@Override
public void run() {
transientState.visitSubscriptionBefore(
ActorClock.currentTimeMillis() - subscriptionTimeout, this::sendCommand);
state.visitPending(ActorClock.currentTimeMillis() - subscriptionTimeout, this::sendCommand);
}

private boolean sendCommand(final MessageSubscription subscription) {
Expand All @@ -47,7 +46,7 @@ final var record = subscription.getRecord();

// TODO (saig0): the state change of the sent time should be reflected by a record (#6364)
final var sentTime = ActorClock.currentTimeMillis();
transientState.updateCommandSentTime(subscription.getRecord(), sentTime);
state.onSent(record, sentTime);

return true; // to continue visiting
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.state.immutable.PendingProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.message.ProcessMessageSubscription;
import io.camunda.zeebe.engine.state.mutable.MutablePendingProcessMessageSubscriptionState;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
Expand All @@ -23,15 +23,15 @@ public final class PendingProcessMessageSubscriptionChecker
private static final Duration SUBSCRIPTION_CHECK_INTERVAL = Duration.ofSeconds(30);

private final SubscriptionCommandSender commandSender;
private final MutablePendingProcessMessageSubscriptionState pendingState;
private final PendingProcessMessageSubscriptionState pendingState;
private final long subscriptionTimeoutInMillis;

private ProcessingScheduleService scheduleService;
private boolean schouldRescheduleTimer = false;

public PendingProcessMessageSubscriptionChecker(
final SubscriptionCommandSender commandSender,
final MutablePendingProcessMessageSubscriptionState pendingState) {
final PendingProcessMessageSubscriptionState pendingState) {
this.commandSender = commandSender;
this.pendingState = pendingState;
subscriptionTimeoutInMillis = SUBSCRIPTION_TIMEOUT.toMillis();
Expand Down Expand Up @@ -76,7 +76,7 @@ private void cancelTimer() {
}

private void checkPendingSubscriptions() {
pendingState.visitSubscriptionBefore(
pendingState.visitPending(
ActorClock.currentTimeMillis() - subscriptionTimeoutInMillis, this::sendPendingCommand);
rescheduleTimer();
}
Expand All @@ -90,7 +90,7 @@ private boolean sendPendingCommand(final ProcessMessageSubscription subscription
}

final var sentTime = ActorClock.currentTimeMillis();
pendingState.updateSentTime(subscription.getRecord(), sentTime);
pendingState.onSent(subscription.getRecord(), sentTime);

return true; // to continue visiting
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import io.camunda.zeebe.engine.state.deployment.DbDeploymentState;
import io.camunda.zeebe.engine.state.deployment.DbProcessState;
import io.camunda.zeebe.engine.state.distribution.DbDistributionState;
import io.camunda.zeebe.engine.state.immutable.PendingMessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.PendingProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.instance.DbElementInstanceState;
import io.camunda.zeebe.engine.state.instance.DbEventScopeInstanceState;
import io.camunda.zeebe.engine.state.instance.DbIncidentState;
Expand All @@ -37,8 +39,6 @@
import io.camunda.zeebe.engine.state.mutable.MutableMessageState;
import io.camunda.zeebe.engine.state.mutable.MutableMessageSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutableMigrationState;
import io.camunda.zeebe.engine.state.mutable.MutablePendingMessageSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutablePendingProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessingState;
Expand Down Expand Up @@ -205,12 +205,12 @@ public MutableMigrationState getMigrationState() {
}

@Override
public MutablePendingMessageSubscriptionState getPendingMessageSubscriptionState() {
public PendingMessageSubscriptionState getPendingMessageSubscriptionState() {
return messageSubscriptionState;
}

@Override
public MutablePendingProcessMessageSubscriptionState getPendingProcessMessageSubscriptionState() {
public PendingProcessMessageSubscriptionState getPendingProcessMessageSubscriptionState() {
return processMessageSubscriptionState;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.state.immutable;

import io.camunda.zeebe.engine.state.immutable.MessageSubscriptionState.MessageSubscriptionVisitor;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;

public interface PendingMessageSubscriptionState {

/**
* Visits all pending message subscriptions where a command hasn't been sent out since a given
* deadline. The visitor is called for each subscription, from the oldest to the newest.
*/
void visitPending(final long deadline, final MessageSubscriptionVisitor visitor);

/**
* Should be called when a pending subscription is sent out. This is used to keep track of the
* last time a command was sent out for a subscription. Freshly sent-out subscriptions are not
* visited by {@link #visitPending(long, MessageSubscriptionVisitor)}.
*/
void onSent(final long elementInstance, final String messageName, final long timestampMs);

default void onSent(final MessageSubscriptionRecord subscription, final long timestampMs) {
onSent(subscription.getElementInstanceKey(), subscription.getMessageName(), timestampMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.state.mutable;
package io.camunda.zeebe.engine.state.immutable;

import io.camunda.zeebe.engine.state.immutable.ProcessMessageSubscriptionState.ProcessMessageSubscriptionVisitor;
import io.camunda.zeebe.protocol.impl.record.value.message.ProcessMessageSubscriptionRecord;
Expand All @@ -16,9 +16,18 @@
* during opening or closing of a process message subscription. This state is not persisted to disk
* and needs to be recovered after restart
*/
public interface MutablePendingProcessMessageSubscriptionState {
public interface PendingProcessMessageSubscriptionState {

void visitSubscriptionBefore(long deadline, ProcessMessageSubscriptionVisitor visitor);
/**
* Visits all pending process message subscriptions where a command hasn't been sent out since a
* given deadline. The visitor is called for each subscription, from the oldest to the newest.
*/
void visitPending(long deadline, ProcessMessageSubscriptionVisitor visitor);

void updateSentTime(ProcessMessageSubscriptionRecord record, long commandSentTime);
/**
* Should be called when a pending subscription is sent out. This is used to keep track of the
* last time a command was sent out for a subscription. Freshly sent-out subscriptions are not
* visited by {@link #visitPending(long, ProcessMessageSubscriptionVisitor)}.
*/
void onSent(ProcessMessageSubscriptionRecord record, long timestampMs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public interface ProcessingState extends StreamProcessorLifecycleAware {

DistributionState getDistributionState();

PendingMessageSubscriptionState getPendingMessageSubscriptionState();

PendingProcessMessageSubscriptionState getPendingProcessMessageSubscriptionState();

int getPartitionId();

boolean isEmpty(final ZbColumnFamilies column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@
import io.camunda.zeebe.db.impl.DbLong;
import io.camunda.zeebe.db.impl.DbNil;
import io.camunda.zeebe.db.impl.DbString;
import io.camunda.zeebe.engine.state.immutable.PendingMessageSubscriptionState;
import io.camunda.zeebe.engine.state.message.TransientPendingSubscriptionState.PendingSubscription;
import io.camunda.zeebe.engine.state.mutable.MutableMessageSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutablePendingMessageSubscriptionState;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.util.buffer.BufferUtil;
import org.agrona.DirectBuffer;

public final class DbMessageSubscriptionState
implements MutableMessageSubscriptionState,
MutablePendingMessageSubscriptionState,
PendingMessageSubscriptionState,
StreamProcessorLifecycleAware {

// (elementInstanceKey, messageName) => MessageSubscription
Expand All @@ -44,8 +46,8 @@ public final class DbMessageSubscriptionState
private final ColumnFamily<DbCompositeKey<DbCompositeKey<DbString, DbString>, DbLong>, DbNil>
messageNameAndCorrelationKeyColumnFamily;

private final PendingMessageSubscriptionState transientState =
new PendingMessageSubscriptionState(this);
private final TransientPendingSubscriptionState transientState =
new TransientPendingSubscriptionState();

public DbMessageSubscriptionState(
final ZeebeDb<ZbColumnFamilies> zeebeDb, final TransactionContext transactionContext) {
Expand Down Expand Up @@ -78,7 +80,9 @@ public void onRecovered(final ReadonlyStreamProcessorContext context) {
subscriptionColumnFamily.forEach(
subscription -> {
if (subscription.isCorrelating()) {
transientState.add(subscription.getRecord());
transientState.add(
new PendingSubscription(elementInstanceKey.getValue(), messageName.toString()),
ActorClock.currentTimeMillis());
}
});
}
Expand Down Expand Up @@ -151,13 +155,20 @@ public void updateToCorrelatingState(final MessageSubscriptionRecord record) {

updateCorrelatingFlag(subscription, true);

transientState.add(record);
transientState.add(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 Although the implementation of add and update are equivalent, semantically, it might make more sense to use update here.

new PendingSubscription(
subscription.getRecord().getElementInstanceKey(),
subscription.getRecord().getMessageName()),
ActorClock.currentTimeMillis());
}

@Override
public void updateToCorrelatedState(final MessageSubscription subscription) {
updateCorrelatingFlag(subscription, false);
transientState.remove(subscription.getRecord());
transientState.remove(
new PendingSubscription(
subscription.getRecord().getElementInstanceKey(),
subscription.getRecord().getMessageName()));
}

@Override
Expand All @@ -184,7 +195,8 @@ final var record = subscription.getRecord();
correlationKey.wrapBuffer(record.getCorrelationKeyBuffer());
messageNameAndCorrelationKeyColumnFamily.deleteExisting(nameCorrelationAndElementInstanceKey);

transientState.remove(subscription.getRecord());
transientState.remove(
new PendingSubscription(elementInstanceKey.getValue(), messageName.toString()));
}

private void updateCorrelatingFlag(
Expand Down Expand Up @@ -213,13 +225,20 @@ private Boolean visitMessageSubscription(
}

@Override
public void visitSubscriptionBefore(
final long deadline, final MessageSubscriptionVisitor visitor) {
transientState.visitSubscriptionBefore(deadline, visitor);
public void visitPending(final long deadline, final MessageSubscriptionVisitor visitor) {
for (final var pendingSubscription : transientState.entriesBefore(deadline)) {
final var subscription =
get(
pendingSubscription.elementInstanceKey(),
BufferUtil.wrapString(pendingSubscription.messageName()));

visitor.visit(subscription);
}
}

@Override
public void updateCommandSentTime(final MessageSubscriptionRecord record, final long sentTime) {
transientState.updateCommandSentTime(record, sentTime);
public void onSent(
final long elementInstanceKey, final String messageName, final long timestampMs) {
transientState.update(new PendingSubscription(elementInstanceKey, messageName), timestampMs);
}
}