Skip to content

Commit

Permalink
Skip Redundant Writes and Fsyncs in TranslogWriter (#77674) (#82146)
Browse files Browse the repository at this point in the history
There are a number of situations where we don't write anything to the
channel so we can skip the empty write and fsync to save some syscalls.
  • Loading branch information
original-brownbear committed Dec 30, 2021
1 parent 8b2cc60 commit 11fc718
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.common.io;

import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.threadpool.ThreadPool;

import java.nio.ByteBuffer;
Expand All @@ -20,20 +21,33 @@ public class DiskIoBufferPool {
ByteSizeValue.parseBytesSizeValue(System.getProperty("es.disk_io.direct.buffer.size", "64KB"), "es.disk_io.direct.buffer.size")
.getBytes()
);
public static final int HEAP_BUFFER_SIZE = 8 * 1024;

private static final ThreadLocal<ByteBuffer> ioBufferPool = ThreadLocal.withInitial(() -> {
// placeholder to cache the fact that a thread does not work with cached direct IO buffers in #ioBufferPool
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);

// protected for testing
protected static final ThreadLocal<ByteBuffer> ioBufferPool = ThreadLocal.withInitial(() -> {
if (isWriteOrFlushThread()) {
return ByteBuffer.allocateDirect(BUFFER_SIZE);
} else {
return ByteBuffer.allocate(HEAP_BUFFER_SIZE);
return EMPTY_BUFFER;
}
});

public static ByteBuffer getIoBuffer() {
public static final DiskIoBufferPool INSTANCE = new DiskIoBufferPool();

protected DiskIoBufferPool() {}

/**
* @return thread-local cached direct byte buffer if we are on a thread that supports caching direct buffers or null otherwise
*/
@Nullable
public ByteBuffer maybeGetDirectIOBuffer() {
ByteBuffer ioBuffer = ioBufferPool.get();
ioBuffer.clear();
return ioBuffer;
if (ioBuffer == EMPTY_BUFFER) {
return null;
}
return ioBuffer.clear();
}

private static boolean isWriteOrFlushThread() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.DiskIoBufferPool;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -110,6 +111,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
// the list of translog readers is guaranteed to be in order of translog generation
private final List<TranslogReader> readers = new ArrayList<>();
private final BigArrays bigArrays;
private final DiskIoBufferPool diskIoBufferPool;
protected final ReleasableLock readLock;
protected final ReleasableLock writeLock;
private final Path location;
Expand Down Expand Up @@ -159,6 +161,7 @@ public Translog(
this.deletionPolicy = deletionPolicy;
this.translogUUID = translogUUID;
bigArrays = config.getBigArrays();
diskIoBufferPool = config.getDiskIoBufferPool();
ReadWriteLock rwl = new ReentrantReadWriteLock();
readLock = new ReleasableLock(rwl.readLock());
writeLock = new ReleasableLock(rwl.writeLock());
Expand Down Expand Up @@ -549,7 +552,8 @@ TranslogWriter createWriter(
primaryTermSupplier.getAsLong(),
tragedy,
persistedSequenceNumberConsumer,
bigArrays
bigArrays,
diskIoBufferPool
);
} catch (final IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
Expand Down Expand Up @@ -1972,7 +1976,8 @@ public static String createEmptyTranslog(
primaryTerm,
new TragicExceptionHolder(),
seqNo -> { throw new UnsupportedOperationException(); },
BigArrays.NON_RECYCLING_INSTANCE
BigArrays.NON_RECYCLING_INSTANCE,
DiskIoBufferPool.INSTANCE
);
writer.close();
return uuid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.index.translog;

import org.elasticsearch.common.io.DiskIoBufferPool;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
Expand All @@ -26,6 +27,7 @@ public final class TranslogConfig {
public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB);
public static final ByteSizeValue EMPTY_TRANSLOG_BUFFER_SIZE = new ByteSizeValue(10, ByteSizeUnit.BYTES);
private final BigArrays bigArrays;
private final DiskIoBufferPool diskIoBufferPool;
private final IndexSettings indexSettings;
private final ShardId shardId;
private final Path translogPath;
Expand All @@ -39,15 +41,23 @@ public final class TranslogConfig {
* @param bigArrays a bigArrays instance used for temporarily allocating write operations
*/
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE);
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, DiskIoBufferPool.INSTANCE);
}

TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, ByteSizeValue bufferSize) {
TranslogConfig(
ShardId shardId,
Path translogPath,
IndexSettings indexSettings,
BigArrays bigArrays,
ByteSizeValue bufferSize,
DiskIoBufferPool diskIoBufferPool
) {
this.bufferSize = bufferSize;
this.indexSettings = indexSettings;
this.shardId = shardId;
this.translogPath = translogPath;
this.bigArrays = bigArrays;
this.diskIoBufferPool = diskIoBufferPool;
}

/**
Expand Down Expand Up @@ -84,4 +94,12 @@ public Path getTranslogPath() {
public ByteSizeValue getBufferSize() {
return bufferSize;
}

/**
* {@link DiskIoBufferPool} for this engine. Used to allow custom pools in tests, always returns
* {@link DiskIoBufferPool#INSTANCE} in production.
*/
public DiskIoBufferPool getDiskIoBufferPool() {
return diskIoBufferPool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {

private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;

private final DiskIoBufferPool diskIoBufferPool;

private TranslogWriter(
final ShardId shardId,
final Checkpoint initialCheckpoint,
Expand All @@ -95,7 +97,8 @@ private TranslogWriter(
TranslogHeader header,
final TragicExceptionHolder tragedy,
final LongConsumer persistedSequenceNumberConsumer,
final BigArrays bigArrays
final BigArrays bigArrays,
final DiskIoBufferPool diskIoBufferPool
) throws IOException {
super(initialCheckpoint.generation, channel, path, header);
assert initialCheckpoint.offset == channel.position()
Expand All @@ -119,6 +122,7 @@ private TranslogWriter(
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer;
this.bigArrays = bigArrays;
this.diskIoBufferPool = diskIoBufferPool;
this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null;
this.tragedy = tragedy;
}
Expand All @@ -137,7 +141,8 @@ public static TranslogWriter create(
final long primaryTerm,
TragicExceptionHolder tragedy,
final LongConsumer persistedSequenceNumberConsumer,
final BigArrays bigArrays
final BigArrays bigArrays,
DiskIoBufferPool diskIoBufferPool
) throws IOException {
final Path checkpointFile = file.getParent().resolve(Translog.CHECKPOINT_FILE_NAME);

Expand Down Expand Up @@ -178,7 +183,8 @@ public static TranslogWriter create(
header,
tragedy,
persistedSequenceNumberConsumer,
bigArrays
bigArrays,
diskIoBufferPool
);
} catch (Exception exception) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
Expand Down Expand Up @@ -461,8 +467,12 @@ final boolean syncUpTo(long offset) throws IOException {
ensureOpen();
checkpointToSync = getCheckpoint();
toWrite = pollOpsToWrite();
flushedSequenceNumbers = nonFsyncedSequenceNumbers;
nonFsyncedSequenceNumbers = new LongArrayList(64);
if (nonFsyncedSequenceNumbers.isEmpty()) {
flushedSequenceNumbers = null;
} else {
flushedSequenceNumbers = nonFsyncedSequenceNumbers;
nonFsyncedSequenceNumbers = new LongArrayList(64);
}
}

try {
Expand All @@ -472,17 +482,23 @@ final boolean syncUpTo(long offset) throws IOException {
closeWithTragicEvent(ex);
throw ex;
}
assert channel.position() == checkpointToSync.offset;
}
// now do the actual fsync outside of the synchronized block such that
// we can continue writing to the buffer etc.
try {
channel.force(false);
assert lastSyncedCheckpoint.offset != checkpointToSync.offset || toWrite.length() == 0;
if (lastSyncedCheckpoint.offset != checkpointToSync.offset) {
channel.force(false);
}
writeCheckpoint(checkpointChannel, checkpointPath, checkpointToSync);
} catch (final Exception ex) {
closeWithTragicEvent(ex);
throw ex;
}
flushedSequenceNumbers.forEach((LongProcedure) persistedSequenceNumberConsumer::accept);
if (flushedSequenceNumbers != null) {
flushedSequenceNumbers.forEach((LongProcedure) persistedSequenceNumberConsumer::accept);
}
assert lastSyncedCheckpoint.offset <= checkpointToSync.offset
: "illegal state: " + lastSyncedCheckpoint.offset + " <= " + checkpointToSync.offset;
lastSyncedCheckpoint = checkpointToSync; // write protected by syncLock
Expand Down Expand Up @@ -521,8 +537,20 @@ private synchronized ReleasableBytesReference pollOpsToWrite() {
private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOException {
try (ReleasableBytesReference toClose = toWrite) {
assert writeLock.isHeldByCurrentThread();
ByteBuffer ioBuffer = DiskIoBufferPool.getIoBuffer();

final int length = toWrite.length();
if (length == 0) {
return;
}
ByteBuffer ioBuffer = diskIoBufferPool.maybeGetDirectIOBuffer();
if (ioBuffer == null) {
// not using a direct buffer for writes from the current thread so just write without copying to the io buffer
BytesRefIterator iterator = toWrite.iterator();
BytesRef current;
while ((current = iterator.next()) != null) {
Channels.writeToChannel(current.bytes, current.offset, current.length, channel);
}
return;
}
BytesRefIterator iterator = toWrite.iterator();
BytesRef current;
while ((current = iterator.next()) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ private Tuple<List<TranslogReader>, TranslogWriter> createReadersAndWriter() thr
randomNonNegativeLong(),
new TragicExceptionHolder(),
seqNo -> {},
BigArrays.NON_RECYCLING_INSTANCE
BigArrays.NON_RECYCLING_INSTANCE,
TranslogTests.RANDOMIZING_IO_BUFFERS
);
writer = Mockito.spy(writer);
byte[] bytes = new byte[4];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.DiskIoBufferPool;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
Expand Down Expand Up @@ -142,6 +144,27 @@
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
public class TranslogTests extends ESTestCase {

public static final DiskIoBufferPool RANDOMIZING_IO_BUFFERS = new DiskIoBufferPool() {
@Override
public ByteBuffer maybeGetDirectIOBuffer() {
// null out thread-local to be able to test that the correct buffer is used when called repeatedly from the same thread
ioBufferPool.remove();
final String currentThreadName = Thread.currentThread().getName();
try {
final boolean useWriteThread = randomBoolean();
Thread.currentThread().setName(useWriteThread ? "[" + ThreadPool.Names.WRITE + "] thread" : "not-a-write-thread");
final ByteBuffer buffer = super.maybeGetDirectIOBuffer();
if (useWriteThread) {
assertTrue(buffer.isDirect());
} else {
assertNull(buffer);
}
return buffer;
} finally {
Thread.currentThread().setName(currentThreadName);
}
}
};
protected final ShardId shardId = new ShardId("index", "_na_", 1);

protected Translog translog;
Expand Down Expand Up @@ -261,7 +284,14 @@ private TranslogConfig getTranslogConfig(final Path path, final Settings setting
);

final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings);
return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize);
return new TranslogConfig(
shardId,
path,
indexSettings,
NON_RECYCLING_INSTANCE,
bufferSize,
randomBoolean() ? DiskIoBufferPool.INSTANCE : RANDOMIZING_IO_BUFFERS
);
}

private Location addToTranslogAndList(Translog translog, List<Translog.Operation> list, Translog.Operation op) throws IOException {
Expand Down Expand Up @@ -1367,7 +1397,8 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException {
temp.getTranslogPath(),
temp.getIndexSettings(),
temp.getBigArrays(),
new ByteSizeValue(1, ByteSizeUnit.KB)
new ByteSizeValue(1, ByteSizeUnit.KB),
randomBoolean() ? DiskIoBufferPool.INSTANCE : RANDOMIZING_IO_BUFFERS
);

final Set<Long> persistedSeqNos = new HashSet<>();
Expand Down

0 comments on commit 11fc718

Please sign in to comment.