Skip to content

Commit

Permalink
Formalize and Streamline Buffer Sizes used by Repositories (#59771) (#…
Browse files Browse the repository at this point in the history
…60052)

Due to complicated access checks (reads and writes execute in their own access context) on some repositories (GCS, Azure, HDFS), using a hard coded buffer size of 4k for restores was needlessly inefficient.
By the same token, the use of stream copying with the default 8k buffer size  for blob writes was inefficient as well.

We also had dedicated, undocumented buffer size settings for HDFS and FS repositories. For these two we would use a 100k buffer by default. We did not have such a setting for e.g. GCS though, which would only use an 8k read buffer which is needlessly small for reading from a raw `URLConnection`.

This commit adds an undocumented setting that sets the default buffer size to `128k` for all repositories. It removes wasteful allocation of such a large buffer for small writes and reads in case of HDFS and FS repositories (i.e. still using the smaller buffer to write metadata) but uses a large buffer for doing restores and uploading segment blobs.

This should speed up Azure and GCS restores and snapshots in a non-trivial way as well as save some memory when reading small blobs on FS and HFDS repositories.
  • Loading branch information
original-brownbear committed Jul 22, 2020
1 parent 6b1c841 commit f26e03d
Show file tree
Hide file tree
Showing 15 changed files with 77 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.internal.io.Streams;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -101,16 +101,19 @@ class GoogleCloudStorageBlobStore implements BlobStore {
private final String repositoryName;
private final GoogleCloudStorageService storageService;
private final GoogleCloudStorageOperationsStats stats;
private final int bufferSize;

GoogleCloudStorageBlobStore(String bucketName,
String clientName,
String repositoryName,
GoogleCloudStorageService storageService) {
GoogleCloudStorageService storageService,
int bufferSize) {
this.bucketName = bucketName;
this.clientName = clientName;
this.repositoryName = repositoryName;
this.storageService = storageService;
this.stats = new GoogleCloudStorageOperationsStats(bucketName);
this.bufferSize = bufferSize;
if (doesBucketExist(bucketName) == false) {
throw new BlobStoreException("Bucket [" + bucketName + "] does not exist");
}
Expand All @@ -126,7 +129,7 @@ public BlobContainer blobContainer(BlobPath path) {
}

@Override
public void close() throws IOException {
public void close() {
storageService.closeRepositoryClient(repositoryName);
}

Expand Down Expand Up @@ -252,7 +255,7 @@ InputStream readBlob(String blobName, long position, long length) throws IOExcep
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
if (blobSize > getLargeBlobThresholdInBytes()) {
writeBlobResumable(blobInfo, inputStream, failIfAlreadyExists);
writeBlobResumable(blobInfo, inputStream, blobSize, failIfAlreadyExists);
} else {
writeBlobMultipart(blobInfo, inputStream, blobSize, failIfAlreadyExists);
}
Expand All @@ -269,13 +272,16 @@ long getLargeBlobThresholdInBytes() {
* https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
* @param blobInfo the info for the blob to be uploaded
* @param inputStream the stream containing the blob data
* @param size expected size of the blob to be written
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException {
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long size, boolean failIfAlreadyExists)
throws IOException {
// We retry 410 GONE errors to cover the unlikely but possible scenario where a resumable upload session becomes broken and
// needs to be restarted from scratch. Given how unlikely a 410 error should be according to SLAs we retry only twice.
assert inputStream.markSupported();
inputStream.mark(Integer.MAX_VALUE);
final byte[] buffer = new byte[size < bufferSize ? Math.toIntExact(size) : bufferSize];
StorageException storageException = null;
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0];
Expand Down Expand Up @@ -303,8 +309,7 @@ public boolean isOpen() {
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}

}));
}), buffer);
// We don't track this operation on the http layer as
// we do with the GET/LIST operations since this operations
// can trigger multiple underlying http requests but only one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {

@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService);
return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bufferSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clien
}))
);

final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore("bucket", client, "repo", service);
final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore("bucket", client, "repo", service,
randomIntBetween(1, 8) * 1024);
httpContexts.forEach(httpContext -> httpServer.removeContext(httpContext));

return new GoogleCloudStorageBlobContainer(BlobPath.cleanPath(), blobStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
when(storageService.client(any(String.class), any(String.class), any(GoogleCloudStorageOperationsStats.class))).thenReturn(storage);

try (BlobStore store = new GoogleCloudStorageBlobStore("bucket", "test", "repo", storageService)) {
try (BlobStore store = new GoogleCloudStorageBlobStore("bucket", "test", "repo", storageService,
randomIntBetween(1, 8) * 1024)) {
final BlobContainer container = store.blobContainer(new BlobPath());

IOException e = expectThrows(IOException.class, () -> container.deleteBlobsIgnoringIfNotExists(blobs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ public Map<String, Repository.Factory> getRepositories(Environment env, NamedXCo
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(
metadata.settings().get("bucket"), "test", metadata.name(), storageService) {
metadata.settings().get("bucket"), "test", metadata.name(), storageService,
randomIntBetween(1, 8) * 1024) {
@Override
long getLargeBlobThresholdInBytes() {
return ByteSizeUnit.MB.toBytes(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.CreateOpts;
Expand Down Expand Up @@ -111,23 +112,19 @@ public InputStream readBlob(String blobName) throws IOException {
}

@Override
public InputStream readBlob(String blobName, long position, long length) throws IOException {
public InputStream readBlob(String blobName, long position, long length) {
throw new UnsupportedOperationException();
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
Path blob = new Path(path, blobName);
// we pass CREATE, which means it fails if a blob already exists.
final EnumSet<CreateFlag> flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)
: EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK);
store.execute((Operation<Void>) fileContext -> {
Path blob = new Path(path, blobName);
// we pass CREATE, which means it fails if a blob already exists.
EnumSet<CreateFlag> flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)
: EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK);
try (FSDataOutputStream stream = fileContext.create(blob, flags, CreateOpts.bufferSize(bufferSize))) {
int bytesRead;
byte[] buffer = new byte[bufferSize];
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
}
try {
writeToPath(inputStream, blobSize, fileContext, blob, flags);
} catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage());
}
Expand All @@ -138,17 +135,10 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
final String tempBlob = FsBlobContainer.tempBlobName(blobName);
final Path tempBlobPath = new Path(path, tempBlob);
final Path blob = new Path(path, blobName);
store.execute((Operation<Void>) fileContext -> {
final Path tempBlobPath = new Path(path, tempBlob);
try (FSDataOutputStream stream = fileContext.create(
tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK), CreateOpts.bufferSize(bufferSize))) {
int bytesRead;
byte[] buffer = new byte[bufferSize];
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
}
}
final Path blob = new Path(path, blobName);
writeToPath(inputStream, blobSize, fileContext, tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK));
try {
fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE);
} catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
Expand All @@ -158,6 +148,17 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS
});
}

private void writeToPath(InputStream inputStream, long blobSize, FileContext fileContext, Path blobPath,
EnumSet<CreateFlag> createFlags) throws IOException {
final byte[] buffer = new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize];
try (FSDataOutputStream stream = fileContext.create(blobPath, createFlags, CreateOpts.bufferSize(buffer.length))) {
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
}
}
}

@Override
public Map<String, BlobMetadata> listBlobsByPrefix(@Nullable final String prefix) throws IOException {
FileStatus[] files = store.execute(fileContext -> fileContext.util().listStatus(path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
Expand Down Expand Up @@ -64,10 +63,6 @@ public final class HdfsRepository extends BlobStoreRepository {
private final URI uri;
private final String pathSetting;

// buffer size passed to HDFS read/write methods
// TODO: why 100KB?
private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB);

public HdfsRepository(
final RepositoryMetadata metadata,
final Environment environment,
Expand Down Expand Up @@ -128,8 +123,6 @@ private HdfsBlobStore createBlobstore(URI uri, String path, Settings repositoryS
Class<?> ret = hadoopConfiguration.getClass(configKey, null, FailoverProxyProvider.class);
boolean haEnabled = ret != null;

int bufferSize = repositorySettings.getAsBytesSize("buffer_size", DEFAULT_BUFFER_SIZE).bytesAsInt();

// Create the filecontext with our user information
// This will correctly configure the filecontext to have our UGI as its internal user.
FileContext fileContext = ugi.doAs((PrivilegedAction<FileContext>) () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void testReadOnly() throws Exception {
Path tempDir = createTempDir();
Path path = tempDir.resolve("bar");

try (FsBlobStore store = new FsBlobStore(Settings.EMPTY, path, true)) {
try (FsBlobStore store = new FsBlobStore(randomIntBetween(1, 8) * 1024, path, true)) {
assertFalse(Files.exists(path));
BlobPath blobPath = BlobPath.cleanPath().add("foo");
store.blobContainer(blobPath);
Expand All @@ -116,7 +116,7 @@ public void testReadOnly() throws Exception {
assertFalse(Files.exists(storePath));
}

try (FsBlobStore store = new FsBlobStore(Settings.EMPTY, path, false)) {
try (FsBlobStore store = new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false)) {
assertTrue(Files.exists(path));
BlobPath blobPath = BlobPath.cleanPath().add("foo");
BlobContainer container = store.blobContainer(blobPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.io.Streams;

import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -144,10 +143,6 @@ public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOExce
IOUtils.rm(blobNames.stream().map(path::resolve).toArray(Path[]::new));
}

private InputStream bufferedInputStream(InputStream inputStream) {
return new BufferedInputStream(inputStream, blobStore.bufferSizeInBytes());
}

@Override
public boolean blobExists(String blobName) {
return Files.exists(path.resolve(blobName));
Expand All @@ -157,7 +152,7 @@ public boolean blobExists(String blobName) {
public InputStream readBlob(String name) throws IOException {
final Path resolvedPath = path.resolve(name);
try {
return bufferedInputStream(Files.newInputStream(resolvedPath));
return Files.newInputStream(resolvedPath);
} catch (FileNotFoundException fnfe) {
throw new NoSuchFileException("[" + name + "] blob not found");
}
Expand All @@ -170,7 +165,7 @@ public InputStream readBlob(String blobName, long position, long length) throws
channel.position(position);
}
assert channel.position() == position;
return bufferedInputStream(org.elasticsearch.common.io.Streams.limitStream(Channels.newInputStream(channel), length));
return org.elasticsearch.common.io.Streams.limitStream(Channels.newInputStream(channel), length);
}

@Override
Expand All @@ -185,10 +180,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
deleteBlobsIgnoringIfNotExists(Collections.singletonList(blobName));
}
final Path file = path.resolve(blobName);
try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) {
Streams.copy(inputStream, outputStream);
}
IOUtils.fsync(file, false);
writeToPath(inputStream, file, blobSize);
IOUtils.fsync(path, true);
}

Expand All @@ -198,10 +190,7 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream
final String tempBlob = tempBlobName(blobName);
final Path tempBlobPath = path.resolve(tempBlob);
try {
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
Streams.copy(inputStream, outputStream);
}
IOUtils.fsync(tempBlobPath, false);
writeToPath(inputStream, tempBlobPath, blobSize);
moveBlobAtomic(tempBlob, blobName, failIfAlreadyExists);
} catch (IOException ex) {
try {
Expand All @@ -215,6 +204,14 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream
}
}

private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException {
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
final int bufferSize = blobStore.bufferSizeInBytes();
Streams.copy(inputStream, outputStream, new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]);
}
IOUtils.fsync(tempBlobPath, false);
}

public void moveBlobAtomic(final String sourceBlobName, final String targetBlobName, final boolean failIfAlreadyExists)
throws IOException {
final Path sourceBlobPath = path.resolve(sourceBlobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -39,14 +36,13 @@ public class FsBlobStore implements BlobStore {

private final boolean readOnly;

public FsBlobStore(Settings settings, Path path, boolean readonly) throws IOException {
public FsBlobStore(int bufferSizeInBytes, Path path, boolean readonly) throws IOException {
this.path = path;
this.readOnly = readonly;
if (this.readOnly == false) {
Files.createDirectories(path);
}
this.bufferSizeInBytes = (int) settings.getAsBytesSize("repositories.fs.buffer_size",
new ByteSizeValue(100, ByteSizeUnit.KB)).getBytes();
this.bufferSizeInBytes = bufferSizeInBytes;
}

@Override
Expand Down

0 comments on commit f26e03d

Please sign in to comment.