Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beef up Translog testing with random channel exceptions #18997

Merged
merged 5 commits into from Jun 21, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -56,7 +56,9 @@ public T get() {
*/
synchronized (InjectorImpl.class) {
if (instance == null) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove?

instance = creator.get();
System.out.println("------------ " + instance.getClass());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was faster :)

}
}
}
Expand Down
@@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

/**
* only for testing until we have a disk-full FileSystem
*/
@FunctionalInterface
interface ChannelFactory {
default FileChannel open(Path path) throws IOException {
return open(path, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
}

FileChannel open(Path path, OpenOption... options) throws IOException;
}
Expand Up @@ -82,8 +82,8 @@ public static Checkpoint read(Path path) throws IOException {
}
}

public static void write(Path checkpointFile, Checkpoint checkpoint, OpenOption... options) throws IOException {
try (FileChannel channel = FileChannel.open(checkpointFile, options)) {
public static void write(ChannelFactory factory, Path checkpointFile, Checkpoint checkpoint, OpenOption... options) throws IOException {
try (FileChannel channel = factory.open(checkpointFile, options)) {
checkpoint.write(channel);
channel.force(false);
}
Expand Down
Expand Up @@ -200,7 +200,7 @@ public Translog(TranslogConfig config, TranslogGeneration translogGeneration) th
Files.createDirectories(location);
final long generation = 1;
Checkpoint checkpoint = new Checkpoint(0, 0, generation);
Checkpoint.write(location.resolve(CHECKPOINT_FILE_NAME), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
Checkpoint.write(getChannelFactory()::open, location.resolve(CHECKPOINT_FILE_NAME), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't need the open method reference.

current = createWriter(generation);
this.lastCommittedTranslogFileGeneration = NOT_SET_GENERATION;

Expand Down Expand Up @@ -1313,8 +1313,8 @@ int getNumOpenViews() {
return outstandingViews.size();
}

TranslogWriter.ChannelFactory getChannelFactory() {
return TranslogWriter.ChannelFactory.DEFAULT;
ChannelFactory getChannelFactory() {
return FileChannel::open;
}

/**
Expand Down
Expand Up @@ -49,6 +49,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
public static final int VERSION = VERSION_CHECKPOINTS;

private final ShardId shardId;
private final ChannelFactory channelFactory;
/* the offset in bytes that was written when the file was last synced*/
private volatile long lastSyncedOffset;
/* the number of translog operations written to this file */
Expand All @@ -64,9 +65,10 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
// lock order synchronized(syncLock) -> synchronized(this)
private final Object syncLock = new Object();

public TranslogWriter(ShardId shardId, long generation, FileChannel channel, Path path, ByteSizeValue bufferSize) throws IOException {
public TranslogWriter(ChannelFactory channelFactory, ShardId shardId, long generation, FileChannel channel, Path path, ByteSizeValue bufferSize) throws IOException {
super(generation, channel, path, channel.position());
this.shardId = shardId;
this.channelFactory = channelFactory;
this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt());
this.lastSyncedOffset = channel.position();
totalOffset = lastSyncedOffset;
Expand All @@ -92,8 +94,8 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f
out.writeInt(ref.length);
out.writeBytes(ref.bytes, ref.offset, ref.length);
channel.force(true);
writeCheckpoint(headerLength, 0, file.getParent(), fileGeneration, StandardOpenOption.WRITE);
final TranslogWriter writer = new TranslogWriter(shardId, fileGeneration, channel, file, bufferSize);
writeCheckpoint(channelFactory, headerLength, 0, file.getParent(), fileGeneration);
final TranslogWriter writer = new TranslogWriter(channelFactory, shardId, fileGeneration, channel, file, bufferSize);
return writer;
} catch (Throwable throwable) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
Expand Down Expand Up @@ -254,7 +256,7 @@ public boolean syncUpTo(long offset) throws IOException {
// we can continue writing to the buffer etc.
try {
channel.force(false);
writeCheckpoint(offsetToSync, opsCounter, path.getParent(), generation, StandardOpenOption.WRITE);
writeCheckpoint(channelFactory, offsetToSync, opsCounter, path.getParent(), generation);
} catch (Throwable ex) {
closeWithTragicEvent(ex);
throw ex;
Expand Down Expand Up @@ -286,20 +288,10 @@ protected void readBytes(ByteBuffer targetBuffer, long position) throws IOExcept
Channels.readFromFileChannelWithEofException(channel, position, targetBuffer);
}

private static void writeCheckpoint(long syncPosition, int numOperations, Path translogFile, long generation, OpenOption... options) throws IOException {
private static void writeCheckpoint(ChannelFactory channelFactory, long syncPosition, int numOperations, Path translogFile, long generation) throws IOException {
final Path checkpointFile = translogFile.resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation);
Checkpoint.write(checkpointFile, checkpoint, options);
}

static class ChannelFactory {

static final ChannelFactory DEFAULT = new ChannelFactory();

// only for testing until we have a disk-full FileSystem
public FileChannel open(Path file) throws IOException {
return FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
}
Checkpoint.write(channelFactory::open, checkpointFile, checkpoint, StandardOpenOption.WRITE);
}

protected final void ensureOpen() {
Expand Down
Expand Up @@ -60,7 +60,9 @@
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -1125,13 +1127,13 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException {
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint read = Checkpoint.read(ckp);
Checkpoint corrupted = new Checkpoint(0, 0, 0);
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
try (Translog translog = new Translog(config, translogGeneration)) {
fail("corrupted");
} catch (IllegalStateException ex) {
assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=2683, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}");
}
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
try (Translog translog = new Translog(config, translogGeneration)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
Expand Down Expand Up @@ -1564,22 +1566,20 @@ public void onceFailedFailAlways() {
private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException, Translog.TranslogGeneration generation) throws IOException {
return new Translog(config, generation) {
@Override
TranslogWriter.ChannelFactory getChannelFactory() {
final TranslogWriter.ChannelFactory factory = super.getChannelFactory();

return new TranslogWriter.ChannelFactory() {
@Override
public FileChannel open(Path file) throws IOException {
FileChannel channel = factory.open(file);
boolean success = false;
try {
ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, paritalWrites, throwUnknownException, channel);
success = true;
return throwingFileChannel;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(channel);
}
ChannelFactory getChannelFactory() {
final ChannelFactory factory = super.getChannelFactory();

return (file, openOption) -> {
FileChannel channel = factory.open(file, openOption);
boolean success = false;
try {
ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, file.getFileName().endsWith(".ckp") // don't do partial writes for checkpoints we rely on the fact that the 20bytes are written as an atomic operation
? false : paritalWrites, throwUnknownException, channel);
success = true;
return throwingFileChannel;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(channel);
}
}
};
Expand Down Expand Up @@ -1845,16 +1845,21 @@ public void testWithRandomException() throws IOException {
}
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
// failed - that's ok, we didn't even create it
} catch (IOException ex) {
assertEquals(ex.getMessage(), "__FAKE__ no space left on device");
}
// now randomly open this failing tlog again just to make sure we can also recover from failing during recovery
if (randomBoolean()) {
try {
IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generation));
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
// failed - that's ok, we didn't even create it
} catch (IOException ex) {
assertEquals(ex.getMessage(), "__FAKE__ no space left on device");
}
}

fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file
try (Translog translog = new Translog(config, generation)) {
Translog.Snapshot snapshot = translog.newSnapshot();
assertEquals(syncedDocs.size(), snapshot.totalOperations());
Expand All @@ -1866,4 +1871,28 @@ public void testWithRandomException() throws IOException {
}
}
}

public void testCheckpointOnDiskFull() throws IOException {
Checkpoint checkpoint = new Checkpoint(randomLong(), randomInt(), randomLong());
Path tempDir = createTempDir();
Checkpoint.write(FileChannel::open, tempDir.resolve("foo.cpk"), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
Checkpoint checkpoint2 = new Checkpoint(randomLong(), randomInt(), randomLong());

try {
Checkpoint.write((p, o) -> {
FileChannel open = FileChannel.open(p, o);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we also want to sometime fail before we open the file? I get that will not make a file linger around but it will be good to so nothing else fails, right?

FailSwitch failSwitch = new FailSwitch();
failSwitch.failNever();
ThrowingFileChannel channel = new ThrowingFileChannel(failSwitch, false, false, open);
failSwitch.failAlways();
return channel;

}, tempDir.resolve("foo.cpk"), checkpoint2, StandardOpenOption.WRITE);
fail("should have failed earlier");
} catch (MockDirectoryWrapper.FakeIOException ex) {
//fine
}
Checkpoint read = Checkpoint.read(tempDir.resolve("foo.cpk"));
assertEquals(read, checkpoint);
}
}