Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport stable/1.3] fix(log/stream): ensure the appender future always gets completed #8628

Merged
1 commit merged into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.dispatcher.Dispatcher;
import io.camunda.zeebe.dispatcher.Dispatchers;
import io.camunda.zeebe.dispatcher.Subscription;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
Expand Down Expand Up @@ -135,11 +136,29 @@ public ActorFuture<Void> closeAsync() {

@Override
protected void handleFailure(final Throwable failure) {
onFailure(failure);
}

private void onFailure(final Throwable failure) {
LOG.error(
"Unexpected error in Log Stream {} in phase {}.",
getName(),
actor.getLifecyclePhase(),
failure);

if (appenderFuture != null && !appenderFuture.isDone()) {
// the appender future is not done yet ->
// log stream was currently trying to
// open the log storage appender but did
// not succeed yet
appenderFuture.completeExceptionally(failure);
}

if (failure instanceof UnrecoverableException) {
onUnrecoverableFailure(HealthReport.dead(this).withIssue(failure));
} else {
onFailure(HealthReport.unhealthy(this).withIssue(failure));
}

super.handleFailure(failure);
}

@Override
Expand All @@ -159,25 +178,23 @@ public ActorFuture<LogStreamReader> newLogStreamReader() {

@Override
public ActorFuture<LogStreamRecordWriter> newLogStreamRecordWriter() {
// this should be replaced after refactoring the actor control
if (actor.isClosed()) {
return CompletableActorFuture.completedExceptionally(new RuntimeException("Actor is closed"));
}

final var writerFuture = new CompletableActorFuture<LogStreamRecordWriter>();
actor.run(() -> createWriter(writerFuture, LogStreamWriterImpl::new));
return writerFuture;
return newLogStreamWriter(LogStreamWriterImpl::new);
}

@Override
public ActorFuture<LogStreamBatchWriter> newLogStreamBatchWriter() {
return newLogStreamWriter(LogStreamBatchWriterImpl::new);
}

private <T extends LogStreamWriter> ActorFuture<T> newLogStreamWriter(
final WriterCreator<T> logStreamCreator) {
// this should be replaced after refactoring the actor control
if (actor.isClosed()) {
return CompletableActorFuture.completedExceptionally(new RuntimeException("Actor is closed"));
}

final var writerFuture = new CompletableActorFuture<LogStreamBatchWriter>();
actor.run(() -> createWriter(writerFuture, LogStreamBatchWriterImpl::new));
final var writerFuture = new CompletableActorFuture<T>();
actor.run(() -> createWriter(writerFuture, logStreamCreator));
return writerFuture;
}

Expand Down Expand Up @@ -208,13 +225,13 @@ private LogStreamReader createLogStreamReader() {

private <T extends LogStreamWriter> void createWriter(
final CompletableActorFuture<T> writerFuture, final WriterCreator<T> creator) {

final var onOpenAppenderConsumer = onOpenAppender(writerFuture, creator);

if (appender != null) {
writerFuture.complete(creator.create(partitionId, writeBuffer));
} else if (appenderFuture != null) {
appenderFuture.onComplete(onOpenAppender(writerFuture, creator));
onOpenAppenderConsumer.accept(appender, null);
} else {
appenderFuture = new CompletableActorFuture<>();
openAppender().onComplete(onOpenAppender(writerFuture, creator));
openAppender().onComplete(onOpenAppenderConsumer);
}
}

Expand All @@ -231,17 +248,29 @@ private <T extends LogStreamWriter> BiConsumer<LogStorageAppender, Throwable> on

private ActorFuture<Void> closeAppender() {
final var closeAppenderFuture = new CompletableActorFuture<Void>();
if (appender == null) {
closeAppenderFuture.complete(null);
return closeAppenderFuture;
}

appenderFuture = null;
LOG.info("Close appender for log stream {}", logName);

final var toCloseAppender = appender;
final var toCloseWriteBuffer = writeBuffer;
final var toCompleteExceptionallyAppenderFuture = appenderFuture;

appender = null;
writeBuffer = null;
appenderFuture = null;

if (toCompleteExceptionallyAppenderFuture != null
&& !toCompleteExceptionallyAppenderFuture.isDone()) {
// while opening the appender, a close signal is received
toCompleteExceptionallyAppenderFuture.completeExceptionally(
new LogStorageAppenderClosedException());
}

if (toCloseAppender == null) {
closeAppenderFuture.complete(null);
return closeAppenderFuture;
}

toCloseAppender
.closeAsync()
.onComplete(
Expand All @@ -252,81 +281,77 @@ private ActorFuture<Void> closeAppender() {
closeAppenderFuture.completeExceptionally(t);
}
});

return closeAppenderFuture;
}

private ActorFuture<LogStorageAppender> openAppender() {
try {
tryOpenAppender();
} catch (final Exception error) {
onOpenAppenderFailed(error);
if (appenderFuture != null) {
return appenderFuture;
}

appenderFuture = new CompletableActorFuture<>();
actor.run(
() -> {
final var initialPosition = getWriteBuffersInitialPosition();
writeBuffer = createAndScheduleWriteBuffer(initialPosition);

writeBuffer
.openSubscriptionAsync(APPENDER_SUBSCRIPTION_NAME)
.onComplete(
(subscription, throwable) -> {
if (throwable == null) {
createAndScheduleLogStorageAppender(subscription)
.onComplete(
(v, t) -> {
if (t != null) {
onFailure(t);
} else {
appenderFuture.complete(appender);
appender.addFailureListener(this);
}
});
} else {
onFailure(throwable);
}
});
});

return appenderFuture;
}

private void tryOpenAppender() {
final long lastPosition;
try {
lastPosition = getLastPosition();
} catch (final UnrecoverableException error) {
LOG.error("Unexpected error when opening appender", error);
onUnrecoverableFailure(HealthReport.dead(this).withIssue(error));
appenderFuture.completeExceptionally(error);
return;
}
private long getWriteBuffersInitialPosition() {
final var lastPosition = getLastCommittedPosition();
long initialPosition = 1;

final long initialPosition;
if (lastPosition > 0) {
initialPosition = lastPosition + 1;
} else {
initialPosition = 1;
}

writeBuffer =
Dispatchers.create(buildActorName(nodeId, "dispatcher", partitionId))
.maxFragmentLength(maxFrameLength)
.initialPosition(initialPosition)
.name(logName + "-write-buffer")
.actorSchedulingService(actorSchedulingService)
.build();
return initialPosition;
}

writeBuffer
.openSubscriptionAsync(APPENDER_SUBSCRIPTION_NAME)
.onComplete(
(subscription, throwable) -> {
if (throwable == null) {
appender =
new LogStorageAppender(
buildActorName(nodeId, "LogAppender", partitionId),
partitionId,
logStorage,
subscription,
maxFrameLength);

actorSchedulingService
.submitActor(appender)
.onComplete(
(v, t) -> {
if (t != null) {
onOpenAppenderFailed(t);
} else {
appenderFuture.complete(appender);
appender.addFailureListener(this);
}
});
} else {
onOpenAppenderFailed(throwable);
}
});
private Dispatcher createAndScheduleWriteBuffer(final long initialPosition) {
return Dispatchers.create(buildActorName(nodeId, "dispatcher", partitionId))
.maxFragmentLength(maxFrameLength)
.initialPosition(initialPosition)
.name(logName + "-write-buffer")
.actorSchedulingService(actorSchedulingService)
.build();
}

private void onOpenAppenderFailed(final Throwable error) {
LOG.error("Unexpected error when opening appender", error);
appenderFuture.completeExceptionally(error);
onFailure(HealthReport.unhealthy(this).withIssue(error));
private ActorFuture<Void> createAndScheduleLogStorageAppender(final Subscription subscription) {
appender =
new LogStorageAppender(
buildActorName(nodeId, "LogAppender", partitionId),
partitionId,
logStorage,
subscription,
maxFrameLength);
return actorSchedulingService.submitActor(appender);
}

private long getLastPosition() {
private long getLastCommittedPosition() {
try (final LogStorageReader storageReader = logStorage.newReader();
final LogStreamReader logReader = new LogStreamReaderImpl(storageReader)) {
return logReader.seekToEnd();
Expand Down Expand Up @@ -382,4 +407,10 @@ private interface WriterCreator<T extends LogStreamWriter> {

T create(int partitionId, Dispatcher dispatcher);
}

private static final class LogStorageAppenderClosedException extends RuntimeException {
private LogStorageAppenderClosedException() {
super("LogStorageAppender was closed before opening succeeded");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class LogStreamErrorTest {

@Parameterized.Parameter(0)
public Throwable logStorageException;

@Rule public ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule();
final LogStorage mockLogStorage = mock(LogStorage.class);
private SyncLogStream logStream;

@Parameterized.Parameters
public static Object[][] parameters() {
return new Object[][] {
{new RuntimeException("reader cannot be created")}, {new Error("reader cannot be created")}
};
}

@Before
public void setup() {
logStream =
Expand All @@ -35,7 +48,7 @@ public void setup() {
.withActorSchedulingService(actorSchedulerRule.get())
.build();

when(mockLogStorage.newReader()).thenThrow(new RuntimeException("reader cannot be created"));
when(mockLogStorage.newReader()).thenThrow(logStorageException);
}

@After
Expand Down