Skip to content

Commit

Permalink
Nicer buffer handling (#93491)
Browse files Browse the repository at this point in the history
Some optimisations that I found when reusing searchable snapshot code elsewhere:
* Add an efficient input stream -> byte buffer path that avoids allocations + copies for heap buffers, this is non-trivial in its effects IMO
  * Also at least avoid allocations and use existing thread-local buffer when doing input stream -> direct bb
  * move `readFully` to lower level streams class to enable this
* Use same thread local direct byte buffer for frozen and caching index input instead of constantly allocating new heap buffers and writing those to disk inefficiently
  • Loading branch information
original-brownbear committed Feb 6, 2023
1 parent e837ff7 commit f2760c6
Show file tree
Hide file tree
Showing 19 changed files with 147 additions and 121 deletions.
57 changes: 57 additions & 0 deletions libs/core/src/main/java/org/elasticsearch/core/Streams.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
* Simple utility methods for file and stream copying. All copy methods close all affected streams when done.
Expand Down Expand Up @@ -78,6 +79,62 @@ public static long copy(final InputStream in, final OutputStream out) throws IOE
return copy(in, out, LOCAL_BUFFER.get(), true);
}

/**
* Read up to {code count} bytes from {@code input} and store them into {@code buffer}.
* The buffers position will be incremented by the number of bytes read from the stream.
* @param input stream to read from
* @param buffer buffer to read into
* @param count maximum number of bytes to read
* @return number of bytes read from the stream
* @throws IOException in case of I/O errors
*/
public static int read(InputStream input, ByteBuffer buffer, int count) throws IOException {
if (buffer.hasArray()) {
return readToHeapBuffer(input, buffer, count);
}
return readToDirectBuffer(input, buffer, count);
}

private static int readToHeapBuffer(InputStream input, ByteBuffer buffer, int count) throws IOException {
final int pos = buffer.position();
int read = readFully(input, buffer.array(), buffer.arrayOffset() + pos, count);
if (read > 0) {
buffer.position(pos + read);
}
return read;
}

private static int readToDirectBuffer(InputStream input, ByteBuffer b, int count) throws IOException {
int totalRead = 0;
final byte[] buffer = LOCAL_BUFFER.get();
while (totalRead < count) {
final int len = Math.min(count - totalRead, buffer.length);
final int read = input.read(buffer, 0, len);
if (read == -1) {
break;
}
b.put(buffer, 0, read);
totalRead += read;
}
return totalRead;
}

public static int readFully(InputStream reader, byte[] dest) throws IOException {
return readFully(reader, dest, 0, dest.length);
}

public static int readFully(InputStream reader, byte[] dest, int offset, int len) throws IOException {
int read = 0;
while (read < len) {
final int r = reader.read(dest, offset + read, len - read);
if (r == -1) {
break;
}
read += r;
}
return read;
}

/**
* Wraps an {@link OutputStream} such that it's {@code close} method becomes a noop
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ public void testWriteBlobWithRetries() throws Exception {

if (randomBoolean()) {
if (randomBoolean()) {
Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, Math.max(1, bytes.length - 1))]);
org.elasticsearch.core.Streams.readFully(
exchange.getRequestBody(),
new byte[randomIntBetween(1, Math.max(1, bytes.length - 1))]
);
} else {
Streams.readFully(exchange.getRequestBody());
AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import org.elasticsearch.common.blobstore.support.BlobMetadata;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.SuppressForbidden;

import java.io.ByteArrayInputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ public void testWriteBlobWithRetries() throws Exception {
}
if (randomBoolean()) {
if (randomBoolean()) {
Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, Math.max(1, bytes.length - 1))]);
org.elasticsearch.core.Streams.readFully(
exchange.getRequestBody(),
new byte[randomIntBetween(1, Math.max(1, bytes.length - 1))]
);
} else {
Streams.readFully(exchange.getRequestBody());
exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
Expand All @@ -241,7 +244,7 @@ public void testWriteBlobWithReadTimeouts() {
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
if (randomBoolean()) {
if (randomBoolean()) {
Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, bytes.length - 1)]);
org.elasticsearch.core.Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, bytes.length - 1)]);
} else {
Streams.readFully(exchange.getRequestBody());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,10 @@ public void testWriteBlobWithRetries() throws Exception {

if (randomBoolean()) {
if (randomBoolean()) {
Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, Math.max(1, bytes.length - 1))]);
org.elasticsearch.core.Streams.readFully(
exchange.getRequestBody(),
new byte[randomIntBetween(1, Math.max(1, bytes.length - 1))]
);
} else {
Streams.readFully(exchange.getRequestBody());
exchange.sendResponseHeaders(
Expand Down Expand Up @@ -222,7 +225,7 @@ public void testWriteBlobWithReadTimeouts() {
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_timeout"), exchange -> {
if (randomBoolean()) {
if (randomBoolean()) {
Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, bytes.length - 1)]);
org.elasticsearch.core.Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, bytes.length - 1)]);
} else {
Streams.readFully(exchange.getRequestBody());
}
Expand Down Expand Up @@ -317,7 +320,10 @@ public void testWriteLargeBlob() throws Exception {
// sends an error back or let the request time out
if (useTimeout == false) {
if (randomBoolean() && contentLength > 0) {
Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, Math.toIntExact(contentLength - 1))]);
org.elasticsearch.core.Streams.readFully(
exchange.getRequestBody(),
new byte[randomIntBetween(1, Math.toIntExact(contentLength - 1))]
);
} else {
Streams.readFully(exchange.getRequestBody());
exchange.sendResponseHeaders(
Expand Down Expand Up @@ -412,7 +418,10 @@ public void testWriteLargeBlobStreaming() throws Exception {
// sends an error back or let the request time out
if (useTimeout == false) {
if (randomBoolean() && contentLength > 0) {
Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, Math.toIntExact(contentLength - 1))]);
org.elasticsearch.core.Streams.readFully(
exchange.getRequestBody(),
new byte[randomIntBetween(1, Math.toIntExact(contentLength - 1))]
);
} else {
Streams.readFully(exchange.getRequestBody());
exchange.sendResponseHeaders(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.http.ssl.SSLContexts;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Streams;
import org.elasticsearch.rest.RestStatus;

import java.io.Closeable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.CoreMatchers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

import org.elasticsearch.Assertions;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Streams;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
Expand Down
16 changes: 0 additions & 16 deletions server/src/main/java/org/elasticsearch/common/io/Streams.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,22 +136,6 @@ public static String copyToString(Reader in) throws IOException {
return out.toString();
}

public static int readFully(InputStream reader, byte[] dest) throws IOException {
return readFully(reader, dest, 0, dest.length);
}

public static int readFully(InputStream reader, byte[] dest, int offset, int len) throws IOException {
int read = 0;
while (read < len) {
final int r = reader.read(dest, offset + read, len - read);
if (r == -1) {
break;
}
read += r;
}
return read;
}

/**
* Fully consumes the input stream, throwing the bytes away. Returns the number of bytes consumed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

package org.elasticsearch.common.io.stream;

import org.elasticsearch.common.io.Streams;
import org.elasticsearch.core.Streams;

import java.io.EOFException;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -55,6 +54,7 @@
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.NodeEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,13 @@ private int getAvailable() throws IOException {
final int footerLen = CodecUtil.footerLength();
if (bufferCount == 0) {
// first read, fill the buffer
bufferCount = Streams.readFully(in, buffer, 0, buffer.length);
bufferCount = org.elasticsearch.core.Streams.readFully(in, buffer, 0, buffer.length);
} else if (bufferPos == bufferCount - footerLen) {
// crc and discard all but the last 16 bytes in the buffer that might be the footer bytes
assert bufferCount >= footerLen;
crc32.update(buffer, 0, bufferPos);
System.arraycopy(buffer, bufferPos, buffer, 0, footerLen);
bufferCount = footerLen + Streams.readFully(in, buffer, footerLen, buffer.length - footerLen);
bufferCount = footerLen + org.elasticsearch.core.Streams.readFully(in, buffer, footerLen, buffer.length - footerLen);
bufferPos = 0;
}
// bytes in the buffer minus 16 bytes that could be the footer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
import org.elasticsearch.common.blobstore.support.BlobMetadata;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.core.Streams;
import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.support.BlobMetadata;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CharArrays;
import org.elasticsearch.core.Streams;
import org.elasticsearch.xpack.core.security.SecurityField;
import org.elasticsearch.xpack.core.watcher.WatcherField;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,15 @@ public Tuple<Long, Long> prefetchPart(final int part) throws IOException {
range.end(),
cacheFileReference
);

final byte[] copyBuffer = new byte[toIntBytes(Math.min(COPY_BUFFER_SIZE, range.length()))];

final ByteBuffer copyBuffer = writeBuffer.get();
long totalBytesRead = 0L;
final AtomicLong totalBytesWritten = new AtomicLong();
long remainingBytes = range.length();
final long startTimeNanos = stats.currentTimeNanos();
try (InputStream input = openInputStreamFromBlobStore(range.start(), range.length())) {
while (remainingBytes > 0L) {
assert totalBytesRead + remainingBytes == range.length();
copyBuffer.clear();
final int bytesRead = readSafe(input, copyBuffer, range.start(), range.end(), remainingBytes, cacheFileReference);

// The range to prewarm in cache
Expand All @@ -182,8 +181,11 @@ public Tuple<Long, Long> prefetchPart(final int part) throws IOException {
// noinspection UnnecessaryLocalVariable
final ByteRange rangeToRead = rangeToWrite;
cacheFile.populateAndRead(rangeToWrite, rangeToRead, (channel) -> bytesRead, (channel, start, end, progressUpdater) -> {
final ByteBuffer byteBuffer = ByteBuffer.wrap(copyBuffer, toIntBytes(start - readStart), toIntBytes(end - start));
final int writtenBytes = positionalWrite(channel, start, byteBuffer);
final int writtenBytes = positionalWrite(
channel,
start,
copyBuffer.slice(toIntBytes(start - readStart), toIntBytes(end - start))
);
logger.trace(
"prefetchPart: writing range [{}-{}] of file [{}], [{}] bytes written",
start,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Streams;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.elasticsearch.xpack.searchablesnapshots.store.IndexInputStats;

Expand Down Expand Up @@ -63,7 +64,6 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
private StreamForSequentialReads streamForSequentialReads;
private long sequentialReadSize;
private static final long NO_SEQUENTIAL_READ_OPTIMIZATION = 0L;
private static final int COPY_BUFFER_SIZE = 8192;

public DirectBlobContainerIndexInput(
String name,
Expand Down Expand Up @@ -312,17 +312,9 @@ private InputStream openBlobStream(int part, long pos, long length) throws IOExc
*/
private static int readFully(InputStream inputStream, final ByteBuffer b, int length, CheckedRunnable<IOException> onEOF)
throws IOException {
int totalRead = 0;
final byte[] buffer = new byte[Math.min(length, COPY_BUFFER_SIZE)];
while (totalRead < length) {
final int len = Math.min(length - totalRead, COPY_BUFFER_SIZE);
final int read = inputStream.read(buffer, 0, len);
if (read == -1) {
onEOF.run();
break;
}
b.put(buffer, 0, read);
totalRead += read;
int totalRead = Streams.read(inputStream, b, length);
if (totalRead < length) {
onEOF.run();
}
return totalRead > 0 ? totalRead : -1;
}
Expand Down

0 comments on commit f2760c6

Please sign in to comment.