Skip to content

Commit

Permalink
Extract aligned write logic from searchable snapshots into blob cache…
Browse files Browse the repository at this point in the history
… module (#93592)

This is very neatly reusable elsewhere. Unfortunately, an isolated test for the method
is rather hard to build. We should look into refactoring this further to enable that.
There's good reason existing tests all go through the cache service or concrete frozen
input it seems.
  • Loading branch information
original-brownbear committed Feb 8, 2023
1 parent af8fccf commit 37fc695
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.blobcache.BlobCacheUtils;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.AbstractRefCounted;
Expand All @@ -19,16 +20,27 @@
import org.elasticsearch.xpack.searchablesnapshots.preallocate.Preallocate;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.function.IntConsumer;
import java.util.function.LongConsumer;

public class SharedBytes extends AbstractRefCounted {

/**
* Thread local direct byte buffer to aggregate multiple positional writes to the cache file.
*/
public static final int MAX_BYTES_PER_WRITE = StrictMath.toIntExact(
ByteSizeValue.parseBytesSizeValue(
System.getProperty("es.searchable.snapshot.shared_cache.write_buffer.size", "2m"),
"es.searchable.snapshot.shared_cache.write_buffer.size"
).getBytes()
);
private static final Logger logger = LogManager.getLogger(SharedBytes.class);

public static int PAGE_SIZE = 4096;
Expand Down Expand Up @@ -96,6 +108,62 @@ public static Path findCacheSnapshotCacheFilePath(NodeEnvironment environment, l
}
}

/**
* Copy {@code length} bytes from {@code input} to {@code fc}, only doing writes aligned along {@link #PAGE_SIZE}.
*
* @param fc output cache file reference
* @param input stream to read from
* @param fileChannelPos position in {@code fc} to write to
* @param relativePos relative position in the Lucene file the is read from {@code input}
* @param length number of bytes to copy
* @param progressUpdater callback to invoke with the number of copied bytes as they are copied
* @param buf bytebuffer to use for writing
* @param cacheFile object that describes the cached file, only used in logging and exception throwing as context information
* @throws IOException on failure
*/
public static void copyToCacheFileAligned(
IO fc,
InputStream input,
long fileChannelPos,
long relativePos,
long length,
LongConsumer progressUpdater,
ByteBuffer buf,
final Object cacheFile
) throws IOException {
long bytesCopied = 0L;
long remaining = length;
while (remaining > 0L) {
final int bytesRead = BlobCacheUtils.readSafe(input, buf, relativePos, remaining, cacheFile);
if (buf.hasRemaining()) {
break;
}
long bytesWritten = positionalWrite(fc, fileChannelPos + bytesCopied, buf);
bytesCopied += bytesWritten;
progressUpdater.accept(bytesCopied);
remaining -= bytesRead;
}
if (remaining > 0) {
// ensure that last write is aligned on 4k boundaries (= page size)
final int remainder = buf.position() % PAGE_SIZE;
final int adjustment = remainder == 0 ? 0 : PAGE_SIZE - remainder;
buf.position(buf.position() + adjustment);
long bytesWritten = positionalWrite(fc, fileChannelPos + bytesCopied, buf);
bytesCopied += bytesWritten;
final long adjustedBytesCopied = bytesCopied - adjustment; // adjust to not break RangeFileTracker
assert adjustedBytesCopied == length;
progressUpdater.accept(adjustedBytesCopied);
}
}

private static int positionalWrite(IO fc, long start, ByteBuffer byteBuffer) throws IOException {
byteBuffer.flip();
int written = fc.write(byteBuffer, start);
assert byteBuffer.hasRemaining() == false;
byteBuffer.clear();
return written;
}

@Override
protected void closeInternal() {
try {
Expand All @@ -107,7 +175,7 @@ protected void closeInternal() {

private final Map<Integer, IO> ios = ConcurrentCollections.newConcurrentMap();

IO getFileChannel(int sharedBytesPos) {
public IO getFileChannel(int sharedBytesPos) {
assert fileChannel != null;
return ios.compute(sharedBytesPos, (p, io) -> {
if (io == null || io.tryIncRef() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.Semaphore;
import java.util.function.LongConsumer;

public class FrozenIndexInput extends MetadataCachingIndexInput {

Expand Down Expand Up @@ -137,10 +136,28 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
),
(channel, channelPos, relativePos, len, progressUpdater) -> {
final long startTimeNanos = stats.currentTimeNanos();
final long streamStartPosition = rangeToWrite.start() + relativePos;

try (InputStream input = openInputStreamFromBlobStore(streamStartPosition, len)) {
writeCacheFile(channel, input, channelPos, relativePos, len, progressUpdater, startTimeNanos);
try (InputStream input = openInputStreamFromBlobStore(rangeToWrite.start() + relativePos, len)) {
assert ThreadPool.assertCurrentThreadPool(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
logger.trace(
"{}: writing channel {} pos {} length {} (details: {})",
fileInfo.physicalName(),
channelPos,
relativePos,
len,
cacheFile
);
SharedBytes.copyToCacheFileAligned(
channel,
input,
channelPos,
relativePos,
len,
progressUpdater,
writeBuffer.get().clear(),
cacheFile
);
final long endTimeNanos = stats.currentTimeNanos();
stats.addCachedBytesWritten(len, endTimeNanos - startTimeNanos);
}
},
SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME
Expand All @@ -158,15 +175,6 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
}
}

private static int positionalWrite(SharedBytes.IO fc, long start, ByteBuffer byteBuffer) throws IOException {
assert ThreadPool.assertCurrentThreadPool(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
byteBuffer.flip();
int written = fc.write(byteBuffer, start);
assert byteBuffer.hasRemaining() == false;
byteBuffer.clear();
return written;
}

private int readCacheFile(
final SharedBytes.IO fc,
long channelPos,
Expand Down Expand Up @@ -209,55 +217,6 @@ private int readCacheFile(
return bytesRead;
}

private void writeCacheFile(
final SharedBytes.IO fc,
final InputStream input,
final long fileChannelPos,
final long relativePos,
final long length,
final LongConsumer progressUpdater,
final long startTimeNanos
) throws IOException {
assert ThreadPool.assertCurrentThreadPool(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
logger.trace(
"{}: writing channel {} pos {} length {} (details: {})",
fileInfo.physicalName(),
fileChannelPos,
relativePos,
length,
cacheFile
);
final long end = relativePos + length;
logger.trace("writing range [{}-{}] to cache file [{}]", relativePos, end, cacheFile);

long bytesCopied = 0L;
long remaining = length;
final ByteBuffer buf = writeBuffer.get().clear();
while (remaining > 0L) {
final int bytesRead = BlobCacheUtils.readSafe(input, buf, relativePos, remaining, cacheFile);
if (buf.hasRemaining()) {
break;
}
long bytesWritten = positionalWrite(fc, fileChannelPos + bytesCopied, buf);
bytesCopied += bytesWritten;
progressUpdater.accept(bytesCopied);
remaining -= bytesRead;
}
if (remaining > 0) {
// ensure that last write is aligned on 4k boundaries (= page size)
final int remainder = buf.position() % SharedBytes.PAGE_SIZE;
final int adjustment = remainder == 0 ? 0 : SharedBytes.PAGE_SIZE - remainder;
buf.position(buf.position() + adjustment);
long bytesWritten = positionalWrite(fc, fileChannelPos + bytesCopied, buf);
bytesCopied += bytesWritten;
final long adjustedBytesCopied = bytesCopied - adjustment; // adjust to not break RangeFileTracker
assert adjustedBytesCopied == length;
progressUpdater.accept(adjustedBytesCopied);
}
final long endTimeNanos = stats.currentTimeNanos();
stats.addCachedBytesWritten(length, endTimeNanos - startTimeNanos);
}

@Override
public FrozenIndexInput clone() {
return (FrozenIndexInput) super.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.blobcache.common.ByteRange;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.SuppressForbidden;
Expand All @@ -45,6 +44,7 @@

import static org.elasticsearch.blobcache.BlobCacheUtils.throwEOF;
import static org.elasticsearch.blobcache.BlobCacheUtils.toIntBytes;
import static org.elasticsearch.blobcache.shared.SharedBytes.MAX_BYTES_PER_WRITE;
import static org.elasticsearch.core.Strings.format;

/**
Expand All @@ -53,16 +53,6 @@
*/
public abstract class MetadataCachingIndexInput extends BaseSearchableSnapshotIndexInput {

/**
* Thread local direct byte buffer to aggregate multiple positional writes to the cache file.
*/
protected static final int MAX_BYTES_PER_WRITE = StrictMath.toIntExact(
ByteSizeValue.parseBytesSizeValue(
System.getProperty("es.searchable.snapshot.shared_cache.write_buffer.size", "2m"),
"es.searchable.snapshot.shared_cache.write_buffer.size"
).getBytes()
);

protected static final ThreadLocal<ByteBuffer> writeBuffer = ThreadLocal.withInitial(
() -> ByteBuffer.allocateDirect(MAX_BYTES_PER_WRITE)
);
Expand Down

0 comments on commit 37fc695

Please sign in to comment.