Skip to content

Commit

Permalink
merge: #8799
Browse files Browse the repository at this point in the history
8799: Correctly truncate a job activation batch if it will not fit in the dispatcher r=npepinpe a=npepinpe

## Description

This PR fixes a bug where at times, we would try to activate a job batch which was too big and could not be written into the dispatcher, as when it was framed and aligned, its size would exceed the `maxFragmentLength`. The fix itself is simple, and it was to delegate, all the way down to the dispatcher (with layers adding their own framing in between), the responsibility to decide whether more jobs can be added to the batch or not. To simplify the code and testing, the responsibility of collecting jobs into a batch was extracted into a specific class.

The new API, `TypedEventWriter#canWriteEventOfLength(int)` could potentially be reused in other places where we want to prevent writing batches that are too large. However, this is a bit brittle, since it requires the code in the engine to properly compute beforehand how the record will grow when you modify it. For example, in the `JobBatchCollector`, to calculate the length of the `TypedRecord<JobBatchRecordValue> record` when adding a given `JobRecordValue` to it, we need the following:

```java
// the expected length is based on the current record's length plus the length of the job
// record we would add to the batch, the number of bytes taken by the additional job key,
// as well as one byte required per job key for its type header. if we ever add more, 
// should be updated accordingly.
final var jobRecordLength = jobRecord.getLength();
final var expectedEventLength = record.getLength() + jobRecordLength + Long.BYTES + 1;
```

Not extremely complicated, but could be error prone. A better solution, chunking the follow up events, is not something we can sanely do in between KRs, unfortunately, and would most likely carry a high risk, so wouldn't be anything we can backport. For now, I think this is an OK compromise.

NOTE: this PR depends on #8797 and #8798. Wait for these to be merged before reviewing!

## Related issues

closes #5525



Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-cloud[bot] and npepinpe committed Feb 23, 2022
2 parents 9bc1c8d + fe996c2 commit bd35278
Show file tree
Hide file tree
Showing 17 changed files with 635 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public static TypedRecordProcessors createEngineProcessors(

final LogStream stream = processingContext.getLogStream();
final int partitionId = stream.getPartitionId();
final int maxFragmentSize = processingContext.getMaxFragmentSize();

final var variablesState = zeebeState.getVariableState();
final var expressionProcessor =
Expand Down Expand Up @@ -128,7 +127,6 @@ public static TypedRecordProcessors createEngineProcessors(
zeebeState,
onJobsAvailableCallback,
eventPublicationBehavior,
maxFragmentSize,
writers,
jobMetrics,
eventTriggerBehavior);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor
writer.appendFollowUpEvent(key, intent, value);
}

@Override
public boolean canWriteEventOfLength(final int eventLength) {
return writer.canWriteEventOfLength(eventLength);
}

@Override
public int getMaxEventLength() {
return writer.getMaxEventLength();
}

@Override
public void appendNewCommand(final Intent intent, final RecordValue value) {
writer.appendNewCommand(intent, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public long getRequestId() {
}

@Override
public long getLength() {
public int getLength() {
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.camunda.zeebe.util.buffer.BufferUtil.wrapString;

import io.camunda.zeebe.engine.metrics.JobMetrics;
import io.camunda.zeebe.engine.processing.job.JobBatchCollector.TooLargeJob;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
Expand All @@ -18,13 +19,7 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.immutable.JobState;
import io.camunda.zeebe.engine.state.immutable.VariableState;
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
import io.camunda.zeebe.msgpack.value.DocumentValue;
import io.camunda.zeebe.msgpack.value.LongValue;
import io.camunda.zeebe.msgpack.value.StringValue;
import io.camunda.zeebe.msgpack.value.ValueArray;
import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
Expand All @@ -33,46 +28,32 @@
import io.camunda.zeebe.protocol.record.intent.JobBatchIntent;
import io.camunda.zeebe.protocol.record.value.ErrorType;
import io.camunda.zeebe.util.ByteValue;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import io.camunda.zeebe.util.Either;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.ObjectHashSet;
import org.agrona.concurrent.UnsafeBuffer;

public final class JobBatchActivateProcessor implements TypedRecordProcessor<JobBatchRecord> {

private final StateWriter stateWriter;
private final VariableState variableState;
private final TypedRejectionWriter rejectionWriter;
private final TypedResponseWriter responseWriter;

private final JobState jobState;
private final JobBatchCollector jobBatchCollector;
private final KeyGenerator keyGenerator;
private final long maxRecordLength;
private final long maxJobBatchLength;

private final ObjectHashSet<DirectBuffer> variableNames = new ObjectHashSet<>();
private final JobMetrics jobMetrics;

public JobBatchActivateProcessor(
final Writers writers,
final ZeebeState state,
final KeyGenerator keyGenerator,
final long maxRecordLength,
final JobMetrics jobMetrics) {

stateWriter = writers.state();
rejectionWriter = writers.rejection();
responseWriter = writers.response();
jobBatchCollector =
new JobBatchCollector(
state.getJobState(), state.getVariableState(), stateWriter::canWriteEventOfLength);

jobState = state.getJobState();
variableState = state.getVariableState();
this.keyGenerator = keyGenerator;

this.maxRecordLength = maxRecordLength;
maxJobBatchLength = maxRecordLength - Long.BYTES;
this.jobMetrics = jobMetrics;
}

Expand All @@ -97,95 +78,21 @@ private boolean isValid(final JobBatchRecord record) {

private void activateJobs(final TypedRecord<JobBatchRecord> record) {
final JobBatchRecord value = record.getValue();

final long jobBatchKey = keyGenerator.nextKey();

final AtomicInteger amount = new AtomicInteger(value.getMaxJobsToActivate());
collectJobsToActivate(record, amount);

stateWriter.appendFollowUpEvent(jobBatchKey, JobBatchIntent.ACTIVATED, value);
responseWriter.writeEventOnCommand(jobBatchKey, JobBatchIntent.ACTIVATED, value, record);

final var activatedJobsCount = record.getValue().getJobKeys().size();
jobMetrics.jobActivated(value.getType(), activatedJobsCount);
}

private void collectJobsToActivate(
final TypedRecord<JobBatchRecord> record, final AtomicInteger amount) {
final JobBatchRecord value = record.getValue();
final ValueArray<JobRecord> jobIterator = value.jobs();
final ValueArray<LongValue> jobKeyIterator = value.jobKeys();

// collect jobs for activation
variableNames.clear();
final ValueArray<StringValue> jobBatchVariables = value.variables();

jobBatchVariables.forEach(
v -> {
final MutableDirectBuffer nameCopy = new UnsafeBuffer(new byte[v.getValue().capacity()]);
nameCopy.putBytes(0, v.getValue(), 0, v.getValue().capacity());
variableNames.add(nameCopy);
});

jobState.forEachActivatableJobs(
value.getTypeBuffer(),
(key, jobRecord) -> {
int remainingAmount = amount.get();
final long deadline = record.getTimestamp() + value.getTimeout();
jobRecord.setDeadline(deadline).setWorker(value.getWorkerBuffer());

// fetch and set variables, required here to already have the full size of the job record
final long elementInstanceKey = jobRecord.getElementInstanceKey();
if (elementInstanceKey >= 0) {
final DirectBuffer variables = collectVariables(variableNames, elementInstanceKey);
jobRecord.setVariables(variables);
} else {
jobRecord.setVariables(DocumentValue.EMPTY_DOCUMENT);
}

if (remainingAmount >= 0
&& (record.getLength() + jobRecord.getLength()) <= maxJobBatchLength) {

remainingAmount = amount.decrementAndGet();
jobKeyIterator.add().setValue(key);
final JobRecord arrayValueJob = jobIterator.add();

// clone job record since buffer is reused during iteration
final ExpandableArrayBuffer buffer = new ExpandableArrayBuffer(jobRecord.getLength());
jobRecord.write(buffer, 0);
arrayValueJob.wrap(buffer);
} else {
value.setTruncated(true);

if (value.getJobs().isEmpty()) {
raiseIncidentJobTooLargeForMessageSize(key, jobRecord);
}

return false;
}

return remainingAmount > 0;
});
}
final Either<TooLargeJob, Integer> result = jobBatchCollector.collectJobs(record);
final var activatedJobCount = result.getOrElse(0);
result.ifLeft(
largeJob -> raiseIncidentJobTooLargeForMessageSize(largeJob.key(), largeJob.record()));

private DirectBuffer collectVariables(
final Collection<DirectBuffer> variableNames, final long elementInstanceKey) {
final DirectBuffer variables;
if (variableNames.isEmpty()) {
variables = variableState.getVariablesAsDocument(elementInstanceKey);
} else {
variables = variableState.getVariablesAsDocument(elementInstanceKey, variableNames);
}
return variables;
activateJobBatch(record, value, jobBatchKey, activatedJobCount);
}

private void rejectCommand(final TypedRecord<JobBatchRecord> record) {
final RejectionType rejectionType;
final String rejectionReason;

final JobBatchRecord value = record.getValue();

final String format = "Expected to activate job batch with %s to be %s, but it was %s";
final var format = "Expected to activate job batch with %s to be %s, but it was %s";

if (value.getMaxJobsToActivate() < 1) {
rejectionType = RejectionType.INVALID_ARGUMENT;
Expand All @@ -212,18 +119,25 @@ private void rejectCommand(final TypedRecord<JobBatchRecord> record) {
responseWriter.writeRejectionOnCommand(record, rejectionType, rejectionReason);
}

private void raiseIncidentJobTooLargeForMessageSize(final long jobKey, final JobRecord job) {

final String messageSize = ByteValue.prettyPrint(maxRecordLength);
private void activateJobBatch(
final TypedRecord<JobBatchRecord> record,
final JobBatchRecord value,
final long jobBatchKey,
final Integer activatedCount) {
stateWriter.appendFollowUpEvent(jobBatchKey, JobBatchIntent.ACTIVATED, value);
responseWriter.writeEventOnCommand(jobBatchKey, JobBatchIntent.ACTIVATED, value, record);
jobMetrics.jobActivated(value.getType(), activatedCount);
}

private void raiseIncidentJobTooLargeForMessageSize(final long jobKey, final JobRecord job) {
final String messageSize = ByteValue.prettyPrint(stateWriter.getMaxEventLength());
final DirectBuffer incidentMessage =
wrapString(
String.format(
"The job with key '%s' can not be activated because it is larger than the configured message size (%s). "
+ "Try to reduce the size by reducing the number of fetched variables or modifying the variable values.",
jobKey, messageSize));

final IncidentRecord incidentEvent =
final var incidentEvent =
new IncidentRecord()
.setErrorType(ErrorType.MESSAGE_SIZE_EXCEEDED)
.setErrorMessage(incidentMessage)
Expand Down
Loading

0 comments on commit bd35278

Please sign in to comment.