Skip to content

Commit

Permalink
merge: #8832
Browse files Browse the repository at this point in the history
8832: [Backport stable/1.3] Correctly truncate a job activation batch if it will not fit in the dispatcher r=npepinpe a=github-actions[bot]

# Description
Backport of #8799 to `stable/1.3`.

relates to #8797 #5525

Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-cloud[bot] and npepinpe committed Feb 23, 2022
2 parents d4088b8 + 39f2ef7 commit 5cc381c
Show file tree
Hide file tree
Showing 17 changed files with 649 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public static TypedRecordProcessors createEngineProcessors(

final LogStream stream = processingContext.getLogStream();
final int partitionId = stream.getPartitionId();
final int maxFragmentSize = processingContext.getMaxFragmentSize();

final var variablesState = zeebeState.getVariableState();
final var expressionProcessor =
Expand Down Expand Up @@ -128,7 +127,6 @@ public static TypedRecordProcessors createEngineProcessors(
zeebeState,
onJobsAvailableCallback,
eventPublicationBehavior,
maxFragmentSize,
writers,
jobMetrics,
eventTriggerBehavior);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor
writer.appendFollowUpEvent(key, intent, value);
}

@Override
public boolean canWriteEventOfLength(final int eventLength) {
return writer.canWriteEventOfLength(eventLength);
}

@Override
public int getMaxEventLength() {
return writer.getMaxEventLength();
}

@Override
public void appendNewCommand(final Intent intent, final RecordValue value) {
writer.appendNewCommand(intent, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public long getRequestId() {
}

@Override
public long getLength() {
public int getLength() {
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.camunda.zeebe.util.buffer.BufferUtil.wrapString;

import io.camunda.zeebe.engine.metrics.JobMetrics;
import io.camunda.zeebe.engine.processing.job.JobBatchCollector.TooLargeJob;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
Expand All @@ -18,13 +19,7 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.immutable.JobState;
import io.camunda.zeebe.engine.state.immutable.VariableState;
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
import io.camunda.zeebe.msgpack.value.DocumentValue;
import io.camunda.zeebe.msgpack.value.LongValue;
import io.camunda.zeebe.msgpack.value.StringValue;
import io.camunda.zeebe.msgpack.value.ValueArray;
import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
Expand All @@ -33,46 +28,32 @@
import io.camunda.zeebe.protocol.record.intent.JobBatchIntent;
import io.camunda.zeebe.protocol.record.value.ErrorType;
import io.camunda.zeebe.util.ByteValue;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import io.camunda.zeebe.util.Either;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.ObjectHashSet;
import org.agrona.concurrent.UnsafeBuffer;

public final class JobBatchActivateProcessor implements TypedRecordProcessor<JobBatchRecord> {

private final StateWriter stateWriter;
private final VariableState variableState;
private final TypedRejectionWriter rejectionWriter;
private final TypedResponseWriter responseWriter;

private final JobState jobState;
private final JobBatchCollector jobBatchCollector;
private final KeyGenerator keyGenerator;
private final long maxRecordLength;
private final long maxJobBatchLength;

private final ObjectHashSet<DirectBuffer> variableNames = new ObjectHashSet<>();
private final JobMetrics jobMetrics;

public JobBatchActivateProcessor(
final Writers writers,
final ZeebeState state,
final KeyGenerator keyGenerator,
final long maxRecordLength,
final JobMetrics jobMetrics) {

stateWriter = writers.state();
rejectionWriter = writers.rejection();
responseWriter = writers.response();
jobBatchCollector =
new JobBatchCollector(
state.getJobState(), state.getVariableState(), stateWriter::canWriteEventOfLength);

jobState = state.getJobState();
variableState = state.getVariableState();
this.keyGenerator = keyGenerator;

this.maxRecordLength = maxRecordLength;
maxJobBatchLength = maxRecordLength - Long.BYTES;
this.jobMetrics = jobMetrics;
}

Expand All @@ -97,95 +78,21 @@ private boolean isValid(final JobBatchRecord record) {

private void activateJobs(final TypedRecord<JobBatchRecord> record) {
final JobBatchRecord value = record.getValue();

final long jobBatchKey = keyGenerator.nextKey();

final AtomicInteger amount = new AtomicInteger(value.getMaxJobsToActivate());
collectJobsToActivate(record, amount);

stateWriter.appendFollowUpEvent(jobBatchKey, JobBatchIntent.ACTIVATED, value);
responseWriter.writeEventOnCommand(jobBatchKey, JobBatchIntent.ACTIVATED, value, record);

final var activatedJobsCount = record.getValue().getJobKeys().size();
jobMetrics.jobActivated(value.getType(), activatedJobsCount);
}

private void collectJobsToActivate(
final TypedRecord<JobBatchRecord> record, final AtomicInteger amount) {
final JobBatchRecord value = record.getValue();
final ValueArray<JobRecord> jobIterator = value.jobs();
final ValueArray<LongValue> jobKeyIterator = value.jobKeys();

// collect jobs for activation
variableNames.clear();
final ValueArray<StringValue> jobBatchVariables = value.variables();

jobBatchVariables.forEach(
v -> {
final MutableDirectBuffer nameCopy = new UnsafeBuffer(new byte[v.getValue().capacity()]);
nameCopy.putBytes(0, v.getValue(), 0, v.getValue().capacity());
variableNames.add(nameCopy);
});

jobState.forEachActivatableJobs(
value.getTypeBuffer(),
(key, jobRecord) -> {
int remainingAmount = amount.get();
final long deadline = record.getTimestamp() + value.getTimeout();
jobRecord.setDeadline(deadline).setWorker(value.getWorkerBuffer());

// fetch and set variables, required here to already have the full size of the job record
final long elementInstanceKey = jobRecord.getElementInstanceKey();
if (elementInstanceKey >= 0) {
final DirectBuffer variables = collectVariables(variableNames, elementInstanceKey);
jobRecord.setVariables(variables);
} else {
jobRecord.setVariables(DocumentValue.EMPTY_DOCUMENT);
}

if (remainingAmount >= 0
&& (record.getLength() + jobRecord.getLength()) <= maxJobBatchLength) {

remainingAmount = amount.decrementAndGet();
jobKeyIterator.add().setValue(key);
final JobRecord arrayValueJob = jobIterator.add();

// clone job record since buffer is reused during iteration
final ExpandableArrayBuffer buffer = new ExpandableArrayBuffer(jobRecord.getLength());
jobRecord.write(buffer, 0);
arrayValueJob.wrap(buffer);
} else {
value.setTruncated(true);

if (value.getJobs().isEmpty()) {
raiseIncidentJobTooLargeForMessageSize(key, jobRecord);
}

return false;
}

return remainingAmount > 0;
});
}
final Either<TooLargeJob, Integer> result = jobBatchCollector.collectJobs(record);
final var activatedJobCount = result.getOrElse(0);
result.ifLeft(
largeJob -> raiseIncidentJobTooLargeForMessageSize(largeJob.key(), largeJob.record()));

private DirectBuffer collectVariables(
final Collection<DirectBuffer> variableNames, final long elementInstanceKey) {
final DirectBuffer variables;
if (variableNames.isEmpty()) {
variables = variableState.getVariablesAsDocument(elementInstanceKey);
} else {
variables = variableState.getVariablesAsDocument(elementInstanceKey, variableNames);
}
return variables;
activateJobBatch(record, value, jobBatchKey, activatedJobCount);
}

private void rejectCommand(final TypedRecord<JobBatchRecord> record) {
final RejectionType rejectionType;
final String rejectionReason;

final JobBatchRecord value = record.getValue();

final String format = "Expected to activate job batch with %s to be %s, but it was %s";
final var format = "Expected to activate job batch with %s to be %s, but it was %s";

if (value.getMaxJobsToActivate() < 1) {
rejectionType = RejectionType.INVALID_ARGUMENT;
Expand All @@ -212,18 +119,25 @@ private void rejectCommand(final TypedRecord<JobBatchRecord> record) {
responseWriter.writeRejectionOnCommand(record, rejectionType, rejectionReason);
}

private void raiseIncidentJobTooLargeForMessageSize(final long jobKey, final JobRecord job) {

final String messageSize = ByteValue.prettyPrint(maxRecordLength);
private void activateJobBatch(
final TypedRecord<JobBatchRecord> record,
final JobBatchRecord value,
final long jobBatchKey,
final Integer activatedCount) {
stateWriter.appendFollowUpEvent(jobBatchKey, JobBatchIntent.ACTIVATED, value);
responseWriter.writeEventOnCommand(jobBatchKey, JobBatchIntent.ACTIVATED, value, record);
jobMetrics.jobActivated(value.getType(), activatedCount);
}

private void raiseIncidentJobTooLargeForMessageSize(final long jobKey, final JobRecord job) {
final String messageSize = ByteValue.prettyPrint(stateWriter.getMaxEventLength());
final DirectBuffer incidentMessage =
wrapString(
String.format(
"The job with key '%s' can not be activated because it is larger than the configured message size (%s). "
+ "Try to reduce the size by reducing the number of fetched variables or modifying the variable values.",
jobKey, messageSize));

final IncidentRecord incidentEvent =
final var incidentEvent =
new IncidentRecord()
.setErrorType(ErrorType.MESSAGE_SIZE_EXCEEDED)
.setErrorMessage(incidentMessage)
Expand Down

0 comments on commit 5cc381c

Please sign in to comment.