diff --git a/engine/src/main/java/io/camunda/zeebe/engine/api/ReadonlyStreamProcessorContext.java b/engine/src/main/java/io/camunda/zeebe/engine/api/ReadonlyStreamProcessorContext.java index 05f881ee2117..723bf88c935b 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/api/ReadonlyStreamProcessorContext.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/api/ReadonlyStreamProcessorContext.java @@ -7,18 +7,10 @@ */ package io.camunda.zeebe.engine.api; -import io.camunda.zeebe.logstreams.log.LogStream; - public interface ReadonlyStreamProcessorContext { ProcessingScheduleService getScheduleService(); - /** - * @return the logstream, on which the processor runs - */ - @Deprecated // only used in EngineRule; TODO remove this - LogStream getLogStream(); - /** * Returns the partition ID * diff --git a/engine/src/main/java/io/camunda/zeebe/streamprocessor/ProcessingScheduleServiceImpl.java b/engine/src/main/java/io/camunda/zeebe/streamprocessor/ProcessingScheduleServiceImpl.java index 06c551737847..31f9a17263bf 100644 --- a/engine/src/main/java/io/camunda/zeebe/streamprocessor/ProcessingScheduleServiceImpl.java +++ b/engine/src/main/java/io/camunda/zeebe/streamprocessor/ProcessingScheduleServiceImpl.java @@ -81,7 +81,7 @@ private void useActorControl(final Runnable task) { LOG.debug("ProcessingScheduleService hasn't been opened yet, ignore scheduled task."); return; } - actorControl.submit(task); + task.run(); } public ActorFuture open(final ActorControl control) { diff --git a/engine/src/main/java/io/camunda/zeebe/streamprocessor/StreamProcessorContext.java b/engine/src/main/java/io/camunda/zeebe/streamprocessor/StreamProcessorContext.java index 19d214ab970e..c5de95c58bfb 100644 --- a/engine/src/main/java/io/camunda/zeebe/streamprocessor/StreamProcessorContext.java +++ b/engine/src/main/java/io/camunda/zeebe/streamprocessor/StreamProcessorContext.java @@ -71,16 +71,15 @@ public ProcessingScheduleService getScheduleService() { return processingScheduleService; } - @Override - public LogStream getLogStream() { - return logStream; - } - @Override public int getPartitionId() { return getLogStream().getPartitionId(); } + public LogStream getLogStream() { + return logStream; + } + public MutableLastProcessedPositionState getLastProcessedPositionState() { return lastProcessedPositionState; } diff --git a/engine/src/test/java/io/camunda/zeebe/engine/util/EngineRule.java b/engine/src/test/java/io/camunda/zeebe/engine/util/EngineRule.java index b6f7ef6eb677..3a818f29d129 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/util/EngineRule.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/util/EngineRule.java @@ -32,11 +32,11 @@ import io.camunda.zeebe.engine.util.client.ProcessInstanceClient; import io.camunda.zeebe.engine.util.client.PublishMessageClient; import io.camunda.zeebe.engine.util.client.VariableClient; -import io.camunda.zeebe.logstreams.log.LogStream; import io.camunda.zeebe.logstreams.log.LogStreamReader; import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter; import io.camunda.zeebe.logstreams.log.LoggedEvent; import io.camunda.zeebe.logstreams.util.ListLogStorage; +import io.camunda.zeebe.logstreams.util.SynchronousLogStream; import io.camunda.zeebe.model.bpmn.Bpmn; import io.camunda.zeebe.protocol.Protocol; import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter; @@ -69,6 +69,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -195,7 +197,9 @@ private void startProcessors() { partitionId, interPartitionCommandSender), jobsAvailableCallback, featureFlags) - .withListener(new ProcessingExporterTransistor()) + .withListener( + new ProcessingExporterTransistor( + environmentRule.getLogStream(partitionId))) .withListener(reprocessingCompletedListener), Optional.of( new StreamProcessorListener() { @@ -423,28 +427,43 @@ private static class ProcessingExporterTransistor implements StreamProcessorLife private LogStreamReader logStreamReader; private TypedRecordImpl typedEvent; + private final SynchronousLogStream synchronousLogStream; + private final ExecutorService executorService; + + public ProcessingExporterTransistor(final SynchronousLogStream synchronousLogStream) { + this.synchronousLogStream = synchronousLogStream; + executorService = Executors.newSingleThreadExecutor(); + } @Override public void onRecovered(final ReadonlyStreamProcessorContext context) { - final int partitionId = context.getPartitionId(); - typedEvent = new TypedRecordImpl(partitionId); - final var scheduleService = context.getScheduleService(); - - final LogStream logStream = context.getLogStream(); - logStream.registerRecordAvailableListener( - () -> scheduleService.runDelayed(Duration.ZERO, this::onNewEventCommitted)); - logStream - .newLogStreamReader() - .onComplete( - ((reader, throwable) -> { - if (throwable == null) { - logStreamReader = reader; - onNewEventCommitted(); - } - })); + executorService.submit( + () -> { + final int partitionId = context.getPartitionId(); + typedEvent = new TypedRecordImpl(partitionId); + final var asyncLogStream = synchronousLogStream.getAsyncLogStream(); + asyncLogStream.registerRecordAvailableListener(this::onNewEventCommitted); + logStreamReader = asyncLogStream.newLogStreamReader().join(); + exportEvents(); + }); + } + + @Override + public void onClose() { + executorService.shutdownNow(); } private void onNewEventCommitted() { + // this is called from outside (LogStream), so we need to enqueue the task + if (executorService.isShutdown()) { + return; + } + + executorService.submit(this::exportEvents); + } + + private void exportEvents() { + // we need to skip until onRecovered happened if (logStreamReader == null) { return; } diff --git a/engine/src/test/java/io/camunda/zeebe/engine/util/StreamProcessingComposite.java b/engine/src/test/java/io/camunda/zeebe/engine/util/StreamProcessingComposite.java index fb34563c5798..84dae5a54138 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/util/StreamProcessingComposite.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/util/StreamProcessingComposite.java @@ -15,6 +15,7 @@ import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors; import io.camunda.zeebe.engine.state.mutable.MutableZeebeState; import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter; +import io.camunda.zeebe.logstreams.util.SynchronousLogStream; import io.camunda.zeebe.msgpack.UnpackedObject; import io.camunda.zeebe.protocol.record.RecordType; import io.camunda.zeebe.protocol.record.intent.Intent; @@ -56,6 +57,10 @@ public LogStreamRecordWriter getLogStreamRecordWriter(final int partitionId) { return streams.getLogStreamRecordWriter(logName); } + public SynchronousLogStream getLogStream(final int partitionId) { + return streams.getLogStream(getLogName(partitionId)); + } + public LogStreamRecordWriter newLogStreamRecordWriter(final int partitionId) { final String logName = getLogName(partitionId); return streams.newLogStreamRecordWriter(logName); diff --git a/engine/src/test/java/io/camunda/zeebe/engine/util/StreamProcessorRule.java b/engine/src/test/java/io/camunda/zeebe/engine/util/StreamProcessorRule.java index ed2478656465..a4511ff08f55 100755 --- a/engine/src/test/java/io/camunda/zeebe/engine/util/StreamProcessorRule.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/util/StreamProcessorRule.java @@ -192,6 +192,10 @@ public StreamProcessor getStreamProcessor(final int partitionId) { return streamProcessingComposite.getStreamProcessor(partitionId); } + public SynchronousLogStream getLogStream(final int partitionId) { + return streamProcessingComposite.getLogStream(partitionId); + } + public CommandResponseWriter getCommandResponseWriter() { return streams.getMockedResponseWriter(); } diff --git a/engine/src/test/java/io/camunda/zeebe/streamprocessor/ProcessingScheduleServiceIntegrationTest.java b/engine/src/test/java/io/camunda/zeebe/streamprocessor/ProcessingScheduleServiceIntegrationTest.java deleted file mode 100644 index f5499197ba04..000000000000 --- a/engine/src/test/java/io/camunda/zeebe/streamprocessor/ProcessingScheduleServiceIntegrationTest.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under - * one or more contributor license agreements. See the NOTICE file distributed - * with this work for additional information regarding copyright ownership. - * Licensed under the Zeebe Community License 1.1. You may not use this file - * except in compliance with the Zeebe Community License 1.1. - */ -package io.camunda.zeebe.streamprocessor; - -import static io.camunda.zeebe.engine.util.RecordToWrite.command; -import static io.camunda.zeebe.engine.util.RecordToWrite.event; -import static io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent.ACTIVATE_ELEMENT; -import static io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent.ELEMENT_ACTIVATING; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; - -import io.camunda.zeebe.engine.api.EmptyProcessingResult; -import io.camunda.zeebe.engine.api.ProcessingResult; -import io.camunda.zeebe.engine.api.ProcessingResultBuilder; -import io.camunda.zeebe.engine.api.ProcessingScheduleService; -import io.camunda.zeebe.engine.api.RecordProcessor; -import io.camunda.zeebe.engine.api.RecordProcessorContext; -import io.camunda.zeebe.engine.api.Task; -import io.camunda.zeebe.engine.api.TaskResult; -import io.camunda.zeebe.engine.api.TaskResultBuilder; -import io.camunda.zeebe.engine.api.TypedRecord; -import io.camunda.zeebe.engine.api.records.RecordBatch; -import io.camunda.zeebe.engine.util.Records; -import io.camunda.zeebe.engine.util.StreamPlatform; -import io.camunda.zeebe.engine.util.StreamPlatformExtension; -import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord; -import io.camunda.zeebe.protocol.record.ValueType; -import io.camunda.zeebe.scheduler.clock.ControlledActorClock; -import java.time.Duration; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mockito; -import org.mockito.verification.VerificationWithTimeout; - -@ExtendWith(StreamPlatformExtension.class) -public class ProcessingScheduleServiceIntegrationTest { - - private static final long TIMEOUT_MILLIS = 2_000L; - private static final VerificationWithTimeout TIMEOUT = timeout(TIMEOUT_MILLIS); - - private static final ProcessInstanceRecord RECORD = Records.processInstance(1); - - @SuppressWarnings("unused") // injected by the extension - private StreamPlatform streamPlatform; - - @SuppressWarnings("unused") // injected by the extension - private ControlledActorClock clock; - - private DummyProcessor dummyProcessor; - - @BeforeEach - public void before() { - dummyProcessor = new DummyProcessor(); - } - - @AfterEach - public void clean() { - dummyProcessor.continueReplay(); - streamPlatform = null; - } - - @Test - public void shouldExecuteScheduledTask() { - // given - streamPlatform.withRecordProcessors(List.of(dummyProcessor)).startStreamProcessor(); - final var mockedTask = spy(new DummyTask()); - - // when - dummyProcessor.scheduleService.runDelayed(Duration.ZERO, mockedTask); - - // then - verify(mockedTask, TIMEOUT).execute(any()); - } - - @Test - public void shouldNotExecuteScheduledTaskIfOnReplay() { - // given - dummyProcessor.blockReplay(); - streamPlatform.writeBatch( - command().processInstance(ACTIVATE_ELEMENT, RECORD), - event().processInstance(ELEMENT_ACTIVATING, RECORD).causedBy(0)); - streamPlatform - .withRecordProcessors(List.of(dummyProcessor)) - .startStreamProcessorNotAwaitOpening(); - final var mockedTask = spy(new DummyTask()); - - // when - dummyProcessor.scheduleService.runDelayed(Duration.ZERO, mockedTask); - - // then - verify(mockedTask, never()).execute(any()); - } - - @Test - public void shouldNotExecuteTaskWhichAreScheduledDuringReplay() { - // given - final var processor = spy(dummyProcessor); - processor.blockReplay(); - streamPlatform.writeBatch( - command().processInstance(ACTIVATE_ELEMENT, RECORD), - event().processInstance(ELEMENT_ACTIVATING, RECORD).causedBy(0)); - streamPlatform.withRecordProcessors(List.of(processor)).startStreamProcessorNotAwaitOpening(); - final var mockedTask = spy(new DummyTask()); - - // when - processor.scheduleService.runDelayed(Duration.ZERO, mockedTask); - processor.continueReplay(); - - // then - verify(processor, TIMEOUT).init(any()); - verify(processor, TIMEOUT).replay(any()); - verify(mockedTask, never()).execute(any()); - } - - @Test - public void shouldNotExecuteScheduledTaskIfSuspended() { - // given - streamPlatform.withRecordProcessors(List.of(dummyProcessor)).startStreamProcessor(); - streamPlatform.pauseProcessing(); - final var mockedTask = spy(new DummyTask()); - - // when - dummyProcessor.scheduleService.runDelayed(Duration.ZERO, mockedTask); - - // then - verify(mockedTask, never()).execute(any()); - } - - @Test - public void shouldExecuteScheduledTaskAfterResumed() { - // given - streamPlatform.withRecordProcessors(List.of(dummyProcessor)).startStreamProcessor(); - streamPlatform.pauseProcessing(); - final var mockedTask = spy(new DummyTask()); - - // when - dummyProcessor.scheduleService.runDelayed(Duration.ZERO, mockedTask); - streamPlatform.resumeProcessing(); - - // then - verify(mockedTask, TIMEOUT).execute(any()); - } - - @Test - public void shouldWriteRecordAfterTaskWasExecuted() { - // given - final var dummyProcessorSpy = spy(dummyProcessor); - streamPlatform.withRecordProcessors(List.of(dummyProcessorSpy)).startStreamProcessor(); - - // when - dummyProcessorSpy.scheduleService.runDelayed( - Duration.ZERO, - (builder) -> { - builder.appendCommandRecord(1, ACTIVATE_ELEMENT, RECORD); - return builder.build(); - }); - - // then - verify(dummyProcessorSpy, TIMEOUT) - .process(Mockito.argThat(record -> record.getKey() == 1), any()); - } - - @Test - public void shouldScheduleOnFixedRate() { - // given - final var dummyProcessorSpy = spy(dummyProcessor); - streamPlatform.withRecordProcessors(List.of(dummyProcessorSpy)).startStreamProcessor(); - - // when - dummyProcessorSpy.scheduleService.runAtFixedRate( - Duration.ofMillis(100), - (builder) -> { - builder.appendCommandRecord(1, ACTIVATE_ELEMENT, RECORD); - return builder.build(); - }); - - // then - verify(dummyProcessorSpy, TIMEOUT.times(5)) - .process(Mockito.argThat(record -> record.getKey() == 1), any()); - } - - private static final class DummyTask implements Task { - @Override - public TaskResult execute(final TaskResultBuilder taskResultBuilder) { - return RecordBatch::empty; - } - } - - private static final class DummyProcessor implements RecordProcessor { - - private ProcessingScheduleService scheduleService; - private CountDownLatch replayLatch; - - @Override - public void init(final RecordProcessorContext recordProcessorContext) { - scheduleService = recordProcessorContext.getScheduleService(); - } - - @Override - public boolean accepts(final ValueType valueType) { - return true; - } - - @Override - public void replay(final TypedRecord record) { - if (replayLatch != null) { - try { - replayLatch.await(); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - } - } - - @Override - public ProcessingResult process( - final TypedRecord record, final ProcessingResultBuilder processingResultBuilder) { - return EmptyProcessingResult.INSTANCE; - } - - @Override - public ProcessingResult onProcessingError( - final Throwable processingException, - final TypedRecord record, - final ProcessingResultBuilder processingResultBuilder) { - return EmptyProcessingResult.INSTANCE; - } - - public void blockReplay() { - replayLatch = new CountDownLatch(1); - } - - public void continueReplay() { - if (replayLatch != null) { - replayLatch.countDown(); - } - } - } -}