Skip to content

Commit

Permalink
refactor(engine): simplify pending process message subscription state
Browse files Browse the repository at this point in the history
The previous class hierarchy was a bit confusing. With these changes
here, the `DbProcessMessageSubscriptionState` is responsible for all
state changes. When necessary, it reads from and writes to the transient
state directly.

This also adjusts names and package paths to indicate that reading and
writing transient state is not a mutable operation.
  • Loading branch information
lenaschoenburg committed Jun 12, 2023
1 parent d928ba2 commit f8a7dc4
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.state.immutable.PendingProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.message.ProcessMessageSubscription;
import io.camunda.zeebe.engine.state.mutable.MutablePendingProcessMessageSubscriptionState;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
Expand All @@ -23,15 +23,15 @@ public final class PendingProcessMessageSubscriptionChecker
private static final Duration SUBSCRIPTION_CHECK_INTERVAL = Duration.ofSeconds(30);

private final SubscriptionCommandSender commandSender;
private final MutablePendingProcessMessageSubscriptionState pendingState;
private final PendingProcessMessageSubscriptionState pendingState;
private final long subscriptionTimeoutInMillis;

private ProcessingScheduleService scheduleService;
private boolean schouldRescheduleTimer = false;

public PendingProcessMessageSubscriptionChecker(
final SubscriptionCommandSender commandSender,
final MutablePendingProcessMessageSubscriptionState pendingState) {
final PendingProcessMessageSubscriptionState pendingState) {
this.commandSender = commandSender;
this.pendingState = pendingState;
subscriptionTimeoutInMillis = SUBSCRIPTION_TIMEOUT.toMillis();
Expand Down Expand Up @@ -76,7 +76,7 @@ private void cancelTimer() {
}

private void checkPendingSubscriptions() {
pendingState.visitSubscriptionBefore(
pendingState.visitPending(
ActorClock.currentTimeMillis() - subscriptionTimeoutInMillis, this::sendPendingCommand);
rescheduleTimer();
}
Expand All @@ -90,7 +90,7 @@ private boolean sendPendingCommand(final ProcessMessageSubscription subscription
}

final var sentTime = ActorClock.currentTimeMillis();
pendingState.updateSentTime(subscription.getRecord(), sentTime);
pendingState.onSent(subscription.getRecord(), sentTime);

return true; // to continue visiting
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.camunda.zeebe.engine.state.deployment.DbProcessState;
import io.camunda.zeebe.engine.state.distribution.DbDistributionState;
import io.camunda.zeebe.engine.state.immutable.PendingMessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.PendingProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.instance.DbElementInstanceState;
import io.camunda.zeebe.engine.state.instance.DbEventScopeInstanceState;
import io.camunda.zeebe.engine.state.instance.DbIncidentState;
Expand All @@ -38,7 +39,6 @@
import io.camunda.zeebe.engine.state.mutable.MutableMessageState;
import io.camunda.zeebe.engine.state.mutable.MutableMessageSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutableMigrationState;
import io.camunda.zeebe.engine.state.mutable.MutablePendingProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessingState;
Expand Down Expand Up @@ -210,7 +210,7 @@ public PendingMessageSubscriptionState getPendingMessageSubscriptionState() {
}

@Override
public MutablePendingProcessMessageSubscriptionState getPendingProcessMessageSubscriptionState() {
public PendingProcessMessageSubscriptionState getPendingProcessMessageSubscriptionState() {
return processMessageSubscriptionState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.state.mutable;
package io.camunda.zeebe.engine.state.immutable;

import io.camunda.zeebe.engine.state.immutable.ProcessMessageSubscriptionState.ProcessMessageSubscriptionVisitor;
import io.camunda.zeebe.protocol.impl.record.value.message.ProcessMessageSubscriptionRecord;
Expand All @@ -16,9 +16,18 @@
* during opening or closing of a process message subscription. This state is not persisted to disk
* and needs to be recovered after restart
*/
public interface MutablePendingProcessMessageSubscriptionState {
public interface PendingProcessMessageSubscriptionState {

void visitSubscriptionBefore(long deadline, ProcessMessageSubscriptionVisitor visitor);
/**
* Visits all pending process message subscriptions where a command hasn't been sent out since a
* given deadline. The visitor is called for each subscription, from the oldest to the newest.
*/
void visitPending(long deadline, ProcessMessageSubscriptionVisitor visitor);

void updateSentTime(ProcessMessageSubscriptionRecord record, long commandSentTime);
/**
* Should be called when a pending subscription is sent out. This is used to keep track of the
* last time a command was sent out for a subscription. Freshly sent-out subscriptions are not
* visited by {@link #visitPending(long, ProcessMessageSubscriptionVisitor)}.
*/
void onSent(ProcessMessageSubscriptionRecord record, long timestampMs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public interface ProcessingState extends StreamProcessorLifecycleAware {

PendingMessageSubscriptionState getPendingMessageSubscriptionState();

PendingProcessMessageSubscriptionState getPendingProcessMessageSubscriptionState();

int getPartitionId();

boolean isEmpty(final ZbColumnFamilies column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@
import io.camunda.zeebe.db.impl.DbCompositeKey;
import io.camunda.zeebe.db.impl.DbLong;
import io.camunda.zeebe.db.impl.DbString;
import io.camunda.zeebe.engine.state.mutable.MutablePendingProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.PendingProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.message.TransientPendingSubscriptionState.PendingSubscription;
import io.camunda.zeebe.engine.state.mutable.MutableProcessMessageSubscriptionState;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.impl.record.value.message.ProcessMessageSubscriptionRecord;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import io.camunda.zeebe.stream.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.stream.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;

public final class DbProcessMessageSubscriptionState
implements MutableProcessMessageSubscriptionState,
MutablePendingProcessMessageSubscriptionState,
PendingProcessMessageSubscriptionState,
StreamProcessorLifecycleAware {

// (elementInstanceKey, messageName) => ProcessMessageSubscription
Expand All @@ -35,8 +38,8 @@ public final class DbProcessMessageSubscriptionState
private final ColumnFamily<DbCompositeKey<DbLong, DbString>, ProcessMessageSubscription>
subscriptionColumnFamily;

private final PendingProcessMessageSubscriptionState transientState =
new PendingProcessMessageSubscriptionState(this);
private final TransientPendingSubscriptionState transientState =
new TransientPendingSubscriptionState();

public DbProcessMessageSubscriptionState(
final ZeebeDb<ZbColumnFamilies> zeebeDb, final TransactionContext transactionContext) {
Expand All @@ -58,7 +61,11 @@ public void onRecovered(final ReadonlyStreamProcessorContext context) {
subscriptionColumnFamily.forEach(
subscription -> {
if (subscription.isOpening() || subscription.isClosing()) {
transientState.add(subscription.getRecord());
transientState.add(
new PendingSubscription(
subscription.getRecord().getElementInstanceKey(),
subscription.getRecord().getMessageName()),
ActorClock.currentTimeMillis());
}
});
}
Expand All @@ -72,25 +79,32 @@ public void put(final long key, final ProcessMessageSubscriptionRecord record) {

subscriptionColumnFamily.insert(elementKeyAndMessageName, processMessageSubscription);

transientState.add(record);
transientState.add(
new PendingSubscription(record.getElementInstanceKey(), record.getMessageName()),
ActorClock.currentTimeMillis());
}

@Override
public void updateToOpeningState(final ProcessMessageSubscriptionRecord record) {
update(record, s -> s.setRecord(record).setOpening());
transientState.add(record);
transientState.add(
new PendingSubscription(record.getElementInstanceKey(), record.getMessageName()),
ActorClock.currentTimeMillis());
}

@Override
public void updateToOpenedState(final ProcessMessageSubscriptionRecord record) {
update(record, s -> s.setRecord(record).setOpened());
transientState.remove(record);
transientState.remove(
new PendingSubscription(record.getElementInstanceKey(), record.getMessageName()));
}

@Override
public void updateToClosingState(final ProcessMessageSubscriptionRecord record) {
update(record, s -> s.setRecord(record).setClosing());
transientState.add(record);
transientState.add(
new PendingSubscription(record.getElementInstanceKey(), record.getMessageName()),
ActorClock.currentTimeMillis());
}

@Override
Expand Down Expand Up @@ -133,16 +147,23 @@ public boolean existSubscriptionForElementInstance(
}

@Override
public void visitSubscriptionBefore(
final long deadline, final ProcessMessageSubscriptionVisitor visitor) {
public void visitPending(final long deadline, final ProcessMessageSubscriptionVisitor visitor) {

transientState.visitSubscriptionBefore(deadline, visitor);
for (final var pendingSubscription : transientState.entriesBefore(deadline)) {
final var subscription =
getSubscription(
pendingSubscription.elementInstanceKey(),
BufferUtil.wrapString(pendingSubscription.messageName()));

visitor.visit(subscription);
}
}

@Override
public void updateSentTime(
final ProcessMessageSubscriptionRecord record, final long commandSentTime) {
transientState.updateSentTime(record, commandSentTime);
public void onSent(final ProcessMessageSubscriptionRecord record, final long timestampMs) {
transientState.update(
new PendingSubscription(record.getElementInstanceKey(), record.getMessageName()),
timestampMs);
}

private void update(
Expand Down Expand Up @@ -175,7 +196,10 @@ private void remove(final ProcessMessageSubscription subscription) {

subscriptionColumnFamily.deleteExisting(elementKeyAndMessageName);

transientState.remove(subscription.getRecord());
transientState.remove(
new PendingSubscription(
subscription.getRecord().getElementInstanceKey(),
subscription.getRecord().getMessageName()));
}

private void wrapSubscriptionKeys(final long elementInstanceKey, final DirectBuffer messageName) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import java.util.stream.Collectors;

/**
* This class is used by {@link PendingProcessMessageSubscriptionState} and {@link
* This class is used by {@link DbProcessMessageSubscriptionState} and {@link
* DbMessageSubscriptionState} to keep track of pending (process) message subscriptions. {@link
* PendingSubscription PendingSubscriptions} are added with a last sent time. The time can be
* updated with {@link #update(PendingSubscription, long)}. Pending subscriptions are retrieved with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import io.camunda.zeebe.engine.state.deployment.PersistedDecision;
import io.camunda.zeebe.engine.state.deployment.PersistedDecisionRequirements;
import io.camunda.zeebe.engine.state.immutable.PendingMessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.PendingProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutableElementInstanceState;
import io.camunda.zeebe.engine.state.mutable.MutableEventScopeInstanceState;
import io.camunda.zeebe.engine.state.mutable.MutableMessageSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutableMigrationState;
import io.camunda.zeebe.engine.state.mutable.MutablePendingProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessMessageSubscriptionState;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.impl.record.value.message.ProcessMessageSubscriptionRecord;
Expand Down Expand Up @@ -191,7 +191,7 @@ public void migrateMessageSubscriptionSentTime(
@Override
public void migrateProcessMessageSubscriptionSentTime(
final MutableProcessMessageSubscriptionState persistentState,
final MutablePendingProcessMessageSubscriptionState transientState) {
final PendingProcessMessageSubscriptionState transientState) {

processSubscriptionSentTimeColumnFamily.forEach(
(key, value) -> {
Expand All @@ -214,12 +214,12 @@ final var record = processMessageSubscription.getRecord();
// explicit call to put(..). This has the desired side-effect that the subscription
// is added to transient state
persistentState.updateToOpeningState(exclusiveCopy);
transientState.updateSentTime(exclusiveCopy, sentTime);
transientState.onSent(exclusiveCopy, sentTime);
} else if (processMessageSubscription.isClosing()) {
// explicit call to updateToClosingState(..). This has the desired side-effect that
// the subscription is added to transient state
persistentState.updateToClosingState(exclusiveCopy);
transientState.updateSentTime(exclusiveCopy, sentTime);
transientState.onSent(exclusiveCopy, sentTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.camunda.zeebe.engine.state.mutable;

import io.camunda.zeebe.engine.state.immutable.PendingMessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.PendingProcessMessageSubscriptionState;

public interface MutableMigrationState {

Expand All @@ -17,7 +18,7 @@ void migrateMessageSubscriptionSentTime(

void migrateProcessMessageSubscriptionSentTime(
final MutableProcessMessageSubscriptionState persistentSate,
final MutablePendingProcessMessageSubscriptionState transientState);
final PendingProcessMessageSubscriptionState transientState);

void migrateTemporaryVariables(
final MutableEventScopeInstanceState eventScopeInstanceState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/
package io.camunda.zeebe.engine.state.mutable;

import io.camunda.zeebe.engine.state.immutable.PendingProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.ProcessingState;
import io.camunda.zeebe.stream.api.state.KeyGenerator;

Expand Down Expand Up @@ -63,7 +62,5 @@ public interface MutableProcessingState extends ProcessingState {

MutableMigrationState getMigrationState();

PendingProcessMessageSubscriptionState getPendingProcessMessageSubscriptionState();

KeyGenerator getKeyGenerator();
}

0 comments on commit f8a7dc4

Please sign in to comment.