diff --git a/bpmn-model/src/main/java/io/zeebe/model/bpmn/validation/zeebe/EventBasedGatewayValidator.java b/bpmn-model/src/main/java/io/zeebe/model/bpmn/validation/zeebe/EventBasedGatewayValidator.java new file mode 100644 index 000000000000..fcd4c30bea7a --- /dev/null +++ b/bpmn-model/src/main/java/io/zeebe/model/bpmn/validation/zeebe/EventBasedGatewayValidator.java @@ -0,0 +1,85 @@ +/* + * Copyright © 2017 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.model.bpmn.validation.zeebe; + +import io.zeebe.model.bpmn.instance.EventBasedGateway; +import io.zeebe.model.bpmn.instance.EventDefinition; +import io.zeebe.model.bpmn.instance.FlowNode; +import io.zeebe.model.bpmn.instance.IntermediateCatchEvent; +import io.zeebe.model.bpmn.instance.MessageEventDefinition; +import io.zeebe.model.bpmn.instance.SequenceFlow; +import io.zeebe.model.bpmn.instance.TimerEventDefinition; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import org.camunda.bpm.model.xml.validation.ModelElementValidator; +import org.camunda.bpm.model.xml.validation.ValidationResultCollector; + +public class EventBasedGatewayValidator implements ModelElementValidator { + + private static final List> SUPPORTED_EVENTS = + Arrays.asList(TimerEventDefinition.class, MessageEventDefinition.class); + + private static final String ERROR_UNSUPPORTED_TARGET_NODE = + "Event-based gateway must not have an outgoing sequence flow to other elements than message/timer intermediate catch events."; + + @Override + public Class getElementType() { + return EventBasedGateway.class; + } + + @Override + public void validate( + EventBasedGateway element, ValidationResultCollector validationResultCollector) { + + final Collection outgoingSequenceFlows = element.getOutgoing(); + + if (outgoingSequenceFlows.size() < 2) { + validationResultCollector.addError( + 0, "Event-based gateway must have at least 2 outgoing sequence flows."); + } + + final boolean isValid = + outgoingSequenceFlows.stream().allMatch(this::isValidOutgoingSequenceFlow); + if (!isValid) { + validationResultCollector.addError(0, ERROR_UNSUPPORTED_TARGET_NODE); + } + } + + private boolean isValidOutgoingSequenceFlow(SequenceFlow flow) { + final FlowNode targetNode = flow.getTarget(); + + if (targetNode instanceof IntermediateCatchEvent) { + return isValidEvent((IntermediateCatchEvent) targetNode); + } else { + return false; + } + } + + private boolean isValidEvent(final IntermediateCatchEvent event) { + final Collection eventDefinitions = event.getEventDefinitions(); + + if (eventDefinitions.size() != 1) { + return false; + + } else { + final EventDefinition eventDefinition = eventDefinitions.iterator().next(); + return SUPPORTED_EVENTS + .stream() + .anyMatch(e -> e.isAssignableFrom(eventDefinition.getClass())); + } + } +} diff --git a/bpmn-model/src/main/java/io/zeebe/model/bpmn/validation/zeebe/FlowElementValidator.java b/bpmn-model/src/main/java/io/zeebe/model/bpmn/validation/zeebe/FlowElementValidator.java index c63905700334..42301634afa8 100644 --- a/bpmn-model/src/main/java/io/zeebe/model/bpmn/validation/zeebe/FlowElementValidator.java +++ b/bpmn-model/src/main/java/io/zeebe/model/bpmn/validation/zeebe/FlowElementValidator.java @@ -17,6 +17,7 @@ import io.zeebe.model.bpmn.instance.BoundaryEvent; import io.zeebe.model.bpmn.instance.EndEvent; +import io.zeebe.model.bpmn.instance.EventBasedGateway; import io.zeebe.model.bpmn.instance.ExclusiveGateway; import io.zeebe.model.bpmn.instance.FlowElement; import io.zeebe.model.bpmn.instance.IntermediateCatchEvent; @@ -38,6 +39,7 @@ public class FlowElementValidator implements ModelElementValidator static { SUPPORTED_ELEMENT_TYPES.add(BoundaryEvent.class); SUPPORTED_ELEMENT_TYPES.add(EndEvent.class); + SUPPORTED_ELEMENT_TYPES.add(EventBasedGateway.class); SUPPORTED_ELEMENT_TYPES.add(ExclusiveGateway.class); SUPPORTED_ELEMENT_TYPES.add(IntermediateCatchEvent.class); SUPPORTED_ELEMENT_TYPES.add(ParallelGateway.class); diff --git a/bpmn-model/src/main/java/io/zeebe/model/bpmn/validation/zeebe/ZeebeDesignTimeValidators.java b/bpmn-model/src/main/java/io/zeebe/model/bpmn/validation/zeebe/ZeebeDesignTimeValidators.java index 24d14b629ece..484aa6566382 100644 --- a/bpmn-model/src/main/java/io/zeebe/model/bpmn/validation/zeebe/ZeebeDesignTimeValidators.java +++ b/bpmn-model/src/main/java/io/zeebe/model/bpmn/validation/zeebe/ZeebeDesignTimeValidators.java @@ -31,16 +31,17 @@ public class ZeebeDesignTimeValidators { VALIDATORS.add(new DefinitionsValidator()); VALIDATORS.add(new EndEventValidator()); VALIDATORS.add(new EventDefinitionValidator()); + VALIDATORS.add(new EventBasedGatewayValidator()); VALIDATORS.add(new ExclusiveGatewayValidator()); VALIDATORS.add(new FlowElementValidator()); VALIDATORS.add(new FlowNodeValidator()); + VALIDATORS.add(new IntermediateCatchEventValidator()); VALIDATORS.add(new MessageValidator()); VALIDATORS.add(new ProcessValidator()); VALIDATORS.add(new SequenceFlowValidator()); VALIDATORS.add(new ServiceTaskValidator()); VALIDATORS.add(new ReceiveTaskValidator()); VALIDATORS.add(new StartEventValidator()); - VALIDATORS.add(new IntermediateCatchEventValidator()); VALIDATORS.add(new SubProcessValidator()); VALIDATORS.add(new ZeebeTaskDefinitionValidator()); VALIDATORS.add(new ZeebeIoMappingValidator()); diff --git a/bpmn-model/src/test/java/io/zeebe/model/bpmn/validation/ZeebeEventBasedGatewayValidationTest.java b/bpmn-model/src/test/java/io/zeebe/model/bpmn/validation/ZeebeEventBasedGatewayValidationTest.java new file mode 100644 index 000000000000..548732542c51 --- /dev/null +++ b/bpmn-model/src/test/java/io/zeebe/model/bpmn/validation/ZeebeEventBasedGatewayValidationTest.java @@ -0,0 +1,59 @@ +/* + * Copyright © 2017 camunda services GmbH (info@camunda.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zeebe.model.bpmn.validation; + +import static io.zeebe.model.bpmn.validation.ExpectedValidationResult.expect; +import static java.util.Collections.singletonList; + +import io.zeebe.model.bpmn.Bpmn; +import io.zeebe.model.bpmn.instance.EventBasedGateway; +import org.junit.runners.Parameterized.Parameters; + +public class ZeebeEventBasedGatewayValidationTest extends AbstractZeebeValidationTest { + + @Parameters(name = "{index}: {1}") + public static Object[][] parameters() { + return new Object[][] { + { + Bpmn.createExecutableProcess("process") + .startEvent() + .eventBasedGateway() + .intermediateCatchEvent() + .timerWithDuration("PT1M") + .done(), + singletonList( + expect( + EventBasedGateway.class, + "Event-based gateway must have at least 2 outgoing sequence flows.")) + }, + { + Bpmn.createExecutableProcess("process") + .startEvent() + .eventBasedGateway() + .receiveTask() + .message(m -> m.name("this").zeebeCorrelationKey("$.foo")) + .moveToLastGateway() + .receiveTask() + .message(m -> m.name("that").zeebeCorrelationKey("$.foo")) + .done(), + singletonList( + expect( + EventBasedGateway.class, + "Event-based gateway must not have an outgoing sequence flow to other elements than message/timer intermediate catch events.")) + } + }; + } +} diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/BpmnStep.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/BpmnStep.java index bae047581fb1..36730c415a0d 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/BpmnStep.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/BpmnStep.java @@ -18,53 +18,44 @@ package io.zeebe.broker.workflow.model; public enum BpmnStep { - NONE, - // exactly one outgoing sequence flow - TAKE_SEQUENCE_FLOW, - - // end event, no outgoing sequence flow - CONSUME_TOKEN, - - // xor-gateway - EXCLUSIVE_SPLIT, - - // parallel gateway - PARALLEL_SPLIT, - PARALLEL_MERGE, + // flow element container (process, sub process) + TRIGGER_START_EVENT, + TRIGGER_END_EVENT, + COMPLETE_PROCESS, + TERMINATE_CONTAINED_INSTANCES, // flow node START_FLOW_NODE, ACTIVATE_FLOW_NODE, COMPLETE_FLOW_NODE, TERMINATE_FLOW_NODE, + PROPAGATE_TERMINATION, + + CONSUME_TOKEN, + TAKE_SEQUENCE_FLOW, // activity ACTIVATE_ACTIVITY, COMPLETE_ACTIVITY, TERMINATE_ACTIVITY, - // boundary events - TRIGGER_BOUNDARY_EVENT, - + // service task CREATE_JOB, + TERMINATE_JOB_TASK, + // exclusive gateway ACTIVATE_GATEWAY, + EXCLUSIVE_SPLIT, - SUBSCRIBE_TO_INTERMEDIATE_MESSAGE, - - CREATE_TIMER, - - TRIGGER_END_EVENT, - TRIGGER_START_EVENT, + // parallel gateway + PARALLEL_SPLIT, + PARALLEL_MERGE, - TERMINATE_CONTAINED_INSTANCES, - TERMINATE_JOB_TASK, - TERMINATE_INTERMEDIATE_MESSAGE, - TERMINATE_TIMER, - TERMINATE_ELEMENT, - PROPAGATE_TERMINATION, + // event-based gateway + TRIGGER_EVENT_BASED_GATEWAY, - CANCEL_PROCESS, - COMPLETE_PROCESS, + // events + SUBSCRIBE_TO_EVENTS, + TRIGGER_EVENT, } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableCatchEvent.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableCatchEvent.java new file mode 100644 index 000000000000..0a3b6146d4ad --- /dev/null +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableCatchEvent.java @@ -0,0 +1,31 @@ +/* + * Zeebe Broker Core + * Copyright © 2017 camunda services GmbH (info@camunda.com) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.zeebe.broker.workflow.model.element; + +import java.time.Duration; + +public interface ExecutableCatchEvent extends ExecutableFlowElement { + + boolean isTimer(); + + boolean isMessage(); + + ExecutableMessage getMessage(); + + Duration getDuration(); +} diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableMessageCatchElement.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableCatchEventSupplier.java similarity index 84% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableMessageCatchElement.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableCatchEventSupplier.java index 620323effa6c..c8120390a575 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableMessageCatchElement.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableCatchEventSupplier.java @@ -17,7 +17,9 @@ */ package io.zeebe.broker.workflow.model.element; -public interface ExecutableMessageCatchElement extends ExecutableFlowElement { +import java.util.List; - ExecutableMessage getMessage(); +public interface ExecutableCatchEventSupplier extends ExecutableFlowElement { + + List getEvents(); } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/message/TerminateIntermediateMessageHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableEventBasedGateway.java similarity index 51% rename from broker-core/src/main/java/io/zeebe/broker/workflow/processor/message/TerminateIntermediateMessageHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableEventBasedGateway.java index 896a518cf60c..40eb26f7b886 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/message/TerminateIntermediateMessageHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableEventBasedGateway.java @@ -15,22 +15,25 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.processor.message; +package io.zeebe.broker.workflow.model.element; -import io.zeebe.broker.logstreams.state.ZeebeState; -import io.zeebe.broker.workflow.model.element.ExecutableIntermediateCatchElement; -import io.zeebe.broker.workflow.processor.BpmnStepContext; -import io.zeebe.broker.workflow.processor.flownode.TerminateFlowNodeHandler; +import java.util.List; -public class TerminateIntermediateMessageHandler - extends TerminateFlowNodeHandler { +public class ExecutableEventBasedGateway extends ExecutableFlowNode + implements ExecutableCatchEventSupplier { - public TerminateIntermediateMessageHandler(final ZeebeState zeebeState) { - super(zeebeState.getIncidentState()); + private List events; + + public ExecutableEventBasedGateway(String id) { + super(id); } @Override - protected void terminate(BpmnStepContext context) { - context.getCatchEventOutput().unsubscribeFromMessageEvents(context); + public List getEvents() { + return events; + } + + public void setEvents(List events) { + this.events = events; } } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableIntermediateCatchElement.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableIntermediateCatchElement.java index e1e5d5bc9ef6..7716362d833b 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableIntermediateCatchElement.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableIntermediateCatchElement.java @@ -18,9 +18,13 @@ package io.zeebe.broker.workflow.model.element; import java.time.Duration; +import java.util.Collections; +import java.util.List; public class ExecutableIntermediateCatchElement extends ExecutableFlowNode - implements ExecutableMessageCatchElement { + implements ExecutableCatchEvent, ExecutableCatchEventSupplier { + + private final List events = Collections.singletonList(this); private ExecutableMessage message; private Duration duration; @@ -38,6 +42,7 @@ public void setMessage(ExecutableMessage message) { this.message = message; } + @Override public Duration getDuration() { return duration; } @@ -45,4 +50,19 @@ public Duration getDuration() { public void setDuration(Duration duration) { this.duration = duration; } + + @Override + public boolean isTimer() { + return duration != null; + } + + @Override + public boolean isMessage() { + return message != null; + } + + @Override + public List getEvents() { + return events; + } } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableReceiveTask.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableReceiveTask.java index 8ca3e2593fcc..c0483fa3aa34 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableReceiveTask.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/element/ExecutableReceiveTask.java @@ -17,8 +17,14 @@ */ package io.zeebe.broker.workflow.model.element; +import java.time.Duration; +import java.util.Collections; +import java.util.List; + public class ExecutableReceiveTask extends ExecutableActivity - implements ExecutableMessageCatchElement { + implements ExecutableCatchEvent, ExecutableCatchEventSupplier { + + private final List events = Collections.singletonList(this); public ExecutableReceiveTask(String id) { super(id); @@ -34,4 +40,24 @@ public ExecutableMessage getMessage() { public void setMessage(ExecutableMessage message) { this.message = message; } + + @Override + public boolean isTimer() { + return false; + } + + @Override + public boolean isMessage() { + return true; + } + + @Override + public Duration getDuration() { + return null; + } + + @Override + public List getEvents() { + return events; + } } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/BpmnTransformer.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/BpmnTransformer.java index ed4c7850f538..5146306960eb 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/BpmnTransformer.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/BpmnTransformer.java @@ -18,22 +18,23 @@ package io.zeebe.broker.workflow.model.transformation; import io.zeebe.broker.workflow.model.element.ExecutableWorkflow; -import io.zeebe.broker.workflow.model.transformation.handler.ActivityHandler; -import io.zeebe.broker.workflow.model.transformation.handler.BoundaryEventHandler; -import io.zeebe.broker.workflow.model.transformation.handler.CreateWorkflowHandler; -import io.zeebe.broker.workflow.model.transformation.handler.EndEventHandler; -import io.zeebe.broker.workflow.model.transformation.handler.ExclusiveGatewayHandler; -import io.zeebe.broker.workflow.model.transformation.handler.FlowElementHandler; -import io.zeebe.broker.workflow.model.transformation.handler.FlowNodeHandler; -import io.zeebe.broker.workflow.model.transformation.handler.IntermediateCatchEventHandler; -import io.zeebe.broker.workflow.model.transformation.handler.MessageHandler; -import io.zeebe.broker.workflow.model.transformation.handler.ParallelGatewayHandler; -import io.zeebe.broker.workflow.model.transformation.handler.ProcessHandler; -import io.zeebe.broker.workflow.model.transformation.handler.ReceiveTaskHandler; -import io.zeebe.broker.workflow.model.transformation.handler.SequenceFlowHandler; -import io.zeebe.broker.workflow.model.transformation.handler.ServiceTaskHandler; -import io.zeebe.broker.workflow.model.transformation.handler.StartEventHandler; -import io.zeebe.broker.workflow.model.transformation.handler.SubProcessHandler; +import io.zeebe.broker.workflow.model.transformation.transformer.ActivityTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.BoundaryEventTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.ContextProcessTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.EndEventTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.EventBasedGatewayTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.ExclusiveGatewayTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.FlowElementInstantiationTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.FlowNodeTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.IntermediateCatchEventTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.MessageTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.ParallelGatewayTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.ProcessTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.ReceiveTaskTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.SequenceFlowTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.ServiceTaskTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.StartEventTransformer; +import io.zeebe.broker.workflow.model.transformation.transformer.SubProcessTransformer; import io.zeebe.model.bpmn.BpmnModelInstance; import io.zeebe.model.bpmn.traversal.ModelWalker; import io.zeebe.msgpack.jsonpath.JsonPathQueryCompiler; @@ -51,28 +52,37 @@ public class BpmnTransformer { */ private final TransformationVisitor step2Visitor; + /* + * Step 3: Modify elements based on the context + */ + private final TransformationVisitor step3Visitor; + private final JsonPathQueryCompiler jsonPathQueryCompiler = new JsonPathQueryCompiler(); public BpmnTransformer() { this.step1Visitor = new TransformationVisitor(); - step1Visitor.registerHandler(new FlowElementHandler()); - step1Visitor.registerHandler(new CreateWorkflowHandler()); - step1Visitor.registerHandler(new MessageHandler()); + step1Visitor.registerHandler(new FlowElementInstantiationTransformer()); + step1Visitor.registerHandler(new MessageTransformer()); + step1Visitor.registerHandler(new ProcessTransformer()); this.step2Visitor = new TransformationVisitor(); - step2Visitor.registerHandler(new ActivityHandler()); - step2Visitor.registerHandler(new BoundaryEventHandler()); - step2Visitor.registerHandler(new EndEventHandler()); - step2Visitor.registerHandler(new ExclusiveGatewayHandler()); - step2Visitor.registerHandler(new FlowNodeHandler()); - step2Visitor.registerHandler(new IntermediateCatchEventHandler()); - step2Visitor.registerHandler(new ParallelGatewayHandler()); - step2Visitor.registerHandler(new ProcessHandler()); - step2Visitor.registerHandler(new SequenceFlowHandler()); - step2Visitor.registerHandler(new ServiceTaskHandler()); - step2Visitor.registerHandler(new ReceiveTaskHandler()); - step2Visitor.registerHandler(new StartEventHandler()); - step2Visitor.registerHandler(new SubProcessHandler()); + step2Visitor.registerHandler(new ActivityTransformer()); + step2Visitor.registerHandler(new BoundaryEventTransformer()); + step2Visitor.registerHandler(new ContextProcessTransformer()); + step2Visitor.registerHandler(new EndEventTransformer()); + step2Visitor.registerHandler(new ExclusiveGatewayTransformer()); + step2Visitor.registerHandler(new FlowNodeTransformer()); + step2Visitor.registerHandler(new IntermediateCatchEventTransformer()); + step2Visitor.registerHandler(new ParallelGatewayTransformer()); + step2Visitor.registerHandler(new SequenceFlowTransformer()); + step2Visitor.registerHandler(new ServiceTaskTransformer()); + step2Visitor.registerHandler(new ReceiveTaskTransformer()); + step2Visitor.registerHandler(new StartEventTransformer()); + step2Visitor.registerHandler(new SubProcessTransformer()); + + this.step3Visitor = new TransformationVisitor(); + step3Visitor.registerHandler(new ContextProcessTransformer()); + step3Visitor.registerHandler(new EventBasedGatewayTransformer()); } public List transformDefinitions(BpmnModelInstance modelInstance) { @@ -86,6 +96,9 @@ public List transformDefinitions(BpmnModelInstance modelInst step2Visitor.setContext(context); walker.walk(step2Visitor); + step3Visitor.setContext(context); + walker.walk(step3Visitor); + return context.getWorkflows(); } } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ActivityHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ActivityTransformer.java similarity index 93% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ActivityHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ActivityTransformer.java index a4a47cb9fe9a..b03afa39e86a 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ActivityHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ActivityTransformer.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.BpmnStep; import io.zeebe.broker.workflow.model.element.ExecutableActivity; @@ -25,7 +25,7 @@ import io.zeebe.model.bpmn.instance.Activity; import io.zeebe.protocol.intent.WorkflowInstanceIntent; -public class ActivityHandler implements ModelElementTransformer { +public class ActivityTransformer implements ModelElementTransformer { @Override public Class getType() { return Activity.class; diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/BoundaryEventHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/BoundaryEventTransformer.java similarity index 95% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/BoundaryEventHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/BoundaryEventTransformer.java index b996628982c7..18a06dced0f3 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/BoundaryEventHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/BoundaryEventTransformer.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.BpmnStep; import io.zeebe.broker.workflow.model.element.ExecutableActivity; @@ -33,7 +33,7 @@ import io.zeebe.protocol.intent.WorkflowInstanceIntent; import java.time.Duration; -public class BoundaryEventHandler implements ModelElementTransformer { +public class BoundaryEventTransformer implements ModelElementTransformer { @Override public Class getType() { return BoundaryEvent.class; @@ -50,7 +50,7 @@ public void transform(BoundaryEvent event, TransformContext context) { attachToActivity(event, workflow, element); element.bindLifecycleState( - WorkflowInstanceIntent.CATCH_EVENT_TRIGGERING, BpmnStep.TRIGGER_BOUNDARY_EVENT); + WorkflowInstanceIntent.CATCH_EVENT_TRIGGERING, BpmnStep.TRIGGER_EVENT); element.bindLifecycleState( WorkflowInstanceIntent.CATCH_EVENT_TRIGGERED, context.getCurrentFlowNodeOutgoingStep()); } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/CreateWorkflowHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ContextProcessTransformer.java similarity index 81% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/CreateWorkflowHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ContextProcessTransformer.java index 81daada07570..a1a566ab44ab 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/CreateWorkflowHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ContextProcessTransformer.java @@ -15,14 +15,14 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.element.ExecutableWorkflow; import io.zeebe.broker.workflow.model.transformation.ModelElementTransformer; import io.zeebe.broker.workflow.model.transformation.TransformContext; import io.zeebe.model.bpmn.instance.Process; -public class CreateWorkflowHandler implements ModelElementTransformer { +public class ContextProcessTransformer implements ModelElementTransformer { @Override public Class getType() { @@ -31,10 +31,7 @@ public Class getType() { @Override public void transform(Process element, TransformContext context) { - - final String id = element.getId(); - final ExecutableWorkflow workflow = new ExecutableWorkflow(id); - context.addWorkflow(workflow); + final ExecutableWorkflow workflow = context.getWorkflow(element.getId()); context.setCurrentWorkflow(workflow); } } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/EndEventHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/EndEventTransformer.java similarity index 92% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/EndEventHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/EndEventTransformer.java index ce63fb2133e3..c213fc63355d 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/EndEventHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/EndEventTransformer.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.element.ExecutableFlowNode; import io.zeebe.broker.workflow.model.element.ExecutableWorkflow; @@ -24,7 +24,7 @@ import io.zeebe.model.bpmn.instance.EndEvent; import io.zeebe.protocol.intent.WorkflowInstanceIntent; -public class EndEventHandler implements ModelElementTransformer { +public class EndEventTransformer implements ModelElementTransformer { @Override public Class getType() { diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/EventBasedGatewayTransformer.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/EventBasedGatewayTransformer.java new file mode 100644 index 000000000000..a067ac08f056 --- /dev/null +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/EventBasedGatewayTransformer.java @@ -0,0 +1,78 @@ +/* + * Zeebe Broker Core + * Copyright © 2017 camunda services GmbH (info@camunda.com) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.zeebe.broker.workflow.model.transformation.transformer; + +import io.zeebe.broker.workflow.model.BpmnStep; +import io.zeebe.broker.workflow.model.element.ExecutableEventBasedGateway; +import io.zeebe.broker.workflow.model.element.ExecutableIntermediateCatchElement; +import io.zeebe.broker.workflow.model.element.ExecutableWorkflow; +import io.zeebe.broker.workflow.model.transformation.ModelElementTransformer; +import io.zeebe.broker.workflow.model.transformation.TransformContext; +import io.zeebe.model.bpmn.instance.EventBasedGateway; +import io.zeebe.protocol.intent.WorkflowInstanceIntent; +import java.util.List; +import java.util.stream.Collectors; + +public class EventBasedGatewayTransformer implements ModelElementTransformer { + + @Override + public Class getType() { + return EventBasedGateway.class; + } + + @Override + public void transform(EventBasedGateway element, TransformContext context) { + final ExecutableWorkflow workflow = context.getCurrentWorkflow(); + final ExecutableEventBasedGateway gateway = + workflow.getElementById(element.getId(), ExecutableEventBasedGateway.class); + + final List connectedEvents = + getConnectedCatchEvents(gateway); + gateway.setEvents(connectedEvents); + + bindLifecycle(element, gateway, context); + + // configure the lifecycle of the connected events + connectedEvents.forEach(event -> bindLifecycle(event, context)); + } + + private List getConnectedCatchEvents( + final ExecutableEventBasedGateway gateway) { + return gateway + .getOutgoing() + .stream() + .map(e -> (ExecutableIntermediateCatchElement) e.getTarget()) + .collect(Collectors.toList()); + } + + private void bindLifecycle( + EventBasedGateway element, + final ExecutableEventBasedGateway gateway, + TransformContext context) { + + gateway.bindLifecycleState( + WorkflowInstanceIntent.GATEWAY_ACTIVATED, BpmnStep.SUBSCRIBE_TO_EVENTS); + } + + private void bindLifecycle(ExecutableIntermediateCatchElement event, TransformContext context) { + event.bindLifecycleState( + WorkflowInstanceIntent.CATCH_EVENT_TRIGGERING, BpmnStep.TRIGGER_EVENT_BASED_GATEWAY); + event.bindLifecycleState( + WorkflowInstanceIntent.CATCH_EVENT_TRIGGERED, context.getCurrentFlowNodeOutgoingStep()); + } +} diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ExclusiveGatewayHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ExclusiveGatewayTransformer.java similarity index 95% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ExclusiveGatewayHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ExclusiveGatewayTransformer.java index 3c32ac2ae744..c1fa66f96b4c 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ExclusiveGatewayHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ExclusiveGatewayTransformer.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.BpmnStep; import io.zeebe.broker.workflow.model.element.ExecutableExclusiveGateway; @@ -28,7 +28,7 @@ import io.zeebe.protocol.intent.WorkflowInstanceIntent; import java.util.Collection; -public class ExclusiveGatewayHandler implements ModelElementTransformer { +public class ExclusiveGatewayTransformer implements ModelElementTransformer { @Override public Class getType() { diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/FlowElementHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/FlowElementInstantiationTransformer.java similarity index 88% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/FlowElementHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/FlowElementInstantiationTransformer.java index 1b3d427d5949..a6da5c81a460 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/FlowElementHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/FlowElementInstantiationTransformer.java @@ -15,11 +15,12 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.element.AbstractFlowElement; import io.zeebe.broker.workflow.model.element.ExecutableActivity; import io.zeebe.broker.workflow.model.element.ExecutableBoundaryEvent; +import io.zeebe.broker.workflow.model.element.ExecutableEventBasedGateway; import io.zeebe.broker.workflow.model.element.ExecutableExclusiveGateway; import io.zeebe.broker.workflow.model.element.ExecutableFlowElementContainer; import io.zeebe.broker.workflow.model.element.ExecutableFlowNode; @@ -33,6 +34,7 @@ import io.zeebe.model.bpmn.instance.Activity; import io.zeebe.model.bpmn.instance.BoundaryEvent; import io.zeebe.model.bpmn.instance.EndEvent; +import io.zeebe.model.bpmn.instance.EventBasedGateway; import io.zeebe.model.bpmn.instance.ExclusiveGateway; import io.zeebe.model.bpmn.instance.FlowElement; import io.zeebe.model.bpmn.instance.IntermediateCatchEvent; @@ -46,7 +48,7 @@ import java.util.Map; import java.util.function.Function; -public class FlowElementHandler implements ModelElementTransformer { +public class FlowElementInstantiationTransformer implements ModelElementTransformer { private static final Map, Function> ELEMENT_FACTORIES; @@ -56,6 +58,7 @@ public class FlowElementHandler implements ModelElementTransformer ELEMENT_FACTORIES.put(Activity.class, ExecutableActivity::new); ELEMENT_FACTORIES.put(BoundaryEvent.class, ExecutableBoundaryEvent::new); ELEMENT_FACTORIES.put(EndEvent.class, ExecutableFlowNode::new); + ELEMENT_FACTORIES.put(EventBasedGateway.class, ExecutableEventBasedGateway::new); ELEMENT_FACTORIES.put(ExclusiveGateway.class, ExecutableExclusiveGateway::new); ELEMENT_FACTORIES.put(IntermediateCatchEvent.class, ExecutableIntermediateCatchElement::new); ELEMENT_FACTORIES.put(ParallelGateway.class, ExecutableFlowNode::new); @@ -77,6 +80,10 @@ public void transform(FlowElement element, TransformContext context) { final Class elemenType = element.getElementType().getInstanceType(); final Function elementFactory = ELEMENT_FACTORIES.get(elemenType); + if (elementFactory == null) { + throw new IllegalStateException("no transformer found for element type: " + elemenType); + } + final AbstractFlowElement executableElement = elementFactory.apply(element.getId()); workflow.addFlowElement(executableElement); diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/FlowNodeHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/FlowNodeTransformer.java similarity index 95% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/FlowNodeHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/FlowNodeTransformer.java index e023f868a455..e5ce012b25ea 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/FlowNodeHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/FlowNodeTransformer.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.BpmnStep; import io.zeebe.broker.workflow.model.element.ExecutableFlowNode; @@ -30,7 +30,7 @@ import io.zeebe.msgpack.mapping.MappingBuilder; import java.util.Collection; -public class FlowNodeHandler implements ModelElementTransformer { +public class FlowNodeTransformer implements ModelElementTransformer { private final MappingBuilder mappingBuilder = new MappingBuilder(); diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/IntermediateCatchEventHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/IntermediateCatchEventTransformer.java similarity index 89% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/IntermediateCatchEventHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/IntermediateCatchEventTransformer.java index 571162deecf6..c786f1743d76 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/IntermediateCatchEventHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/IntermediateCatchEventTransformer.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.BpmnStep; import io.zeebe.broker.workflow.model.element.ExecutableIntermediateCatchElement; @@ -31,7 +31,7 @@ import io.zeebe.protocol.intent.WorkflowInstanceIntent; import java.time.Duration; -public class IntermediateCatchEventHandler +public class IntermediateCatchEventTransformer implements ModelElementTransformer { @Override @@ -47,9 +47,6 @@ public void transform(IntermediateCatchEvent element, TransformContext context) workflow.getElementById(element.getId(), ExecutableIntermediateCatchElement.class); final EventDefinition eventDefinition = element.getEventDefinitions().iterator().next(); - - bindDefaultLifecycle(context, executableElement); - if (eventDefinition instanceof MessageEventDefinition) { transformMessageEventDefinition( context, executableElement, (MessageEventDefinition) eventDefinition); @@ -57,6 +54,8 @@ public void transform(IntermediateCatchEvent element, TransformContext context) } else if (eventDefinition instanceof TimerEventDefinition) { transformTimerEventDefinition(executableElement, (TimerEventDefinition) eventDefinition); } + + bindLifecycle(context, executableElement); } private void transformMessageEventDefinition( @@ -67,11 +66,6 @@ private void transformMessageEventDefinition( final Message message = messageEventDefinition.getMessage(); final ExecutableMessage executableMessage = context.getMessage(message.getId()); executableElement.setMessage(executableMessage); - - executableElement.bindLifecycleState( - WorkflowInstanceIntent.ELEMENT_ACTIVATED, BpmnStep.SUBSCRIBE_TO_INTERMEDIATE_MESSAGE); - executableElement.bindLifecycleState( - WorkflowInstanceIntent.ELEMENT_TERMINATING, BpmnStep.TERMINATE_INTERMEDIATE_MESSAGE); } private void transformTimerEventDefinition( @@ -81,23 +75,22 @@ private void transformTimerEventDefinition( final String timeDuration = timerEventDefinition.getTimeDuration().getTextContent(); final Duration duration = Duration.parse(timeDuration); executableElement.setDuration(duration); - - executableElement.bindLifecycleState( - WorkflowInstanceIntent.ELEMENT_ACTIVATED, BpmnStep.CREATE_TIMER); - executableElement.bindLifecycleState( - WorkflowInstanceIntent.ELEMENT_TERMINATING, BpmnStep.TERMINATE_TIMER); } - private void bindDefaultLifecycle( + private void bindLifecycle( TransformContext context, ExecutableIntermediateCatchElement executableElement) { executableElement.bindLifecycleState( WorkflowInstanceIntent.ELEMENT_READY, BpmnStep.ACTIVATE_FLOW_NODE); + executableElement.bindLifecycleState( + WorkflowInstanceIntent.ELEMENT_ACTIVATED, BpmnStep.SUBSCRIBE_TO_EVENTS); executableElement.bindLifecycleState( WorkflowInstanceIntent.ELEMENT_COMPLETING, BpmnStep.COMPLETE_FLOW_NODE); executableElement.bindLifecycleState( WorkflowInstanceIntent.ELEMENT_COMPLETED, context.getCurrentFlowNodeOutgoingStep()); + executableElement.bindLifecycleState( + WorkflowInstanceIntent.ELEMENT_TERMINATING, BpmnStep.TERMINATE_ACTIVITY); executableElement.bindLifecycleState( WorkflowInstanceIntent.ELEMENT_TERMINATED, BpmnStep.PROPAGATE_TERMINATION); } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/MessageHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/MessageTransformer.java similarity index 93% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/MessageHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/MessageTransformer.java index 7c64ed98ce3a..fed84be6457b 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/MessageHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/MessageTransformer.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.element.ExecutableMessage; import io.zeebe.broker.workflow.model.transformation.ModelElementTransformer; @@ -26,7 +26,7 @@ import io.zeebe.msgpack.jsonpath.JsonPathQueryCompiler; import io.zeebe.util.buffer.BufferUtil; -public class MessageHandler implements ModelElementTransformer { +public class MessageTransformer implements ModelElementTransformer { @Override public Class getType() { diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ParallelGatewayHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ParallelGatewayTransformer.java similarity index 91% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ParallelGatewayHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ParallelGatewayTransformer.java index 9617285aa141..5f16311108f7 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ParallelGatewayHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ParallelGatewayTransformer.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.element.ExecutableFlowNode; import io.zeebe.broker.workflow.model.element.ExecutableWorkflow; @@ -24,7 +24,7 @@ import io.zeebe.model.bpmn.instance.ParallelGateway; import io.zeebe.protocol.intent.WorkflowInstanceIntent; -public class ParallelGatewayHandler implements ModelElementTransformer { +public class ParallelGatewayTransformer implements ModelElementTransformer { @Override public Class getType() { diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ProcessHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ProcessTransformer.java similarity index 86% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ProcessHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ProcessTransformer.java index 2a15bcf33f98..ed2d36725212 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ProcessHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ProcessTransformer.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.BpmnStep; import io.zeebe.broker.workflow.model.element.ExecutableWorkflow; @@ -24,7 +24,7 @@ import io.zeebe.model.bpmn.instance.Process; import io.zeebe.protocol.intent.WorkflowInstanceIntent; -public class ProcessHandler implements ModelElementTransformer { +public class ProcessTransformer implements ModelElementTransformer { @Override public Class getType() { @@ -33,7 +33,10 @@ public Class getType() { @Override public void transform(Process element, TransformContext context) { - final ExecutableWorkflow workflow = context.getWorkflow(element.getId()); + + final String id = element.getId(); + final ExecutableWorkflow workflow = new ExecutableWorkflow(id); + context.addWorkflow(workflow); context.setCurrentWorkflow(workflow); workflow.bindLifecycleState(WorkflowInstanceIntent.ELEMENT_READY, BpmnStep.ACTIVATE_FLOW_NODE); diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ReceiveTaskHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ReceiveTaskTransformer.java similarity index 92% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ReceiveTaskHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ReceiveTaskTransformer.java index cd7ef2b8fb15..4e80bbb18e86 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ReceiveTaskHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ReceiveTaskTransformer.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.BpmnStep; import io.zeebe.broker.workflow.model.element.ExecutableMessage; @@ -27,7 +27,7 @@ import io.zeebe.model.bpmn.instance.ReceiveTask; import io.zeebe.protocol.intent.WorkflowInstanceIntent; -public class ReceiveTaskHandler implements ModelElementTransformer { +public class ReceiveTaskTransformer implements ModelElementTransformer { @Override public Class getType() { @@ -54,6 +54,6 @@ public void transform(ReceiveTask element, TransformContext context) { private void bindLifecycle( TransformContext context, final ExecutableReceiveTask executableElement) { executableElement.bindLifecycleState( - WorkflowInstanceIntent.ELEMENT_ACTIVATED, BpmnStep.SUBSCRIBE_TO_INTERMEDIATE_MESSAGE); + WorkflowInstanceIntent.ELEMENT_ACTIVATED, BpmnStep.SUBSCRIBE_TO_EVENTS); } } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/SequenceFlowHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/SequenceFlowTransformer.java similarity index 93% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/SequenceFlowHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/SequenceFlowTransformer.java index 493b7df71dd4..a8f0ec32d8ac 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/SequenceFlowHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/SequenceFlowTransformer.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.BpmnStep; import io.zeebe.broker.workflow.model.element.ExecutableFlowNode; @@ -26,6 +26,7 @@ import io.zeebe.model.bpmn.instance.Activity; import io.zeebe.model.bpmn.instance.ConditionExpression; import io.zeebe.model.bpmn.instance.EndEvent; +import io.zeebe.model.bpmn.instance.EventBasedGateway; import io.zeebe.model.bpmn.instance.ExclusiveGateway; import io.zeebe.model.bpmn.instance.FlowNode; import io.zeebe.model.bpmn.instance.IntermediateCatchEvent; @@ -35,7 +36,7 @@ import io.zeebe.msgpack.el.JsonConditionFactory; import io.zeebe.protocol.intent.WorkflowInstanceIntent; -public class SequenceFlowHandler implements ModelElementTransformer { +public class SequenceFlowTransformer implements ModelElementTransformer { @Override public Class getType() { @@ -61,16 +62,20 @@ private void bindLifecycle( if (target instanceof Activity || target instanceof IntermediateCatchEvent) { step = BpmnStep.START_FLOW_NODE; - } else if (target instanceof ExclusiveGateway) { + + } else if (target instanceof ExclusiveGateway || target instanceof EventBasedGateway) { step = BpmnStep.ACTIVATE_GATEWAY; + } else if (target instanceof ParallelGateway) { if (target.getIncoming().size() == 1) { step = BpmnStep.ACTIVATE_GATEWAY; } else { step = BpmnStep.PARALLEL_MERGE; } + } else if (target instanceof EndEvent) { step = BpmnStep.TRIGGER_END_EVENT; + } else { throw new RuntimeException("Unsupported element"); } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ServiceTaskHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ServiceTaskTransformer.java similarity index 96% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ServiceTaskHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ServiceTaskTransformer.java index 9eca08c95375..16d1ad8ab1e3 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/ServiceTaskHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ServiceTaskTransformer.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import static io.zeebe.util.buffer.BufferUtil.wrapString; @@ -36,7 +36,7 @@ import org.agrona.MutableDirectBuffer; import org.agrona.concurrent.UnsafeBuffer; -public class ServiceTaskHandler implements ModelElementTransformer { +public class ServiceTaskTransformer implements ModelElementTransformer { private static final int INITIAL_SIZE_KEY_VALUE_PAIR = 128; diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/StartEventHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/StartEventTransformer.java similarity index 93% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/StartEventHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/StartEventTransformer.java index f6cfd1be1897..d65f47bc040f 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/StartEventHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/StartEventTransformer.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.element.ExecutableFlowElementContainer; import io.zeebe.broker.workflow.model.element.ExecutableFlowNode; @@ -26,7 +26,7 @@ import io.zeebe.model.bpmn.instance.StartEvent; import io.zeebe.protocol.intent.WorkflowInstanceIntent; -public class StartEventHandler implements ModelElementTransformer { +public class StartEventTransformer implements ModelElementTransformer { @Override public Class getType() { diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/SubProcessHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/SubProcessTransformer.java similarity index 92% rename from broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/SubProcessHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/SubProcessTransformer.java index 2fac30c3e208..084c7ea0b404 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/handler/SubProcessHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/SubProcessTransformer.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.model.transformation.handler; +package io.zeebe.broker.workflow.model.transformation.transformer; import io.zeebe.broker.workflow.model.BpmnStep; import io.zeebe.broker.workflow.model.element.ExecutableFlowElementContainer; @@ -25,7 +25,7 @@ import io.zeebe.model.bpmn.instance.SubProcess; import io.zeebe.protocol.intent.WorkflowInstanceIntent; -public class SubProcessHandler implements ModelElementTransformer { +public class SubProcessTransformer implements ModelElementTransformer { @Override public Class getType() { diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/BpmnStepHandlers.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/BpmnStepHandlers.java index 7244235c5488..34b898cbe2e8 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/BpmnStepHandlers.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/BpmnStepHandlers.java @@ -21,93 +21,87 @@ import io.zeebe.broker.logstreams.state.ZeebeState; import io.zeebe.broker.workflow.model.BpmnStep; import io.zeebe.broker.workflow.model.element.ExecutableFlowElement; +import io.zeebe.broker.workflow.model.element.ExecutableFlowNode; import io.zeebe.broker.workflow.processor.activity.ActivateActivityHandler; import io.zeebe.broker.workflow.processor.activity.CompleteActivityHandler; import io.zeebe.broker.workflow.processor.activity.TerminateActivityHandler; -import io.zeebe.broker.workflow.processor.boundary.TriggerBoundaryEventHandler; +import io.zeebe.broker.workflow.processor.event.SubscribeEventHandler; +import io.zeebe.broker.workflow.processor.event.TriggerEventHandler; import io.zeebe.broker.workflow.processor.flownode.ActivateFlowNodeHandler; import io.zeebe.broker.workflow.processor.flownode.CompleteFlowNodeHandler; import io.zeebe.broker.workflow.processor.flownode.ConsumeTokenHandler; import io.zeebe.broker.workflow.processor.flownode.PropagateTerminationHandler; -import io.zeebe.broker.workflow.processor.flownode.TakeSequenceFlowHandler; -import io.zeebe.broker.workflow.processor.flownode.TerminateFlowNodeHandler; import io.zeebe.broker.workflow.processor.gateway.ExclusiveSplitHandler; import io.zeebe.broker.workflow.processor.gateway.ParallelSplitHandler; -import io.zeebe.broker.workflow.processor.message.MessageCatchElementHandler; -import io.zeebe.broker.workflow.processor.message.TerminateIntermediateMessageHandler; +import io.zeebe.broker.workflow.processor.gateway.TriggerEventBasedGatewayHandler; import io.zeebe.broker.workflow.processor.process.CompleteProcessHandler; -import io.zeebe.broker.workflow.processor.sequenceflow.ActivateGatewayHandler; import io.zeebe.broker.workflow.processor.sequenceflow.ParallelMergeHandler; import io.zeebe.broker.workflow.processor.sequenceflow.StartFlowNodeHandler; -import io.zeebe.broker.workflow.processor.sequenceflow.TriggerEndEventHandler; +import io.zeebe.broker.workflow.processor.sequenceflow.TakeSequenceFlowHandler; import io.zeebe.broker.workflow.processor.servicetask.CreateJobHandler; import io.zeebe.broker.workflow.processor.servicetask.TerminateServiceTaskHandler; import io.zeebe.broker.workflow.processor.subprocess.TerminateContainedElementsHandler; import io.zeebe.broker.workflow.processor.subprocess.TriggerStartEventHandler; -import io.zeebe.broker.workflow.processor.timer.CreateTimerHandler; -import io.zeebe.broker.workflow.processor.timer.TerminateTimerHandler; import io.zeebe.broker.workflow.state.WorkflowState; import io.zeebe.protocol.intent.WorkflowInstanceIntent; import java.util.EnumMap; import java.util.Map; public class BpmnStepHandlers { - private final Map stepHandlers = new EnumMap<>(BpmnStep.class); + private final Map> stepHandlers = new EnumMap<>(BpmnStep.class); public BpmnStepHandlers(WorkflowState workflowState, ZeebeState zeebeState) { final IncidentState incidentState = zeebeState.getIncidentState(); + // flow element container (process, sub process) + stepHandlers.put(BpmnStep.TRIGGER_START_EVENT, new TriggerStartEventHandler()); + stepHandlers.put(BpmnStep.COMPLETE_PROCESS, new CompleteProcessHandler()); + stepHandlers.put( + BpmnStep.TERMINATE_CONTAINED_INSTANCES, new TerminateContainedElementsHandler(zeebeState)); + + // flow nodes + stepHandlers.put( + BpmnStep.ACTIVATE_FLOW_NODE, new ActivateFlowNodeHandler()); + stepHandlers.put(BpmnStep.COMPLETE_FLOW_NODE, new CompleteFlowNodeHandler()); + stepHandlers.put(BpmnStep.TAKE_SEQUENCE_FLOW, new TakeSequenceFlowHandler()); + stepHandlers.put(BpmnStep.CONSUME_TOKEN, new ConsumeTokenHandler()); + stepHandlers.put(BpmnStep.PROPAGATE_TERMINATION, new PropagateTerminationHandler()); + // activity stepHandlers.put(BpmnStep.ACTIVATE_ACTIVITY, new ActivateActivityHandler()); stepHandlers.put(BpmnStep.COMPLETE_ACTIVITY, new CompleteActivityHandler()); stepHandlers.put(BpmnStep.TERMINATE_ACTIVITY, new TerminateActivityHandler(incidentState)); - // boundary events - stepHandlers.put(BpmnStep.TRIGGER_BOUNDARY_EVENT, new TriggerBoundaryEventHandler()); - // service task stepHandlers.put(BpmnStep.CREATE_JOB, new CreateJobHandler()); + stepHandlers.put(BpmnStep.TERMINATE_JOB_TASK, new TerminateServiceTaskHandler(zeebeState)); // exclusive gateway stepHandlers.put(BpmnStep.EXCLUSIVE_SPLIT, new ExclusiveSplitHandler()); - // flow nodes - stepHandlers.put(BpmnStep.CONSUME_TOKEN, new ConsumeTokenHandler()); - stepHandlers.put(BpmnStep.START_FLOW_NODE, new StartFlowNodeHandler()); - stepHandlers.put(BpmnStep.ACTIVATE_FLOW_NODE, new ActivateFlowNodeHandler()); - stepHandlers.put(BpmnStep.COMPLETE_FLOW_NODE, new CompleteFlowNodeHandler()); - stepHandlers.put(BpmnStep.TERMINATE_ELEMENT, new TerminateFlowNodeHandler(incidentState)); - - // sequence flow - stepHandlers.put(BpmnStep.TAKE_SEQUENCE_FLOW, new TakeSequenceFlowHandler()); - stepHandlers.put(BpmnStep.ACTIVATE_GATEWAY, new ActivateGatewayHandler()); - stepHandlers.put(BpmnStep.TRIGGER_END_EVENT, new TriggerEndEventHandler()); - stepHandlers.put(BpmnStep.PARALLEL_MERGE, new ParallelMergeHandler(workflowState)); - - // flow element container (process, sub process) - stepHandlers.put(BpmnStep.TRIGGER_START_EVENT, new TriggerStartEventHandler()); - // parallel gateway stepHandlers.put(BpmnStep.PARALLEL_SPLIT, new ParallelSplitHandler()); - // termination - stepHandlers.put(BpmnStep.TERMINATE_JOB_TASK, new TerminateServiceTaskHandler(zeebeState)); - stepHandlers.put(BpmnStep.TERMINATE_TIMER, new TerminateTimerHandler(zeebeState)); - stepHandlers.put( - BpmnStep.TERMINATE_INTERMEDIATE_MESSAGE, - new TerminateIntermediateMessageHandler(zeebeState)); - stepHandlers.put( - BpmnStep.TERMINATE_CONTAINED_INSTANCES, new TerminateContainedElementsHandler(zeebeState)); - stepHandlers.put(BpmnStep.PROPAGATE_TERMINATION, new PropagateTerminationHandler()); + // event-based gateway + stepHandlers.put(BpmnStep.TRIGGER_EVENT_BASED_GATEWAY, new TriggerEventBasedGatewayHandler()); - // intermediate catch event - stepHandlers.put(BpmnStep.SUBSCRIBE_TO_INTERMEDIATE_MESSAGE, new MessageCatchElementHandler()); - stepHandlers.put(BpmnStep.CREATE_TIMER, new CreateTimerHandler()); + // events + stepHandlers.put(BpmnStep.SUBSCRIBE_TO_EVENTS, new SubscribeEventHandler()); + stepHandlers.put(BpmnStep.TRIGGER_EVENT, new TriggerEventHandler()); - // process - stepHandlers.put(BpmnStep.COMPLETE_PROCESS, new CompleteProcessHandler()); + // sequence flow + stepHandlers.put( + BpmnStep.START_FLOW_NODE, new StartFlowNodeHandler(WorkflowInstanceIntent.ELEMENT_READY)); + stepHandlers.put( + BpmnStep.TRIGGER_END_EVENT, + new StartFlowNodeHandler(WorkflowInstanceIntent.END_EVENT_OCCURRED)); + stepHandlers.put( + BpmnStep.ACTIVATE_GATEWAY, + new StartFlowNodeHandler(WorkflowInstanceIntent.GATEWAY_ACTIVATED)); + stepHandlers.put(BpmnStep.PARALLEL_MERGE, new ParallelMergeHandler(workflowState)); } + @SuppressWarnings({"unchecked", "rawtypes"}) public void handle(BpmnStepContext context) { final ExecutableFlowElement flowElement = context.getElement(); final WorkflowInstanceIntent state = diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/CatchEventOutput.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/CatchEventOutput.java index 4c4cb2900f3d..ab44427083b2 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/CatchEventOutput.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/CatchEventOutput.java @@ -24,9 +24,8 @@ import io.zeebe.broker.logstreams.state.ZeebeState; import io.zeebe.broker.subscription.command.SubscriptionCommandSender; import io.zeebe.broker.workflow.data.TimerRecord; -import io.zeebe.broker.workflow.model.element.ExecutableIntermediateCatchElement; +import io.zeebe.broker.workflow.model.element.ExecutableCatchEvent; import io.zeebe.broker.workflow.model.element.ExecutableMessage; -import io.zeebe.broker.workflow.model.element.ExecutableMessageCatchElement; import io.zeebe.broker.workflow.processor.boundary.BoundaryEventHelper; import io.zeebe.broker.workflow.state.ElementInstance; import io.zeebe.broker.workflow.state.EventTrigger; @@ -53,22 +52,20 @@ public CatchEventOutput(ZeebeState state, SubscriptionCommandSender subscription this.subscriptionCommandSender = subscriptionCommandSender; } - public void unsubscribeFromCatchEvents(BpmnStepContext context) { + public void unsubscribeFromCatchEvents(long elementInstanceKey, BpmnStepContext context) { // at the moment, the way the state is handled we don't need specific event information to // unsubscribe from an event trigger, but once messages are supported it will be necessary. - unsubscribeFromTimerEvents( - context.getElementInstance().getKey(), context.getOutput().getStreamWriter()); - unsubscribeFromMessageEvents(context); + unsubscribeFromTimerEvents(elementInstanceKey, context.getOutput().getStreamWriter()); + unsubscribeFromMessageEvents(elementInstanceKey, context); } public void subscribeToCatchEvents( - BpmnStepContext context, final List events) { - - for (final ExecutableIntermediateCatchElement event : events) { - if (event.getDuration() != null) { + BpmnStepContext context, final List events) { + for (final ExecutableCatchEvent event : events) { + if (event.isTimer()) { subscribeToTimerEvent( - context.getElementInstance(), event, context.getOutput().getStreamWriter()); - } else if (event.getMessage() != null) { + context.getRecord().getKey(), event, context.getOutput().getStreamWriter()); + } else if (event.isMessage()) { subscribeToMessageEvent(context, event); } } @@ -90,12 +87,12 @@ public void triggerBoundaryEventFromInterruptedElement( private final TimerRecord timerRecord = new TimerRecord(); public void subscribeToTimerEvent( - ElementInstance element, ExecutableIntermediateCatchElement event, TypedStreamWriter writer) { + long elementInstanceKey, ExecutableCatchEvent event, TypedStreamWriter writer) { final Duration duration = event.getDuration(); final long dueDate = ActorClock.currentTimeMillis() + duration.toMillis(); timerRecord - .setElementInstanceKey(element.getKey()) + .setElementInstanceKey(elementInstanceKey) .setDueDate(dueDate) .setHandlerNodeId(event.getId()); writer.appendNewCommand(TimerIntent.CREATE, timerRecord); @@ -122,8 +119,7 @@ public void unsubscribeFromTimerEvents(long elementInstanceKey, TypedStreamWrite private final MsgPackQueryProcessor queryProcessor = new MsgPackQueryProcessor(); private WorkflowInstanceSubscription subscription = new WorkflowInstanceSubscription(); - public void subscribeToMessageEvent( - BpmnStepContext context, ExecutableMessageCatchElement handler) { + public void subscribeToMessageEvent(BpmnStepContext context, ExecutableCatchEvent handler) { final ExecutableMessage message = handler.getMessage(); final DirectBuffer extractedKey = extractCorrelationKey(context, message); @@ -132,7 +128,7 @@ public void subscribeToMessageEvent( } final long workflowInstanceKey = context.getValue().getWorkflowInstanceKey(); - final long elementInstanceKey = context.getElementInstance().getKey(); + final long elementInstanceKey = context.getRecord().getKey(); final DirectBuffer messageName = cloneBuffer(message.getMessageName()); final DirectBuffer correlationKey = cloneBuffer(extractedKey); @@ -152,12 +148,11 @@ public void subscribeToMessageEvent( workflowInstanceKey, elementInstanceKey, messageName, correlationKey)); } - public void unsubscribeFromMessageEvents(BpmnStepContext context) { + public void unsubscribeFromMessageEvents(long elementInstanceKey, BpmnStepContext context) { state .getWorkflowInstanceSubscriptionState() .visitElementSubscriptions( - context.getElementInstance().getKey(), - sub -> unsubscribeFromMessageEvent(context, sub)); + elementInstanceKey, sub -> unsubscribeFromMessageEvent(context, sub)); } private boolean unsubscribeFromMessageEvent( diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/WorkflowEventProcessors.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/WorkflowEventProcessors.java index 69a8ebb6f2cf..ccb6c23bf891 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/WorkflowEventProcessors.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/WorkflowEventProcessors.java @@ -35,9 +35,22 @@ import io.zeebe.protocol.intent.TimerIntent; import io.zeebe.protocol.intent.WorkflowInstanceIntent; import io.zeebe.protocol.intent.WorkflowInstanceSubscriptionIntent; +import java.util.Arrays; +import java.util.List; public class WorkflowEventProcessors { + private static final List WORKFLOW_INSTANCE_COMMANDS = + Arrays.asList( + WorkflowInstanceIntent.CREATE, + WorkflowInstanceIntent.CANCEL, + WorkflowInstanceIntent.UPDATE_PAYLOAD); + + private static boolean isWorkflowInstanceEvent(WorkflowInstanceIntent intent) { + return !WORKFLOW_INSTANCE_COMMANDS.contains(intent) + && intent != WorkflowInstanceIntent.PAYLOAD_UPDATED; + } + public static BpmnStepProcessor addWorkflowProcessors( TypedEventStreamProcessorBuilder typedProcessorBuilder, ZeebeState zeebeState, @@ -75,63 +88,20 @@ private static void addWorkflowInstanceCommandProcessor( final WorkflowInstanceCommandProcessor commandProcessor = new WorkflowInstanceCommandProcessor(workflowEngineState); - builder - .onCommand(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.CREATE, commandProcessor) - .onCommand(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.CANCEL, commandProcessor) - .onCommand( - ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.UPDATE_PAYLOAD, commandProcessor); + WORKFLOW_INSTANCE_COMMANDS.forEach( + intent -> builder.onCommand(ValueType.WORKFLOW_INSTANCE, intent, commandProcessor)); } private static void addBpmnStepProcessor( TypedEventStreamProcessorBuilder streamProcessorBuilder, BpmnStepProcessor bpmnStepProcessor) { - streamProcessorBuilder - .onEvent( - ValueType.WORKFLOW_INSTANCE, - WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN, - bpmnStepProcessor) - .onEvent( - ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_READY, bpmnStepProcessor) - .onEvent( - ValueType.WORKFLOW_INSTANCE, - WorkflowInstanceIntent.ELEMENT_ACTIVATED, - bpmnStepProcessor) - .onEvent( - ValueType.WORKFLOW_INSTANCE, - WorkflowInstanceIntent.ELEMENT_COMPLETING, - bpmnStepProcessor) - .onEvent( - ValueType.WORKFLOW_INSTANCE, - WorkflowInstanceIntent.START_EVENT_OCCURRED, - bpmnStepProcessor) - .onEvent( - ValueType.WORKFLOW_INSTANCE, - WorkflowInstanceIntent.END_EVENT_OCCURRED, - bpmnStepProcessor) - .onEvent( - ValueType.WORKFLOW_INSTANCE, - WorkflowInstanceIntent.GATEWAY_ACTIVATED, - bpmnStepProcessor) - .onEvent( - ValueType.WORKFLOW_INSTANCE, - WorkflowInstanceIntent.ELEMENT_COMPLETED, - bpmnStepProcessor) - .onEvent( - ValueType.WORKFLOW_INSTANCE, - WorkflowInstanceIntent.ELEMENT_TERMINATING, - bpmnStepProcessor) - .onEvent( - ValueType.WORKFLOW_INSTANCE, - WorkflowInstanceIntent.ELEMENT_TERMINATED, - bpmnStepProcessor) - .onEvent( - ValueType.WORKFLOW_INSTANCE, - WorkflowInstanceIntent.CATCH_EVENT_TRIGGERING, - bpmnStepProcessor) - .onEvent( - ValueType.WORKFLOW_INSTANCE, - WorkflowInstanceIntent.CATCH_EVENT_TRIGGERED, - bpmnStepProcessor); + + Arrays.stream(WorkflowInstanceIntent.values()) + .filter(WorkflowEventProcessors::isWorkflowInstanceEvent) + .forEach( + intent -> + streamProcessorBuilder.onEvent( + ValueType.WORKFLOW_INSTANCE, intent, bpmnStepProcessor)); } private static void addMessageStreamProcessors( diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/activity/CompleteActivityHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/activity/CompleteActivityHandler.java index 6be382d8bb74..c206a2e00838 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/activity/CompleteActivityHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/activity/CompleteActivityHandler.java @@ -17,14 +17,14 @@ */ package io.zeebe.broker.workflow.processor.activity; -import io.zeebe.broker.workflow.model.element.ExecutableActivity; +import io.zeebe.broker.workflow.model.element.ExecutableFlowNode; import io.zeebe.broker.workflow.processor.BpmnStepContext; import io.zeebe.broker.workflow.processor.flownode.CompleteFlowNodeHandler; -public class CompleteActivityHandler extends CompleteFlowNodeHandler { +public class CompleteActivityHandler extends CompleteFlowNodeHandler { @Override - public void complete(BpmnStepContext context) { + public void complete(BpmnStepContext context) { super.complete(context); - context.getCatchEventOutput().unsubscribeFromCatchEvents(context); + context.getCatchEventOutput().unsubscribeFromCatchEvents(context.getRecord().getKey(), context); } } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/activity/TerminateActivityHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/activity/TerminateActivityHandler.java index 4759a9ef694e..cb9f42364d85 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/activity/TerminateActivityHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/activity/TerminateActivityHandler.java @@ -18,20 +18,19 @@ package io.zeebe.broker.workflow.processor.activity; import io.zeebe.broker.incident.processor.IncidentState; -import io.zeebe.broker.workflow.model.element.ExecutableActivity; +import io.zeebe.broker.workflow.model.element.ExecutableFlowNode; import io.zeebe.broker.workflow.processor.BpmnStepContext; import io.zeebe.broker.workflow.processor.flownode.TerminateFlowNodeHandler; -public class TerminateActivityHandler - extends TerminateFlowNodeHandler { +public class TerminateActivityHandler extends TerminateFlowNodeHandler { public TerminateActivityHandler(IncidentState incidentState) { super(incidentState); } @Override - protected void terminate(BpmnStepContext context) { + protected void terminate(BpmnStepContext context) { super.terminate(context); - context.getCatchEventOutput().unsubscribeFromCatchEvents(context); + context.getCatchEventOutput().unsubscribeFromCatchEvents(context.getRecord().getKey(), context); } } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/timer/CreateTimerHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/event/SubscribeEventHandler.java similarity index 57% rename from broker-core/src/main/java/io/zeebe/broker/workflow/processor/timer/CreateTimerHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/processor/event/SubscribeEventHandler.java index e2fe1d66861b..18ce22498bd4 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/timer/CreateTimerHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/event/SubscribeEventHandler.java @@ -15,20 +15,23 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.processor.timer; +package io.zeebe.broker.workflow.processor.event; -import io.zeebe.broker.workflow.model.element.ExecutableIntermediateCatchElement; +import io.zeebe.broker.workflow.model.element.ExecutableCatchEventSupplier; import io.zeebe.broker.workflow.processor.BpmnStepContext; import io.zeebe.broker.workflow.processor.BpmnStepHandler; -public class CreateTimerHandler implements BpmnStepHandler { +public class SubscribeEventHandler implements BpmnStepHandler { + @Override - public void handle(BpmnStepContext context) { - context - .getCatchEventOutput() - .subscribeToTimerEvent( - context.getElementInstance(), - context.getElement(), - context.getOutput().getStreamWriter()); + public void handle(final BpmnStepContext context) { + final ExecutableCatchEventSupplier supplier = context.getElement(); + + context.getCatchEventOutput().subscribeToCatchEvents(context, supplier.getEvents()); + + // TODO ignore intermediate catch events because they are treater as activities - #1698 + if (context.getElementInstance() == null) { + context.getOutput().deferEvent(context.getRecord()); + } } } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/boundary/TriggerBoundaryEventHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/event/TriggerEventHandler.java similarity index 83% rename from broker-core/src/main/java/io/zeebe/broker/workflow/processor/boundary/TriggerBoundaryEventHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/processor/event/TriggerEventHandler.java index 7ed26bb494f2..b655103489e3 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/boundary/TriggerBoundaryEventHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/event/TriggerEventHandler.java @@ -15,9 +15,9 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.processor.boundary; +package io.zeebe.broker.workflow.processor.event; -import io.zeebe.broker.workflow.model.element.ExecutableBoundaryEvent; +import io.zeebe.broker.workflow.model.element.ExecutableIntermediateCatchElement; import io.zeebe.broker.workflow.processor.BpmnStepContext; import io.zeebe.broker.workflow.processor.BpmnStepHandler; import io.zeebe.broker.workflow.processor.flownode.IOMappingHelper; @@ -25,11 +25,11 @@ import io.zeebe.protocol.impl.record.value.incident.ErrorType; import io.zeebe.protocol.intent.WorkflowInstanceIntent; -public class TriggerBoundaryEventHandler implements BpmnStepHandler { +public class TriggerEventHandler implements BpmnStepHandler { private final IOMappingHelper ioMappingHelper = new IOMappingHelper(); @Override - public void handle(BpmnStepContext context) { + public void handle(BpmnStepContext context) { try { ioMappingHelper.applyOutputMappings(context); diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/message/MessageCatchElementHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/event/UnsubscribeEventHandler.java similarity index 68% rename from broker-core/src/main/java/io/zeebe/broker/workflow/processor/message/MessageCatchElementHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/processor/event/UnsubscribeEventHandler.java index 95828fdd251b..c986a9761d77 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/message/MessageCatchElementHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/event/UnsubscribeEventHandler.java @@ -15,15 +15,16 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.processor.message; +package io.zeebe.broker.workflow.processor.event; -import io.zeebe.broker.workflow.model.element.ExecutableMessageCatchElement; +import io.zeebe.broker.workflow.model.element.ExecutableCatchEventSupplier; import io.zeebe.broker.workflow.processor.BpmnStepContext; import io.zeebe.broker.workflow.processor.BpmnStepHandler; -public class MessageCatchElementHandler implements BpmnStepHandler { +public class UnsubscribeEventHandler implements BpmnStepHandler { + @Override - public void handle(final BpmnStepContext context) { - context.getCatchEventOutput().subscribeToMessageEvent(context, context.getElement()); + public void handle(final BpmnStepContext context) { + context.getCatchEventOutput().unsubscribeFromCatchEvents(context.getRecord().getKey(), context); } } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/CompleteFlowNodeHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/CompleteFlowNodeHandler.java index bb7ec22146e2..2dc856c97743 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/CompleteFlowNodeHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/CompleteFlowNodeHandler.java @@ -24,11 +24,11 @@ import io.zeebe.protocol.impl.record.value.incident.ErrorType; import io.zeebe.protocol.intent.WorkflowInstanceIntent; -public class CompleteFlowNodeHandler implements BpmnStepHandler { +public class CompleteFlowNodeHandler implements BpmnStepHandler { private final IOMappingHelper ioMappingHelper = new IOMappingHelper(); @Override - public void handle(BpmnStepContext context) { + public void handle(BpmnStepContext context) { try { ioMappingHelper.applyOutputMappings(context); @@ -50,5 +50,5 @@ public void handle(BpmnStepContext context) { * * @param context current processor context */ - public void complete(BpmnStepContext context) {} + public void complete(BpmnStepContext context) {} } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/ConsumeTokenHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/ConsumeTokenHandler.java index 8c3563162b8f..9a09c15bae4b 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/ConsumeTokenHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/ConsumeTokenHandler.java @@ -34,6 +34,9 @@ public void handle(BpmnStepContext context) { final ElementInstance scopeInstance = context.getFlowScopeInstance(); final WorkflowInstanceRecord scopeInstanceValue = scopeInstance.getValue(); + assert scopeInstance.getNumberOfActiveExecutionPaths() >= 0 + : "number of active execution paths is negative"; + if (scopeInstance.getNumberOfActiveExecutionPaths() == 0) { scopeInstanceValue.setPayload(value.getPayload()); diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/TerminateFlowNodeHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/TerminateFlowNodeHandler.java index 6f58d90cabaf..8c321bd994ea 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/TerminateFlowNodeHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/TerminateFlowNodeHandler.java @@ -18,7 +18,7 @@ package io.zeebe.broker.workflow.processor.flownode; import io.zeebe.broker.incident.processor.IncidentState; -import io.zeebe.broker.workflow.model.element.ExecutableFlowElement; +import io.zeebe.broker.workflow.model.element.ExecutableFlowNode; import io.zeebe.broker.workflow.processor.BpmnStepContext; import io.zeebe.broker.workflow.processor.BpmnStepHandler; import io.zeebe.broker.workflow.processor.EventOutput; @@ -26,18 +26,17 @@ import io.zeebe.protocol.impl.record.value.incident.IncidentRecord; import io.zeebe.protocol.intent.WorkflowInstanceIntent; -public class TerminateFlowNodeHandler - implements BpmnStepHandler { +public class TerminateFlowNodeHandler implements BpmnStepHandler { protected final IncidentState incidentState; - private BpmnStepContext context; + private BpmnStepContext context; public TerminateFlowNodeHandler(IncidentState incidentState) { this.incidentState = incidentState; } @Override - public void handle(BpmnStepContext context) { + public void handle(BpmnStepContext context) { this.context = context; final EventOutput output = context.getOutput(); final ElementInstance elementInstance = context.getElementInstance(); @@ -63,7 +62,7 @@ public void handle(BpmnStepContext context) { * * @param context current processor context */ - protected void terminate(BpmnStepContext context) {} + protected void terminate(BpmnStepContext context) {} private void resolveExistingIncident(IncidentRecord incidentRecord, long incidentKey) { context.getOutput().appendResolvedIncidentEvent(incidentKey, incidentRecord); diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/gateway/TriggerEventBasedGatewayHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/gateway/TriggerEventBasedGatewayHandler.java new file mode 100644 index 000000000000..3768b9e1a7cd --- /dev/null +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/gateway/TriggerEventBasedGatewayHandler.java @@ -0,0 +1,50 @@ +/* + * Zeebe Broker Core + * Copyright © 2017 camunda services GmbH (info@camunda.com) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.zeebe.broker.workflow.processor.gateway; + +import io.zeebe.broker.workflow.model.element.ExecutableIntermediateCatchElement; +import io.zeebe.broker.workflow.processor.BpmnStepContext; +import io.zeebe.broker.workflow.processor.BpmnStepHandler; +import io.zeebe.broker.workflow.processor.flownode.IOMappingHelper; +import io.zeebe.msgpack.mapping.MappingException; +import io.zeebe.protocol.impl.record.value.incident.ErrorType; +import io.zeebe.protocol.intent.WorkflowInstanceIntent; + +public class TriggerEventBasedGatewayHandler + implements BpmnStepHandler { + private final IOMappingHelper ioMappingHelper = new IOMappingHelper(); + + @Override + public void handle(BpmnStepContext context) { + + context.getCatchEventOutput().unsubscribeFromCatchEvents(context.getRecord().getKey(), context); + + try { + ioMappingHelper.applyOutputMappings(context); + + context + .getOutput() + .appendFollowUpEvent( + context.getRecord().getKey(), + WorkflowInstanceIntent.CATCH_EVENT_TRIGGERED, + context.getValue()); + } catch (MappingException e) { + context.raiseIncident(ErrorType.IO_MAPPING_ERROR, e.getMessage()); + } + } +} diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/sequenceflow/ActivateGatewayHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/sequenceflow/ActivateGatewayHandler.java deleted file mode 100644 index 71ba9ed03d9c..000000000000 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/sequenceflow/ActivateGatewayHandler.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Zeebe Broker Core - * Copyright © 2017 camunda services GmbH (info@camunda.com) - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package io.zeebe.broker.workflow.processor.sequenceflow; - -import io.zeebe.broker.workflow.model.element.ExecutableFlowNode; -import io.zeebe.broker.workflow.model.element.ExecutableSequenceFlow; -import io.zeebe.broker.workflow.processor.BpmnStepContext; -import io.zeebe.broker.workflow.processor.BpmnStepHandler; -import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord; -import io.zeebe.protocol.intent.WorkflowInstanceIntent; - -public class ActivateGatewayHandler implements BpmnStepHandler { - - @Override - public void handle(BpmnStepContext context) { - - final ExecutableSequenceFlow sequenceFlow = context.getElement(); - final ExecutableFlowNode targetNode = sequenceFlow.getTarget(); - - final WorkflowInstanceRecord value = context.getValue(); - value.setElementId(targetNode.getId()); - - context.getOutput().appendNewEvent(WorkflowInstanceIntent.GATEWAY_ACTIVATED, value); - } -} diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/sequenceflow/StartFlowNodeHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/sequenceflow/StartFlowNodeHandler.java index a10311375b5f..dfe9802ca784 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/sequenceflow/StartFlowNodeHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/sequenceflow/StartFlowNodeHandler.java @@ -26,6 +26,12 @@ public class StartFlowNodeHandler implements BpmnStepHandler { + private final WorkflowInstanceIntent nodeIntent; + + public StartFlowNodeHandler(WorkflowInstanceIntent nodeIntent) { + this.nodeIntent = nodeIntent; + } + @Override public void handle(BpmnStepContext context) { final ExecutableSequenceFlow sequenceFlow = context.getElement(); @@ -34,6 +40,6 @@ public void handle(BpmnStepContext context) { final WorkflowInstanceRecord value = context.getValue(); value.setElementId(targetNode.getId()); - context.getOutput().appendNewEvent(WorkflowInstanceIntent.ELEMENT_READY, value); + context.getOutput().appendNewEvent(nodeIntent, value); } } diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/TakeSequenceFlowHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/sequenceflow/TakeSequenceFlowHandler.java similarity index 96% rename from broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/TakeSequenceFlowHandler.java rename to broker-core/src/main/java/io/zeebe/broker/workflow/processor/sequenceflow/TakeSequenceFlowHandler.java index b99ad41a18ae..686e5e8dc4a4 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/flownode/TakeSequenceFlowHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/sequenceflow/TakeSequenceFlowHandler.java @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package io.zeebe.broker.workflow.processor.flownode; +package io.zeebe.broker.workflow.processor.sequenceflow; import io.zeebe.broker.workflow.model.element.ExecutableFlowNode; import io.zeebe.broker.workflow.model.element.ExecutableSequenceFlow; diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/sequenceflow/TriggerEndEventHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/sequenceflow/TriggerEndEventHandler.java deleted file mode 100644 index 011b7d3b4f3f..000000000000 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/sequenceflow/TriggerEndEventHandler.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Zeebe Broker Core - * Copyright © 2017 camunda services GmbH (info@camunda.com) - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package io.zeebe.broker.workflow.processor.sequenceflow; - -import io.zeebe.broker.workflow.model.element.ExecutableFlowNode; -import io.zeebe.broker.workflow.model.element.ExecutableSequenceFlow; -import io.zeebe.broker.workflow.processor.BpmnStepContext; -import io.zeebe.broker.workflow.processor.BpmnStepHandler; -import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord; -import io.zeebe.protocol.intent.WorkflowInstanceIntent; - -public class TriggerEndEventHandler implements BpmnStepHandler { - - @Override - public void handle(BpmnStepContext context) { - - final ExecutableSequenceFlow sequenceFlow = context.getElement(); - final ExecutableFlowNode targetNode = sequenceFlow.getTarget(); - - final WorkflowInstanceRecord value = context.getValue(); - value.setElementId(targetNode.getId()); - - context.getOutput().appendNewEvent(WorkflowInstanceIntent.END_EVENT_OCCURRED, value); - } -} diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/servicetask/TerminateServiceTaskHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/servicetask/TerminateServiceTaskHandler.java index 0a9c3f02a5b3..012a7f163b32 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/servicetask/TerminateServiceTaskHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/servicetask/TerminateServiceTaskHandler.java @@ -20,7 +20,7 @@ import io.zeebe.broker.incident.processor.IncidentState; import io.zeebe.broker.job.JobState; import io.zeebe.broker.logstreams.state.ZeebeState; -import io.zeebe.broker.workflow.model.element.ExecutableServiceTask; +import io.zeebe.broker.workflow.model.element.ExecutableFlowNode; import io.zeebe.broker.workflow.processor.BpmnStepContext; import io.zeebe.broker.workflow.processor.activity.TerminateActivityHandler; import io.zeebe.broker.workflow.state.ElementInstance; @@ -28,7 +28,7 @@ import io.zeebe.protocol.impl.record.value.job.JobRecord; import io.zeebe.protocol.intent.JobIntent; -public class TerminateServiceTaskHandler extends TerminateActivityHandler { +public class TerminateServiceTaskHandler extends TerminateActivityHandler { private final JobState jobState; public TerminateServiceTaskHandler(ZeebeState zeebeState) { @@ -37,7 +37,7 @@ public TerminateServiceTaskHandler(ZeebeState zeebeState) { } @Override - protected void terminate(BpmnStepContext context) { + protected void terminate(BpmnStepContext context) { super.terminate(context); final ElementInstance elementInstance = context.getElementInstance(); @@ -55,8 +55,7 @@ protected void terminate(BpmnStepContext context) { } } - public void resolveExistingJobIncident( - long jobKey, BpmnStepContext context) { + public void resolveExistingJobIncident(long jobKey, BpmnStepContext context) { final long jobIncidentKey = incidentState.getJobIncidentKey(jobKey); final boolean hasIncident = jobIncidentKey != IncidentState.MISSING_INCIDENT; diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/subprocess/TerminateContainedElementsHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/subprocess/TerminateContainedElementsHandler.java index 2240e5457b2a..0db67b3b49d7 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/subprocess/TerminateContainedElementsHandler.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/subprocess/TerminateContainedElementsHandler.java @@ -25,6 +25,7 @@ import io.zeebe.broker.workflow.processor.EventOutput; import io.zeebe.broker.workflow.state.ElementInstance; import io.zeebe.broker.workflow.state.ElementInstanceState; +import io.zeebe.broker.workflow.state.IndexedRecord; import io.zeebe.broker.workflow.state.WorkflowState; import io.zeebe.protocol.impl.record.value.incident.IncidentRecord; import io.zeebe.protocol.intent.WorkflowInstanceIntent; @@ -48,11 +49,18 @@ public void handle(BpmnStepContext context) { final ElementInstance elementInstance = context.getElementInstance(); final EventOutput output = context.getOutput(); final ElementInstanceState elementInstanceState = workflowState.getElementInstanceState(); - final List children = - elementInstanceState.getChildren(elementInstance.getKey()); - context.getCatchEventOutput().unsubscribeFromCatchEvents(context); + context.getCatchEventOutput().unsubscribeFromCatchEvents(elementInstance.getKey(), context); + + final List deferredTokens = + elementInstanceState.getDeferredTokens(elementInstance.getKey()); + for (IndexedRecord deferedToken : deferredTokens) { + context.getCatchEventOutput().unsubscribeFromCatchEvents(deferedToken.getKey(), context); + output.consumeDeferredEvent(elementInstance.getKey(), deferedToken.getKey()); + } + final List children = + elementInstanceState.getChildren(elementInstance.getKey()); if (children.isEmpty()) { if (elementInstance.isInterrupted()) { context diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/timer/TerminateTimerHandler.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/timer/TerminateTimerHandler.java deleted file mode 100644 index e55e34a40700..000000000000 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/timer/TerminateTimerHandler.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Zeebe Broker Core - * Copyright © 2017 camunda services GmbH (info@camunda.com) - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package io.zeebe.broker.workflow.processor.timer; - -import io.zeebe.broker.logstreams.state.ZeebeState; -import io.zeebe.broker.workflow.model.element.ExecutableIntermediateCatchElement; -import io.zeebe.broker.workflow.processor.BpmnStepContext; -import io.zeebe.broker.workflow.processor.flownode.TerminateFlowNodeHandler; - -public class TerminateTimerHandler - extends TerminateFlowNodeHandler { - - public TerminateTimerHandler(ZeebeState zeebeState) { - super(zeebeState.getIncidentState()); - } - - @Override - protected void terminate(BpmnStepContext context) { - context - .getCatchEventOutput() - .unsubscribeFromTimerEvents( - context.getElementInstance().getKey(), context.getOutput().getStreamWriter()); - } -} diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/timer/TriggerTimerProcessor.java b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/timer/TriggerTimerProcessor.java index 94795d6113db..a2081dc27be8 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/processor/timer/TriggerTimerProcessor.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/processor/timer/TriggerTimerProcessor.java @@ -27,9 +27,12 @@ import io.zeebe.broker.workflow.processor.boundary.BoundaryEventHelper; import io.zeebe.broker.workflow.state.ElementInstance; import io.zeebe.broker.workflow.state.ElementInstanceState; +import io.zeebe.broker.workflow.state.StoredRecord; +import io.zeebe.broker.workflow.state.StoredRecord.Purpose; import io.zeebe.broker.workflow.state.TimerInstance; import io.zeebe.broker.workflow.state.WorkflowState; import io.zeebe.protocol.clientapi.RejectionType; +import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord; import io.zeebe.protocol.intent.TimerIntent; import io.zeebe.protocol.intent.WorkflowInstanceIntent; @@ -63,6 +66,8 @@ public void processRecord( streamWriter.appendFollowUpEvent(record.getKey(), TimerIntent.TRIGGERED, timer); + // TODO handler trigger events in a uniform way - #1699 + if (elementInstance != null && elementInstance.getState() == WorkflowInstanceIntent.ELEMENT_ACTIVATED) { if (boundaryEventHelper.shouldTriggerBoundaryEvent( @@ -78,6 +83,18 @@ public void processRecord( } elementInstanceState.flushDirtyState(); + + } else { + final StoredRecord tokenEvent = elementInstanceState.getTokenEvent(elementInstanceKey); + + if (tokenEvent != null && tokenEvent.getPurpose() == Purpose.DEFERRED_TOKEN) { + // continue at an event-based gateway + final WorkflowInstanceRecord deferedRecord = tokenEvent.getRecord().getValue(); + deferedRecord.setPayload(EMPTY_DOCUMENT).setElementId(timerRecord.getHandlerNodeId()); + + streamWriter.appendFollowUpEvent( + tokenEvent.getKey(), WorkflowInstanceIntent.CATCH_EVENT_TRIGGERING, deferedRecord); + } } workflowState.getTimerState().remove(timerInstance); diff --git a/broker-core/src/test/java/io/zeebe/broker/workflow/gateway/EventbasedGatewayTest.java b/broker-core/src/test/java/io/zeebe/broker/workflow/gateway/EventbasedGatewayTest.java new file mode 100644 index 000000000000..507c437e8a35 --- /dev/null +++ b/broker-core/src/test/java/io/zeebe/broker/workflow/gateway/EventbasedGatewayTest.java @@ -0,0 +1,196 @@ +/* + * Zeebe Broker Core + * Copyright © 2017 camunda services GmbH (info@camunda.com) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.zeebe.broker.workflow.gateway; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.tuple; + +import io.zeebe.broker.test.EmbeddedBrokerRule; +import io.zeebe.exporter.record.Assertions; +import io.zeebe.exporter.record.Record; +import io.zeebe.exporter.record.value.TimerRecordValue; +import io.zeebe.exporter.record.value.WorkflowInstanceRecordValue; +import io.zeebe.model.bpmn.Bpmn; +import io.zeebe.model.bpmn.BpmnModelInstance; +import io.zeebe.protocol.intent.TimerIntent; +import io.zeebe.protocol.intent.WorkflowInstanceIntent; +import io.zeebe.test.broker.protocol.clientapi.ClientApiRule; +import io.zeebe.test.broker.protocol.clientapi.PartitionTestClient; +import io.zeebe.test.util.record.RecordingExporter; +import java.time.Duration; +import java.util.List; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; + +public class EventbasedGatewayTest { + + private static final String PROCESS_ID = "process"; + + private static final BpmnModelInstance WORKFLOW_WITH_TIMERS = + Bpmn.createExecutableProcess(PROCESS_ID) + .startEvent("start") + .eventBasedGateway() + .id("gateway") + .intermediateCatchEvent("timer-1", c -> c.timerWithDuration("PT0.1S")) + .sequenceFlowId("to-end1") + .endEvent("end1") + .moveToLastGateway() + .intermediateCatchEvent("timer-2", c -> c.timerWithDuration("PT10S")) + .sequenceFlowId("to-end2") + .endEvent("end2") + .done(); + + public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(); + public ClientApiRule apiRule = new ClientApiRule(brokerRule::getClientAddress); + + @Rule public RuleChain ruleChain = RuleChain.outerRule(brokerRule).around(apiRule); + + private PartitionTestClient testClient; + + @Before + public void init() { + testClient = apiRule.partitionClient(); + } + + @Test + public void testLifecycle() { + // given + testClient.deploy(WORKFLOW_WITH_TIMERS); + + testClient.createWorkflowInstance(PROCESS_ID); + + assertThat(RecordingExporter.timerRecords(TimerIntent.CREATED).limit(2).exists()); + + // when + brokerRule.getClock().addTime(Duration.ofSeconds(1)); + + // then + assertThat( + RecordingExporter.workflowInstanceRecords() + .skipUntil( + r -> r.getMetadata().getIntent() == WorkflowInstanceIntent.GATEWAY_ACTIVATED) + .limitToWorkflowInstanceCompleted()) + .extracting(r -> tuple(r.getValue().getElementId(), r.getMetadata().getIntent())) + .containsExactly( + tuple("gateway", WorkflowInstanceIntent.GATEWAY_ACTIVATED), + tuple("timer-1", WorkflowInstanceIntent.CATCH_EVENT_TRIGGERING), + tuple("timer-1", WorkflowInstanceIntent.CATCH_EVENT_TRIGGERED), + tuple("to-end1", WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN), + tuple("end1", WorkflowInstanceIntent.END_EVENT_OCCURRED), + tuple(PROCESS_ID, WorkflowInstanceIntent.ELEMENT_COMPLETING), + tuple(PROCESS_ID, WorkflowInstanceIntent.ELEMENT_COMPLETED)); + } + + @Test + public void shouldCreateTimers() { + // given + testClient.deploy(WORKFLOW_WITH_TIMERS); + + // when + testClient.createWorkflowInstance(PROCESS_ID); + + // then + final Record gatewayEvent = + RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.GATEWAY_ACTIVATED) + .getFirst(); + + final List> timerEvents = + RecordingExporter.timerRecords(TimerIntent.CREATED).limit(2).asList(); + + assertThat(timerEvents) + .hasSize(2) + .extracting( + r -> tuple(r.getValue().getHandlerFlowNodeId(), r.getValue().getElementInstanceKey())) + .contains(tuple("timer-1", gatewayEvent.getKey()), tuple("timer-2", gatewayEvent.getKey())); + } + + @Test + public void shouldTriggerFirstEvent() { + // given + testClient.deploy(WORKFLOW_WITH_TIMERS); + + testClient.createWorkflowInstance(PROCESS_ID); + + final Record gatewayEvent = + RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.GATEWAY_ACTIVATED) + .getFirst(); + + // when + brokerRule.getClock().addTime(Duration.ofSeconds(1)); + + // then + final Record triggeredEvent = + RecordingExporter.timerRecords(TimerIntent.TRIGGERED).getFirst(); + Assertions.assertThat(triggeredEvent.getValue()) + .hasElementInstanceKey(gatewayEvent.getKey()) + .hasHandlerFlowNodeId("timer-1"); + + assertThat( + RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.END_EVENT_OCCURRED) + .withElementId("end1") + .exists()) + .isTrue(); + } + + @Test + public void shouldCancelSubscriptionsAfterFirstEventIsTriggered() { + // given + testClient.deploy(WORKFLOW_WITH_TIMERS); + + testClient.createWorkflowInstance(PROCESS_ID); + + assertThat(RecordingExporter.timerRecords(TimerIntent.CREATED).limit(2).exists()); + + // when + brokerRule.getClock().addTime(Duration.ofSeconds(1)); + + // then + assertThat( + RecordingExporter.timerRecords(TimerIntent.TRIGGERED) + .withHandlerNodeId("timer-1") + .exists()) + .isTrue(); + + assertThat( + RecordingExporter.timerRecords(TimerIntent.CANCELED) + .withHandlerNodeId("timer-2") + .exists()) + .isTrue(); + } + + @Test + public void shouldCancelSubscriptionsWhenScopeIsTerminated() { + // given + testClient.deploy(WORKFLOW_WITH_TIMERS); + + final long workflowInstanceKey = testClient.createWorkflowInstance(PROCESS_ID); + + assertThat(RecordingExporter.timerRecords(TimerIntent.CREATED).limit(2).exists()); + + // when + testClient.cancelWorkflowInstance(workflowInstanceKey); + + // then + assertThat(RecordingExporter.timerRecords(TimerIntent.CANCELED).limit(2)) + .extracting(r -> r.getValue().getHandlerFlowNodeId()) + .hasSize(2) + .contains("timer-1", "timer-2"); + } +} diff --git a/protocol/src/main/java/io/zeebe/protocol/intent/WorkflowInstanceIntent.java b/protocol/src/main/java/io/zeebe/protocol/intent/WorkflowInstanceIntent.java index 09b514e0e6c6..b22ace6540b0 100644 --- a/protocol/src/main/java/io/zeebe/protocol/intent/WorkflowInstanceIntent.java +++ b/protocol/src/main/java/io/zeebe/protocol/intent/WorkflowInstanceIntent.java @@ -17,9 +17,12 @@ public enum WorkflowInstanceIntent implements Intent { CREATE((short) 0), + START_EVENT_OCCURRED((short) 1), END_EVENT_OCCURRED((short) 2), + SEQUENCE_FLOW_TAKEN((short) 3), + GATEWAY_ACTIVATED((short) 4), ELEMENT_READY((short) 5),