Skip to content

Commit

Permalink
chore(engine): apply event sourcing to message publish processor
Browse files Browse the repository at this point in the history
* use the state writer in the message publish processor to apply the state changes
* extend the ES templates for the new record properties
* disable the replay filter to restore the key generator for migrated processors
  • Loading branch information
saig0 committed Feb 18, 2021
1 parent acc7b3e commit df249f6
Show file tree
Hide file tree
Showing 27 changed files with 499 additions and 112 deletions.
Expand Up @@ -84,26 +84,34 @@ public long triggerStartEvent(
workflowKey, newElementInstanceKey, elementId, variables);

if (triggered) {

final var workflowInstanceKey = keyGenerator.nextKey();
final var eventOccurredKey = keyGenerator.nextKey();

eventOccurredRecord
.setBpmnElementType(BpmnElementType.START_EVENT)
.setWorkflowKey(workflowKey)
.setWorkflowInstanceKey(workflowInstanceKey)
.setElementId(elementId);

streamWriter.appendFollowUpEvent(
eventOccurredKey, WorkflowInstanceIntent.EVENT_OCCURRED, eventOccurredRecord);

activateStartEvent(streamWriter, workflowKey, workflowInstanceKey, elementId);
return workflowInstanceKey;

} else {
return -1L;
}
}

public void activateStartEvent(
final TypedStreamWriter streamWriter,
final long workflowKey,
final long workflowInstanceKey,
final DirectBuffer elementId) {

final var eventOccurredKey = keyGenerator.nextKey();

eventOccurredRecord
.setBpmnElementType(BpmnElementType.START_EVENT)
.setWorkflowKey(workflowKey)
.setWorkflowInstanceKey(workflowInstanceKey)
.setElementId(elementId);

// TODO (saig0): create the workflow instance by writing an ACTIVATE command (#6184)
streamWriter.appendFollowUpEvent(
eventOccurredKey, WorkflowInstanceIntent.EVENT_OCCURRED, eventOccurredRecord);
}

private boolean isEventSubprocess(final ExecutableFlowElement catchEvent) {
return catchEvent instanceof ExecutableStartEvent
&& ((ExecutableStartEvent) catchEvent).getEventSubProcess() != null;
Expand Down
Expand Up @@ -42,13 +42,14 @@ public static void addMessageProcessors(
.onCommand(
ValueType.MESSAGE,
MessageIntent.PUBLISH,
new PublishMessageProcessor(
new MessagePublishProcessor(
messageState,
subscriptionState,
startEventSubscriptionState,
eventScopeInstanceState,
subscriptionCommandSender,
keyGenerator))
keyGenerator,
writers))
.onCommand(
ValueType.MESSAGE, MessageIntent.EXPIRE, new MessageExpireProcessor(writers.state()))
.onCommand(
Expand Down
Expand Up @@ -14,52 +14,61 @@
import io.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.zeebe.engine.state.KeyGenerator;
import io.zeebe.engine.state.immutable.MessageStartEventSubscriptionState;
import io.zeebe.engine.state.message.Message;
import io.zeebe.engine.state.immutable.MessageState;
import io.zeebe.engine.state.immutable.MessageSubscriptionState;
import io.zeebe.engine.state.mutable.MutableEventScopeInstanceState;
import io.zeebe.engine.state.mutable.MutableMessageState;
import io.zeebe.engine.state.mutable.MutableMessageSubscriptionState;
import io.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.zeebe.protocol.impl.record.value.message.MessageStartEventSubscriptionRecord;
import io.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.zeebe.protocol.record.RejectionType;
import io.zeebe.protocol.record.intent.MessageIntent;
import io.zeebe.util.sched.clock.ActorClock;
import io.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent;
import io.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import java.util.function.Consumer;

public final class PublishMessageProcessor implements TypedRecordProcessor<MessageRecord> {
public final class MessagePublishProcessor implements TypedRecordProcessor<MessageRecord> {

private static final String ALREADY_PUBLISHED_MESSAGE =
"Expected to publish a new message with id '%s', but a message with that id was already published";

private final MutableMessageState messageState;
private final MutableMessageSubscriptionState subscriptionState;
private final MessageState messageState;
private final MessageSubscriptionState subscriptionState;
private final MessageStartEventSubscriptionState startEventSubscriptionState;
private final SubscriptionCommandSender commandSender;
private final KeyGenerator keyGenerator;
private final EventHandle eventHandle;
private final StateWriter stateWriter;

private final EventHandle eventHandle;
private final Subscriptions correlatingSubscriptions = new Subscriptions();

private TypedResponseWriter responseWriter;
private MessageRecord messageRecord;
private long messageKey;

public PublishMessageProcessor(
public MessagePublishProcessor(
final MutableMessageState messageState,
final MutableMessageSubscriptionState subscriptionState,
final MessageStartEventSubscriptionState startEventSubscriptionState,
final MutableEventScopeInstanceState scopeEventInstanceState,
final MutableEventScopeInstanceState eventScopeInstanceState,
final SubscriptionCommandSender commandSender,
final KeyGenerator keyGenerator) {
final KeyGenerator keyGenerator,
final Writers writers) {
this.messageState = messageState;
this.subscriptionState = subscriptionState;
this.startEventSubscriptionState = startEventSubscriptionState;
this.commandSender = commandSender;
this.keyGenerator = keyGenerator;
stateWriter = writers.state();

eventHandle = new EventHandle(keyGenerator, scopeEventInstanceState);
eventHandle = new EventHandle(keyGenerator, eventScopeInstanceState);
}

@Override
Expand Down Expand Up @@ -97,7 +106,10 @@ private void handleNewMessage(
final Consumer<SideEffectProducer> sideEffect) {
messageKey = keyGenerator.nextKey();

streamWriter.appendFollowUpEvent(messageKey, MessageIntent.PUBLISHED, command.getValue());
// calculate the deadline based on the command's timestamp
messageRecord.setDeadline(command.getTimestamp() + messageRecord.getTimeToLive());

stateWriter.appendFollowUpEvent(messageKey, MessageIntent.PUBLISHED, command.getValue());
responseWriter.writeEventOnCommand(
messageKey, MessageIntent.PUBLISHED, command.getValue(), command);

Expand All @@ -106,17 +118,9 @@ private void handleNewMessage(

sideEffect.accept(this::sendCorrelateCommand);

if (messageRecord.getTimeToLive() > 0L) {
final Message message = newMessage(messageKey, messageRecord, command.getTimestamp());
messageState.put(message);

// avoid correlating this message to the workflow again
correlatingSubscriptions.visitBpmnProcessIds(
bpmnProcessId -> messageState.putMessageCorrelation(messageKey, bpmnProcessId));

} else {
// don't need to add the message to the store - it can not be correlated afterwards
streamWriter.appendFollowUpEvent(messageKey, MessageIntent.EXPIRED, messageRecord);
if (messageRecord.getTimeToLive() <= 0L) {
// avoid that the message can be correlated again by writing the EXPIRED event as a follow-up
stateWriter.appendFollowUpEvent(messageKey, MessageIntent.EXPIRED, messageRecord);
}
}

Expand All @@ -132,11 +136,21 @@ private void correlateToSubscriptions(final long messageKey, final MessageRecord

correlatingSubscriptions.add(subscription);

subscriptionState.updateToCorrelatingState(
subscription,
message.getVariablesBuffer(),
ActorClock.currentTimeMillis(),
messageKey);
// TODO (saig0): reuse the subscription record in the state (#6180)
final var messageSubscriptionRecord =
new MessageSubscriptionRecord()
.setBpmnProcessId(subscription.getBpmnProcessId())
.setWorkflowInstanceKey(subscription.getWorkflowInstanceKey())
.setElementInstanceKey(subscription.getElementInstanceKey())
.setMessageName(subscription.getMessageName())
.setMessageKey(messageKey)
.setCorrelationKey(subscription.getCorrelationKey())
.setVariables(message.getVariablesBuffer())
.setCloseOnCorrelate(subscription.shouldCloseOnCorrelate());

// TODO (saig0): the subscription should have a key (#2805)
stateWriter.appendFollowUpEvent(
-1L, MessageSubscriptionIntent.CORRELATING, messageSubscriptionRecord);
}

return true;
Expand All @@ -159,25 +173,31 @@ private void correlateToMessageStartEvents(
|| !messageState.existActiveWorkflowInstance(
bpmnProcessIdBuffer, correlationKeyBuffer))) {

final var workflowInstanceKey =
eventHandle.triggerStartEvent(
streamWriter,
subscription.getWorkflowKey(),
subscription.getStartEventIdBuffer(),
messageRecord.getVariablesBuffer());

if (workflowInstanceKey > 0) {
correlatingSubscriptions.add(subscription);

if (correlationKeyBuffer.capacity() > 0) {
// lock the workflow for this correlation key
// - other messages with same correlation key are not correlated to this workflow
// until the created instance is ended
messageState.putActiveWorkflowInstance(bpmnProcessIdBuffer, correlationKeyBuffer);
messageState.putWorkflowInstanceCorrelationKey(
workflowInstanceKey, correlationKeyBuffer);
}
}
correlatingSubscriptions.add(subscription);

final var workflowInstanceKey = keyGenerator.nextKey();

// TODO (saig0): reuse the subscription record in the state (#6183)
final var subscriptionRecord =
new MessageStartEventSubscriptionRecord()
.setWorkflowKey(subscription.getWorkflowKey())
.setBpmnProcessId(subscription.getBpmnProcessIdBuffer())
.setStartEventId(subscription.getStartEventIdBuffer())
.setWorkflowInstanceKey(workflowInstanceKey)
.setMessageName(subscription.getMessageNameBuffer())
.setMessageKey(messageKey)
.setCorrelationKey(correlationKeyBuffer)
.setVariables(messageRecord.getVariablesBuffer());

// TODO (saig0): the subscription should have a key (#2805)
stateWriter.appendFollowUpEvent(
-1L, MessageStartEventSubscriptionIntent.CORRELATED, subscriptionRecord);

eventHandle.activateStartEvent(
streamWriter,
subscription.getWorkflowKey(),
workflowInstanceKey,
subscription.getStartEventIdBuffer());
}
});
}
Expand All @@ -196,18 +216,6 @@ private boolean sendCorrelateCommand() {
messageRecord.getVariablesBuffer(),
messageRecord.getCorrelationKeyBuffer()));

return success ? responseWriter.flush() : false;
}

private Message newMessage(
final long messageKey, final MessageRecord messageRecord, final long publishedTimestamp) {
return new Message(
messageKey,
messageRecord.getNameBuffer(),
messageRecord.getCorrelationKeyBuffer(),
messageRecord.getVariablesBuffer(),
messageRecord.getMessageIdBuffer(),
messageRecord.getTimeToLive(),
publishedTimestamp + messageRecord.getTimeToLive());
return success && responseWriter.flush();
}
}
Expand Up @@ -45,7 +45,9 @@ private boolean sendCommand(final MessageSubscription subscription) {
subscription.getCorrelationKey());

if (success) {
subscriptionState.updateSentTimeInTransaction(subscription, ActorClock.currentTimeMillis());
// TODO (saig0): the state change of the sent time should be reflected by a record (#6364)
final var sentTime = ActorClock.currentTimeMillis();
subscriptionState.updateSentTimeInTransaction(subscription, sentTime);
}

return success;
Expand Down
Expand Up @@ -9,7 +9,8 @@

import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.MessageIntent;
import io.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent;
import io.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.zeebe.protocol.record.value.BpmnElementType;
import java.util.ArrayList;
import java.util.EnumMap;
Expand Down Expand Up @@ -39,11 +40,14 @@ record -> {

MIGRATED_VALUE_TYPES.put(ValueType.ERROR, MIGRATED);
MIGRATED_VALUE_TYPES.put(ValueType.WORKFLOW, MIGRATED);
MIGRATED_VALUE_TYPES.put(ValueType.MESSAGE, MIGRATED);

MIGRATED_VALUE_TYPES.put(
ValueType.MESSAGE_SUBSCRIPTION,
record -> record.getIntent() == MessageSubscriptionIntent.CORRELATING);
MIGRATED_VALUE_TYPES.put(
ValueType.MESSAGE,
record ->
record.getIntent() == MessageIntent.EXPIRE
|| record.getIntent() == MessageIntent.EXPIRED);
ValueType.MESSAGE_START_EVENT_SUBSCRIPTION,
record -> record.getIntent() == MessageStartEventSubscriptionIntent.CORRELATED);
}

private MigratedStreamProcessors() {}
Expand Down
Expand Up @@ -113,7 +113,12 @@ public final class ReProcessingStateMachine {
private final RecordProcessorMap recordProcessorMap;

private final EventFilter eventFilter =
new MetadataEventFilter(new RecordProtocolVersionFilter().and(REPLAY_FILTER));
new MetadataEventFilter(
new RecordProtocolVersionFilter()
// TODO (saig0): enable the replay filter after all stream processors are migrated (#6202)
// until then, we need to restore the key generator for already migrated processors
// .and(REPLAY_FILTER)
);

private final LogStreamReader logStreamReader;
private final ReprocessingStreamWriter reprocessingStreamWriter = new ReprocessingStreamWriter();
Expand Down
Expand Up @@ -15,6 +15,8 @@
import io.zeebe.protocol.record.intent.DeploymentDistributionIntent;
import io.zeebe.protocol.record.intent.Intent;
import io.zeebe.protocol.record.intent.MessageIntent;
import io.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent;
import io.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.zeebe.protocol.record.intent.WorkflowInstanceIntent;
import io.zeebe.protocol.record.intent.WorkflowIntent;
import java.util.HashMap;
Expand Down Expand Up @@ -48,7 +50,18 @@ public EventAppliers(final ZeebeState state) {
register(WorkflowIntent.CREATED, new WorkflowCreatedApplier(state));
register(DeploymentDistributionIntent.DISTRIBUTING, new DeploymentDistributionApplier(state));

register(MessageIntent.PUBLISHED, new MessagePublishedApplier(state.getMessageState()));
register(MessageIntent.EXPIRED, new MessageExpiredApplier(state.getMessageState()));

register(
MessageSubscriptionIntent.CORRELATING,
new MessageSubscriptionCorrelatingApplier(
state.getMessageSubscriptionState(), state.getMessageState()));

register(
MessageStartEventSubscriptionIntent.CORRELATED,
new MessageStartEventSubscriptionCorrelatedApplier(
state.getMessageState(), state.getEventScopeInstanceState()));
}

private <I extends Intent> void register(final I intent, final TypedEventApplier<I, ?> applier) {
Expand Down
@@ -0,0 +1,40 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.0. You may not use this file
* except in compliance with the Zeebe Community License 1.0.
*/
package io.zeebe.engine.state.appliers;

import io.zeebe.engine.state.TypedEventApplier;
import io.zeebe.engine.state.message.Message;
import io.zeebe.engine.state.mutable.MutableMessageState;
import io.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.zeebe.protocol.record.intent.MessageIntent;

public final class MessagePublishedApplier
implements TypedEventApplier<MessageIntent, MessageRecord> {

private final MutableMessageState messageState;

public MessagePublishedApplier(final MutableMessageState messageState) {
this.messageState = messageState;
}

@Override
public void applyState(final long key, final MessageRecord value) {
// TODO (saig0): reuse the message record in the state
final var message =
new Message(
key,
value.getNameBuffer(),
value.getCorrelationKeyBuffer(),
value.getVariablesBuffer(),
value.getMessageIdBuffer(),
value.getTimeToLive(),
value.getDeadline());

messageState.put(message);
}
}

0 comments on commit df249f6

Please sign in to comment.