Skip to content

Commit

Permalink
Fix S3 Streaming Writes Ignoring Relative Paths for Large Writes (#76273
Browse files Browse the repository at this point in the history
) (#76284)

It's in the title, we were not accounting for relative paths at all
here and only saved by the fact that we mostly short-circuit to
non-streaming writes.
Extended testing to catch this case for S3 and would do a follow-up
to extend it for the other implementations as well.
  • Loading branch information
original-brownbear committed Aug 10, 2021
1 parent 90ba0c3 commit 2626ec8
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ protected Matcher<Object> readTimeoutExceptionMatcher() {
protected BlobContainer createBlobContainer(Integer maxRetries,
TimeValue readTimeout,
Boolean disableChunkedEncoding,
ByteSizeValue bufferSize) {
ByteSizeValue bufferSize,
BlobPath path) {
Settings.Builder settingsBuilder = Settings.builder();

if (maxRetries != null) {
Expand All @@ -91,7 +92,7 @@ protected BlobContainer createBlobContainer(Integer maxRetries,
final URLHttpClientSettings httpClientSettings = URLHttpClientSettings.fromSettings(settings);
URLBlobStore urlBlobStore =
new URLBlobStore(settings, new URL(getEndpointForServer()), factory.create(httpClientSettings), httpClientSettings);
return urlBlobStore.blobContainer(BlobPath.EMPTY);
return urlBlobStore.blobContainer(path == null ? BlobPath.EMPTY : path);
} catch (MalformedURLException e) {
throw new RuntimeException("Unable to create URLBlobStore", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ protected Class<? extends Exception> unresponsiveExceptionType() {
protected BlobContainer createBlobContainer(final @Nullable Integer maxRetries,
final @Nullable TimeValue readTimeout,
final @Nullable Boolean disableChunkedEncoding,
final @Nullable ByteSizeValue bufferSize) {
final @Nullable ByteSizeValue bufferSize,
final @Nullable BlobPath path) {
final Settings.Builder clientSettings = Settings.builder();
final String client = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
clientSettings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace(client).getKey(), httpServerUrl());
Expand Down Expand Up @@ -181,7 +182,7 @@ public void testReadLargeBlobWithRetries() throws Exception {
exchange.close();
});

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
try (InputStream inputStream = blobContainer.readBlob("large_blob_retries")) {
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
}
Expand Down Expand Up @@ -218,7 +219,7 @@ public void testWriteBlobWithRetries() throws Exception {
}
}));

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
blobContainer.writeBlob("write_blob_max_retries", stream, bytes.length, false);
}
Expand All @@ -228,7 +229,7 @@ public void testWriteBlobWithRetries() throws Exception {
public void testWriteBlobWithReadTimeouts() {
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null);

// HTTP server does not send a response
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
Expand Down Expand Up @@ -356,7 +357,7 @@ public void testWriteLargeBlob() throws IOException {

final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;

final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null);
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null);
if (randomBoolean()) {
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) {
blobContainer.writeBlob("write_large_blob", stream, data.length, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void writeBlob(String blobName,
boolean failIfAlreadyExists,
boolean atomic,
CheckedConsumer<OutputStream, IOException> writer) throws IOException {
final String absoluteBlobKey = buildKey(blobName);
try (AmazonS3Reference clientReference = blobStore.clientReference();
ChunkedBlobOutputStream<PartETag> out = new ChunkedBlobOutputStream<PartETag>(
blobStore.bigArrays(), blobStore.bufferSizeInBytes()) {
Expand All @@ -155,14 +156,14 @@ private void flushBuffer(boolean lastPart) throws IOException {
if (flushedBytes == 0L) {
assert lastPart == false : "use single part upload if there's only a single part";
uploadId.set(SocketAccess.doPrivileged(() ->
clientReference.client().initiateMultipartUpload(initiateMultiPartUpload(blobName)).getUploadId()));
clientReference.client().initiateMultipartUpload(initiateMultiPartUpload(absoluteBlobKey)).getUploadId()));
if (Strings.isEmpty(uploadId.get())) {
throw new IOException("Failed to initialize multipart upload " + blobName);
throw new IOException("Failed to initialize multipart upload " + absoluteBlobKey);
}
}
assert lastPart == false || successful : "must only write last part if successful";
final UploadPartRequest uploadRequest = createPartUploadRequest(
buffer.bytes().streamInput(), uploadId.get(), parts.size() + 1, blobName, buffer.size(), lastPart);
buffer.bytes().streamInput(), uploadId.get(), parts.size() + 1, absoluteBlobKey, buffer.size(), lastPart);
final UploadPartResult uploadResponse =
SocketAccess.doPrivileged(() -> clientReference.client().uploadPart(uploadRequest));
finishPart(uploadResponse.getPartETag());
Expand All @@ -175,7 +176,7 @@ protected void onCompletion() throws IOException {
} else {
flushBuffer(true);
final CompleteMultipartUploadRequest complRequest =
new CompleteMultipartUploadRequest(blobStore.bucket(), blobName, uploadId.get(), parts);
new CompleteMultipartUploadRequest(blobStore.bucket(), absoluteBlobKey, uploadId.get(), parts);
complRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
}
Expand All @@ -184,7 +185,7 @@ protected void onCompletion() throws IOException {
@Override
protected void onFailure() {
if (Strings.hasText(uploadId.get())) {
abortMultiPartUpload(uploadId.get(), blobName);
abortMultiPartUpload(uploadId.get(), absoluteBlobKey);
}
}
}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ protected Class<? extends Exception> unresponsiveExceptionType() {

@Override
protected BlobContainer createBlobContainer(final @Nullable Integer maxRetries,
final @Nullable TimeValue readTimeout,
final @Nullable Boolean disableChunkedEncoding,
final @Nullable ByteSizeValue bufferSize) {
final @Nullable TimeValue readTimeout,
final @Nullable Boolean disableChunkedEncoding,
final @Nullable ByteSizeValue bufferSize,
final @Nullable BlobPath path) {
final Settings.Builder clientSettings = Settings.builder();
final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);

Expand Down Expand Up @@ -123,7 +124,7 @@ protected BlobContainer createBlobContainer(final @Nullable Integer maxRetries,
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata("repository", S3Repository.TYPE,
Settings.builder().put(S3Repository.CLIENT_NAME.getKey(), clientName).build());

return new S3BlobContainer(BlobPath.EMPTY, new S3BlobStore(service, "bucket",
return new S3BlobContainer(path == null ? BlobPath.EMPTY : path, new S3BlobStore(service, "bucket",
S3Repository.SERVER_SIDE_ENCRYPTION_SETTING.getDefault(Settings.EMPTY),
bufferSize == null ? S3Repository.BUFFER_SIZE_SETTING.getDefault(Settings.EMPTY) : bufferSize,
S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
Expand Down Expand Up @@ -172,7 +173,7 @@ public void testWriteBlobWithRetries() throws Exception {
}
});

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null);
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
blobContainer.writeBlob("write_blob_max_retries", stream, bytes.length, false);
}
Expand All @@ -182,7 +183,7 @@ public void testWriteBlobWithRetries() throws Exception {
public void testWriteBlobWithReadTimeouts() {
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null);

// HTTP server does not send a response
httpServer.createContext("/bucket/write_blob_timeout", exchange -> {
Expand Down Expand Up @@ -214,7 +215,7 @@ public void testWriteLargeBlob() throws Exception {
final boolean useTimeout = rarely();
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB);
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize);
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null);

final int parts = randomIntBetween(1, 5);
final long lastPartSize = randomLongBetween(10, 512);
Expand Down Expand Up @@ -302,7 +303,8 @@ public void testWriteLargeBlobStreaming() throws Exception {
final boolean useTimeout = rarely();
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB);
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize);
final BlobContainer blobContainer =
createBlobContainer(null, readTimeout, true, bufferSize, randomBoolean() ? null : BlobPath.EMPTY.add("foo"));

final int parts = randomIntBetween(1, 5);
final long lastPartSize = randomLongBetween(10, 512);
Expand All @@ -314,7 +316,7 @@ public void testWriteLargeBlobStreaming() throws Exception {
final AtomicLong bytesReceived = new AtomicLong(0L);
final CountDown countDownComplete = new CountDown(nbErrors);

httpServer.createContext("/bucket/write_large_blob_streaming", exchange -> {
httpServer.createContext("/bucket/" + blobContainer.path().buildAsString() + "write_large_blob_streaming", exchange -> {
final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length"));

if ("POST".equals(exchange.getRequestMethod())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.sun.net.httpserver.HttpServer;
import org.apache.http.ConnectionClosedException;
import org.apache.http.HttpStatus;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -77,15 +78,16 @@ public void tearDown() throws Exception {
protected abstract BlobContainer createBlobContainer(@Nullable Integer maxRetries,
@Nullable TimeValue readTimeout,
@Nullable Boolean disableChunkedEncoding,
@Nullable ByteSizeValue bufferSize);
@Nullable ByteSizeValue bufferSize,
@Nullable BlobPath path);

protected org.hamcrest.Matcher<Object> readTimeoutExceptionMatcher() {
return either(instanceOf(SocketTimeoutException.class)).or(instanceOf(ConnectionClosedException.class))
.or(instanceOf(RuntimeException.class));
}

public void testReadNonexistentBlobThrowsNoSuchFileException() {
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null);
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null, null);
final long position = randomLongBetween(0, MAX_RANGE_VAL);
final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position)));
final Exception exception = expectThrows(
Expand Down Expand Up @@ -131,7 +133,7 @@ public void testReadBlobWithRetries() throws Exception {
});

final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);
try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries")) {
final int readLimit;
final InputStream wrappedStream;
Expand Down Expand Up @@ -188,8 +190,8 @@ public void testReadRangeBlobWithRetries() throws Exception {
}
});

final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 500));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(5, 10));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);
final int position = randomIntBetween(0, bytes.length - 1);
final int length = randomIntBetween(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
try (InputStream inputStream = blobContainer.readBlob("read_range_blob_max_retries", position, length)) {
Expand Down Expand Up @@ -218,7 +220,7 @@ public void testReadRangeBlobWithRetries() throws Exception {
public void testReadBlobWithReadTimeouts() {
final int maxRetries = randomInt(5);
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);

// HTTP server does not send a response
httpServer.createContext(downloadStorageEndpoint("read_blob_unresponsive"), exchange -> {});
Expand Down Expand Up @@ -250,7 +252,7 @@ public void testReadBlobWithReadTimeouts() {

public void testReadBlobWithNoHttpResponse() {
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null);
final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null, null);

// HTTP server closes connection immediately
httpServer.createContext(downloadStorageEndpoint("read_blob_no_response"), HttpExchange::close);
Expand All @@ -269,7 +271,7 @@ public void testReadBlobWithNoHttpResponse() {

public void testReadBlobWithPrematureConnectionClose() {
final int maxRetries = randomInt(20);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);

// HTTP server sends a partial response
final byte[] bytes = randomBlobContent(1);
Expand Down

0 comments on commit 2626ec8

Please sign in to comment.