Skip to content

Commit

Permalink
Minor cleanups FrozenIndexInput (#93309)
Browse files Browse the repository at this point in the history
Some random finds while working with this code. We shouldn't use a Consumer<Long> instead of a LongConsumer
as we never pass `null` to the consumer.
Also, way simplified the locking around the Lucene `Bytebuffer b` to simplify the code and technically make it
a little faster/less-contenting as well.
Plus, made use of modern Java's buffer slicing to simplify the slicing of the Lucene buffer.
  • Loading branch information
original-brownbear committed Feb 2, 2023
1 parent f4b7335 commit 03f8ea5
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -894,7 +894,7 @@ public interface RangeAvailableHandler {

@FunctionalInterface
public interface RangeMissingHandler {
void fillCacheRange(SharedBytes.IO channel, long channelPos, long relativePos, long length, Consumer<Long> progressUpdater)
void fillCacheRange(SharedBytes.IO channel, long channelPos, long relativePos, long length, LongConsumer progressUpdater)
throws IOException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongConsumer;

public class CacheFile {

Expand Down Expand Up @@ -334,7 +334,7 @@ public interface RangeAvailableHandler {

@FunctionalInterface
public interface RangeMissingHandler {
void fillCacheRange(FileChannel channel, long from, long to, Consumer<Long> progressUpdater) throws IOException;
void fillCacheRange(FileChannel channel, long from, long to, LongConsumer progressUpdater) throws IOException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.concurrent.Semaphore;
import java.util.function.LongConsumer;

import static org.elasticsearch.blobcache.BlobCacheUtils.toIntBytes;
import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -113,22 +112,12 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
final int length = b.remaining();
final int originalByteBufPosition = b.position();

final ReentrantReadWriteLock luceneByteBufLock = new ReentrantReadWriteLock();
final AtomicBoolean stopAsyncReads = new AtomicBoolean();
// Runnable that, when called, ensures that async callbacks (such as those used by readCacheFile) are not
// Semaphore that, when all permits are acquired, ensures that async callbacks (such as those used by readCacheFile) are not
// accessing the byte buffer anymore that was passed to readWithoutBlobCache
// In particular, it's important to call this method before adapting the ByteBuffer's offset
final Runnable preventAsyncBufferChanges = () -> {
luceneByteBufLock.writeLock().lock();
try {
stopAsyncReads.set(true);
} finally {
luceneByteBufLock.writeLock().unlock();
}
};

// In particular, it's important to acquire all permits before adapting the ByteBuffer's offset
final Semaphore luceneByteBufPermits = new Semaphore(Integer.MAX_VALUE);
boolean bufferWriteLocked = false;
logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this);

try {
final ByteRange startRangeToWrite = computeRange(position);
final ByteRange endRangeToWrite = computeRange(position + length - 1);
Expand All @@ -149,8 +138,7 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
len,
b,
rangeToRead.start(),
luceneByteBufLock,
stopAsyncReads
luceneByteBufPermits
),
(channel, channelPos, relativePos, len, progressUpdater) -> {
final long startTimeNanos = stats.currentTimeNanos();
Expand All @@ -163,12 +151,15 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME
);
assert bytesRead == length : bytesRead + " vs " + length;
assert luceneByteBufLock.getReadHoldCount() == 0;
assert luceneByteBufPermits.availablePermits() == Integer.MAX_VALUE;

preventAsyncBufferChanges.run();
luceneByteBufPermits.acquire(Integer.MAX_VALUE);
bufferWriteLocked = true;
b.position(originalByteBufPosition + bytesRead); // mark all bytes as accounted for
} finally {
preventAsyncBufferChanges.run();
if (bufferWriteLocked == false) {
luceneByteBufPermits.acquire(Integer.MAX_VALUE);
}
}
}

Expand All @@ -188,8 +179,7 @@ private int readCacheFile(
long length,
final ByteBuffer buffer,
long logicalPos,
ReentrantReadWriteLock luceneByteBufLock,
AtomicBoolean stopAsyncReads
Semaphore luceneByteBufPermits
) throws IOException {
logger.trace(
"{}: reading cached {} logical {} channel {} pos {} length {} (details: {})",
Expand All @@ -205,21 +195,10 @@ private int readCacheFile(
return 0;
}
final int bytesRead;
if (luceneByteBufLock.readLock().tryLock()) {
if (luceneByteBufPermits.tryAcquire()) {
try {
boolean shouldStopReading = stopAsyncReads.get();
if (shouldStopReading) {
// return fake response
return Math.toIntExact(length);
}
// create slice that is positioned to read the given values
final ByteBuffer dup = buffer.duplicate();
final int newPosition = dup.position() + Math.toIntExact(relativePos);
assert newPosition <= dup.limit() : "newpos " + newPosition + " limit " + dup.limit();
assert newPosition + length <= buffer.limit()
: "oldpos " + dup.position() + " newpos " + newPosition + " length " + length + " limit " + buffer.limit();
dup.position(newPosition);
dup.limit(newPosition + Math.toIntExact(length));
final ByteBuffer dup = buffer.slice(buffer.position() + Math.toIntExact(relativePos), Math.toIntExact(length));
bytesRead = fc.read(dup, channelPos);
if (bytesRead == -1) {
throw new EOFException(
Expand All @@ -233,7 +212,7 @@ private int readCacheFile(
);
}
} finally {
luceneByteBufLock.readLock().unlock();
luceneByteBufPermits.release();
}
} else {
// return fake response
Expand Down Expand Up @@ -263,7 +242,7 @@ private void writeCacheFile(
final long fileChannelPos,
final long relativePos,
final long length,
final Consumer<Long> progressUpdater,
final LongConsumer progressUpdater,
final long startTimeNanos
) throws IOException {
assert ThreadPool.assertCurrentThreadPool(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongConsumer;

import static org.elasticsearch.blobcache.BlobCacheUtils.toIntBytes;
import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -287,7 +287,7 @@ protected int readCacheFile(final FileChannel fc, final long position, final Byt
return bytesRead;
}

protected void writeCacheFile(final FileChannel fc, final long start, final long end, final Consumer<Long> progressUpdater)
protected void writeCacheFile(final FileChannel fc, final long start, final long end, final LongConsumer progressUpdater)
throws IOException {
assert assertFileChannelOpen(fc);
assert ThreadPool.assertCurrentThreadPool(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
Expand Down

0 comments on commit 03f8ea5

Please sign in to comment.