From 1c0299e949e52ff633f75acb3f0b7e5a29441c67 Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Wed, 16 Feb 2022 15:49:30 +0100 Subject: [PATCH 1/6] refactor(engine): use integers for event/record length Use `int` type instead of `long` for events/records. Under the hood, we always use `int` and not `long`, simply widening the values to return longs. It seemed unnecessary in the end, especially since we always deal with the `ByteBuffer` type in the end, which only allows buffers with up to `Integer.MAX_VALUE` length. (cherry picked from commit a53fb8563fb56ed9065207c5e9e9c085281ee46d) --- .../engine/processing/incident/IncidentRecordWrapper.java | 2 +- .../engine/processing/streamprocessor/TypedEventImpl.java | 4 ++-- .../zeebe/engine/processing/streamprocessor/TypedRecord.java | 4 +++- .../java/io/camunda/zeebe/engine/util/MockTypedRecord.java | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/incident/IncidentRecordWrapper.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/incident/IncidentRecordWrapper.java index d35d21631fc5..16687d216237 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/incident/IncidentRecordWrapper.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/incident/IncidentRecordWrapper.java @@ -105,7 +105,7 @@ public long getRequestId() { } @Override - public long getLength() { + public int getLength() { return 0; } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedEventImpl.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedEventImpl.java index fe424ac3abfb..f1d1c0628e50 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedEventImpl.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedEventImpl.java @@ -110,8 +110,8 @@ public long getRequestId() { @Override @JsonIgnore - public long getLength() { - return (long) metadata.getLength() + value.getLength(); + public int getLength() { + return metadata.getLength() + value.getLength(); } @Override diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedRecord.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedRecord.java index 1211ab247350..3a98a3d02e9d 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedRecord.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedRecord.java @@ -13,15 +13,17 @@ public interface TypedRecord extends Record { + @Override long getKey(); + @Override T getValue(); int getRequestStreamId(); long getRequestId(); - long getLength(); + int getLength(); default boolean hasRequestMetadata() { return getRequestId() != RecordMetadataEncoder.requestIdNullValue() diff --git a/engine/src/test/java/io/camunda/zeebe/engine/util/MockTypedRecord.java b/engine/src/test/java/io/camunda/zeebe/engine/util/MockTypedRecord.java index 8a086fba6ba8..da700401100a 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/util/MockTypedRecord.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/util/MockTypedRecord.java @@ -56,7 +56,7 @@ public long getRequestId() { } @Override - public long getLength() { + public int getLength() { return metadata.getLength() + value.getLength(); } From 40fdf393dd262e30607206b10acdcca8d1eb0450 Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Wed, 16 Feb 2022 15:51:34 +0100 Subject: [PATCH 2/6] refactor(engine): reuse state extension where applicable Replaces creating the state manually in the `VariableBehaviorTest` and reuses the extension instead. (cherry picked from commit 829b5929c4cef9b951133711ef9a2b0d29b99f50) --- .../variable/VariableBehaviorTest.java | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/variable/VariableBehaviorTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/variable/VariableBehaviorTest.java index a029ac0bf4f4..fbdb97229222 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/variable/VariableBehaviorTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/variable/VariableBehaviorTest.java @@ -9,58 +9,47 @@ import static org.assertj.core.api.Assertions.assertThat; -import io.camunda.zeebe.db.ZeebeDb; import io.camunda.zeebe.engine.processing.streamprocessor.writers.EventApplyingStateWriter; -import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; -import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory; -import io.camunda.zeebe.engine.state.ZbColumnFamilies; -import io.camunda.zeebe.engine.state.ZeebeDbState; import io.camunda.zeebe.engine.state.appliers.EventAppliers; import io.camunda.zeebe.engine.state.immutable.VariableState; import io.camunda.zeebe.engine.state.mutable.MutableVariableState; import io.camunda.zeebe.engine.state.mutable.MutableZeebeState; import io.camunda.zeebe.engine.util.RecordingTypedEventWriter; import io.camunda.zeebe.engine.util.RecordingTypedEventWriter.RecordedEvent; +import io.camunda.zeebe.engine.util.ZeebeStateExtension; import io.camunda.zeebe.protocol.record.intent.VariableIntent; import io.camunda.zeebe.protocol.record.value.VariableRecordValue; import io.camunda.zeebe.protocol.record.value.VariableRecordValueAssert; import io.camunda.zeebe.test.util.MsgPackUtil; import io.camunda.zeebe.util.buffer.BufferUtil; -import java.io.File; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.agrona.CloseHelper; import org.agrona.DirectBuffer; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ZeebeStateExtension.class) final class VariableBehaviorTest { private final RecordingTypedEventWriter eventWriter = new RecordingTypedEventWriter(); - private ZeebeDb db; + @SuppressWarnings("unused") // injected by the extension + private MutableZeebeState zeebeState; + private MutableVariableState state; private VariableBehavior behavior; @BeforeEach - void beforeEach(final @TempDir File directory) { - db = DefaultZeebeDbFactory.defaultFactory().createDb(directory); - final MutableZeebeState zeebeState = new ZeebeDbState(db, db.createContext()); - final StateWriter stateWriter = + void beforeEach() { + final var stateWriter = new EventApplyingStateWriter(eventWriter, new EventAppliers(zeebeState)); state = zeebeState.getVariableState(); behavior = new VariableBehavior(state, stateWriter, zeebeState.getKeyGenerator()); } - @AfterEach - void afterEach() { - CloseHelper.close(db); - } - @Test void shouldMergeLocalDocument() { // given From 2982144fa7ec11e12119cb21f0eebf7b230a4a08 Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Wed, 16 Feb 2022 15:52:39 +0100 Subject: [PATCH 3/6] refactor(engine): extend TypedEventWriter to deal with sizes Adds size probing capabilities to the `TypedEventWriter`, namely one `#canWriteEventOfLength(int)` which checks if an additional event of the given length could be written. This simply delgates to the batch writer under the hood the normal case, with some sane overrides for other implementations. Additionally delegates to the writer to report/determine the `maxEventLength` instead of getting it from the `ProcessingContext`. The reasoning for both is that these are writer specific properties: it's the writer's responsibility to know what it can write, and whether it can write it or not. (cherry picked from commit 3d8af7be77c0db28c60a1608fbd6d6c3f3a8863d) --- .../bpmn/behavior/TypedStreamWriterProxy.java | 10 +++++++ .../writers/EventApplyingStateWriter.java | 10 +++++++ .../writers/NoopTypedStreamWriter.java | 5 ++++ .../writers/TypedEventWriter.java | 26 +++++++++++++++++++ .../writers/TypedStreamWriterImpl.java | 25 ++++++++++++++++++ .../StreamProcessorHealthTest.java | 5 ++++ .../util/RecordingTypedEventWriter.java | 5 ++++ 7 files changed, 86 insertions(+) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/TypedStreamWriterProxy.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/TypedStreamWriterProxy.java index b94c7485cee3..d0d4a7ea4a87 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/TypedStreamWriterProxy.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/TypedStreamWriterProxy.java @@ -39,6 +39,16 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor writer.appendFollowUpEvent(key, intent, value); } + @Override + public boolean canWriteEventOfLength(final int eventLength) { + return writer.canWriteEventOfLength(eventLength); + } + + @Override + public int getMaxEventLength() { + return writer.getMaxEventLength(); + } + @Override public void appendNewCommand(final Intent intent, final RecordValue value) { writer.appendNewCommand(intent, value); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/EventApplyingStateWriter.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/EventApplyingStateWriter.java index c405ff86e0f3..e72c0ff55f9c 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/EventApplyingStateWriter.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/EventApplyingStateWriter.java @@ -35,4 +35,14 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor eventWriter.appendFollowUpEvent(key, intent, value); eventApplier.applyState(key, intent, value); } + + @Override + public boolean canWriteEventOfLength(final int eventLength) { + return eventWriter.canWriteEventOfLength(eventLength); + } + + @Override + public int getMaxEventLength() { + return eventWriter.getMaxEventLength(); + } } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/NoopTypedStreamWriter.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/NoopTypedStreamWriter.java index 51a8889af618..73518dbe3d36 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/NoopTypedStreamWriter.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/NoopTypedStreamWriter.java @@ -32,6 +32,11 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor // no op implementation } + @Override + public int getMaxEventLength() { + return Integer.MAX_VALUE; + } + @Override public void appendNewCommand(final Intent intent, final RecordValue value) { // no op implementation diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedEventWriter.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedEventWriter.java index ec26a0cfca38..133a09ace567 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedEventWriter.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedEventWriter.java @@ -13,4 +13,30 @@ public interface TypedEventWriter { void appendFollowUpEvent(long key, Intent intent, RecordValue value); + + /** + * Use this to know whether you can write an event of this length. + * + *

Example: + * + *

{@code
+   * final TypedEventWriter writer;
+   * // ... assign the writer
+   * final TypedRecord record;
+   * // ... assign record
+   * if (!writer.canWriteEventOfLength(record.getLength())) {
+   *   // raise an incident or some such
+   *   return;
+   * }
+   * }
+ * + * @param eventLength the length of the event that will be written + * @return true if an event of length {@code eventLength} can be written + */ + default boolean canWriteEventOfLength(final int eventLength) { + return eventLength <= getMaxEventLength(); + } + + /** @return the maximum event length */ + int getMaxEventLength(); } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedStreamWriterImpl.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedStreamWriterImpl.java index be4b2d7db2d3..bef8cc17a074 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedStreamWriterImpl.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedStreamWriterImpl.java @@ -129,4 +129,29 @@ public void configureSourceContext(final long sourceRecordPosition) { public void appendFollowUpEvent(final long key, final Intent intent, final RecordValue value) { appendRecord(key, RecordType.EVENT, intent, value); } + + /** + * Use this to know whether you can add an event of the given length to the underlying batch + * writer. + * + * @param eventLength the length of the event that will be added to the batch + * @return true if an event of length {@code eventLength} can be added to this batch such that it + * can later be written + */ + @Override + public boolean canWriteEventOfLength(final int eventLength) { + return batchWriter.canWriteAdditionalEvent(eventLength); + } + + /** + * This is not actually accurate, as the frame length needs to also be aligned by the same amount + * of bytes as the batch. However, this would break concerns here, i.e. the writer here would have + * to become Dispatcher aware. + * + * @return an approximate value of the max fragment length + */ + @Override + public int getMaxEventLength() { + return batchWriter.getMaxFragmentLength(); + } } diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorHealthTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorHealthTest.java index 82f18948cd91..12bae892d884 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorHealthTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorHealthTest.java @@ -218,6 +218,11 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor } } + @Override + public int getMaxEventLength() { + return Integer.MAX_VALUE; + } + @Override public void appendNewCommand(final Intent intent, final RecordValue value) {} diff --git a/engine/src/test/java/io/camunda/zeebe/engine/util/RecordingTypedEventWriter.java b/engine/src/test/java/io/camunda/zeebe/engine/util/RecordingTypedEventWriter.java index 4651beacd121..da84a51880b3 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/util/RecordingTypedEventWriter.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/util/RecordingTypedEventWriter.java @@ -31,6 +31,11 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor events.add(new RecordedEvent<>(key, intent, value)); } + @Override + public int getMaxEventLength() { + return Integer.MAX_VALUE; + } + public static final class RecordedEvent { public final long key; From ea2cb5923a3b25e9a6555dcae75a062a1bdbc5eb Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Wed, 16 Feb 2022 15:55:54 +0100 Subject: [PATCH 4/6] refactor(engine): extract job batch collection into a behavior Extracts collecting activatable jobs for a job batch activation request into a specific class. This allows more narrowed, simpler testing of the behavior. Additionally, the behavior makes use of the new `TypedEventWriter#canWriteEventOfLength(int)` API. (cherry picked from commit 0c1985f93ce7c2ca39b5722bfae35d22e618f8f2) --- .../engine/processing/EngineProcessors.java | 2 - .../job/JobBatchActivateProcessor.java | 134 ++----- .../processing/job/JobBatchCollector.java | 169 +++++++++ .../processing/job/JobEventProcessors.java | 3 +- .../processing/job/JobBatchCollectorTest.java | 340 ++++++++++++++++++ 5 files changed, 534 insertions(+), 114 deletions(-) create mode 100644 engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java create mode 100644 engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/EngineProcessors.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/EngineProcessors.java index 6c564e882cf8..e645e33a69d8 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/EngineProcessors.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/EngineProcessors.java @@ -65,7 +65,6 @@ public static TypedRecordProcessors createEngineProcessors( final LogStream stream = processingContext.getLogStream(); final int partitionId = stream.getPartitionId(); - final int maxFragmentSize = processingContext.getMaxFragmentSize(); final var variablesState = zeebeState.getVariableState(); final var expressionProcessor = @@ -128,7 +127,6 @@ public static TypedRecordProcessors createEngineProcessors( zeebeState, onJobsAvailableCallback, eventPublicationBehavior, - maxFragmentSize, writers, jobMetrics, eventTriggerBehavior); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchActivateProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchActivateProcessor.java index 6f7e2ba42650..055cb68a20b3 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchActivateProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchActivateProcessor.java @@ -10,6 +10,7 @@ import static io.camunda.zeebe.util.buffer.BufferUtil.wrapString; import io.camunda.zeebe.engine.metrics.JobMetrics; +import io.camunda.zeebe.engine.processing.job.JobBatchCollector.LargeJob; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; @@ -18,13 +19,7 @@ import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; import io.camunda.zeebe.engine.state.KeyGenerator; -import io.camunda.zeebe.engine.state.immutable.JobState; -import io.camunda.zeebe.engine.state.immutable.VariableState; import io.camunda.zeebe.engine.state.immutable.ZeebeState; -import io.camunda.zeebe.msgpack.value.DocumentValue; -import io.camunda.zeebe.msgpack.value.LongValue; -import io.camunda.zeebe.msgpack.value.StringValue; -import io.camunda.zeebe.msgpack.value.ValueArray; import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord; import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord; import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord; @@ -33,46 +28,32 @@ import io.camunda.zeebe.protocol.record.intent.JobBatchIntent; import io.camunda.zeebe.protocol.record.value.ErrorType; import io.camunda.zeebe.util.ByteValue; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicInteger; +import io.camunda.zeebe.util.Either; import org.agrona.DirectBuffer; -import org.agrona.ExpandableArrayBuffer; -import org.agrona.MutableDirectBuffer; -import org.agrona.collections.ObjectHashSet; -import org.agrona.concurrent.UnsafeBuffer; public final class JobBatchActivateProcessor implements TypedRecordProcessor { private final StateWriter stateWriter; - private final VariableState variableState; private final TypedRejectionWriter rejectionWriter; private final TypedResponseWriter responseWriter; - - private final JobState jobState; + private final JobBatchCollector jobBatchCollector; private final KeyGenerator keyGenerator; - private final long maxRecordLength; - private final long maxJobBatchLength; - - private final ObjectHashSet variableNames = new ObjectHashSet<>(); private final JobMetrics jobMetrics; public JobBatchActivateProcessor( final Writers writers, final ZeebeState state, final KeyGenerator keyGenerator, - final long maxRecordLength, final JobMetrics jobMetrics) { stateWriter = writers.state(); rejectionWriter = writers.rejection(); responseWriter = writers.response(); + jobBatchCollector = + new JobBatchCollector( + state.getJobState(), state.getVariableState(), stateWriter::canWriteEventOfLength); - jobState = state.getJobState(); - variableState = state.getVariableState(); this.keyGenerator = keyGenerator; - - this.maxRecordLength = maxRecordLength; - maxJobBatchLength = maxRecordLength - Long.BYTES; this.jobMetrics = jobMetrics; } @@ -97,95 +78,21 @@ private boolean isValid(final JobBatchRecord record) { private void activateJobs(final TypedRecord record) { final JobBatchRecord value = record.getValue(); - final long jobBatchKey = keyGenerator.nextKey(); - final AtomicInteger amount = new AtomicInteger(value.getMaxJobsToActivate()); - collectJobsToActivate(record, amount); - - stateWriter.appendFollowUpEvent(jobBatchKey, JobBatchIntent.ACTIVATED, value); - responseWriter.writeEventOnCommand(jobBatchKey, JobBatchIntent.ACTIVATED, value, record); - - final var activatedJobsCount = record.getValue().getJobKeys().size(); - jobMetrics.jobActivated(value.getType(), activatedJobsCount); - } - - private void collectJobsToActivate( - final TypedRecord record, final AtomicInteger amount) { - final JobBatchRecord value = record.getValue(); - final ValueArray jobIterator = value.jobs(); - final ValueArray jobKeyIterator = value.jobKeys(); - - // collect jobs for activation - variableNames.clear(); - final ValueArray jobBatchVariables = value.variables(); - - jobBatchVariables.forEach( - v -> { - final MutableDirectBuffer nameCopy = new UnsafeBuffer(new byte[v.getValue().capacity()]); - nameCopy.putBytes(0, v.getValue(), 0, v.getValue().capacity()); - variableNames.add(nameCopy); - }); - - jobState.forEachActivatableJobs( - value.getTypeBuffer(), - (key, jobRecord) -> { - int remainingAmount = amount.get(); - final long deadline = record.getTimestamp() + value.getTimeout(); - jobRecord.setDeadline(deadline).setWorker(value.getWorkerBuffer()); - - // fetch and set variables, required here to already have the full size of the job record - final long elementInstanceKey = jobRecord.getElementInstanceKey(); - if (elementInstanceKey >= 0) { - final DirectBuffer variables = collectVariables(variableNames, elementInstanceKey); - jobRecord.setVariables(variables); - } else { - jobRecord.setVariables(DocumentValue.EMPTY_DOCUMENT); - } - - if (remainingAmount >= 0 - && (record.getLength() + jobRecord.getLength()) <= maxJobBatchLength) { - - remainingAmount = amount.decrementAndGet(); - jobKeyIterator.add().setValue(key); - final JobRecord arrayValueJob = jobIterator.add(); - - // clone job record since buffer is reused during iteration - final ExpandableArrayBuffer buffer = new ExpandableArrayBuffer(jobRecord.getLength()); - jobRecord.write(buffer, 0); - arrayValueJob.wrap(buffer); - } else { - value.setTruncated(true); - - if (value.getJobs().isEmpty()) { - raiseIncidentJobTooLargeForMessageSize(key, jobRecord); - } - - return false; - } - - return remainingAmount > 0; - }); - } + final Either result = jobBatchCollector.collectJobs(record); + final var activatedJobCount = result.getOrElse(0); + result.ifLeft( + largeJob -> raiseIncidentJobTooLargeForMessageSize(largeJob.key(), largeJob.record())); - private DirectBuffer collectVariables( - final Collection variableNames, final long elementInstanceKey) { - final DirectBuffer variables; - if (variableNames.isEmpty()) { - variables = variableState.getVariablesAsDocument(elementInstanceKey); - } else { - variables = variableState.getVariablesAsDocument(elementInstanceKey, variableNames); - } - return variables; + activateJobBatch(record, value, jobBatchKey, activatedJobCount); } private void rejectCommand(final TypedRecord record) { final RejectionType rejectionType; final String rejectionReason; - final JobBatchRecord value = record.getValue(); - - final String format = "Expected to activate job batch with %s to be %s, but it was %s"; + final var format = "Expected to activate job batch with %s to be %s, but it was %s"; if (value.getMaxJobsToActivate() < 1) { rejectionType = RejectionType.INVALID_ARGUMENT; @@ -212,18 +119,25 @@ private void rejectCommand(final TypedRecord record) { responseWriter.writeRejectionOnCommand(record, rejectionType, rejectionReason); } - private void raiseIncidentJobTooLargeForMessageSize(final long jobKey, final JobRecord job) { - - final String messageSize = ByteValue.prettyPrint(maxRecordLength); + private void activateJobBatch( + final TypedRecord record, + final JobBatchRecord value, + final long jobBatchKey, + final Integer activatedCount) { + stateWriter.appendFollowUpEvent(jobBatchKey, JobBatchIntent.ACTIVATED, value); + responseWriter.writeEventOnCommand(jobBatchKey, JobBatchIntent.ACTIVATED, value, record); + jobMetrics.jobActivated(value.getType(), activatedCount); + } + private void raiseIncidentJobTooLargeForMessageSize(final long jobKey, final JobRecord job) { + final String messageSize = ByteValue.prettyPrint(stateWriter.getMaxEventLength()); final DirectBuffer incidentMessage = wrapString( String.format( "The job with key '%s' can not be activated because it is larger than the configured message size (%s). " + "Try to reduce the size by reducing the number of fetched variables or modifying the variable values.", jobKey, messageSize)); - - final IncidentRecord incidentEvent = + final var incidentEvent = new IncidentRecord() .setErrorType(ErrorType.MESSAGE_SIZE_EXCEEDED) .setErrorMessage(incidentMessage) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java new file mode 100644 index 000000000000..d8ebeaf280a4 --- /dev/null +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java @@ -0,0 +1,169 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Zeebe Community License 1.1. You may not use this file + * except in compliance with the Zeebe Community License 1.1. + */ +package io.camunda.zeebe.engine.processing.job; + +import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord; +import io.camunda.zeebe.engine.state.immutable.JobState; +import io.camunda.zeebe.engine.state.immutable.VariableState; +import io.camunda.zeebe.msgpack.value.DocumentValue; +import io.camunda.zeebe.msgpack.value.LongValue; +import io.camunda.zeebe.msgpack.value.StringValue; +import io.camunda.zeebe.msgpack.value.ValueArray; +import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord; +import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord; +import io.camunda.zeebe.util.Either; +import io.camunda.zeebe.util.buffer.BufferUtil; +import java.util.Collection; +import java.util.function.Predicate; +import org.agrona.DirectBuffer; +import org.agrona.ExpandableArrayBuffer; +import org.agrona.collections.MutableInteger; +import org.agrona.collections.MutableReference; +import org.agrona.collections.ObjectHashSet; + +/** + * Collects jobs to be activated as part of a {@link JobBatchRecord}. Activate-able jobs are read + * from the {@link JobState}, resolving and setting their variables from the {@link VariableState}, + * and added to the given batch record. + */ +final class JobBatchCollector { + private final ObjectHashSet variableNames = new ObjectHashSet<>(); + + private final JobState jobState; + private final VariableState variableState; + private final Predicate canWriteEventOfLength; + + /** + * @param jobState the state from which jobs are collected + * @param variableState the state from which variables are resolved and collected + * @param canWriteEventOfLength a predicate which should return whether the resulting {@link + * TypedRecord} containing the {@link JobBatchRecord} will be writable or not. The predicate + * takes in the size of the record, and should return true if it can write such a record, and + * false otherwise + */ + JobBatchCollector( + final JobState jobState, + final VariableState variableState, + final Predicate canWriteEventOfLength) { + this.jobState = jobState; + this.variableState = variableState; + this.canWriteEventOfLength = canWriteEventOfLength; + } + + /** + * Collects jobs to be added to the given {@code record}. The jobs and their keys are added + * directly to the given record. + * + *

This method will fail only if it could not activate anything because the batch would be too + * large, but there was at least one job to activate. On failure, it will return that job and its + * key. On success, it will return the amount of jobs activated. + * + * @param record the batch activate command; jobs and their keys will be added directly into it + * @return the amount of activated jobs on success, or a job which was too large to activate + */ + Either collectJobs(final TypedRecord record) { + final JobBatchRecord value = record.getValue(); + final ValueArray jobIterator = value.jobs(); + final ValueArray jobKeyIterator = value.jobKeys(); + final Collection requestedVariables = collectVariableNames(value); + final var maxActivatedCount = value.getMaxJobsToActivate(); + final var activatedCount = new MutableInteger(0); + final var jobCopyBuffer = new ExpandableArrayBuffer(); + final var unwritableJob = new MutableReference(); + + jobState.forEachActivatableJobs( + value.getTypeBuffer(), + (key, jobRecord) -> { + // fill in the job record properties first in order to accurately estimate its size before + // adding it to the batch + final var deadline = record.getTimestamp() + value.getTimeout(); + jobRecord.setDeadline(deadline).setWorker(value.getWorkerBuffer()); + setJobVariables(requestedVariables, jobRecord, jobRecord.getElementInstanceKey()); + + // the expected length is based on the current record's length plus the length of the job + // record we would add to the batch, the number of bytes taken by the additional job key, + // as well as one byte required per job key for its type header. if we ever add more, this + // should be updated accordingly. + final var jobRecordLength = jobRecord.getLength(); + final var expectedEventLength = record.getLength() + jobRecordLength + Long.BYTES + 1; + if (activatedCount.value <= maxActivatedCount + && canWriteEventOfLength.test(expectedEventLength)) { + appendJobToBatch(jobIterator, jobKeyIterator, jobCopyBuffer, key, jobRecord); + activatedCount.increment(); + } else { + // if no jobs were activated, then the current job is simply too large, and we cannot + // activate it + if (activatedCount.value == 0) { + unwritableJob.set(new LargeJob(key, jobRecord)); + } + + value.setTruncated(true); + return false; + } + + return activatedCount.value < maxActivatedCount; + }); + + if (unwritableJob.ref != null) { + return Either.left(unwritableJob.ref); + } + + return Either.right(activatedCount.value); + } + + private void setJobVariables( + final Collection requestedVariables, + final JobRecord jobRecord, + final long elementInstanceKey) { + if (elementInstanceKey >= 0) { + final DirectBuffer variables = collectVariables(requestedVariables, elementInstanceKey); + jobRecord.setVariables(variables); + } else { + jobRecord.setVariables(DocumentValue.EMPTY_DOCUMENT); + } + } + + private void appendJobToBatch( + final ValueArray jobIterator, + final ValueArray jobKeyIterator, + final ExpandableArrayBuffer jobCopyBuffer, + final Long key, + final JobRecord jobRecord) { + jobKeyIterator.add().setValue(key); + final JobRecord arrayValueJob = jobIterator.add(); + + // clone job record since buffer is reused during iteration + jobRecord.write(jobCopyBuffer, 0); + arrayValueJob.wrap(jobCopyBuffer, 0, jobRecord.getLength()); + } + + private Collection collectVariableNames(final JobBatchRecord batchRecord) { + final ValueArray requestedVariables = batchRecord.variables(); + + variableNames.clear(); + requestedVariables.forEach( + variable -> variableNames.add(BufferUtil.cloneBuffer(variable.getValue()))); + + return variableNames; + } + + private DirectBuffer collectVariables( + final Collection variableNames, final long elementInstanceKey) { + final DirectBuffer variables; + + if (variableNames.isEmpty()) { + variables = variableState.getVariablesAsDocument(elementInstanceKey); + } else { + variables = variableState.getVariablesAsDocument(elementInstanceKey, variableNames); + } + + return variables; + } + + record LargeJob(long key, JobRecord record) {} +} diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobEventProcessors.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobEventProcessors.java index 2d115a3ce2ce..86c45cd2183e 100755 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobEventProcessors.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobEventProcessors.java @@ -28,7 +28,6 @@ public static void addJobProcessors( final MutableZeebeState zeebeState, final Consumer onJobsAvailableCallback, final BpmnEventPublicationBehavior eventPublicationBehavior, - final int maxRecordSize, final Writers writers, final JobMetrics jobMetrics, final EventTriggerBehavior eventTriggerBehavior) { @@ -70,7 +69,7 @@ ValueType.JOB, JobIntent.UPDATE_RETRIES, new JobUpdateRetriesProcessor(zeebeStat ValueType.JOB_BATCH, JobBatchIntent.ACTIVATE, new JobBatchActivateProcessor( - writers, zeebeState, zeebeState.getKeyGenerator(), maxRecordSize, jobMetrics)) + writers, zeebeState, zeebeState.getKeyGenerator(), jobMetrics)) .withListener(new JobTimeoutTrigger(jobState)) .withListener(jobBackoffChecker) .withListener( diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java new file mode 100644 index 000000000000..607ab62ef2af --- /dev/null +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java @@ -0,0 +1,340 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Zeebe Community License 1.1. You may not use this file + * except in compliance with the Zeebe Community License 1.1. + */ +package io.camunda.zeebe.engine.processing.job; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.camunda.zeebe.engine.processing.job.JobBatchCollector.LargeJob; +import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord; +import io.camunda.zeebe.engine.state.mutable.MutableZeebeState; +import io.camunda.zeebe.engine.util.MockTypedRecord; +import io.camunda.zeebe.engine.util.ZeebeStateExtension; +import io.camunda.zeebe.protocol.impl.record.RecordMetadata; +import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord; +import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord; +import io.camunda.zeebe.protocol.record.RecordType; +import io.camunda.zeebe.protocol.record.RecordValueWithVariablesAssert; +import io.camunda.zeebe.protocol.record.ValueType; +import io.camunda.zeebe.protocol.record.intent.JobBatchIntent; +import io.camunda.zeebe.protocol.record.value.JobBatchRecordValueAssert; +import io.camunda.zeebe.protocol.record.value.JobRecordValue; +import io.camunda.zeebe.protocol.record.value.JobRecordValueAssert; +import io.camunda.zeebe.test.util.MsgPackUtil; +import io.camunda.zeebe.test.util.asserts.EitherAssert; +import io.camunda.zeebe.util.Either; +import io.camunda.zeebe.util.buffer.BufferUtil; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; +import org.agrona.DirectBuffer; +import org.agrona.collections.MutableReference; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ZeebeStateExtension.class) +final class JobBatchCollectorTest { + private static final String JOB_TYPE = "job"; + + private final RecordLengthEvaluator lengthEvaluator = new RecordLengthEvaluator(); + + @SuppressWarnings("unused") // injected by the extension + private MutableZeebeState state; + + private JobBatchCollector collector; + + @BeforeEach + void beforeEach() { + collector = + new JobBatchCollector(state.getJobState(), state.getVariableState(), lengthEvaluator); + } + + @Test + void shouldTruncateBatchIfNoMoreCanBeWritten() { + // given + final long variableScopeKey = state.getKeyGenerator().nextKey(); + final TypedRecord record = createRecord(); + final List jobs = Arrays.asList(createJob(variableScopeKey), createJob(variableScopeKey)); + final var toggle = new AtomicBoolean(true); + + // when - set up the evaluator to only accept the first job + lengthEvaluator.canWriteEventOfLength = (length) -> toggle.getAndSet(false); + final Either result = collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + EitherAssert.assertThat(result) + .as("should have activated only one job successfully") + .right() + .isEqualTo(1); + JobBatchRecordValueAssert.assertThat(batchRecord).hasOnlyJobKeys(jobs.get(0).key).isTruncated(); + } + + @Test + void shouldReturnLargeJobIfFirstJobCannotBeWritten() { + // given + final long variableScopeKey = state.getKeyGenerator().nextKey(); + final TypedRecord record = createRecord(); + final List jobs = Arrays.asList(createJob(variableScopeKey), createJob(variableScopeKey)); + + // when - set up the evaluator to accept no jobs + lengthEvaluator.canWriteEventOfLength = (length) -> false; + final Either result = collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + EitherAssert.assertThat(result) + .as("should return excessively large job") + .left() + .hasFieldOrPropertyWithValue("key", jobs.get(0).key); + JobBatchRecordValueAssert.assertThat(batchRecord).hasNoJobKeys().hasNoJobs().isTruncated(); + } + + @Test + void shouldCollectJobsWithVariables() { + // given - multiple jobs to ensure variables are collected based on the scope + final TypedRecord record = createRecord(); + final long firstScopeKey = state.getKeyGenerator().nextKey(); + final long secondScopeKey = state.getKeyGenerator().nextKey(); + final Map firstJobVariables = Map.of("foo", "bar", "baz", "buz"); + final Map secondJobVariables = Map.of("fizz", "buzz"); + createJobWithVariables(firstScopeKey, firstJobVariables); + createJobWithVariables(secondScopeKey, secondJobVariables); + + // when + collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + JobBatchRecordValueAssert.assertThat(batchRecord) + .satisfies( + batch -> { + final List activatedJobs = batch.getJobs(); + RecordValueWithVariablesAssert.assertThat(activatedJobs.get(0)) + .hasVariables(firstJobVariables); + RecordValueWithVariablesAssert.assertThat(activatedJobs.get(1)) + .hasVariables(secondJobVariables); + }); + } + + @Test + void shouldAppendJobKeyToBatchRecord() { + // given - multiple jobs to ensure variables are collected based on the scope + final TypedRecord record = createRecord(); + final long scopeKey = state.getKeyGenerator().nextKey(); + final List jobs = Arrays.asList(createJob(scopeKey), createJob(scopeKey)); + + // when + collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + JobBatchRecordValueAssert.assertThat(batchRecord).hasJobKeys(jobs.get(0).key, jobs.get(1).key); + } + + @Test + void shouldActivateUpToMaxJobs() { + // given + final TypedRecord record = createRecord(); + final long scopeKey = state.getKeyGenerator().nextKey(); + final List jobs = Arrays.asList(createJob(scopeKey), createJob(scopeKey)); + record.getValue().setMaxJobsToActivate(1); + + // when + final Either result = collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + EitherAssert.assertThat(result).as("should collect only the first job").right().isEqualTo(1); + JobBatchRecordValueAssert.assertThat(batchRecord).hasJobKeys(jobs.get(0).key).isNotTruncated(); + } + + @Test + void shouldSetDeadlineOnActivation() { + // given + final TypedRecord record = createRecord(); + final long scopeKey = state.getKeyGenerator().nextKey(); + final long expectedDeadline = record.getTimestamp() + record.getValue().getTimeout(); + createJob(scopeKey); + createJob(scopeKey); + + // when + collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + JobBatchRecordValueAssert.assertThat(batchRecord) + .satisfies( + batch -> { + final List activatedJobs = batch.getJobs(); + JobRecordValueAssert.assertThat(activatedJobs.get(0)) + .as("first activated job has the expected deadline") + .hasDeadline(expectedDeadline); + JobRecordValueAssert.assertThat(activatedJobs.get(1)) + .as("second activated job has the expected deadline") + .hasDeadline(expectedDeadline); + }); + } + + @Test + void shouldSetWorkerOnActivation() { + // given + final TypedRecord record = createRecord(); + final long scopeKey = state.getKeyGenerator().nextKey(); + final String expectedWorker = "foo"; + createJob(scopeKey); + createJob(scopeKey); + record.getValue().setWorker(expectedWorker); + + // when + collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + JobBatchRecordValueAssert.assertThat(batchRecord) + .satisfies( + batch -> { + final List activatedJobs = batch.getJobs(); + JobRecordValueAssert.assertThat(activatedJobs.get(0)) + .as("first activated job has the expected worker") + .hasWorker(expectedWorker); + JobRecordValueAssert.assertThat(activatedJobs.get(1)) + .as("second activated job has the expected worker") + .hasWorker(expectedWorker); + }); + } + + @Test + void shouldFetchOnlyRequestedVariables() { + // given + final TypedRecord record = createRecord(); + final long firstScopeKey = state.getKeyGenerator().nextKey(); + final long secondScopeKey = state.getKeyGenerator().nextKey(); + final Map firstJobVariables = Map.of("foo", "bar", "baz", "buz"); + final Map secondJobVariables = Map.of("fizz", "buzz"); + createJobWithVariables(firstScopeKey, firstJobVariables); + createJobWithVariables(secondScopeKey, secondJobVariables); + record.getValue().variables().add().wrap(BufferUtil.wrapString("foo")); + + // when + collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + JobBatchRecordValueAssert.assertThat(batchRecord) + .satisfies( + batch -> { + final List activatedJobs = batch.getJobs(); + RecordValueWithVariablesAssert.assertThat(activatedJobs.get(0)) + .hasVariables(Map.of("foo", "bar")); + RecordValueWithVariablesAssert.assertThat(activatedJobs.get(1)) + .hasVariables(Collections.emptyMap()); + }); + } + + /** + * This is specifically a regression test for #5525. It's possible for this test to become + * outdated if we ever change how records are serialized, variables packed, etc. But it's a + * best-effort solution to make sure that if we do, we are forced to double-check and ensure we're + * correctly estimating the size of the record before writing it. + * + *

Long term, the writer should be able to cope with arbitrarily large batches, but until then + * we're stuck with this workaround for a better UX. + */ + @Test + void shouldEstimateLengthCorrectly() { + // given - multiple jobs to ensure variables are collected based on the scope + final TypedRecord record = createRecord(); + final long scopeKey = state.getKeyGenerator().nextKey(); + final Map variables = Map.of("foo", "bar"); + final MutableReference estimatedLength = new MutableReference<>(); + final int initialLength = record.getLength(); + createJobWithVariables(scopeKey, variables); + + // when + lengthEvaluator.canWriteEventOfLength = + length -> { + estimatedLength.set(length); + return true; + }; + collector.collectJobs(record); + + // then + // the expected length is then the length of the initial record + the length of the activated + // job + the length of a key (long) and one byte for its list header + final var activatedJob = (JobRecord) record.getValue().getJobs().get(0); + final int expectedLength = initialLength + activatedJob.getLength() + 9; + assertThat(estimatedLength.ref).isEqualTo(expectedLength); + } + + private TypedRecord createRecord() { + final RecordMetadata metadata = + new RecordMetadata() + .recordType(RecordType.COMMAND) + .intent(JobBatchIntent.ACTIVATE) + .valueType(ValueType.JOB_BATCH); + final var batchRecord = + new JobBatchRecord() + .setTimeout(Duration.ofSeconds(10).toMillis()) + .setMaxJobsToActivate(10) + .setType(JOB_TYPE) + .setWorker("test"); + + return new MockTypedRecord<>(state.getKeyGenerator().nextKey(), metadata, batchRecord); + } + + private Job createJob(final long variableScopeKey) { + final var jobRecord = + new JobRecord() + .setBpmnProcessId("process") + .setElementId("element") + .setElementInstanceKey(variableScopeKey) + .setType(JOB_TYPE); + final long jobKey = state.getKeyGenerator().nextKey(); + + state.getJobState().create(jobKey, jobRecord); + return new Job(jobKey, jobRecord); + } + + private void createJobWithVariables( + final long variableScopeKey, final Map variables) { + setVariables(variableScopeKey, variables); + createJob(variableScopeKey); + } + + private void setVariables(final long variableScopeKey, final Map variables) { + final var variableState = state.getVariableState(); + variables.forEach( + (key, value) -> + variableState.setVariableLocal( + variableScopeKey, + variableScopeKey, + variableScopeKey, + BufferUtil.wrapString(key), + packString(value))); + } + + private DirectBuffer packString(final String value) { + return MsgPackUtil.encodeMsgPack(b -> b.packString(value)); + } + + private static final class RecordLengthEvaluator implements Predicate { + private Predicate canWriteEventOfLength = length -> true; + + @Override + public boolean test(final Integer length) { + return canWriteEventOfLength.test(length); + } + } + + private record Job(long key, JobRecord job) {} +} From 9faf3fbd01c57af49f8998d68d6d5de045c651b0 Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Wed, 23 Feb 2022 13:46:19 +0100 Subject: [PATCH 5/6] refactor(engine): rename LargeJob to TooLargeJob Renames record type `LargeJob` to `TooLargeJob` to better reflect the fact that it is excessively large. (cherry picked from commit fe996c207af46d82eb6b5c68b473d4404afe2c7b) --- .../engine/processing/job/JobBatchActivateProcessor.java | 4 ++-- .../zeebe/engine/processing/job/JobBatchCollector.java | 8 ++++---- .../engine/processing/job/JobBatchCollectorTest.java | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchActivateProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchActivateProcessor.java index 055cb68a20b3..7aa1b46855db 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchActivateProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchActivateProcessor.java @@ -10,7 +10,7 @@ import static io.camunda.zeebe.util.buffer.BufferUtil.wrapString; import io.camunda.zeebe.engine.metrics.JobMetrics; -import io.camunda.zeebe.engine.processing.job.JobBatchCollector.LargeJob; +import io.camunda.zeebe.engine.processing.job.JobBatchCollector.TooLargeJob; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; @@ -80,7 +80,7 @@ private void activateJobs(final TypedRecord record) { final JobBatchRecord value = record.getValue(); final long jobBatchKey = keyGenerator.nextKey(); - final Either result = jobBatchCollector.collectJobs(record); + final Either result = jobBatchCollector.collectJobs(record); final var activatedJobCount = result.getOrElse(0); result.ifLeft( largeJob -> raiseIncidentJobTooLargeForMessageSize(largeJob.key(), largeJob.record())); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java index d8ebeaf280a4..0d5e1e3bf656 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java @@ -66,7 +66,7 @@ final class JobBatchCollector { * @param record the batch activate command; jobs and their keys will be added directly into it * @return the amount of activated jobs on success, or a job which was too large to activate */ - Either collectJobs(final TypedRecord record) { + Either collectJobs(final TypedRecord record) { final JobBatchRecord value = record.getValue(); final ValueArray jobIterator = value.jobs(); final ValueArray jobKeyIterator = value.jobKeys(); @@ -74,7 +74,7 @@ Either collectJobs(final TypedRecord record) final var maxActivatedCount = value.getMaxJobsToActivate(); final var activatedCount = new MutableInteger(0); final var jobCopyBuffer = new ExpandableArrayBuffer(); - final var unwritableJob = new MutableReference(); + final var unwritableJob = new MutableReference(); jobState.forEachActivatableJobs( value.getTypeBuffer(), @@ -99,7 +99,7 @@ Either collectJobs(final TypedRecord record) // if no jobs were activated, then the current job is simply too large, and we cannot // activate it if (activatedCount.value == 0) { - unwritableJob.set(new LargeJob(key, jobRecord)); + unwritableJob.set(new TooLargeJob(key, jobRecord)); } value.setTruncated(true); @@ -165,5 +165,5 @@ private DirectBuffer collectVariables( return variables; } - record LargeJob(long key, JobRecord record) {} + record TooLargeJob(long key, JobRecord record) {} } diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java index 607ab62ef2af..b3c5866c02a2 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java @@ -9,7 +9,7 @@ import static org.assertj.core.api.Assertions.assertThat; -import io.camunda.zeebe.engine.processing.job.JobBatchCollector.LargeJob; +import io.camunda.zeebe.engine.processing.job.JobBatchCollector.TooLargeJob; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord; import io.camunda.zeebe.engine.state.mutable.MutableZeebeState; import io.camunda.zeebe.engine.util.MockTypedRecord; @@ -68,7 +68,7 @@ void shouldTruncateBatchIfNoMoreCanBeWritten() { // when - set up the evaluator to only accept the first job lengthEvaluator.canWriteEventOfLength = (length) -> toggle.getAndSet(false); - final Either result = collector.collectJobs(record); + final Either result = collector.collectJobs(record); // then final JobBatchRecord batchRecord = record.getValue(); @@ -88,7 +88,7 @@ void shouldReturnLargeJobIfFirstJobCannotBeWritten() { // when - set up the evaluator to accept no jobs lengthEvaluator.canWriteEventOfLength = (length) -> false; - final Either result = collector.collectJobs(record); + final Either result = collector.collectJobs(record); // then final JobBatchRecord batchRecord = record.getValue(); @@ -150,7 +150,7 @@ void shouldActivateUpToMaxJobs() { record.getValue().setMaxJobsToActivate(1); // when - final Either result = collector.collectJobs(record); + final Either result = collector.collectJobs(record); // then final JobBatchRecord batchRecord = record.getValue(); From 39f2ef7903b3d349e941926128385cb5aa353698 Mon Sep 17 00:00:00 2001 From: Nicolas Pepin-Perreault Date: Wed, 23 Feb 2022 18:30:16 +0100 Subject: [PATCH 6/6] refactor(engine): do not use record on Java 11 --- .../processing/job/JobBatchCollector.java | 18 ++++++++++++++- .../processing/job/JobBatchCollectorTest.java | 22 +++++++++---------- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java index 0d5e1e3bf656..eee9b59cec65 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java @@ -165,5 +165,21 @@ private DirectBuffer collectVariables( return variables; } - record TooLargeJob(long key, JobRecord record) {} + static final class TooLargeJob { + private final long key; + private final JobRecord job; + + private TooLargeJob(final long key, final JobRecord job) { + this.key = key; + this.job = job; + } + + long key() { + return key; + } + + JobRecord record() { + return job; + } + } } diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java index b3c5866c02a2..27607010f9ba 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java @@ -63,7 +63,7 @@ void shouldTruncateBatchIfNoMoreCanBeWritten() { // given final long variableScopeKey = state.getKeyGenerator().nextKey(); final TypedRecord record = createRecord(); - final List jobs = Arrays.asList(createJob(variableScopeKey), createJob(variableScopeKey)); + final List jobs = Arrays.asList(createJob(variableScopeKey), createJob(variableScopeKey)); final var toggle = new AtomicBoolean(true); // when - set up the evaluator to only accept the first job @@ -76,7 +76,7 @@ void shouldTruncateBatchIfNoMoreCanBeWritten() { .as("should have activated only one job successfully") .right() .isEqualTo(1); - JobBatchRecordValueAssert.assertThat(batchRecord).hasOnlyJobKeys(jobs.get(0).key).isTruncated(); + JobBatchRecordValueAssert.assertThat(batchRecord).hasOnlyJobKeys(jobs.get(0)).isTruncated(); } @Test @@ -84,7 +84,7 @@ void shouldReturnLargeJobIfFirstJobCannotBeWritten() { // given final long variableScopeKey = state.getKeyGenerator().nextKey(); final TypedRecord record = createRecord(); - final List jobs = Arrays.asList(createJob(variableScopeKey), createJob(variableScopeKey)); + final List jobs = Arrays.asList(createJob(variableScopeKey), createJob(variableScopeKey)); // when - set up the evaluator to accept no jobs lengthEvaluator.canWriteEventOfLength = (length) -> false; @@ -95,7 +95,7 @@ void shouldReturnLargeJobIfFirstJobCannotBeWritten() { EitherAssert.assertThat(result) .as("should return excessively large job") .left() - .hasFieldOrPropertyWithValue("key", jobs.get(0).key); + .hasFieldOrPropertyWithValue("key", jobs.get(0)); JobBatchRecordValueAssert.assertThat(batchRecord).hasNoJobKeys().hasNoJobs().isTruncated(); } @@ -131,14 +131,14 @@ void shouldAppendJobKeyToBatchRecord() { // given - multiple jobs to ensure variables are collected based on the scope final TypedRecord record = createRecord(); final long scopeKey = state.getKeyGenerator().nextKey(); - final List jobs = Arrays.asList(createJob(scopeKey), createJob(scopeKey)); + final List jobs = Arrays.asList(createJob(scopeKey), createJob(scopeKey)); // when collector.collectJobs(record); // then final JobBatchRecord batchRecord = record.getValue(); - JobBatchRecordValueAssert.assertThat(batchRecord).hasJobKeys(jobs.get(0).key, jobs.get(1).key); + JobBatchRecordValueAssert.assertThat(batchRecord).hasJobKeys(jobs.get(0), jobs.get(1)); } @Test @@ -146,7 +146,7 @@ void shouldActivateUpToMaxJobs() { // given final TypedRecord record = createRecord(); final long scopeKey = state.getKeyGenerator().nextKey(); - final List jobs = Arrays.asList(createJob(scopeKey), createJob(scopeKey)); + final List jobs = Arrays.asList(createJob(scopeKey), createJob(scopeKey)); record.getValue().setMaxJobsToActivate(1); // when @@ -155,7 +155,7 @@ void shouldActivateUpToMaxJobs() { // then final JobBatchRecord batchRecord = record.getValue(); EitherAssert.assertThat(result).as("should collect only the first job").right().isEqualTo(1); - JobBatchRecordValueAssert.assertThat(batchRecord).hasJobKeys(jobs.get(0).key).isNotTruncated(); + JobBatchRecordValueAssert.assertThat(batchRecord).hasJobKeys(jobs.get(0)).isNotTruncated(); } @Test @@ -292,7 +292,7 @@ private TypedRecord createRecord() { return new MockTypedRecord<>(state.getKeyGenerator().nextKey(), metadata, batchRecord); } - private Job createJob(final long variableScopeKey) { + private long createJob(final long variableScopeKey) { final var jobRecord = new JobRecord() .setBpmnProcessId("process") @@ -302,7 +302,7 @@ private Job createJob(final long variableScopeKey) { final long jobKey = state.getKeyGenerator().nextKey(); state.getJobState().create(jobKey, jobRecord); - return new Job(jobKey, jobRecord); + return jobKey; } private void createJobWithVariables( @@ -335,6 +335,4 @@ public boolean test(final Integer length) { return canWriteEventOfLength.test(length); } } - - private record Job(long key, JobRecord job) {} }