Skip to content

Commit

Permalink
Merge pull request #15535 from s1monw/fail_on_any_tragic_event
Browse files Browse the repository at this point in the history
Check for tragic event on all kinds of exceptions not only ACE and IOException
  • Loading branch information
s1monw committed Dec 18, 2015
2 parents 2093ea5 + 5b991b9 commit ca7a4f9
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ public Location add(Operation operation) throws IOException {
closeOnTragicEvent(ex);
throw ex;
} catch (Throwable e) {
closeOnTragicEvent(e);
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally {
Releasables.close(out.bytes());
Expand Down Expand Up @@ -500,7 +501,7 @@ public void sync() throws IOException {
if (closed.get() == false) {
current.sync();
}
} catch (AlreadyClosedException | IOException ex) {
} catch (Throwable ex) {
closeOnTragicEvent(ex);
throw ex;
}
Expand Down Expand Up @@ -533,7 +534,7 @@ public boolean ensureSynced(Location location) throws IOException {
ensureOpen();
return current.syncUpTo(location.translogLocation + location.size);
}
} catch (AlreadyClosedException | IOException ex) {
} catch (Throwable ex) {
closeOnTragicEvent(ex);
throw ex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import static org.hamcrest.Matchers.*;

Expand Down Expand Up @@ -1387,6 +1386,35 @@ public void testTranslogOpsCountIsCorrect() throws IOException {
}
}

public void testTragicEventCanBeAnyException() throws IOException {
Path tempDir = createTempDir();
final AtomicBoolean fail = new AtomicBoolean();
TranslogConfig config = getTranslogConfig(tempDir);
assumeFalse("this won't work if we sync on any op",config.isSyncOnEachOperation());
Translog translog = getFailableTranslog(fail, config, false, true);
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
fail.set(true);
try {
Translog.Location location = translog.add(new Translog.Index("test", "2", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
if (config.getType() == TranslogWriter.Type.BUFFERED) { // the buffered case will fail on the add if we exceed the buffer or will fail on the flush once we sync
if (randomBoolean()) {
translog.ensureSynced(location);
} else {
translog.sync();
}
}
//TODO once we have a mock FS that can simulate we can also fail on plain sync
fail("WTF");
} catch (UnknownException ex) {
// w00t
} catch (TranslogException ex) {
assertTrue(ex.getCause() instanceof UnknownException);
}
assertFalse(translog.isOpen());
assertTrue(translog.getTragicException() instanceof UnknownException);
}

public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException {
Path tempDir = createTempDir();
final AtomicBoolean fail = new AtomicBoolean(false);
Expand Down Expand Up @@ -1432,9 +1460,9 @@ protected void afterAdd() throws IOException {
}
boolean atLeastOneFailed = false;
for (Throwable ex : threadExceptions) {
assertTrue(ex.toString(), ex instanceof IOException || ex instanceof AlreadyClosedException);
if (ex != null) {
atLeastOneFailed = true;
break;
}
}
if (atLeastOneFailed == false) {
Expand Down Expand Up @@ -1477,8 +1505,11 @@ protected void afterAdd() throws IOException {
}
}
}

private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException {
return getFailableTranslog(fail, config, randomBoolean(), false);
}

private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException) throws IOException {
return new Translog(config) {
@Override
TranslogWriter.ChannelFactory getChannelFactory() {
Expand All @@ -1488,7 +1519,7 @@ TranslogWriter.ChannelFactory getChannelFactory() {
@Override
public FileChannel open(Path file) throws IOException {
FileChannel channel = factory.open(file);
return new ThrowingFileChannel(fail, randomBoolean(), channel);
return new ThrowingFileChannel(fail, paritalWrites, throwUnknownException, channel);
}
};
}
Expand All @@ -1498,11 +1529,13 @@ public FileChannel open(Path file) throws IOException {
public static class ThrowingFileChannel extends FilterFileChannel {
private final AtomicBoolean fail;
private final boolean partialWrite;
private final boolean throwUnknownException;

public ThrowingFileChannel(AtomicBoolean fail, boolean partialWrite, FileChannel delegate) {
public ThrowingFileChannel(AtomicBoolean fail, boolean partialWrite, boolean throwUnknownException, FileChannel delegate) {
super(delegate);
this.fail = fail;
this.partialWrite = partialWrite;
this.throwUnknownException = throwUnknownException;
}

@Override
Expand All @@ -1519,19 +1552,27 @@ public int write(ByteBuffer src, long position) throws IOException {
public int write(ByteBuffer src) throws IOException {
if (fail.get()) {
if (partialWrite) {
if (src.limit() > 1) {
if (src.hasRemaining()) {
final int pos = src.position();
final int limit = src.limit();
src.limit(limit / 2);
src.limit(randomIntBetween(pos, limit));
super.write(src);
src.position(pos);
src.limit(limit);
src.position(pos);
throw new IOException("__FAKE__ no space left on device");
}
}
throw new MockDirectoryWrapper.FakeIOException();
if (throwUnknownException) {
throw new UnknownException();
} else {
throw new MockDirectoryWrapper.FakeIOException();
}
}
return super.write(src);
}
}

private static final class UnknownException extends RuntimeException {

}
}

0 comments on commit ca7a4f9

Please sign in to comment.