Skip to content

Commit

Permalink
Extract and dry more IO logic out of searchable snapshots module (#93512
Browse files Browse the repository at this point in the history
)

Extracting more of the IO logic to the blob cache to make it reusable as well as dry it up a little.
Small changes to the EOF exception message format in 2 cases in here that only remove redundant information.
Also, noop in memory commit dir around for reuse elsewhere.
  • Loading branch information
original-brownbear committed Feb 6, 2023
1 parent c1b0bf6 commit 6498bd1
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,17 @@

package org.elasticsearch.blobcache;

import org.apache.lucene.store.IndexInput;
import org.elasticsearch.blobcache.common.ByteRange;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.core.Streams;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import static org.elasticsearch.core.Strings.format;

public class BlobCacheUtils {

Expand All @@ -20,4 +30,56 @@ public class BlobCacheUtils {
public static int toIntBytes(long l) {
return ByteSizeUnit.BYTES.toIntBytes(l);
}

public static void throwEOF(long channelPos, long len, Object file) throws EOFException {
throw new EOFException(format("unexpected EOF reading [%d-%d] from %s", channelPos, channelPos + len, file));
}

public static void ensureSeek(long pos, IndexInput input) throws IOException {
final long length = input.length();
if (pos > length) {
throw new EOFException("Reading past end of file [position=" + pos + ", length=" + input.length() + "] for " + input);
} else if (pos < 0L) {
throw new IOException("Seeking to negative position [" + pos + "] for " + input);
}
}

public static ByteRange computeRange(long rangeSize, long position, long length) {
long start = (position / rangeSize) * rangeSize;
long end = Math.min(start + rangeSize, length);
return ByteRange.of(start, end);
}

public static void ensureSlice(String sliceName, long sliceOffset, long sliceLength, IndexInput input) {
if (sliceOffset < 0 || sliceLength < 0 || sliceOffset + sliceLength > input.length()) {
throw new IllegalArgumentException(
"slice() "
+ sliceName
+ " out of bounds: offset="
+ sliceOffset
+ ",length="
+ sliceLength
+ ",fileLength="
+ input.length()
+ ": "
+ input
);
}
}

/**
* Perform a single {@code read()} from {@code inputStream} into {@code copyBuffer}, handling an EOF by throwing an {@link EOFException}
* rather than returning {@code -1}. Returns the number of bytes read, which is always positive.
*
* Most of its arguments are there simply to make the message of the {@link EOFException} more informative.
*/
public static int readSafe(InputStream inputStream, ByteBuffer copyBuffer, long rangeStart, long remaining, Object cacheFileReference)
throws IOException {
final int len = (remaining < copyBuffer.remaining()) ? toIntBytes(remaining) : copyBuffer.remaining();
final int bytesRead = Streams.read(inputStream, copyBuffer, len);
if (bytesRead <= 0) {
throwEOF(rangeStart, remaining, cacheFileReference);
}
return bytesRead;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.searchablesnapshots.store;
package org.elasticsearch.blobcache.store;

import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.store.Directory;
Expand Down Expand Up @@ -33,7 +33,7 @@ public class InMemoryNoOpCommitDirectory extends FilterDirectory {
private final Directory realDirectory;
private final Set<String> deletedFiles = new CopyOnWriteArraySet<>();

InMemoryNoOpCommitDirectory(Directory realDirectory) {
public InMemoryNoOpCommitDirectory(Directory realDirectory) {
super(new ByteBuffersDirectory(NoLockFactory.INSTANCE));
this.realDirectory = realDirectory;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.blobcache;

import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.test.ESTestCase;

import java.io.EOFException;
import java.nio.ByteBuffer;

public class BlobCacheUtilsTests extends ESTestCase {

public void testReadSafeThrows() {
final ByteBuffer buffer = ByteBuffer.allocate(randomIntBetween(1, 1025));
final int remaining = randomIntBetween(1, 1025);
expectThrows(EOFException.class, () -> BlobCacheUtils.readSafe(BytesArray.EMPTY.streamInput(), buffer, 0, remaining, null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.searchablesnapshots.store;
package org.elasticsearch.blobcache.store;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.support.CountDownActionListener;
import org.elasticsearch.blobcache.common.ByteRange;
import org.elasticsearch.blobcache.shared.SharedBlobCacheService;
import org.elasticsearch.blobcache.store.InMemoryNoOpCommitDirectory;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;

import static org.elasticsearch.blobcache.BlobCacheUtils.readSafe;
import static org.elasticsearch.blobcache.BlobCacheUtils.toIntBytes;

public class CachedBlobContainerIndexInput extends MetadataCachingIndexInput {
Expand Down Expand Up @@ -171,7 +172,7 @@ public Tuple<Long, Long> prefetchPart(final int part) throws IOException {
while (remainingBytes > 0L) {
assert totalBytesRead + remainingBytes == range.length();
copyBuffer.clear();
final int bytesRead = readSafe(input, copyBuffer, range.start(), range.end(), remainingBytes, cacheFileReference);
final int bytesRead = readSafe(input, copyBuffer, range.start(), remainingBytes, cacheFileReference);

// The range to prewarm in cache
final long readStart = range.start() + totalBytesRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.blobcache.BlobCacheUtils;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.core.CheckedRunnable;
Expand Down Expand Up @@ -244,11 +245,7 @@ public void close() throws IOException {

@Override
protected void seekInternal(long pos) throws IOException {
if (pos > length()) {
throw new EOFException("Reading past end of file [position=" + pos + ", length=" + length() + "] for " + toString());
} else if (pos < 0L) {
throw new IOException("Seeking to negative position [" + pos + "] for " + toString());
}
BlobCacheUtils.ensureSeek(pos, this);
if (position != offset + pos) {
position = offset + pos;
closeStreamForSequentialReads();
Expand All @@ -267,28 +264,23 @@ public DirectBlobContainerIndexInput clone() {

@Override
public IndexInput slice(String sliceName, long offset, long length) throws IOException {
if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) {
final DirectBlobContainerIndexInput slice = new DirectBlobContainerIndexInput(
sliceName,
blobContainer,
fileInfo,
context,
stats,
position,
this.offset + offset,
length,
// Slices might not be closed when they are no longer needed, but we must always close streamForSequentialReads. The simple
// solution: do not optimize sequential reads on slices.
NO_SEQUENTIAL_READ_OPTIMIZATION
);
slice.isClone = true;
slice.seek(0L);
return slice;
} else {
throw new IllegalArgumentException(
"slice() " + sliceName + " out of bounds: offset=" + offset + ",length=" + length + ",fileLength=" + length() + ": " + this
);
}
BlobCacheUtils.ensureSlice(sliceName, offset, length, this);
final DirectBlobContainerIndexInput slice = new DirectBlobContainerIndexInput(
sliceName,
blobContainer,
fileInfo,
context,
stats,
position,
this.offset + offset,
length,
// Slices might not be closed when they are no longer needed, but we must always close streamForSequentialReads. The simple
// solution: do not optimize sequential reads on slices.
NO_SEQUENTIAL_READ_OPTIMIZATION
);
slice.isClone = true;
slice.seek(0L);
return slice;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.IOContext;
import org.elasticsearch.blobcache.BlobCacheUtils;
import org.elasticsearch.blobcache.common.ByteRange;
import org.elasticsearch.blobcache.shared.SharedBlobCacheService;
import org.elasticsearch.blobcache.shared.SharedBytes;
Expand All @@ -20,16 +21,12 @@
import org.elasticsearch.xpack.searchablesnapshots.store.IndexInputStats;
import org.elasticsearch.xpack.searchablesnapshots.store.SearchableSnapshotDirectory;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.concurrent.Semaphore;
import java.util.function.LongConsumer;

import static org.elasticsearch.core.Strings.format;

public class FrozenIndexInput extends MetadataCachingIndexInput {

private static final Logger logger = LogManager.getLogger(FrozenIndexInput.class);
Expand Down Expand Up @@ -115,7 +112,7 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
// In particular, it's important to acquire all permits before adapting the ByteBuffer's offset
final Semaphore luceneByteBufPermits = new Semaphore(Integer.MAX_VALUE);
boolean bufferWriteLocked = false;
logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this);
logger.trace("readInternal: read [{}-{}] from [{}]", position, position + length, this);
try {
final ByteRange startRangeToWrite = computeRange(position);
final ByteRange endRangeToWrite = computeRange(position + length - 1);
Expand Down Expand Up @@ -199,15 +196,7 @@ private int readCacheFile(
final ByteBuffer dup = buffer.slice(buffer.position() + Math.toIntExact(relativePos), Math.toIntExact(length));
bytesRead = fc.read(dup, channelPos);
if (bytesRead == -1) {
throw new EOFException(
String.format(
Locale.ROOT,
"unexpected EOF reading [%d-%d] from %s",
channelPos,
channelPos + dup.remaining(),
this.cacheFile
)
);
BlobCacheUtils.throwEOF(channelPos, dup.remaining(), this.cacheFile);
}
} finally {
luceneByteBufPermits.release();
Expand Down Expand Up @@ -239,13 +228,13 @@ private void writeCacheFile(
cacheFile
);
final long end = relativePos + length;
logger.trace(() -> format("writing range [%s-%s] to cache file [%s]", relativePos, end, cacheFile));
logger.trace("writing range [{}-{}] to cache file [{}]", relativePos, end, cacheFile);

long bytesCopied = 0L;
long remaining = length;
final ByteBuffer buf = writeBuffer.get().clear();
while (remaining > 0L) {
final int bytesRead = MetadataCachingIndexInput.readSafe(input, buf, relativePos, end, remaining, cacheFile);
final int bytesRead = BlobCacheUtils.readSafe(input, buf, relativePos, remaining, cacheFile);
if (buf.hasRemaining()) {
break;
}
Expand Down

0 comments on commit 6498bd1

Please sign in to comment.