Skip to content

Commit

Permalink
JAMES-2806 ObjectStorage S3 retry save() on non existing bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
trantienduchn committed Jul 8, 2019
1 parent 542b4cb commit b5fb2f7
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 25 deletions.
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
Expand All @@ -45,21 +46,26 @@
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;

import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.retry.PredefinedRetryPolicies;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Module;

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.retry.Retry;

public class AwsS3ObjectStorage {

private static final Iterable<Module> JCLOUDS_MODULES = ImmutableSet.of(new SLF4JLoggingModule());
Expand All @@ -82,7 +88,7 @@ public class AwsS3ObjectStorage {

@Inject
@VisibleForTesting
AwsS3ObjectStorage() {
public AwsS3ObjectStorage() {
executorService = Executors.newFixedThreadPool(MAX_THREADS, NamedThreadFactory.withClassName(AwsS3ObjectStorage.class));
}

Expand Down Expand Up @@ -125,6 +131,12 @@ private ContextBuilder contextBuilder() {
}

private static class AwsS3BlobPutter implements BlobPutter {

private static final int NOT_FOUND_STATUS_CODE = 404;
private static final String BUCKET_NOT_FOUND_ERROR_CODE = "NoSuchBucket";
private static final Duration FIRST_BACK_OFF = Duration.ofMillis(100);
private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);

private final AwsS3AuthConfiguration configuration;
private final ExecutorService executorService;

Expand All @@ -135,14 +147,14 @@ private static class AwsS3BlobPutter implements BlobPutter {

@Override
public void putDirectly(BucketName bucketName, Blob blob) {
writeFileAndAct(blob, (file) -> putWithRetry(bucketName, configuration, blob, file, FIRST_TRY));
writeFileAndAct(blob, (file) -> putWithRetry(bucketName, configuration, blob, file, FIRST_TRY).block());
}

@Override
public BlobId putAndComputeId(BucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) {
Consumer<File> putChangedBlob = (file) -> {
initialBlob.getMetadata().setName(blobIdSupplier.get().asString());
putWithRetry(bucketName, configuration, initialBlob, file, FIRST_TRY);
putWithRetry(bucketName, configuration, initialBlob, file, FIRST_TRY).block();
};
writeFileAndAct(initialBlob, putChangedBlob);
return blobIdSupplier.get();
Expand All @@ -163,30 +175,40 @@ private void writeFileAndAct(Blob blob, Consumer<File> putFile) {
}
}

private void putWithRetry(BucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file, int tried) {
try {
put(bucketName, configuration, blob, file);
} catch (RuntimeException e) {
if (tried < MAX_RETRY_ON_EXCEPTION) {
putWithRetry(bucketName, configuration, blob, file, tried + 1);
} else {
throw e;
}
}
private Mono<Void> putWithRetry(BucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file, int tried) {
return Mono.<Void>fromRunnable(Throwing.runnable(() -> put(bucketName, configuration, blob, file)).sneakyThrow())
.publishOn(Schedulers.elastic())
.retryWhen(Retry
.<Void>onlyIf(retryContext -> needToCreateBucket(retryContext.exception()))
.exponentialBackoff(FIRST_BACK_OFF, FOREVER)
.withBackoffScheduler(Schedulers.elastic())
.retryMax(MAX_RETRY_ON_EXCEPTION)
.doOnRetry(retryContext -> createBucket(bucketName, configuration)));
}

private void put(BucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file) {
try {
PutObjectRequest request = new PutObjectRequest(bucketName.asString(),
blob.getMetadata().getName(),
file);

getTransferManager(configuration)
.upload(request)
.waitForUploadResult();
} catch (AmazonClientException | InterruptedException e) {
throw new RuntimeException(e);
private void put(BucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file) throws InterruptedException {
PutObjectRequest request = new PutObjectRequest(bucketName.asString(),
blob.getMetadata().getName(),
file);

getTransferManager(configuration)
.upload(request)
.waitForUploadResult();
}

private void createBucket(BucketName bucketName, AwsS3AuthConfiguration configuration) {
getS3Client(configuration, getClientConfiguration())
.createBucket(bucketName.asString());
}

private boolean needToCreateBucket(Throwable th) {
if (th instanceof AmazonS3Exception) {
AmazonS3Exception s3Exception = (AmazonS3Exception) th;
return NOT_FOUND_STATUS_CODE == s3Exception.getStatusCode()
&& BUCKET_NOT_FOUND_ERROR_CODE.equals(s3Exception.getErrorCode());
}

return false;
}

private TransferManager getTransferManager(AwsS3AuthConfiguration configuration) {
Expand Down
@@ -0,0 +1,90 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.blob.objectstorage;

import java.util.UUID;

import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketBlobStoreContract;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.api.MetricableBlobStore;
import org.apache.james.blob.api.MetricableBlobStoreContract;
import org.apache.james.blob.objectstorage.aws.AwsS3AuthConfiguration;
import org.apache.james.blob.objectstorage.aws.AwsS3ObjectStorage;
import org.apache.james.blob.objectstorage.aws.DockerAwsS3Container;
import org.apache.james.blob.objectstorage.aws.DockerAwsS3Extension;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(DockerAwsS3Extension.class)
public class ObjectStorageBlobsDAOAWSTest implements MetricableBlobStoreContract, BucketBlobStoreContract {

private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();

private BucketName defaultBucketName;
private org.jclouds.blobstore.BlobStore blobStore;
private ObjectStorageBlobsDAO objectStorageBlobsDAO;
private AwsS3ObjectStorage awsS3ObjectStorage;
private AwsS3AuthConfiguration configuration;
private BlobStore testee;

@BeforeEach
void setUp(DockerAwsS3Container dockerAwsS3) {
awsS3ObjectStorage = new AwsS3ObjectStorage();
defaultBucketName = BucketName.of(UUID.randomUUID().toString());
configuration = AwsS3AuthConfiguration.builder()
.endpoint(dockerAwsS3.getEndpoint())
.accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID)
.secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY)
.build();

ObjectStorageBlobsDAOBuilder.ReadyToBuild builder = ObjectStorageBlobsDAO
.builder(configuration)
.defaultBucketName(defaultBucketName)
.blobIdFactory(BLOB_ID_FACTORY)
.blobPutter(awsS3ObjectStorage.putBlob(configuration));

blobStore = builder.getSupplier().get();
objectStorageBlobsDAO = builder.build();
objectStorageBlobsDAO.createBucket(defaultBucketName).block();
testee = new MetricableBlobStore(metricsTestExtension.getMetricFactory(), objectStorageBlobsDAO);
}

@AfterEach
void tearDown() {
blobStore.deleteContainer(defaultBucketName.asString());
blobStore.deleteContainer(CUSTOM.asString());
blobStore.getContext().close();
awsS3ObjectStorage.tearDown();
}

@Override
public BlobStore testee() {
return testee;
}

@Override
public BlobId.Factory blobIdFactory() {
return BLOB_ID_FACTORY;
}
}

0 comments on commit b5fb2f7

Please sign in to comment.