Skip to content

Commit

Permalink
merge: #8628
Browse files Browse the repository at this point in the history
8628: [Backport stable/1.3] fix(log/stream): ensure the appender future always gets completed r=oleschoenburg a=romansmirnov

## Description

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

backports #8605
relates #7992



Co-authored-by: Roman <roman.smirnov@camunda.com>
  • Loading branch information
zeebe-bors-cloud[bot] and romansmirnov committed Jan 21, 2022
2 parents 9096ef7 + 62378be commit da9674e
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.dispatcher.Dispatcher;
import io.camunda.zeebe.dispatcher.Dispatchers;
import io.camunda.zeebe.dispatcher.Subscription;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
Expand Down Expand Up @@ -135,11 +136,29 @@ public ActorFuture<Void> closeAsync() {

@Override
protected void handleFailure(final Throwable failure) {
onFailure(failure);
}

private void onFailure(final Throwable failure) {
LOG.error(
"Unexpected error in Log Stream {} in phase {}.",
getName(),
actor.getLifecyclePhase(),
failure);

if (appenderFuture != null && !appenderFuture.isDone()) {
// the appender future is not done yet ->
// log stream was currently trying to
// open the log storage appender but did
// not succeed yet
appenderFuture.completeExceptionally(failure);
}

if (failure instanceof UnrecoverableException) {
onUnrecoverableFailure(HealthReport.dead(this).withIssue(failure));
} else {
onFailure(HealthReport.unhealthy(this).withIssue(failure));
}

super.handleFailure(failure);
}

@Override
Expand All @@ -159,25 +178,23 @@ public ActorFuture<LogStreamReader> newLogStreamReader() {

@Override
public ActorFuture<LogStreamRecordWriter> newLogStreamRecordWriter() {
// this should be replaced after refactoring the actor control
if (actor.isClosed()) {
return CompletableActorFuture.completedExceptionally(new RuntimeException("Actor is closed"));
}

final var writerFuture = new CompletableActorFuture<LogStreamRecordWriter>();
actor.run(() -> createWriter(writerFuture, LogStreamWriterImpl::new));
return writerFuture;
return newLogStreamWriter(LogStreamWriterImpl::new);
}

@Override
public ActorFuture<LogStreamBatchWriter> newLogStreamBatchWriter() {
return newLogStreamWriter(LogStreamBatchWriterImpl::new);
}

private <T extends LogStreamWriter> ActorFuture<T> newLogStreamWriter(
final WriterCreator<T> logStreamCreator) {
// this should be replaced after refactoring the actor control
if (actor.isClosed()) {
return CompletableActorFuture.completedExceptionally(new RuntimeException("Actor is closed"));
}

final var writerFuture = new CompletableActorFuture<LogStreamBatchWriter>();
actor.run(() -> createWriter(writerFuture, LogStreamBatchWriterImpl::new));
final var writerFuture = new CompletableActorFuture<T>();
actor.run(() -> createWriter(writerFuture, logStreamCreator));
return writerFuture;
}

Expand Down Expand Up @@ -208,13 +225,13 @@ private LogStreamReader createLogStreamReader() {

private <T extends LogStreamWriter> void createWriter(
final CompletableActorFuture<T> writerFuture, final WriterCreator<T> creator) {

final var onOpenAppenderConsumer = onOpenAppender(writerFuture, creator);

if (appender != null) {
writerFuture.complete(creator.create(partitionId, writeBuffer));
} else if (appenderFuture != null) {
appenderFuture.onComplete(onOpenAppender(writerFuture, creator));
onOpenAppenderConsumer.accept(appender, null);
} else {
appenderFuture = new CompletableActorFuture<>();
openAppender().onComplete(onOpenAppender(writerFuture, creator));
openAppender().onComplete(onOpenAppenderConsumer);
}
}

Expand All @@ -231,17 +248,29 @@ private <T extends LogStreamWriter> BiConsumer<LogStorageAppender, Throwable> on

private ActorFuture<Void> closeAppender() {
final var closeAppenderFuture = new CompletableActorFuture<Void>();
if (appender == null) {
closeAppenderFuture.complete(null);
return closeAppenderFuture;
}

appenderFuture = null;
LOG.info("Close appender for log stream {}", logName);

final var toCloseAppender = appender;
final var toCloseWriteBuffer = writeBuffer;
final var toCompleteExceptionallyAppenderFuture = appenderFuture;

appender = null;
writeBuffer = null;
appenderFuture = null;

if (toCompleteExceptionallyAppenderFuture != null
&& !toCompleteExceptionallyAppenderFuture.isDone()) {
// while opening the appender, a close signal is received
toCompleteExceptionallyAppenderFuture.completeExceptionally(
new LogStorageAppenderClosedException());
}

if (toCloseAppender == null) {
closeAppenderFuture.complete(null);
return closeAppenderFuture;
}

toCloseAppender
.closeAsync()
.onComplete(
Expand All @@ -252,81 +281,77 @@ private ActorFuture<Void> closeAppender() {
closeAppenderFuture.completeExceptionally(t);
}
});

return closeAppenderFuture;
}

private ActorFuture<LogStorageAppender> openAppender() {
try {
tryOpenAppender();
} catch (final Exception error) {
onOpenAppenderFailed(error);
if (appenderFuture != null) {
return appenderFuture;
}

appenderFuture = new CompletableActorFuture<>();
actor.run(
() -> {
final var initialPosition = getWriteBuffersInitialPosition();
writeBuffer = createAndScheduleWriteBuffer(initialPosition);

writeBuffer
.openSubscriptionAsync(APPENDER_SUBSCRIPTION_NAME)
.onComplete(
(subscription, throwable) -> {
if (throwable == null) {
createAndScheduleLogStorageAppender(subscription)
.onComplete(
(v, t) -> {
if (t != null) {
onFailure(t);
} else {
appenderFuture.complete(appender);
appender.addFailureListener(this);
}
});
} else {
onFailure(throwable);
}
});
});

return appenderFuture;
}

private void tryOpenAppender() {
final long lastPosition;
try {
lastPosition = getLastPosition();
} catch (final UnrecoverableException error) {
LOG.error("Unexpected error when opening appender", error);
onUnrecoverableFailure(HealthReport.dead(this).withIssue(error));
appenderFuture.completeExceptionally(error);
return;
}
private long getWriteBuffersInitialPosition() {
final var lastPosition = getLastCommittedPosition();
long initialPosition = 1;

final long initialPosition;
if (lastPosition > 0) {
initialPosition = lastPosition + 1;
} else {
initialPosition = 1;
}

writeBuffer =
Dispatchers.create(buildActorName(nodeId, "dispatcher", partitionId))
.maxFragmentLength(maxFrameLength)
.initialPosition(initialPosition)
.name(logName + "-write-buffer")
.actorSchedulingService(actorSchedulingService)
.build();
return initialPosition;
}

writeBuffer
.openSubscriptionAsync(APPENDER_SUBSCRIPTION_NAME)
.onComplete(
(subscription, throwable) -> {
if (throwable == null) {
appender =
new LogStorageAppender(
buildActorName(nodeId, "LogAppender", partitionId),
partitionId,
logStorage,
subscription,
maxFrameLength);

actorSchedulingService
.submitActor(appender)
.onComplete(
(v, t) -> {
if (t != null) {
onOpenAppenderFailed(t);
} else {
appenderFuture.complete(appender);
appender.addFailureListener(this);
}
});
} else {
onOpenAppenderFailed(throwable);
}
});
private Dispatcher createAndScheduleWriteBuffer(final long initialPosition) {
return Dispatchers.create(buildActorName(nodeId, "dispatcher", partitionId))
.maxFragmentLength(maxFrameLength)
.initialPosition(initialPosition)
.name(logName + "-write-buffer")
.actorSchedulingService(actorSchedulingService)
.build();
}

private void onOpenAppenderFailed(final Throwable error) {
LOG.error("Unexpected error when opening appender", error);
appenderFuture.completeExceptionally(error);
onFailure(HealthReport.unhealthy(this).withIssue(error));
private ActorFuture<Void> createAndScheduleLogStorageAppender(final Subscription subscription) {
appender =
new LogStorageAppender(
buildActorName(nodeId, "LogAppender", partitionId),
partitionId,
logStorage,
subscription,
maxFrameLength);
return actorSchedulingService.submitActor(appender);
}

private long getLastPosition() {
private long getLastCommittedPosition() {
try (final LogStorageReader storageReader = logStorage.newReader();
final LogStreamReader logReader = new LogStreamReaderImpl(storageReader)) {
return logReader.seekToEnd();
Expand Down Expand Up @@ -382,4 +407,10 @@ private interface WriterCreator<T extends LogStreamWriter> {

T create(int partitionId, Dispatcher dispatcher);
}

private static final class LogStorageAppenderClosedException extends RuntimeException {
private LogStorageAppenderClosedException() {
super("LogStorageAppender was closed before opening succeeded");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class LogStreamErrorTest {

@Parameterized.Parameter(0)
public Throwable logStorageException;

@Rule public ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule();
final LogStorage mockLogStorage = mock(LogStorage.class);
private SyncLogStream logStream;

@Parameterized.Parameters
public static Object[][] parameters() {
return new Object[][] {
{new RuntimeException("reader cannot be created")}, {new Error("reader cannot be created")}
};
}

@Before
public void setup() {
logStream =
Expand All @@ -35,7 +48,7 @@ public void setup() {
.withActorSchedulingService(actorSchedulerRule.get())
.build();

when(mockLogStorage.newReader()).thenThrow(new RuntimeException("reader cannot be created"));
when(mockLogStorage.newReader()).thenThrow(logStorageException);
}

@After
Expand Down

0 comments on commit da9674e

Please sign in to comment.