Skip to content

Commit

Permalink
Ensure no circular reference in translog tragic exception (#55959)
Browse files Browse the repository at this point in the history
We generate a circular reference exception in translog in 6.8 in the
following scenario:

- The first rollGeneration hits "too many open files" exception when it's
copying a checkpoint file. We will set the tragic exception and close
the translog

- The second rollGeneration hits AlreadyClosedException as the current
writer is closed. We will suppress the ACE to the current tragic
exception. Unfortunately, this leads to a circular reference as ACE
already suppresses the tragic exception.

Other factors that help to manifest this bug:
- We do not fail the engine on AlreadyClosedException in flush
- We do not check for ensureOpen before rolling a new generation

Closes #55893
  • Loading branch information
dnhatn committed Apr 30, 2020
1 parent a7d01c2 commit cfd61e6
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public static boolean reThrowIfNotNull(@Nullable Throwable e) {
}

@SuppressWarnings("unchecked")
private static <T extends Throwable> Optional<T> unwrapCausesAndSuppressed(Throwable cause, Predicate<Throwable> predicate) {
public static <T extends Throwable> Optional<T> unwrapCausesAndSuppressed(Throwable cause, Predicate<Throwable> predicate) {
if (predicate.test(cause)) {
return Optional.of((T) cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1808,6 +1808,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
refresh("version_table_flush", SearcherScope.INTERNAL, true);
translog.trimUnreferencedReaders();
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.elasticsearch.index.translog;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;

import java.util.concurrent.atomic.AtomicReference;

public class TragicExceptionHolder {
Expand All @@ -30,10 +33,15 @@ public class TragicExceptionHolder {
*/
public void setTragicException(Exception ex) {
assert ex != null;
if (tragedy.compareAndSet(null, ex) == false) {
if (tragedy.get() != ex) { // to ensure there is no self-suppression
tragedy.get().addSuppressed(ex);
}
if (tragedy.compareAndSet(null, ex)) {
return; // first exception
}
final Exception tragedy = this.tragedy.get();
// ensure no circular reference
if (ExceptionsHelper.unwrapCausesAndSuppressed(ex, e -> e == tragedy).isPresent()) {
assert ex == tragedy || ex instanceof AlreadyClosedException : new AssertionError("must be ACE or tragic exception", ex);
} else {
tragedy.addSuppressed(ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1621,10 +1621,9 @@ private static long minGenerationForSeqNo(long seqNo, TranslogWriter writer, Lis
* @throws IOException if an I/O exception occurred during any file operations
*/
public void rollGeneration() throws IOException {
// make sure we move most of the data to disk outside of the writeLock
// in order to reduce the time the lock is held since it's blocking all threads
sync();
syncBeforeRollGeneration();
try (Releasable ignored = writeLock.acquire()) {
ensureOpen();
try {
final TranslogReader reader = current.closeIntoReader();
readers.add(reader);
Expand All @@ -1641,6 +1640,12 @@ public void rollGeneration() throws IOException {
}
}

void syncBeforeRollGeneration() throws IOException {
// make sure we move most of the data to disk outside of the writeLock
// in order to reduce the time the lock is held since it's blocking all threads
sync();
}

/**
* Trims unreferenced translog generations by asking {@link TranslogDeletionPolicy} for the minimum
* required generation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,12 @@
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -3307,4 +3310,64 @@ public void testSyncConcurrently() throws Exception {
}
}
}

public void testEnsureNoCircularException() throws Exception {
final AtomicBoolean failedToSyncCheckpoint = new AtomicBoolean();
final ChannelFactory channelFactory = (file, openOption) -> {
final FileChannel channel = FileChannel.open(file, openOption);
return new FilterFileChannel(channel) {
@Override
public void force(boolean metaData) throws IOException {
if (failedToSyncCheckpoint.get()) {
throw new IOException("simulated");
}
super.force(metaData);
}
};
};
final TranslogConfig config = getTranslogConfig(createTempDir());
final String translogUUID = Translog.createEmptyTranslog(
config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, channelFactory, primaryTerm.get());
final Translog translog = new Translog(config, translogUUID, createTranslogDeletionPolicy(),
() -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get,
seqNo -> {}) {
@Override
ChannelFactory getChannelFactory() {
return channelFactory;
}

@Override
void syncBeforeRollGeneration() {
// make it a noop like the old versions
}
};
try {
translog.add(new Translog.Index("1", "_doc", 1, primaryTerm.get(), new byte[]{1}));
failedToSyncCheckpoint.set(true);
expectThrows(IOException.class, translog::rollGeneration);
final AlreadyClosedException alreadyClosedException = expectThrows(AlreadyClosedException.class, translog::rollGeneration);
if (hasCircularReference(alreadyClosedException)) {
throw new AssertionError("detect circular reference exception", alreadyClosedException);
}
} finally {
IOUtils.close(translog);
}
}

static boolean hasCircularReference(Exception cause) {
final Queue<Throwable> queue = new LinkedList<>();
queue.add(cause);
final Set<Throwable> seen = Collections.newSetFromMap(new IdentityHashMap<>());
while (queue.isEmpty() == false) {
final Throwable current = queue.remove();
if (seen.add(current) == false) {
return true;
}
Collections.addAll(queue, current.getSuppressed());
if (current.getCause() != null) {
queue.add(current.getCause());
}
}
return false;
}
}

0 comments on commit cfd61e6

Please sign in to comment.