Skip to content

Commit

Permalink
merge: #10888
Browse files Browse the repository at this point in the history
10888: Migrate the TypedStreamProcessorTest r=Zelldon a=Zelldon

## Description

* Rewrote [shouldWriteSourceEventAndProducerOnBatch](dbc19f6) test and move it to StreamProcessorTest
* Extend StreamProcessorTest:
  * Add test for writing rejection on error handling
  * Split a test such that we test separately whether processing can continue and we can write a response on error
* [iterate over TypedStreamProcessorTest](f6aa792)
  * The test makes still sense from the engine perspective since we can verify whether commands which fail to process are automatically rejected etc.
  * 
<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related to #10455 



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Nov 3, 2022
2 parents c5e4901 + e9cf0a1 commit 7447921
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,18 @@

import static io.camunda.zeebe.util.buffer.BufferUtil.wrapString;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.camunda.zeebe.engine.api.CommandResponseWriter;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.util.RecordStream;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.TestStreams;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
Expand All @@ -34,19 +30,17 @@
import io.camunda.zeebe.scheduler.testing.ActorSchedulerRule;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.test.util.TestUtil;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.mockito.MockitoAnnotations;

public final class TypedStreamProcessorTest {
public final class EngineErrorHandlingTest {

private static final String STREAM_NAME = "foo";
protected SynchronousLogStream stream;

private final TemporaryFolder tempFolder = new TemporaryFolder();
private final AutoCloseableRule closeables = new AutoCloseableRule();
private final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule();
Expand All @@ -61,54 +55,16 @@ public final class TypedStreamProcessorTest {

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);

streams = new TestStreams(tempFolder, closeables, actorSchedulerRule.get());
mockCommandResponseWriter = streams.getMockedResponseWriter();
stream = streams.createLogStream(STREAM_NAME);
streams.createLogStream(STREAM_NAME);

final AtomicLong key = new AtomicLong();
keyGenerator = () -> key.getAndIncrement();
}

@Test
public void shouldWriteSourceEventAndProducerOnBatch() {
// given
streams.startStreamProcessor(
STREAM_NAME,
DefaultZeebeDbFactory.defaultFactory(),
(processingContext) ->
TypedRecordProcessors.processors(keyGenerator, processingContext.getWriters())
.onCommand(
ValueType.DEPLOYMENT,
DeploymentIntent.CREATE,
new BatchProcessor(processingContext.getWriters())));
final long firstEventPosition =
streams
.newRecord(STREAM_NAME)
.event(deployment("foo"))
.recordType(RecordType.COMMAND)
.intent(DeploymentIntent.CREATE)
.write();

// when
final LoggedEvent writtenEvent =
TestUtil.doRepeatedly(
() ->
streams
.events(STREAM_NAME)
.filter(
e -> Records.isEvent(e, ValueType.DEPLOYMENT, DeploymentIntent.CREATED))
.findFirst())
.until(o -> o.isPresent())
.get();

// then
assertThat(writtenEvent.getSourceEventPosition()).isEqualTo(firstEventPosition);
}

@Test
public void shouldSkipFailingEvent() {
public void shouldAutoRejectCommandOnProcessingFailure() {
// given
streams.startStreamProcessor(
STREAM_NAME,
Expand All @@ -119,20 +75,6 @@ public void shouldSkipFailingEvent() {
ValueType.DEPLOYMENT,
DeploymentIntent.CREATE,
new ErrorProneProcessor(processingContext.getWriters())));
final AtomicLong requestId = new AtomicLong(0);
final AtomicInteger requestStreamId = new AtomicInteger(0);

when(mockCommandResponseWriter.tryWriteResponse(anyInt(), anyLong()))
.then(
(invocationOnMock -> {
final int streamIdArg = invocationOnMock.getArgument(0);
final long requestIdArg = invocationOnMock.getArgument(1);

requestId.set(requestIdArg);
requestStreamId.set(streamIdArg);

return true;
}));

final long failingKey = keyGenerator.nextKey();
streams
Expand Down Expand Up @@ -170,10 +112,7 @@ public void shouldSkipFailingEvent() {
assertThat(writtenEvent.getSourceEventPosition()).isEqualTo(secondEventPosition);

// error response
verify(mockCommandResponseWriter).tryWriteResponse(anyInt(), anyLong());

assertThat(requestId.get()).isEqualTo(255L);
assertThat(requestStreamId.get()).isEqualTo(99);
verify(mockCommandResponseWriter).tryWriteResponse(eq(99), eq(255L));

final Record<DeploymentRecord> deploymentRejection =
new RecordStream(streams.events(STREAM_NAME))
Expand All @@ -186,7 +125,7 @@ public void shouldSkipFailingEvent() {
assertThat(deploymentRejection.getRejectionType()).isEqualTo(RejectionType.PROCESSING_ERROR);
}

protected DeploymentRecord deployment(final String name) {
DeploymentRecord deployment(final String name) {
final DeploymentRecord event = new DeploymentRecord();
event.resources().add().setResource(wrapString("foo")).setResourceName(wrapString(name));
return event;
Expand All @@ -210,19 +149,4 @@ public void processRecord(final TypedRecord<DeploymentRecord> record) {
.appendFollowUpEvent(record.getKey(), DeploymentIntent.CREATED, record.getValue());
}
}

protected class BatchProcessor implements TypedRecordProcessor<DeploymentRecord> {

private final StateWriter stateWriter;

public BatchProcessor(final Writers writers) {
stateWriter = writers.state();
}

@Override
public void processRecord(final TypedRecord<DeploymentRecord> record) {
stateWriter.appendFollowUpEvent(
keyGenerator.nextKey(), DeploymentIntent.CREATED, record.getValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static io.camunda.zeebe.engine.util.RecordToWrite.event;
import static io.camunda.zeebe.engine.util.RecordToWrite.rejection;
import static io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent.ACTIVATE_ELEMENT;
import static io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent.COMPLETE_ELEMENT;
import static io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent.ELEMENT_ACTIVATING;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.awaitility.Awaitility.await;
Expand All @@ -37,6 +38,7 @@
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.state.processing.DbKeyGenerator;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
Expand Down Expand Up @@ -257,6 +259,39 @@ public void shouldWriteFollowUpEventsAndCommands() {
.isEqualTo(3));
}

@Test
public void shouldSetSourcePointerForFollowUpRecords() {
// given
final var defaultRecordProcessor = streamPlatform.getDefaultMockedRecordProcessor();
final var resultBuilder = new BufferedProcessingResultBuilder((c, v) -> true);
resultBuilder.appendRecordReturnEither(
1, RecordType.EVENT, ACTIVATE_ELEMENT, RejectionType.NULL_VAL, "", RECORD);
resultBuilder.appendRecordReturnEither(
2, RecordType.COMMAND, COMPLETE_ELEMENT, RejectionType.NULL_VAL, "", RECORD);

when(defaultRecordProcessor.process(any(), any()))
.thenReturn(resultBuilder.build())
.thenReturn(EmptyProcessingResult.INSTANCE);

streamPlatform.startStreamProcessor();

// when
streamPlatform.writeBatch(command().processInstance(ACTIVATE_ELEMENT, RECORD));

// then
verify(defaultRecordProcessor, TIMEOUT.times(2)).process(any(), any());

final var logStreamReader = streamPlatform.getLogStream().newLogStreamReader();
logStreamReader.seekToFirstEvent();
final var firstRecord = logStreamReader.next();
assertThat(firstRecord.getSourceEventPosition()).isEqualTo(-1);
final var firstRecordPosition = firstRecord.getPosition();
while (logStreamReader.hasNext()) {
final var followUpRecord = logStreamReader.next();
assertThat(followUpRecord.getSourceEventPosition()).isEqualTo(firstRecordPosition);
}
}

@Test
public void shouldExecutePostCommitTask() {
// given
Expand Down Expand Up @@ -520,9 +555,7 @@ public void shouldWriteResponse() {
public void shouldWriteResponseOnFailedEventProcessing() {
// given
final var defaultMockedRecordProcessor = streamPlatform.getDefaultMockedRecordProcessor();
when(defaultMockedRecordProcessor.process(any(), any()))
.thenThrow(new RuntimeException())
.thenReturn(EmptyProcessingResult.INSTANCE);
when(defaultMockedRecordProcessor.process(any(), any())).thenThrow(new RuntimeException());

final var resultBuilder = new BufferedProcessingResultBuilder((c, s) -> true);
resultBuilder.withResponse(
Expand All @@ -536,18 +569,15 @@ public void shouldWriteResponseOnFailedEventProcessing() {
1,
12);
when(defaultMockedRecordProcessor.onProcessingError(any(), any(), any()))
.thenReturn(resultBuilder.build())
.thenReturn(EmptyProcessingResult.INSTANCE);
.thenReturn(resultBuilder.build());

streamPlatform.startStreamProcessor();

// when
streamPlatform.writeBatch(
command().processInstance(ACTIVATE_ELEMENT, RECORD),
command().processInstance(ACTIVATE_ELEMENT, RECORD));
streamPlatform.writeBatch(command().processInstance(ACTIVATE_ELEMENT, RECORD));

// then
verify(defaultMockedRecordProcessor, TIMEOUT.times(2)).process(any(), any());
verify(defaultMockedRecordProcessor, TIMEOUT.times(1)).process(any(), any());
verify(defaultMockedRecordProcessor, TIMEOUT.times(1)).onProcessingError(any(), any(), any());

final var commandResponseWriter = streamPlatform.getMockCommandResponseWriter();
Expand All @@ -560,6 +590,58 @@ public void shouldWriteResponseOnFailedEventProcessing() {
verify(commandResponseWriter, TIMEOUT.times(1)).tryWriteResponse(anyInt(), anyLong());
}

@Test
public void shouldContinueProcessingAfterFailedProcessing() {
// given
final var defaultMockedRecordProcessor = streamPlatform.getDefaultMockedRecordProcessor();
when(defaultMockedRecordProcessor.process(any(), any()))
.thenThrow(new RuntimeException())
.thenReturn(EmptyProcessingResult.INSTANCE);

streamPlatform.startStreamProcessor();

// when
streamPlatform.writeBatch(
command().processInstance(ACTIVATE_ELEMENT, RECORD),
command().processInstance(ACTIVATE_ELEMENT, RECORD));

// then
verify(defaultMockedRecordProcessor, TIMEOUT.times(2)).process(any(), any());
verify(defaultMockedRecordProcessor, TIMEOUT.times(1)).onProcessingError(any(), any(), any());
}

@Test
public void shouldBeAbleToWriteRejectionOnErrorHandling() {
// given
final var defaultMockedRecordProcessor = streamPlatform.getDefaultMockedRecordProcessor();
when(defaultMockedRecordProcessor.process(any(), any())).thenThrow(new RuntimeException());

final var resultBuilder = new BufferedProcessingResultBuilder((c, s) -> true);
resultBuilder.appendRecordReturnEither(
1, RecordType.COMMAND_REJECTION, ACTIVATE_ELEMENT, RejectionType.NULL_VAL, "", RECORD);
when(defaultMockedRecordProcessor.onProcessingError(any(), any(), any()))
.thenReturn(resultBuilder.build());

streamPlatform.startStreamProcessor();

// when
streamPlatform.writeBatch(command().processInstance(ACTIVATE_ELEMENT, RECORD));

// then
verify(defaultMockedRecordProcessor, TIMEOUT.times(1)).process(any(), any());
verify(defaultMockedRecordProcessor, TIMEOUT.times(1)).onProcessingError(any(), any(), any());

final var logStreamReader = streamPlatform.getLogStream().newLogStreamReader();
logStreamReader.seekToFirstEvent();
logStreamReader.next(); // command
assertThat(logStreamReader.hasNext()).isTrue(); // rejection
final var record = logStreamReader.next();
final var recordMetadata = new RecordMetadata();
record.readMetadata(recordMetadata);
assertThat(recordMetadata.getRecordType()).isEqualTo(RecordType.COMMAND_REJECTION);
assertThat(record.getSourceEventPosition()).isEqualTo(1);
}

@Test
public void shouldInvokeOnProcessedListenerWhenReturnResult() {
// given
Expand Down

0 comments on commit 7447921

Please sign in to comment.