diff --git a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ServiceTaskTransformer.java b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ServiceTaskTransformer.java index 16d1ad8ab1e3..3bad5f226854 100644 --- a/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ServiceTaskTransformer.java +++ b/broker-core/src/main/java/io/zeebe/broker/workflow/model/transformation/transformer/ServiceTaskTransformer.java @@ -17,6 +17,7 @@ */ package io.zeebe.broker.workflow.model.transformation.transformer; +import static io.zeebe.broker.Broker.LOG; import static io.zeebe.util.buffer.BufferUtil.wrapString; import io.zeebe.broker.workflow.model.BpmnStep; @@ -30,7 +31,8 @@ import io.zeebe.model.bpmn.instance.zeebe.ZeebeTaskHeaders; import io.zeebe.msgpack.spec.MsgPackWriter; import io.zeebe.protocol.intent.WorkflowInstanceIntent; -import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; import org.agrona.DirectBuffer; import org.agrona.ExpandableArrayBuffer; import org.agrona.MutableDirectBuffer; @@ -80,34 +82,56 @@ private void transformTaskHeaders(ServiceTask element, final ExecutableServiceTa final ZeebeTaskHeaders taskHeaders = element.getSingleExtensionElement(ZeebeTaskHeaders.class); if (taskHeaders != null) { - final DirectBuffer encodedHeaders = encode(taskHeaders); - serviceTask.setEncodedHeaders(encodedHeaders); + final List validHeaders = + taskHeaders + .getHeaders() + .stream() + .filter(this::isValidHeader) + .collect(Collectors.toList()); + + if (validHeaders.size() < taskHeaders.getHeaders().size()) { + LOG.warn( + "Ignoring invalid headers for task '{}'. Must have non-empty key and value.", + element.getName()); + } + + if (!validHeaders.isEmpty()) { + final DirectBuffer encodedHeaders = encode(validHeaders); + serviceTask.setEncodedHeaders(encodedHeaders); + } } } - private DirectBuffer encode(ZeebeTaskHeaders taskHeaders) { + private DirectBuffer encode(List taskHeaders) { final MutableDirectBuffer buffer = new UnsafeBuffer(0, 0); - final Collection headers = taskHeaders.getHeaders(); + final ExpandableArrayBuffer expandableBuffer = + new ExpandableArrayBuffer(INITIAL_SIZE_KEY_VALUE_PAIR * taskHeaders.size()); - if (!headers.isEmpty()) { - final ExpandableArrayBuffer expandableBuffer = - new ExpandableArrayBuffer(INITIAL_SIZE_KEY_VALUE_PAIR * headers.size()); - msgPackWriter.wrap(expandableBuffer, 0); - msgPackWriter.writeMapHeader(headers.size()); + msgPackWriter.wrap(expandableBuffer, 0); + msgPackWriter.writeMapHeader(taskHeaders.size()); - headers.forEach( - h -> { + taskHeaders.forEach( + h -> { + if (isValidHeader(h)) { final DirectBuffer key = wrapString(h.getKey()); msgPackWriter.writeString(key); final DirectBuffer value = wrapString(h.getValue()); msgPackWriter.writeString(value); - }); + } + }); - buffer.wrap(expandableBuffer.byteArray(), 0, msgPackWriter.getOffset()); - } + buffer.wrap(expandableBuffer.byteArray(), 0, msgPackWriter.getOffset()); return buffer; } + + private boolean isValidHeader(ZeebeHeader header) { + return header != null + && header.getValue() != null + && !header.getValue().isEmpty() + && header.getKey() != null + && !header.getKey().isEmpty(); + } } diff --git a/broker-core/src/test/java/io/zeebe/broker/workflow/activity/ActivityTest.java b/broker-core/src/test/java/io/zeebe/broker/workflow/activity/ActivityTest.java index d2e478f9d1e6..cf05e45f8d19 100644 --- a/broker-core/src/test/java/io/zeebe/broker/workflow/activity/ActivityTest.java +++ b/broker-core/src/test/java/io/zeebe/broker/workflow/activity/ActivityTest.java @@ -27,6 +27,7 @@ import io.zeebe.exporter.record.value.WorkflowInstanceRecordValue; import io.zeebe.model.bpmn.Bpmn; import io.zeebe.model.bpmn.BpmnModelInstance; +import io.zeebe.protocol.intent.DeploymentIntent; import io.zeebe.protocol.intent.JobIntent; import io.zeebe.protocol.intent.TimerIntent; import io.zeebe.protocol.intent.WorkflowInstanceIntent; @@ -167,6 +168,53 @@ public void shouldUnsubscribeFromBoundaryEventTriggersOnTerminating() { WorkflowInstanceIntent.ELEMENT_TERMINATING, WorkflowInstanceIntent.ELEMENT_TERMINATED); } + @Test + public void shouldIgnoreTaskHeadersIfEmpty() { + createWorkflowAndAssertIgnoredHeaders(""); + } + + @Test + public void shouldIgnoreTaskHeadersIfNull() { + createWorkflowAndAssertIgnoredHeaders(null); + } + + private void createWorkflowAndAssertIgnoredHeaders(String testValue) { + // given + final BpmnModelInstance model = + Bpmn.createExecutableProcess("process") + .startEvent("start") + .serviceTask("task1", b -> b.zeebeTaskType("type1").zeebeTaskHeader("key", testValue)) + .endEvent("end") + .moveToActivity("task1") + .serviceTask("task2", b -> b.zeebeTaskType("type2").zeebeTaskHeader(testValue, "value")) + .connectTo("end") + .moveToActivity("task1") + .serviceTask( + "task3", b -> b.zeebeTaskType("type3").zeebeTaskHeader(testValue, testValue)) + .connectTo("end") + .done(); + + // when + final long deploymentKey = testClient.deploy(model); + testClient.receiveFirstDeploymentEvent(DeploymentIntent.CREATED, deploymentKey); + testClient.createWorkflowInstance("process"); + + // then + final JobRecordValue firstJob = + testClient.receiveJobs().withType("type1").getFirst().getValue(); + assertThat(firstJob.getCustomHeaders()).isEmpty(); + testClient.completeJobOfType("type1"); + + final JobRecordValue secondJob = + testClient.receiveJobs().withType("type2").getFirst().getValue(); + assertThat(secondJob.getCustomHeaders()).isEmpty(); + testClient.completeJobOfType("type2"); + + final JobRecordValue thirdJob = + testClient.receiveJobs().withType("type3").getFirst().getValue(); + assertThat(thirdJob.getCustomHeaders()).isEmpty(); + } + private void shouldUnsubscribeFromBoundaryEventTrigger( WorkflowInstanceIntent leavingState, WorkflowInstanceIntent leftState) { // given