Skip to content

Commit

Permalink
Retry GCS Resumable Upload on Error 410 (#45963) (#51419)
Browse files Browse the repository at this point in the history
A resumable upload session can fail on with a 410 error and should
be retried in that case. I added retrying twice using resetting of
the given `InputStream` as the retry mechanism since the same
approach is used by the AWS S3 SDK already as well and relied upon
by the S3 repository implementation.

Related GCS documentation:
https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
  • Loading branch information
original-brownbear committed Jan 24, 2020
1 parent a388cf7 commit 853e33a
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.StorageException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
Expand All @@ -53,16 +57,19 @@
import java.util.Map;
import java.util.stream.Collectors;

import static java.net.HttpURLConnection.HTTP_GONE;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;

class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore {

private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class);

// The recommended maximum size of a blob that should be uploaded in a single
// request. Larger files should be uploaded over multiple requests (this is
// called "resumable upload")
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
public static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;

private final String bucketName;
private final String clientName;
Expand Down Expand Up @@ -213,35 +220,53 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException {
try {
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
new Storage.BlobWriteOption[] { Storage.BlobWriteOption.doesNotExist() } :
new Storage.BlobWriteOption[0];
final WriteChannel writeChannel = SocketAccess
// We retry 410 GONE errors to cover the unlikely but possible scenario where a resumable upload session becomes broken and
// needs to be restarted from scratch. Given how unlikely a 410 error should be according to SLAs we retry only twice.
assert inputStream.markSupported();
inputStream.mark(Integer.MAX_VALUE);
StorageException storageException = null;
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0];
for (int retry = 0; retry < 3; ++retry) {
try {
final WriteChannel writeChannel = SocketAccess
.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
return writeChannel.isOpen();
}
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
return writeChannel.isOpen();
}

@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}
@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}

@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
}
}));
return;
} catch (final StorageException se) {
final int errorCode = se.getCode();
if (errorCode == HTTP_GONE) {
logger.warn(() -> new ParameterizedMessage("Retrying broken resumable upload session for blob {}", blobInfo), se);
storageException = ExceptionsHelper.useOrSuppress(storageException, se);
inputStream.reset();
continue;
} else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
}));
} catch (final StorageException se) {
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
if (storageException != null) {
se.addSuppressed(storageException);
}
throw se;
}
throw se;
}
assert storageException != null;
throw storageException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,21 @@

package org.elasticsearch.repositories.gcs;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;

import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -37,10 +46,35 @@ protected BlobStore newBlobStore() {
final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
try {
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>()));
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>(), random()));
} catch (final Exception e) {
throw new RuntimeException(e);
}
return new GoogleCloudStorageBlobStore(bucketName, clientName, storageService);
}

public void testWriteReadLarge() throws IOException {
try(BlobStore store = newBlobStore()) {
final BlobContainer container = store.blobContainer(new BlobPath());
byte[] data = randomBytes(GoogleCloudStorageBlobStore.LARGE_BLOB_THRESHOLD_BYTE_SIZE + 1);
writeBlob(container, "foobar", new BytesArray(data), randomBoolean());
if (randomBoolean()) {
// override file, to check if we get latest contents
random().nextBytes(data);
writeBlob(container, "foobar", new BytesArray(data), false);
}
try (InputStream stream = container.readBlob("foobar")) {
BytesRefBuilder target = new BytesRefBuilder();
while (target.length() < data.length) {
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
int offset = scaledRandomIntBetween(0, buffer.length - 1);
int read = stream.read(buffer, offset, buffer.length - offset);
target.append(new BytesRef(buffer, offset, read));
}
assertEquals(data.length, target.length());
assertArrayEquals(data, Arrays.copyOfRange(target.bytes(), 0, target.length()));
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected GoogleCloudStorageService createStorageService() {
public static class MockGoogleCloudStorageService extends GoogleCloudStorageService {
@Override
public Storage client(String clientName) {
return new MockStorage(BUCKET, blobs);
return new MockStorage(BUCKET, blobs, random());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected BlobStore newBlobStore() {
final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
try {
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>()));
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>(), random()));
} catch (final Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testDeprecatedSettings() throws Exception {
new GoogleCloudStorageService() {
@Override
public Storage client(String clientName) throws IOException {
return new MockStorage("test", new ConcurrentHashMap<>());
return new MockStorage("test", new ConcurrentHashMap<>(), random());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.StorageRpcOptionUtils;
import com.google.cloud.storage.StorageTestUtils;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
Expand All @@ -52,6 +54,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -63,10 +67,12 @@
*/
class MockStorage implements Storage {

private final Random random;
private final String bucketName;
private final ConcurrentMap<String, byte[]> blobs;

MockStorage(final String bucket, final ConcurrentMap<String, byte[]> blobs) {
MockStorage(final String bucket, final ConcurrentMap<String, byte[]> blobs, final Random random) {
this.random = random;
this.bucketName = Objects.requireNonNull(bucket);
this.blobs = Objects.requireNonNull(blobs);
}
Expand Down Expand Up @@ -228,12 +234,16 @@ public boolean isOpen() {
return null;
}

private final Set<BlobInfo> simulated410s = ConcurrentCollections.newConcurrentSet();

@Override
public WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
if (bucketName.equals(blobInfo.getBucket())) {
final ByteArrayOutputStream output = new ByteArrayOutputStream();
return new WriteChannel() {

private volatile boolean failed;

final WritableByteChannel writableByteChannel = Channels.newChannel(output);

@Override
Expand All @@ -248,6 +258,11 @@ public RestorableState<WriteChannel> capture() {

@Override
public int write(ByteBuffer src) throws IOException {
// Only fail a blob once on a 410 error since the error is so unlikely in practice
if (simulated410s.add(blobInfo) && random.nextBoolean()) {
failed = true;
throw new StorageException(HttpURLConnection.HTTP_GONE, "Simulated lost resumeable upload session");
}
return writableByteChannel.write(src);
}

Expand All @@ -259,13 +274,15 @@ public boolean isOpen() {
@Override
public void close() {
IOUtils.closeWhileHandlingException(writableByteChannel);
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
if (existingBytes != null) {
throw new StorageException(412, "Blob already exists");
if (failed == false) {
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
if (existingBytes != null) {
throw new StorageException(412, "Blob already exists");
}
} else {
blobs.put(blobInfo.getName(), output.toByteArray());
}
} else {
blobs.put(blobInfo.getName(), output.toByteArray());
}
}
};
Expand Down

0 comments on commit 853e33a

Please sign in to comment.