diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/EngineProcessors.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/EngineProcessors.java index 6c564e882cf8..e645e33a69d8 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/EngineProcessors.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/EngineProcessors.java @@ -65,7 +65,6 @@ public static TypedRecordProcessors createEngineProcessors( final LogStream stream = processingContext.getLogStream(); final int partitionId = stream.getPartitionId(); - final int maxFragmentSize = processingContext.getMaxFragmentSize(); final var variablesState = zeebeState.getVariableState(); final var expressionProcessor = @@ -128,7 +127,6 @@ public static TypedRecordProcessors createEngineProcessors( zeebeState, onJobsAvailableCallback, eventPublicationBehavior, - maxFragmentSize, writers, jobMetrics, eventTriggerBehavior); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/TypedStreamWriterProxy.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/TypedStreamWriterProxy.java index b94c7485cee3..d0d4a7ea4a87 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/TypedStreamWriterProxy.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/TypedStreamWriterProxy.java @@ -39,6 +39,16 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor writer.appendFollowUpEvent(key, intent, value); } + @Override + public boolean canWriteEventOfLength(final int eventLength) { + return writer.canWriteEventOfLength(eventLength); + } + + @Override + public int getMaxEventLength() { + return writer.getMaxEventLength(); + } + @Override public void appendNewCommand(final Intent intent, final RecordValue value) { writer.appendNewCommand(intent, value); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/incident/IncidentRecordWrapper.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/incident/IncidentRecordWrapper.java index d35d21631fc5..16687d216237 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/incident/IncidentRecordWrapper.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/incident/IncidentRecordWrapper.java @@ -105,7 +105,7 @@ public long getRequestId() { } @Override - public long getLength() { + public int getLength() { return 0; } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchActivateProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchActivateProcessor.java index 6f7e2ba42650..7aa1b46855db 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchActivateProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchActivateProcessor.java @@ -10,6 +10,7 @@ import static io.camunda.zeebe.util.buffer.BufferUtil.wrapString; import io.camunda.zeebe.engine.metrics.JobMetrics; +import io.camunda.zeebe.engine.processing.job.JobBatchCollector.TooLargeJob; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; @@ -18,13 +19,7 @@ import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; import io.camunda.zeebe.engine.state.KeyGenerator; -import io.camunda.zeebe.engine.state.immutable.JobState; -import io.camunda.zeebe.engine.state.immutable.VariableState; import io.camunda.zeebe.engine.state.immutable.ZeebeState; -import io.camunda.zeebe.msgpack.value.DocumentValue; -import io.camunda.zeebe.msgpack.value.LongValue; -import io.camunda.zeebe.msgpack.value.StringValue; -import io.camunda.zeebe.msgpack.value.ValueArray; import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord; import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord; import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord; @@ -33,46 +28,32 @@ import io.camunda.zeebe.protocol.record.intent.JobBatchIntent; import io.camunda.zeebe.protocol.record.value.ErrorType; import io.camunda.zeebe.util.ByteValue; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicInteger; +import io.camunda.zeebe.util.Either; import org.agrona.DirectBuffer; -import org.agrona.ExpandableArrayBuffer; -import org.agrona.MutableDirectBuffer; -import org.agrona.collections.ObjectHashSet; -import org.agrona.concurrent.UnsafeBuffer; public final class JobBatchActivateProcessor implements TypedRecordProcessor { private final StateWriter stateWriter; - private final VariableState variableState; private final TypedRejectionWriter rejectionWriter; private final TypedResponseWriter responseWriter; - - private final JobState jobState; + private final JobBatchCollector jobBatchCollector; private final KeyGenerator keyGenerator; - private final long maxRecordLength; - private final long maxJobBatchLength; - - private final ObjectHashSet variableNames = new ObjectHashSet<>(); private final JobMetrics jobMetrics; public JobBatchActivateProcessor( final Writers writers, final ZeebeState state, final KeyGenerator keyGenerator, - final long maxRecordLength, final JobMetrics jobMetrics) { stateWriter = writers.state(); rejectionWriter = writers.rejection(); responseWriter = writers.response(); + jobBatchCollector = + new JobBatchCollector( + state.getJobState(), state.getVariableState(), stateWriter::canWriteEventOfLength); - jobState = state.getJobState(); - variableState = state.getVariableState(); this.keyGenerator = keyGenerator; - - this.maxRecordLength = maxRecordLength; - maxJobBatchLength = maxRecordLength - Long.BYTES; this.jobMetrics = jobMetrics; } @@ -97,95 +78,21 @@ private boolean isValid(final JobBatchRecord record) { private void activateJobs(final TypedRecord record) { final JobBatchRecord value = record.getValue(); - final long jobBatchKey = keyGenerator.nextKey(); - final AtomicInteger amount = new AtomicInteger(value.getMaxJobsToActivate()); - collectJobsToActivate(record, amount); - - stateWriter.appendFollowUpEvent(jobBatchKey, JobBatchIntent.ACTIVATED, value); - responseWriter.writeEventOnCommand(jobBatchKey, JobBatchIntent.ACTIVATED, value, record); - - final var activatedJobsCount = record.getValue().getJobKeys().size(); - jobMetrics.jobActivated(value.getType(), activatedJobsCount); - } - - private void collectJobsToActivate( - final TypedRecord record, final AtomicInteger amount) { - final JobBatchRecord value = record.getValue(); - final ValueArray jobIterator = value.jobs(); - final ValueArray jobKeyIterator = value.jobKeys(); - - // collect jobs for activation - variableNames.clear(); - final ValueArray jobBatchVariables = value.variables(); - - jobBatchVariables.forEach( - v -> { - final MutableDirectBuffer nameCopy = new UnsafeBuffer(new byte[v.getValue().capacity()]); - nameCopy.putBytes(0, v.getValue(), 0, v.getValue().capacity()); - variableNames.add(nameCopy); - }); - - jobState.forEachActivatableJobs( - value.getTypeBuffer(), - (key, jobRecord) -> { - int remainingAmount = amount.get(); - final long deadline = record.getTimestamp() + value.getTimeout(); - jobRecord.setDeadline(deadline).setWorker(value.getWorkerBuffer()); - - // fetch and set variables, required here to already have the full size of the job record - final long elementInstanceKey = jobRecord.getElementInstanceKey(); - if (elementInstanceKey >= 0) { - final DirectBuffer variables = collectVariables(variableNames, elementInstanceKey); - jobRecord.setVariables(variables); - } else { - jobRecord.setVariables(DocumentValue.EMPTY_DOCUMENT); - } - - if (remainingAmount >= 0 - && (record.getLength() + jobRecord.getLength()) <= maxJobBatchLength) { - - remainingAmount = amount.decrementAndGet(); - jobKeyIterator.add().setValue(key); - final JobRecord arrayValueJob = jobIterator.add(); - - // clone job record since buffer is reused during iteration - final ExpandableArrayBuffer buffer = new ExpandableArrayBuffer(jobRecord.getLength()); - jobRecord.write(buffer, 0); - arrayValueJob.wrap(buffer); - } else { - value.setTruncated(true); - - if (value.getJobs().isEmpty()) { - raiseIncidentJobTooLargeForMessageSize(key, jobRecord); - } - - return false; - } - - return remainingAmount > 0; - }); - } + final Either result = jobBatchCollector.collectJobs(record); + final var activatedJobCount = result.getOrElse(0); + result.ifLeft( + largeJob -> raiseIncidentJobTooLargeForMessageSize(largeJob.key(), largeJob.record())); - private DirectBuffer collectVariables( - final Collection variableNames, final long elementInstanceKey) { - final DirectBuffer variables; - if (variableNames.isEmpty()) { - variables = variableState.getVariablesAsDocument(elementInstanceKey); - } else { - variables = variableState.getVariablesAsDocument(elementInstanceKey, variableNames); - } - return variables; + activateJobBatch(record, value, jobBatchKey, activatedJobCount); } private void rejectCommand(final TypedRecord record) { final RejectionType rejectionType; final String rejectionReason; - final JobBatchRecord value = record.getValue(); - - final String format = "Expected to activate job batch with %s to be %s, but it was %s"; + final var format = "Expected to activate job batch with %s to be %s, but it was %s"; if (value.getMaxJobsToActivate() < 1) { rejectionType = RejectionType.INVALID_ARGUMENT; @@ -212,18 +119,25 @@ private void rejectCommand(final TypedRecord record) { responseWriter.writeRejectionOnCommand(record, rejectionType, rejectionReason); } - private void raiseIncidentJobTooLargeForMessageSize(final long jobKey, final JobRecord job) { - - final String messageSize = ByteValue.prettyPrint(maxRecordLength); + private void activateJobBatch( + final TypedRecord record, + final JobBatchRecord value, + final long jobBatchKey, + final Integer activatedCount) { + stateWriter.appendFollowUpEvent(jobBatchKey, JobBatchIntent.ACTIVATED, value); + responseWriter.writeEventOnCommand(jobBatchKey, JobBatchIntent.ACTIVATED, value, record); + jobMetrics.jobActivated(value.getType(), activatedCount); + } + private void raiseIncidentJobTooLargeForMessageSize(final long jobKey, final JobRecord job) { + final String messageSize = ByteValue.prettyPrint(stateWriter.getMaxEventLength()); final DirectBuffer incidentMessage = wrapString( String.format( "The job with key '%s' can not be activated because it is larger than the configured message size (%s). " + "Try to reduce the size by reducing the number of fetched variables or modifying the variable values.", jobKey, messageSize)); - - final IncidentRecord incidentEvent = + final var incidentEvent = new IncidentRecord() .setErrorType(ErrorType.MESSAGE_SIZE_EXCEEDED) .setErrorMessage(incidentMessage) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java new file mode 100644 index 000000000000..eee9b59cec65 --- /dev/null +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobBatchCollector.java @@ -0,0 +1,185 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Zeebe Community License 1.1. You may not use this file + * except in compliance with the Zeebe Community License 1.1. + */ +package io.camunda.zeebe.engine.processing.job; + +import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord; +import io.camunda.zeebe.engine.state.immutable.JobState; +import io.camunda.zeebe.engine.state.immutable.VariableState; +import io.camunda.zeebe.msgpack.value.DocumentValue; +import io.camunda.zeebe.msgpack.value.LongValue; +import io.camunda.zeebe.msgpack.value.StringValue; +import io.camunda.zeebe.msgpack.value.ValueArray; +import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord; +import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord; +import io.camunda.zeebe.util.Either; +import io.camunda.zeebe.util.buffer.BufferUtil; +import java.util.Collection; +import java.util.function.Predicate; +import org.agrona.DirectBuffer; +import org.agrona.ExpandableArrayBuffer; +import org.agrona.collections.MutableInteger; +import org.agrona.collections.MutableReference; +import org.agrona.collections.ObjectHashSet; + +/** + * Collects jobs to be activated as part of a {@link JobBatchRecord}. Activate-able jobs are read + * from the {@link JobState}, resolving and setting their variables from the {@link VariableState}, + * and added to the given batch record. + */ +final class JobBatchCollector { + private final ObjectHashSet variableNames = new ObjectHashSet<>(); + + private final JobState jobState; + private final VariableState variableState; + private final Predicate canWriteEventOfLength; + + /** + * @param jobState the state from which jobs are collected + * @param variableState the state from which variables are resolved and collected + * @param canWriteEventOfLength a predicate which should return whether the resulting {@link + * TypedRecord} containing the {@link JobBatchRecord} will be writable or not. The predicate + * takes in the size of the record, and should return true if it can write such a record, and + * false otherwise + */ + JobBatchCollector( + final JobState jobState, + final VariableState variableState, + final Predicate canWriteEventOfLength) { + this.jobState = jobState; + this.variableState = variableState; + this.canWriteEventOfLength = canWriteEventOfLength; + } + + /** + * Collects jobs to be added to the given {@code record}. The jobs and their keys are added + * directly to the given record. + * + *

This method will fail only if it could not activate anything because the batch would be too + * large, but there was at least one job to activate. On failure, it will return that job and its + * key. On success, it will return the amount of jobs activated. + * + * @param record the batch activate command; jobs and their keys will be added directly into it + * @return the amount of activated jobs on success, or a job which was too large to activate + */ + Either collectJobs(final TypedRecord record) { + final JobBatchRecord value = record.getValue(); + final ValueArray jobIterator = value.jobs(); + final ValueArray jobKeyIterator = value.jobKeys(); + final Collection requestedVariables = collectVariableNames(value); + final var maxActivatedCount = value.getMaxJobsToActivate(); + final var activatedCount = new MutableInteger(0); + final var jobCopyBuffer = new ExpandableArrayBuffer(); + final var unwritableJob = new MutableReference(); + + jobState.forEachActivatableJobs( + value.getTypeBuffer(), + (key, jobRecord) -> { + // fill in the job record properties first in order to accurately estimate its size before + // adding it to the batch + final var deadline = record.getTimestamp() + value.getTimeout(); + jobRecord.setDeadline(deadline).setWorker(value.getWorkerBuffer()); + setJobVariables(requestedVariables, jobRecord, jobRecord.getElementInstanceKey()); + + // the expected length is based on the current record's length plus the length of the job + // record we would add to the batch, the number of bytes taken by the additional job key, + // as well as one byte required per job key for its type header. if we ever add more, this + // should be updated accordingly. + final var jobRecordLength = jobRecord.getLength(); + final var expectedEventLength = record.getLength() + jobRecordLength + Long.BYTES + 1; + if (activatedCount.value <= maxActivatedCount + && canWriteEventOfLength.test(expectedEventLength)) { + appendJobToBatch(jobIterator, jobKeyIterator, jobCopyBuffer, key, jobRecord); + activatedCount.increment(); + } else { + // if no jobs were activated, then the current job is simply too large, and we cannot + // activate it + if (activatedCount.value == 0) { + unwritableJob.set(new TooLargeJob(key, jobRecord)); + } + + value.setTruncated(true); + return false; + } + + return activatedCount.value < maxActivatedCount; + }); + + if (unwritableJob.ref != null) { + return Either.left(unwritableJob.ref); + } + + return Either.right(activatedCount.value); + } + + private void setJobVariables( + final Collection requestedVariables, + final JobRecord jobRecord, + final long elementInstanceKey) { + if (elementInstanceKey >= 0) { + final DirectBuffer variables = collectVariables(requestedVariables, elementInstanceKey); + jobRecord.setVariables(variables); + } else { + jobRecord.setVariables(DocumentValue.EMPTY_DOCUMENT); + } + } + + private void appendJobToBatch( + final ValueArray jobIterator, + final ValueArray jobKeyIterator, + final ExpandableArrayBuffer jobCopyBuffer, + final Long key, + final JobRecord jobRecord) { + jobKeyIterator.add().setValue(key); + final JobRecord arrayValueJob = jobIterator.add(); + + // clone job record since buffer is reused during iteration + jobRecord.write(jobCopyBuffer, 0); + arrayValueJob.wrap(jobCopyBuffer, 0, jobRecord.getLength()); + } + + private Collection collectVariableNames(final JobBatchRecord batchRecord) { + final ValueArray requestedVariables = batchRecord.variables(); + + variableNames.clear(); + requestedVariables.forEach( + variable -> variableNames.add(BufferUtil.cloneBuffer(variable.getValue()))); + + return variableNames; + } + + private DirectBuffer collectVariables( + final Collection variableNames, final long elementInstanceKey) { + final DirectBuffer variables; + + if (variableNames.isEmpty()) { + variables = variableState.getVariablesAsDocument(elementInstanceKey); + } else { + variables = variableState.getVariablesAsDocument(elementInstanceKey, variableNames); + } + + return variables; + } + + static final class TooLargeJob { + private final long key; + private final JobRecord job; + + private TooLargeJob(final long key, final JobRecord job) { + this.key = key; + this.job = job; + } + + long key() { + return key; + } + + JobRecord record() { + return job; + } + } +} diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobEventProcessors.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobEventProcessors.java index 2d115a3ce2ce..86c45cd2183e 100755 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobEventProcessors.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobEventProcessors.java @@ -28,7 +28,6 @@ public static void addJobProcessors( final MutableZeebeState zeebeState, final Consumer onJobsAvailableCallback, final BpmnEventPublicationBehavior eventPublicationBehavior, - final int maxRecordSize, final Writers writers, final JobMetrics jobMetrics, final EventTriggerBehavior eventTriggerBehavior) { @@ -70,7 +69,7 @@ ValueType.JOB, JobIntent.UPDATE_RETRIES, new JobUpdateRetriesProcessor(zeebeStat ValueType.JOB_BATCH, JobBatchIntent.ACTIVATE, new JobBatchActivateProcessor( - writers, zeebeState, zeebeState.getKeyGenerator(), maxRecordSize, jobMetrics)) + writers, zeebeState, zeebeState.getKeyGenerator(), jobMetrics)) .withListener(new JobTimeoutTrigger(jobState)) .withListener(jobBackoffChecker) .withListener( diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedEventImpl.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedEventImpl.java index fe424ac3abfb..f1d1c0628e50 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedEventImpl.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedEventImpl.java @@ -110,8 +110,8 @@ public long getRequestId() { @Override @JsonIgnore - public long getLength() { - return (long) metadata.getLength() + value.getLength(); + public int getLength() { + return metadata.getLength() + value.getLength(); } @Override diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedRecord.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedRecord.java index 1211ab247350..3a98a3d02e9d 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedRecord.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedRecord.java @@ -13,15 +13,17 @@ public interface TypedRecord extends Record { + @Override long getKey(); + @Override T getValue(); int getRequestStreamId(); long getRequestId(); - long getLength(); + int getLength(); default boolean hasRequestMetadata() { return getRequestId() != RecordMetadataEncoder.requestIdNullValue() diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/EventApplyingStateWriter.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/EventApplyingStateWriter.java index c405ff86e0f3..e72c0ff55f9c 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/EventApplyingStateWriter.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/EventApplyingStateWriter.java @@ -35,4 +35,14 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor eventWriter.appendFollowUpEvent(key, intent, value); eventApplier.applyState(key, intent, value); } + + @Override + public boolean canWriteEventOfLength(final int eventLength) { + return eventWriter.canWriteEventOfLength(eventLength); + } + + @Override + public int getMaxEventLength() { + return eventWriter.getMaxEventLength(); + } } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/NoopTypedStreamWriter.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/NoopTypedStreamWriter.java index 51a8889af618..73518dbe3d36 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/NoopTypedStreamWriter.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/NoopTypedStreamWriter.java @@ -32,6 +32,11 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor // no op implementation } + @Override + public int getMaxEventLength() { + return Integer.MAX_VALUE; + } + @Override public void appendNewCommand(final Intent intent, final RecordValue value) { // no op implementation diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedEventWriter.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedEventWriter.java index ec26a0cfca38..133a09ace567 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedEventWriter.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedEventWriter.java @@ -13,4 +13,30 @@ public interface TypedEventWriter { void appendFollowUpEvent(long key, Intent intent, RecordValue value); + + /** + * Use this to know whether you can write an event of this length. + * + *

Example: + * + *

{@code
+   * final TypedEventWriter writer;
+   * // ... assign the writer
+   * final TypedRecord record;
+   * // ... assign record
+   * if (!writer.canWriteEventOfLength(record.getLength())) {
+   *   // raise an incident or some such
+   *   return;
+   * }
+   * }
+ * + * @param eventLength the length of the event that will be written + * @return true if an event of length {@code eventLength} can be written + */ + default boolean canWriteEventOfLength(final int eventLength) { + return eventLength <= getMaxEventLength(); + } + + /** @return the maximum event length */ + int getMaxEventLength(); } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedStreamWriterImpl.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedStreamWriterImpl.java index be4b2d7db2d3..bef8cc17a074 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedStreamWriterImpl.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/TypedStreamWriterImpl.java @@ -129,4 +129,29 @@ public void configureSourceContext(final long sourceRecordPosition) { public void appendFollowUpEvent(final long key, final Intent intent, final RecordValue value) { appendRecord(key, RecordType.EVENT, intent, value); } + + /** + * Use this to know whether you can add an event of the given length to the underlying batch + * writer. + * + * @param eventLength the length of the event that will be added to the batch + * @return true if an event of length {@code eventLength} can be added to this batch such that it + * can later be written + */ + @Override + public boolean canWriteEventOfLength(final int eventLength) { + return batchWriter.canWriteAdditionalEvent(eventLength); + } + + /** + * This is not actually accurate, as the frame length needs to also be aligned by the same amount + * of bytes as the batch. However, this would break concerns here, i.e. the writer here would have + * to become Dispatcher aware. + * + * @return an approximate value of the max fragment length + */ + @Override + public int getMaxEventLength() { + return batchWriter.getMaxFragmentLength(); + } } diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java new file mode 100644 index 000000000000..27607010f9ba --- /dev/null +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/job/JobBatchCollectorTest.java @@ -0,0 +1,338 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Zeebe Community License 1.1. You may not use this file + * except in compliance with the Zeebe Community License 1.1. + */ +package io.camunda.zeebe.engine.processing.job; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.camunda.zeebe.engine.processing.job.JobBatchCollector.TooLargeJob; +import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord; +import io.camunda.zeebe.engine.state.mutable.MutableZeebeState; +import io.camunda.zeebe.engine.util.MockTypedRecord; +import io.camunda.zeebe.engine.util.ZeebeStateExtension; +import io.camunda.zeebe.protocol.impl.record.RecordMetadata; +import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord; +import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord; +import io.camunda.zeebe.protocol.record.RecordType; +import io.camunda.zeebe.protocol.record.RecordValueWithVariablesAssert; +import io.camunda.zeebe.protocol.record.ValueType; +import io.camunda.zeebe.protocol.record.intent.JobBatchIntent; +import io.camunda.zeebe.protocol.record.value.JobBatchRecordValueAssert; +import io.camunda.zeebe.protocol.record.value.JobRecordValue; +import io.camunda.zeebe.protocol.record.value.JobRecordValueAssert; +import io.camunda.zeebe.test.util.MsgPackUtil; +import io.camunda.zeebe.test.util.asserts.EitherAssert; +import io.camunda.zeebe.util.Either; +import io.camunda.zeebe.util.buffer.BufferUtil; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; +import org.agrona.DirectBuffer; +import org.agrona.collections.MutableReference; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ZeebeStateExtension.class) +final class JobBatchCollectorTest { + private static final String JOB_TYPE = "job"; + + private final RecordLengthEvaluator lengthEvaluator = new RecordLengthEvaluator(); + + @SuppressWarnings("unused") // injected by the extension + private MutableZeebeState state; + + private JobBatchCollector collector; + + @BeforeEach + void beforeEach() { + collector = + new JobBatchCollector(state.getJobState(), state.getVariableState(), lengthEvaluator); + } + + @Test + void shouldTruncateBatchIfNoMoreCanBeWritten() { + // given + final long variableScopeKey = state.getKeyGenerator().nextKey(); + final TypedRecord record = createRecord(); + final List jobs = Arrays.asList(createJob(variableScopeKey), createJob(variableScopeKey)); + final var toggle = new AtomicBoolean(true); + + // when - set up the evaluator to only accept the first job + lengthEvaluator.canWriteEventOfLength = (length) -> toggle.getAndSet(false); + final Either result = collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + EitherAssert.assertThat(result) + .as("should have activated only one job successfully") + .right() + .isEqualTo(1); + JobBatchRecordValueAssert.assertThat(batchRecord).hasOnlyJobKeys(jobs.get(0)).isTruncated(); + } + + @Test + void shouldReturnLargeJobIfFirstJobCannotBeWritten() { + // given + final long variableScopeKey = state.getKeyGenerator().nextKey(); + final TypedRecord record = createRecord(); + final List jobs = Arrays.asList(createJob(variableScopeKey), createJob(variableScopeKey)); + + // when - set up the evaluator to accept no jobs + lengthEvaluator.canWriteEventOfLength = (length) -> false; + final Either result = collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + EitherAssert.assertThat(result) + .as("should return excessively large job") + .left() + .hasFieldOrPropertyWithValue("key", jobs.get(0)); + JobBatchRecordValueAssert.assertThat(batchRecord).hasNoJobKeys().hasNoJobs().isTruncated(); + } + + @Test + void shouldCollectJobsWithVariables() { + // given - multiple jobs to ensure variables are collected based on the scope + final TypedRecord record = createRecord(); + final long firstScopeKey = state.getKeyGenerator().nextKey(); + final long secondScopeKey = state.getKeyGenerator().nextKey(); + final Map firstJobVariables = Map.of("foo", "bar", "baz", "buz"); + final Map secondJobVariables = Map.of("fizz", "buzz"); + createJobWithVariables(firstScopeKey, firstJobVariables); + createJobWithVariables(secondScopeKey, secondJobVariables); + + // when + collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + JobBatchRecordValueAssert.assertThat(batchRecord) + .satisfies( + batch -> { + final List activatedJobs = batch.getJobs(); + RecordValueWithVariablesAssert.assertThat(activatedJobs.get(0)) + .hasVariables(firstJobVariables); + RecordValueWithVariablesAssert.assertThat(activatedJobs.get(1)) + .hasVariables(secondJobVariables); + }); + } + + @Test + void shouldAppendJobKeyToBatchRecord() { + // given - multiple jobs to ensure variables are collected based on the scope + final TypedRecord record = createRecord(); + final long scopeKey = state.getKeyGenerator().nextKey(); + final List jobs = Arrays.asList(createJob(scopeKey), createJob(scopeKey)); + + // when + collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + JobBatchRecordValueAssert.assertThat(batchRecord).hasJobKeys(jobs.get(0), jobs.get(1)); + } + + @Test + void shouldActivateUpToMaxJobs() { + // given + final TypedRecord record = createRecord(); + final long scopeKey = state.getKeyGenerator().nextKey(); + final List jobs = Arrays.asList(createJob(scopeKey), createJob(scopeKey)); + record.getValue().setMaxJobsToActivate(1); + + // when + final Either result = collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + EitherAssert.assertThat(result).as("should collect only the first job").right().isEqualTo(1); + JobBatchRecordValueAssert.assertThat(batchRecord).hasJobKeys(jobs.get(0)).isNotTruncated(); + } + + @Test + void shouldSetDeadlineOnActivation() { + // given + final TypedRecord record = createRecord(); + final long scopeKey = state.getKeyGenerator().nextKey(); + final long expectedDeadline = record.getTimestamp() + record.getValue().getTimeout(); + createJob(scopeKey); + createJob(scopeKey); + + // when + collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + JobBatchRecordValueAssert.assertThat(batchRecord) + .satisfies( + batch -> { + final List activatedJobs = batch.getJobs(); + JobRecordValueAssert.assertThat(activatedJobs.get(0)) + .as("first activated job has the expected deadline") + .hasDeadline(expectedDeadline); + JobRecordValueAssert.assertThat(activatedJobs.get(1)) + .as("second activated job has the expected deadline") + .hasDeadline(expectedDeadline); + }); + } + + @Test + void shouldSetWorkerOnActivation() { + // given + final TypedRecord record = createRecord(); + final long scopeKey = state.getKeyGenerator().nextKey(); + final String expectedWorker = "foo"; + createJob(scopeKey); + createJob(scopeKey); + record.getValue().setWorker(expectedWorker); + + // when + collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + JobBatchRecordValueAssert.assertThat(batchRecord) + .satisfies( + batch -> { + final List activatedJobs = batch.getJobs(); + JobRecordValueAssert.assertThat(activatedJobs.get(0)) + .as("first activated job has the expected worker") + .hasWorker(expectedWorker); + JobRecordValueAssert.assertThat(activatedJobs.get(1)) + .as("second activated job has the expected worker") + .hasWorker(expectedWorker); + }); + } + + @Test + void shouldFetchOnlyRequestedVariables() { + // given + final TypedRecord record = createRecord(); + final long firstScopeKey = state.getKeyGenerator().nextKey(); + final long secondScopeKey = state.getKeyGenerator().nextKey(); + final Map firstJobVariables = Map.of("foo", "bar", "baz", "buz"); + final Map secondJobVariables = Map.of("fizz", "buzz"); + createJobWithVariables(firstScopeKey, firstJobVariables); + createJobWithVariables(secondScopeKey, secondJobVariables); + record.getValue().variables().add().wrap(BufferUtil.wrapString("foo")); + + // when + collector.collectJobs(record); + + // then + final JobBatchRecord batchRecord = record.getValue(); + JobBatchRecordValueAssert.assertThat(batchRecord) + .satisfies( + batch -> { + final List activatedJobs = batch.getJobs(); + RecordValueWithVariablesAssert.assertThat(activatedJobs.get(0)) + .hasVariables(Map.of("foo", "bar")); + RecordValueWithVariablesAssert.assertThat(activatedJobs.get(1)) + .hasVariables(Collections.emptyMap()); + }); + } + + /** + * This is specifically a regression test for #5525. It's possible for this test to become + * outdated if we ever change how records are serialized, variables packed, etc. But it's a + * best-effort solution to make sure that if we do, we are forced to double-check and ensure we're + * correctly estimating the size of the record before writing it. + * + *

Long term, the writer should be able to cope with arbitrarily large batches, but until then + * we're stuck with this workaround for a better UX. + */ + @Test + void shouldEstimateLengthCorrectly() { + // given - multiple jobs to ensure variables are collected based on the scope + final TypedRecord record = createRecord(); + final long scopeKey = state.getKeyGenerator().nextKey(); + final Map variables = Map.of("foo", "bar"); + final MutableReference estimatedLength = new MutableReference<>(); + final int initialLength = record.getLength(); + createJobWithVariables(scopeKey, variables); + + // when + lengthEvaluator.canWriteEventOfLength = + length -> { + estimatedLength.set(length); + return true; + }; + collector.collectJobs(record); + + // then + // the expected length is then the length of the initial record + the length of the activated + // job + the length of a key (long) and one byte for its list header + final var activatedJob = (JobRecord) record.getValue().getJobs().get(0); + final int expectedLength = initialLength + activatedJob.getLength() + 9; + assertThat(estimatedLength.ref).isEqualTo(expectedLength); + } + + private TypedRecord createRecord() { + final RecordMetadata metadata = + new RecordMetadata() + .recordType(RecordType.COMMAND) + .intent(JobBatchIntent.ACTIVATE) + .valueType(ValueType.JOB_BATCH); + final var batchRecord = + new JobBatchRecord() + .setTimeout(Duration.ofSeconds(10).toMillis()) + .setMaxJobsToActivate(10) + .setType(JOB_TYPE) + .setWorker("test"); + + return new MockTypedRecord<>(state.getKeyGenerator().nextKey(), metadata, batchRecord); + } + + private long createJob(final long variableScopeKey) { + final var jobRecord = + new JobRecord() + .setBpmnProcessId("process") + .setElementId("element") + .setElementInstanceKey(variableScopeKey) + .setType(JOB_TYPE); + final long jobKey = state.getKeyGenerator().nextKey(); + + state.getJobState().create(jobKey, jobRecord); + return jobKey; + } + + private void createJobWithVariables( + final long variableScopeKey, final Map variables) { + setVariables(variableScopeKey, variables); + createJob(variableScopeKey); + } + + private void setVariables(final long variableScopeKey, final Map variables) { + final var variableState = state.getVariableState(); + variables.forEach( + (key, value) -> + variableState.setVariableLocal( + variableScopeKey, + variableScopeKey, + variableScopeKey, + BufferUtil.wrapString(key), + packString(value))); + } + + private DirectBuffer packString(final String value) { + return MsgPackUtil.encodeMsgPack(b -> b.packString(value)); + } + + private static final class RecordLengthEvaluator implements Predicate { + private Predicate canWriteEventOfLength = length -> true; + + @Override + public boolean test(final Integer length) { + return canWriteEventOfLength.test(length); + } + } +} diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorHealthTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorHealthTest.java index 82f18948cd91..12bae892d884 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorHealthTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorHealthTest.java @@ -218,6 +218,11 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor } } + @Override + public int getMaxEventLength() { + return Integer.MAX_VALUE; + } + @Override public void appendNewCommand(final Intent intent, final RecordValue value) {} diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/variable/VariableBehaviorTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/variable/VariableBehaviorTest.java index a029ac0bf4f4..fbdb97229222 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/variable/VariableBehaviorTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/variable/VariableBehaviorTest.java @@ -9,58 +9,47 @@ import static org.assertj.core.api.Assertions.assertThat; -import io.camunda.zeebe.db.ZeebeDb; import io.camunda.zeebe.engine.processing.streamprocessor.writers.EventApplyingStateWriter; -import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; -import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory; -import io.camunda.zeebe.engine.state.ZbColumnFamilies; -import io.camunda.zeebe.engine.state.ZeebeDbState; import io.camunda.zeebe.engine.state.appliers.EventAppliers; import io.camunda.zeebe.engine.state.immutable.VariableState; import io.camunda.zeebe.engine.state.mutable.MutableVariableState; import io.camunda.zeebe.engine.state.mutable.MutableZeebeState; import io.camunda.zeebe.engine.util.RecordingTypedEventWriter; import io.camunda.zeebe.engine.util.RecordingTypedEventWriter.RecordedEvent; +import io.camunda.zeebe.engine.util.ZeebeStateExtension; import io.camunda.zeebe.protocol.record.intent.VariableIntent; import io.camunda.zeebe.protocol.record.value.VariableRecordValue; import io.camunda.zeebe.protocol.record.value.VariableRecordValueAssert; import io.camunda.zeebe.test.util.MsgPackUtil; import io.camunda.zeebe.util.buffer.BufferUtil; -import java.io.File; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.agrona.CloseHelper; import org.agrona.DirectBuffer; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ZeebeStateExtension.class) final class VariableBehaviorTest { private final RecordingTypedEventWriter eventWriter = new RecordingTypedEventWriter(); - private ZeebeDb db; + @SuppressWarnings("unused") // injected by the extension + private MutableZeebeState zeebeState; + private MutableVariableState state; private VariableBehavior behavior; @BeforeEach - void beforeEach(final @TempDir File directory) { - db = DefaultZeebeDbFactory.defaultFactory().createDb(directory); - final MutableZeebeState zeebeState = new ZeebeDbState(db, db.createContext()); - final StateWriter stateWriter = + void beforeEach() { + final var stateWriter = new EventApplyingStateWriter(eventWriter, new EventAppliers(zeebeState)); state = zeebeState.getVariableState(); behavior = new VariableBehavior(state, stateWriter, zeebeState.getKeyGenerator()); } - @AfterEach - void afterEach() { - CloseHelper.close(db); - } - @Test void shouldMergeLocalDocument() { // given diff --git a/engine/src/test/java/io/camunda/zeebe/engine/util/MockTypedRecord.java b/engine/src/test/java/io/camunda/zeebe/engine/util/MockTypedRecord.java index 8a086fba6ba8..da700401100a 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/util/MockTypedRecord.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/util/MockTypedRecord.java @@ -56,7 +56,7 @@ public long getRequestId() { } @Override - public long getLength() { + public int getLength() { return metadata.getLength() + value.getLength(); } diff --git a/engine/src/test/java/io/camunda/zeebe/engine/util/RecordingTypedEventWriter.java b/engine/src/test/java/io/camunda/zeebe/engine/util/RecordingTypedEventWriter.java index 4651beacd121..da84a51880b3 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/util/RecordingTypedEventWriter.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/util/RecordingTypedEventWriter.java @@ -31,6 +31,11 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor events.add(new RecordedEvent<>(key, intent, value)); } + @Override + public int getMaxEventLength() { + return Integer.MAX_VALUE; + } + public static final class RecordedEvent { public final long key;