Skip to content

Commit

Permalink
test: introduce mock for lifecycle awareness
Browse files Browse the repository at this point in the history
  • Loading branch information
Zelldon committed Sep 29, 2022
1 parent 473dcc7 commit ae68802
Showing 1 changed file with 14 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.camunda.zeebe.db.ZeebeDb;
Expand All @@ -19,7 +21,6 @@
import io.camunda.zeebe.engine.api.CommandResponseWriter;
import io.camunda.zeebe.engine.api.EmptyProcessingResult;
import io.camunda.zeebe.engine.api.InterPartitionCommandSender;
import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
Expand All @@ -46,7 +47,6 @@
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand All @@ -73,6 +73,7 @@ public final class StreamPlatform {

private final WriteActor writeActor = new WriteActor();
private final ZeebeDbFactory zeebeDbFactory;
private StreamProcessorLifecycleAware mockProcessorLifecycleAware;

public StreamPlatform(
final Path dataDirectory,
Expand Down Expand Up @@ -183,19 +184,16 @@ public StreamProcessor startStreamProcessorNotAwaitOpening() {
return buildStreamProcessor(stream, false);
}

public StreamProcessorLifecycleAware getMockProcessorLifecycleAware() {
return mockProcessorLifecycleAware;
}

public StreamProcessor buildStreamProcessor(
final SynchronousLogStream stream, final boolean awaitOpening) {
final var storage = createRuntimeFolder(stream);
final var snapshot = storage.getParent().resolve(SNAPSHOT_FOLDER);

final var recoveredLatch = new CountDownLatch(1);
final var recoveredAwaiter =
new StreamProcessorLifecycleAware() {
@Override
public void onRecovered(final ReadonlyStreamProcessorContext context) {
recoveredLatch.countDown();
}
};
mockProcessorLifecycleAware = mock(StreamProcessorLifecycleAware.class);

final ZeebeDb<?> zeebeDb;
if (snapshotWasTaken) {
Expand All @@ -215,17 +213,13 @@ public void onRecovered(final ReadonlyStreamProcessorContext context) {
.streamProcessorMode(streamProcessorMode)
.partitionCommandSender(mock(InterPartitionCommandSender.class));

builder.getLifecycleListeners().add(recoveredAwaiter);
builder.getLifecycleListeners().add(mockProcessorLifecycleAware);

final StreamProcessor streamProcessor = builder.build();
final var openFuture = streamProcessor.openAsync(false);

if (awaitOpening) { // and recovery
try {
recoveredLatch.await(15, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
Thread.interrupted();
}
verify(mockProcessorLifecycleAware, timeout(15 * 1000)).onRecovered(any());
}
openFuture.join(15, TimeUnit.SECONDS);

Expand Down Expand Up @@ -281,6 +275,10 @@ public long writeBatch(final RecordToWrite... recordsToWrite) {
return writeActor.submit(batchWriter::tryWrite).join();
}

public void closeStreamProcessor() throws Exception {
processorContext.close();
}

/** Used to run writes within an actor thread. */
private static final class WriteActor extends Actor {
public ActorFuture<Long> submit(final Callable<Long> write) {
Expand Down

0 comments on commit ae68802

Please sign in to comment.