Skip to content

Commit

Permalink
Reuse FrozenIndexInput.writeCacheFile method (#70545)
Browse files Browse the repository at this point in the history
Makes it easier for subsequent PRs that change the core write logic of FrozenIndexInput as there is just a single write path
  • Loading branch information
ywelsch committed Mar 18, 2021
1 parent fa544bd commit de82200
Showing 1 changed file with 35 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,30 +229,14 @@ protected void doReadInternal(ByteBuffer b) throws IOException {
(channel, channelPos, relativePos, len, progressUpdater) -> {
assert len <= cachedBlob.to() - cachedBlob.from();
final long startTimeNanos = stats.currentTimeNanos();
final BytesRefIterator iterator = cachedBlob.bytes()
.slice(toIntBytes(relativePos), toIntBytes(len))
.iterator();
long writePosition = channelPos;
long bytesCopied = 0L;
BytesRef current;
while ((current = iterator.next()) != null) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(current.bytes, current.offset, current.length);
while (byteBuffer.remaining() > 0) {
final long bytesWritten = positionalWrite(channel, writePosition, byteBuffer);
bytesCopied += bytesWritten;
writePosition += bytesWritten;
progressUpdater.accept(bytesCopied);
}
}
long channelTo = channelPos + len;
assert writePosition == channelTo : writePosition + " vs " + channelTo;
final long endTimeNanos = stats.currentTimeNanos();
stats.addCachedBytesWritten(len, endTimeNanos - startTimeNanos);
logger.trace(
"copied bytes [{}-{}] of file [{}] from cache index to disk",
writeCacheFile(
channel,
cachedBlob.bytes().streamInput(),
channelPos,
relativePos,
relativePos + len,
fileName
len,
progressUpdater,
startTimeNanos
);
},
directory.cacheFetchAsyncExecutor()
Expand Down Expand Up @@ -309,14 +293,14 @@ protected void doReadInternal(ByteBuffer b) throws IOException {
luceneByteBufLock,
stopAsyncReads
),
(channel, channelPos, relativePos, len, progressUpdater) -> this.writeCacheFile(
channel,
channelPos,
relativePos,
len,
rangeToWrite.start(),
progressUpdater
),
(channel, channelPos, relativePos, len, progressUpdater) -> {
final long startTimeNanos = stats.currentTimeNanos();
final long streamStartPosition = rangeToWrite.start() + relativePos + compoundFileOffset;

try (InputStream input = openInputStreamFromBlobStore(streamStartPosition, len)) {
this.writeCacheFile(channel, input, channelPos, relativePos, len, progressUpdater, startTimeNanos);
}
},
directory.cacheFetchAsyncExecutor()
);

Expand Down Expand Up @@ -589,17 +573,17 @@ private int readCacheFile(

private void writeCacheFile(
final SharedBytes.IO fc,
long fileChannelPos,
long relativePos,
long length,
long logicalPos,
final Consumer<Long> progressUpdater
final InputStream input,
final long fileChannelPos,
final long relativePos,
final long length,
final Consumer<Long> progressUpdater,
final long startTimeNanos
) throws IOException {
assert assertCurrentThreadMayWriteCacheFile();
logger.trace(
"{}: writing logical {} channel {} pos {} length {} (details: {})",
"{}: writing channel {} pos {} length {} (details: {})",
fileInfo.physicalName(),
logicalPos,
fileChannelPos,
relativePos,
length,
Expand All @@ -611,19 +595,21 @@ private void writeCacheFile(

long bytesCopied = 0L;
long remaining = length;
final long startTimeNanos = stats.currentTimeNanos();
try (InputStream input = openInputStreamFromBlobStore(logicalPos + relativePos + compoundFileOffset, length)) {
while (remaining > 0L) {
final int bytesRead = readSafe(input, copyBuffer, relativePos, end, remaining, frozenCacheFile);
positionalWrite(fc, fileChannelPos + bytesCopied, ByteBuffer.wrap(copyBuffer, 0, bytesRead));
bytesCopied += bytesRead;
remaining -= bytesRead;
progressUpdater.accept(bytesCopied);
while (remaining > 0L) {
final int bytesRead = readSafe(input, copyBuffer, relativePos, end, remaining, frozenCacheFile);
final ByteBuffer byteBuffer = ByteBuffer.wrap(copyBuffer, 0, bytesRead);
int writePosition = 0;
while (byteBuffer.remaining() > 0) {
final long bytesWritten = positionalWrite(fc, fileChannelPos + bytesCopied + writePosition, byteBuffer);
writePosition += bytesWritten;
}
final long endTimeNanos = stats.currentTimeNanos();
assert bytesCopied == length;
stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos);
bytesCopied += bytesRead;
remaining -= bytesRead;
progressUpdater.accept(bytesCopied);
}
final long endTimeNanos = stats.currentTimeNanos();
assert bytesCopied == length;
stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos);
}

@Override
Expand Down

0 comments on commit de82200

Please sign in to comment.