From 62378bea886a40e4ae409976940326e4b432c6fc Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 17 Jan 2022 14:53:14 +0100 Subject: [PATCH] fix(log/stream): ensure the appender future always gets completed --- .../logstreams/impl/log/LogStreamImpl.java | 189 ++++++++++-------- .../logstreams/log/LogStreamErrorTest.java | 15 +- 2 files changed, 124 insertions(+), 80 deletions(-) diff --git a/logstreams/src/main/java/io/camunda/zeebe/logstreams/impl/log/LogStreamImpl.java b/logstreams/src/main/java/io/camunda/zeebe/logstreams/impl/log/LogStreamImpl.java index 646128e9a44b..b201c102c330 100644 --- a/logstreams/src/main/java/io/camunda/zeebe/logstreams/impl/log/LogStreamImpl.java +++ b/logstreams/src/main/java/io/camunda/zeebe/logstreams/impl/log/LogStreamImpl.java @@ -9,6 +9,7 @@ import io.camunda.zeebe.dispatcher.Dispatcher; import io.camunda.zeebe.dispatcher.Dispatchers; +import io.camunda.zeebe.dispatcher.Subscription; import io.camunda.zeebe.logstreams.impl.Loggers; import io.camunda.zeebe.logstreams.log.LogRecordAwaiter; import io.camunda.zeebe.logstreams.log.LogStream; @@ -135,11 +136,29 @@ public ActorFuture closeAsync() { @Override protected void handleFailure(final Throwable failure) { + onFailure(failure); + } + + private void onFailure(final Throwable failure) { + LOG.error( + "Unexpected error in Log Stream {} in phase {}.", + getName(), + actor.getLifecyclePhase(), + failure); + + if (appenderFuture != null && !appenderFuture.isDone()) { + // the appender future is not done yet -> + // log stream was currently trying to + // open the log storage appender but did + // not succeed yet + appenderFuture.completeExceptionally(failure); + } + if (failure instanceof UnrecoverableException) { onUnrecoverableFailure(HealthReport.dead(this).withIssue(failure)); + } else { + onFailure(HealthReport.unhealthy(this).withIssue(failure)); } - - super.handleFailure(failure); } @Override @@ -159,25 +178,23 @@ public ActorFuture newLogStreamReader() { @Override public ActorFuture newLogStreamRecordWriter() { - // this should be replaced after refactoring the actor control - if (actor.isClosed()) { - return CompletableActorFuture.completedExceptionally(new RuntimeException("Actor is closed")); - } - - final var writerFuture = new CompletableActorFuture(); - actor.run(() -> createWriter(writerFuture, LogStreamWriterImpl::new)); - return writerFuture; + return newLogStreamWriter(LogStreamWriterImpl::new); } @Override public ActorFuture newLogStreamBatchWriter() { + return newLogStreamWriter(LogStreamBatchWriterImpl::new); + } + + private ActorFuture newLogStreamWriter( + final WriterCreator logStreamCreator) { // this should be replaced after refactoring the actor control if (actor.isClosed()) { return CompletableActorFuture.completedExceptionally(new RuntimeException("Actor is closed")); } - final var writerFuture = new CompletableActorFuture(); - actor.run(() -> createWriter(writerFuture, LogStreamBatchWriterImpl::new)); + final var writerFuture = new CompletableActorFuture(); + actor.run(() -> createWriter(writerFuture, logStreamCreator)); return writerFuture; } @@ -208,13 +225,13 @@ private LogStreamReader createLogStreamReader() { private void createWriter( final CompletableActorFuture writerFuture, final WriterCreator creator) { + + final var onOpenAppenderConsumer = onOpenAppender(writerFuture, creator); + if (appender != null) { - writerFuture.complete(creator.create(partitionId, writeBuffer)); - } else if (appenderFuture != null) { - appenderFuture.onComplete(onOpenAppender(writerFuture, creator)); + onOpenAppenderConsumer.accept(appender, null); } else { - appenderFuture = new CompletableActorFuture<>(); - openAppender().onComplete(onOpenAppender(writerFuture, creator)); + openAppender().onComplete(onOpenAppenderConsumer); } } @@ -231,17 +248,29 @@ private BiConsumer on private ActorFuture closeAppender() { final var closeAppenderFuture = new CompletableActorFuture(); - if (appender == null) { - closeAppenderFuture.complete(null); - return closeAppenderFuture; - } - appenderFuture = null; LOG.info("Close appender for log stream {}", logName); + final var toCloseAppender = appender; final var toCloseWriteBuffer = writeBuffer; + final var toCompleteExceptionallyAppenderFuture = appenderFuture; + appender = null; writeBuffer = null; + appenderFuture = null; + + if (toCompleteExceptionallyAppenderFuture != null + && !toCompleteExceptionallyAppenderFuture.isDone()) { + // while opening the appender, a close signal is received + toCompleteExceptionallyAppenderFuture.completeExceptionally( + new LogStorageAppenderClosedException()); + } + + if (toCloseAppender == null) { + closeAppenderFuture.complete(null); + return closeAppenderFuture; + } + toCloseAppender .closeAsync() .onComplete( @@ -252,81 +281,77 @@ private ActorFuture closeAppender() { closeAppenderFuture.completeExceptionally(t); } }); + return closeAppenderFuture; } private ActorFuture openAppender() { - try { - tryOpenAppender(); - } catch (final Exception error) { - onOpenAppenderFailed(error); + if (appenderFuture != null) { + return appenderFuture; } + + appenderFuture = new CompletableActorFuture<>(); + actor.run( + () -> { + final var initialPosition = getWriteBuffersInitialPosition(); + writeBuffer = createAndScheduleWriteBuffer(initialPosition); + + writeBuffer + .openSubscriptionAsync(APPENDER_SUBSCRIPTION_NAME) + .onComplete( + (subscription, throwable) -> { + if (throwable == null) { + createAndScheduleLogStorageAppender(subscription) + .onComplete( + (v, t) -> { + if (t != null) { + onFailure(t); + } else { + appenderFuture.complete(appender); + appender.addFailureListener(this); + } + }); + } else { + onFailure(throwable); + } + }); + }); + return appenderFuture; } - private void tryOpenAppender() { - final long lastPosition; - try { - lastPosition = getLastPosition(); - } catch (final UnrecoverableException error) { - LOG.error("Unexpected error when opening appender", error); - onUnrecoverableFailure(HealthReport.dead(this).withIssue(error)); - appenderFuture.completeExceptionally(error); - return; - } + private long getWriteBuffersInitialPosition() { + final var lastPosition = getLastCommittedPosition(); + long initialPosition = 1; - final long initialPosition; if (lastPosition > 0) { initialPosition = lastPosition + 1; - } else { - initialPosition = 1; } - writeBuffer = - Dispatchers.create(buildActorName(nodeId, "dispatcher", partitionId)) - .maxFragmentLength(maxFrameLength) - .initialPosition(initialPosition) - .name(logName + "-write-buffer") - .actorSchedulingService(actorSchedulingService) - .build(); + return initialPosition; + } - writeBuffer - .openSubscriptionAsync(APPENDER_SUBSCRIPTION_NAME) - .onComplete( - (subscription, throwable) -> { - if (throwable == null) { - appender = - new LogStorageAppender( - buildActorName(nodeId, "LogAppender", partitionId), - partitionId, - logStorage, - subscription, - maxFrameLength); - - actorSchedulingService - .submitActor(appender) - .onComplete( - (v, t) -> { - if (t != null) { - onOpenAppenderFailed(t); - } else { - appenderFuture.complete(appender); - appender.addFailureListener(this); - } - }); - } else { - onOpenAppenderFailed(throwable); - } - }); + private Dispatcher createAndScheduleWriteBuffer(final long initialPosition) { + return Dispatchers.create(buildActorName(nodeId, "dispatcher", partitionId)) + .maxFragmentLength(maxFrameLength) + .initialPosition(initialPosition) + .name(logName + "-write-buffer") + .actorSchedulingService(actorSchedulingService) + .build(); } - private void onOpenAppenderFailed(final Throwable error) { - LOG.error("Unexpected error when opening appender", error); - appenderFuture.completeExceptionally(error); - onFailure(HealthReport.unhealthy(this).withIssue(error)); + private ActorFuture createAndScheduleLogStorageAppender(final Subscription subscription) { + appender = + new LogStorageAppender( + buildActorName(nodeId, "LogAppender", partitionId), + partitionId, + logStorage, + subscription, + maxFrameLength); + return actorSchedulingService.submitActor(appender); } - private long getLastPosition() { + private long getLastCommittedPosition() { try (final LogStorageReader storageReader = logStorage.newReader(); final LogStreamReader logReader = new LogStreamReaderImpl(storageReader)) { return logReader.seekToEnd(); @@ -382,4 +407,10 @@ private interface WriterCreator { T create(int partitionId, Dispatcher dispatcher); } + + private static final class LogStorageAppenderClosedException extends RuntimeException { + private LogStorageAppenderClosedException() { + super("LogStorageAppender was closed before opening succeeded"); + } + } } diff --git a/logstreams/src/test/java/io/camunda/zeebe/logstreams/log/LogStreamErrorTest.java b/logstreams/src/test/java/io/camunda/zeebe/logstreams/log/LogStreamErrorTest.java index 6f0308f2f40a..cd1ff20a9b28 100644 --- a/logstreams/src/test/java/io/camunda/zeebe/logstreams/log/LogStreamErrorTest.java +++ b/logstreams/src/test/java/io/camunda/zeebe/logstreams/log/LogStreamErrorTest.java @@ -19,13 +19,26 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class LogStreamErrorTest { + @Parameterized.Parameter(0) + public Throwable logStorageException; + @Rule public ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule(); final LogStorage mockLogStorage = mock(LogStorage.class); private SyncLogStream logStream; + @Parameterized.Parameters + public static Object[][] parameters() { + return new Object[][] { + {new RuntimeException("reader cannot be created")}, {new Error("reader cannot be created")} + }; + } + @Before public void setup() { logStream = @@ -35,7 +48,7 @@ public void setup() { .withActorSchedulingService(actorSchedulerRule.get()) .build(); - when(mockLogStorage.newReader()).thenThrow(new RuntimeException("reader cannot be created")); + when(mockLogStorage.newReader()).thenThrow(logStorageException); } @After