Skip to content
This repository has been archived by the owner on May 16, 2023. It is now read-only.

Commit

Permalink
switch from s3 ListObjectsV2 to ListObjects (#1821)
Browse files Browse the repository at this point in the history
* switch from s3 ListObjectsV2 to ListObjects

* Update tests accordingly with the approach used

* introduce 'delimiter' on s3 ListObjectsRequest

* fix javadoc

Co-authored-by: Gut Ioan <ioan.gut@sap.com>
  • Loading branch information
hilmarf and ioangut authored May 4, 2022
1 parent 188aa4f commit b03357a
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 55 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
<feign.version>11.8</feign.version>
<commons-math3.version>3.6.1</commons-math3.version>
<bouncycastle.version>1.70</bouncycastle.version>
<awssdk-s3.version>2.17.103</awssdk-s3.version>
<awssdk-s3.version>2.17.182</awssdk-s3.version>
<io-netty.version>4.1.76.Final</io-netty.version>
<guava.version>31.1-jre</guava.version>
<jackson.version>2.13.2</jackson.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@


package app.coronawarn.server.services.distribution.objectstore;

import app.coronawarn.server.services.distribution.config.DistributionServiceConfig;
import app.coronawarn.server.services.distribution.objectstore.client.ObjectStoreClient;
import app.coronawarn.server.services.distribution.objectstore.client.ObjectStoreClient.HeaderKey;
import app.coronawarn.server.services.distribution.objectstore.client.S3Object;
import app.coronawarn.server.services.distribution.objectstore.publish.LocalFile;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -93,6 +92,8 @@ public void putObject(LocalFile localFile, int maxAge) {
* Deletes objects in the object store, based on the given prefix (folder structure).
*
* @param prefix the prefix, e.g. my/folder/
*
* @see #getAllObjectsWithPrefix(String)
*/
public void deleteObjectsWithPrefix(String prefix) {
List<String> toDelete = getObjectsWithPrefix(prefix)
Expand All @@ -104,6 +105,11 @@ public void deleteObjectsWithPrefix(String prefix) {
this.client.removeObjects(bucket, toDelete);
}

public void deleteObject(S3Object toDelete) {
logger.info("Deleting {}", toDelete);
this.client.removeObjects(bucket, Collections.singletonList(toDelete.getObjectName()));
}

/**
* Fetches the list of objects in the store with the given prefix.
*
Expand All @@ -114,6 +120,10 @@ public List<S3Object> getObjectsWithPrefix(String prefix) {
return client.getObjects(bucket, prefix);
}

public List<S3Object> getAllObjectsWithPrefix(String prefix) {
return client.getObjects(bucket, prefix, null);
}

private Map<HeaderKey, String> createHeaders(int maxAge, LocalFile file) {
EnumMap<HeaderKey, String> headers = new EnumMap<>(Map.of(HeaderKey.CACHE_CONTROL, "public,max-age=" + maxAge));
if (this.isSetPublicReadAclOnPutObject) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,12 @@ public void applyTraceTimeWarningHourRetentionPolicy(long retentionDays) {

/**
* Java stream do not support checked exceptions within streams. This helper method rethrows them as unchecked
* expressions, so they can be passed up to the Retention Policy.
* expressions, so they can be passed up to the Retention Policy.<br />
* <strong>Attention:</strong> this first queries all the objects from S3 with the same prefix!
*
* @param s3Object the S3 object, that should be deleted.
*
* @see ObjectStoreAccess#deleteObjectsWithPrefix(String)
*/
public void deleteS3Object(S3Object s3Object) {
try {
Expand All @@ -139,6 +142,20 @@ public void deleteS3Object(S3Object s3Object) {
}
}

/**
* Java stream do not support checked exceptions within streams. This helper method rethrows them as unchecked
* expressions, so they can be passed up to the Retention Policy.
*
* @param s3Object the S3 object, that should be deleted.
*/
public void deleteSingleS3Object(S3Object s3Object) {
try {
objectStoreAccess.deleteObject(s3Object);
} catch (ObjectStoreOperationFailedException e) {
failedObjectStoreOperationsCounter.incrementAndCheckThreshold(e);
}
}

private boolean isDiagnosisKeyFilePathOnHourFolder(S3Object s3Object) {
Matcher matcher = hourPathPattern.matcher(s3Object.getObjectName());
return matcher.matches();
Expand Down Expand Up @@ -178,8 +195,8 @@ private String getTraceTimeWarningPrefix(String country) {
* Delete the whole folder {@link #dccRevocationDirectory}.
*/
public void deleteDccRevocationDir() {
final Collection<S3Object> s3Objects = objectStoreAccess.getObjectsWithPrefix(dccRevocationDirectory);
final Collection<S3Object> s3Objects = objectStoreAccess.getAllObjectsWithPrefix(dccRevocationDirectory);
logger.info("Deleting {} dccRevocationDirectory files", s3Objects.size());
s3Objects.forEach(this::deleteS3Object);
s3Objects.forEach(this::deleteSingleS3Object);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


package app.coronawarn.server.services.distribution.objectstore.client;

import app.coronawarn.server.services.distribution.statistics.exceptions.NotModifiedException;
Expand All @@ -23,6 +21,8 @@ public interface ObjectStoreClient {
*/
List<S3Object> getObjects(String bucket, String prefix);

List<S3Object> getObjects(final String bucket, final String prefix, final String delimiter);

JsonFile getSingleObjectContent(String bucket, String key);

JsonFile getSingleObjectContent(String bucket, String key, String ifNotETag) throws NotModifiedException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


package app.coronawarn.server.services.distribution.objectstore.client;

import app.coronawarn.server.services.distribution.config.DistributionServiceConfig;
Expand All @@ -26,10 +24,11 @@ public class ObjectStorePublishingConfig {

@Bean(name = "publish-s3")
public ObjectStoreClient createObjectStoreClient(DistributionServiceConfig distributionServiceConfig) {
return createClient(distributionServiceConfig.getObjectStore());
return createClient(distributionServiceConfig.getObjectStore(),
distributionServiceConfig.getDccRevocation().getDccListPath());
}

private ObjectStoreClient createClient(ObjectStore objectStore) {
private ObjectStoreClient createClient(final ObjectStore objectStore, final String dccListPath) {
AwsCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(
AwsBasicCredentials.create(objectStore.getAccessKey(), objectStore.getSecretKey()));
String endpoint = removeTrailingSlash(objectStore.getEndpoint()) + ":" + objectStore.getPort();
Expand All @@ -38,7 +37,7 @@ private ObjectStoreClient createClient(ObjectStore objectStore) {
.region(DEFAULT_REGION)
.endpointOverride(URI.create(endpoint))
.credentialsProvider(credentialsProvider)
.build());
.build(), dccListPath);
}

private String removeTrailingSlash(String string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.lang.Boolean.TRUE;
import static java.util.stream.Collectors.toList;
import static software.amazon.awssdk.services.s3.model.ListObjectsRequest.builder;

import app.coronawarn.server.services.distribution.statistics.exceptions.NotModifiedException;
import app.coronawarn.server.services.distribution.statistics.file.JsonFile;
Expand Down Expand Up @@ -29,6 +30,9 @@
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest.Builder;
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
Expand All @@ -47,15 +51,38 @@ public class S3ClientWrapper implements ObjectStoreClient {

private final S3Client s3Client;

/**
* Default value is coming from: 'services.distribution.dcc-revocation.dcc-revocation-directory' which is currently:
* 'dcc-rl'.
*/
private final String dccListPath;

/**
* Standard constructor for reading analytics/statistics data from OBS.
*
* @param s3Client delegator
*/
public S3ClientWrapper(S3Client s3Client) {
this.s3Client = s3Client;
this.dccListPath = null;
}

/**
* Constructor which sets a default {@link Builder#delimiter(String)}.
*
* @param s3Client delegator
* @param dccListPath - {@link #dccListPath}.
*/
public S3ClientWrapper(final S3Client s3Client, final String dccListPath) {
this.s3Client = s3Client;
this.dccListPath = dccListPath;
}

@Override
public boolean bucketExists(String bucketName) {
try {
// using S3Client.listObjectsV2 instead of S3Client.listBuckets/headBucket in order to limit required permissions
s3Client.listObjectsV2(ListObjectsV2Request.builder().bucket(bucketName).maxKeys(1).build());
// using S3Client.listObjects instead of S3Client.listBuckets/headBucket in order to limit required permissions
s3Client.listObjects(ListObjectsRequest.builder().bucket(bucketName).maxKeys(1).build());
return true;
} catch (NoSuchBucketException e) {
return false;
Expand All @@ -72,7 +99,8 @@ private JsonFile fromResponse(ResponseInputStream<GetObjectResponse> response) {
@Retryable(
value = SdkException.class,
maxAttemptsExpression = "${services.distribution.objectstore.retry-attempts}",
backoff = @Backoff(delayExpression = "${services.distribution.objectstore.retry-backoff}"))
backoff = @Backoff(
delayExpression = "${services.distribution.objectstore.retry-backoff}"))
public JsonFile getSingleObjectContent(String bucket, String key) {
GetObjectRequest request = GetObjectRequest.builder()
.bucket(bucket)
Expand All @@ -87,7 +115,8 @@ public JsonFile getSingleObjectContent(String bucket, String key) {
value = SdkException.class,
exclude = NotModifiedException.class,
maxAttemptsExpression = "${services.distribution.objectstore.retry-attempts}",
backoff = @Backoff(delayExpression = "${services.distribution.objectstore.retry-backoff}"))
backoff = @Backoff(
delayExpression = "${services.distribution.objectstore.retry-backoff}"))
public JsonFile getSingleObjectContent(String bucket, String key, String ifNotETag) throws NotModifiedException {
try {
GetObjectRequest request = GetObjectRequest.builder()
Expand All @@ -106,24 +135,61 @@ public JsonFile getSingleObjectContent(String bucket, String key, String ifNotET
}
}

/**
* Calls {@link #getSingleObjectContent(String, String, String)} but with {@link #dccListPath} as last parameter -
* aka. delimiter.
*/
@Override
@Retryable(
value = SdkException.class,
maxAttemptsExpression = "${services.distribution.objectstore.retry-attempts}",
backoff = @Backoff(delayExpression = "${services.distribution.objectstore.retry-backoff}"))
backoff = @Backoff(
delayExpression = "${services.distribution.objectstore.retry-backoff}"))
public List<S3Object> getObjects(String bucket, String prefix) {
return getObjects(bucket, prefix, dccListPath);
}

/**
* Pass in <code>null</code> as delimiter, when you really want to receive ALL existing {@link S3Objetcs}.
*/
@Override
@Retryable(
value = SdkException.class,
maxAttemptsExpression = "${services.distribution.objectstore.retry-attempts}",
backoff = @Backoff(
delayExpression = "${services.distribution.objectstore.retry-backoff}"))
public List<S3Object> getObjects(String bucket, String prefix, String delimiter) {
logRetryStatus("object download");
List<S3Object> allS3Objects = new ArrayList<>();
String continuationToken = null;

String marker = null;
do {
ListObjectsV2Request request =
ListObjectsV2Request.builder().prefix(prefix).bucket(bucket).continuationToken(continuationToken).build();
ListObjectsV2Response response = s3Client.listObjectsV2(request);
final ListObjectsRequest request = builder().prefix(prefix).bucket(bucket).marker(marker).delimiter(delimiter)
.build();
final ListObjectsResponse response = s3Client.listObjects(request);
marker = TRUE.equals(response.isTruncated()) ? response.nextMarker() : null;
if (TRUE.equals(response.isTruncated()) && marker == null) {
// the zenko/cloudserver during the tests doesn't support the old API as it's the case for OBS at TSI
return tryWithV2(bucket, prefix, delimiter);
}
response.contents().stream()
.map(s3Object -> buildS3Object(s3Object, bucket))
.forEach(allS3Objects::add);
} while (marker != null);

return allS3Objects;
}

private List<S3Object> tryWithV2(final String bucket, final String prefix, final String delimiter) {
logger.warn("using GET Bucket (List Objects) Version 2!?!");
final List<S3Object> allS3Objects = new ArrayList<>();
String continuationToken = null;
do {
final ListObjectsV2Request request = ListObjectsV2Request.builder().prefix(prefix).bucket(bucket)
.continuationToken(continuationToken).delimiter(delimiter)
.build();
final ListObjectsV2Response response = s3Client.listObjectsV2(request);
continuationToken = TRUE.equals(response.isTruncated()) ? response.nextContinuationToken() : null;
response.contents().stream().map(s3Object -> buildS3Object(s3Object, bucket)).forEach(allS3Objects::add);
} while (continuationToken != null);

return allS3Objects;
Expand All @@ -138,7 +204,8 @@ public List<S3Object> skipReadOperation(Throwable cause) {
@Retryable(
value = SdkException.class,
maxAttemptsExpression = "${services.distribution.objectstore.retry-attempts}",
backoff = @Backoff(delayExpression = "${services.distribution.objectstore.retry-backoff}"))
backoff = @Backoff(
delayExpression = "${services.distribution.objectstore.retry-backoff}"))
public void putObject(String bucket, String objectName, Path filePath, Map<HeaderKey, String> headers) {
logRetryStatus("object upload");
var requestBuilder = PutObjectRequest.builder().bucket(bucket).key(objectName);
Expand All @@ -160,9 +227,12 @@ public void putObject(String bucket, String objectName, Path filePath, Map<Heade
}

@Override
@Retryable(value = {SdkException.class, ObjectStoreOperationFailedException.class},
@Retryable(
value = { SdkException.class,
ObjectStoreOperationFailedException.class },
maxAttemptsExpression = "${services.distribution.objectstore.retry-attempts}",
backoff = @Backoff(delayExpression = "${services.distribution.objectstore.retry-backoff}"))
backoff = @Backoff(
delayExpression = "${services.distribution.objectstore.retry-backoff}"))
public void removeObjects(String bucket, List<String> objectNames) {
if (objectNames.isEmpty()) {
return;
Expand Down Expand Up @@ -202,8 +272,7 @@ private void skipModifyingOperation(Throwable cause) {

/**
* Fetches the CWA Hash for the given S3Object. Unfortunately, this is necessary for the AWS SDK, as it does not
* support fetching metadata within the {@link ListObjectsV2Request}.<br> MinIO actually does support this, so when
* they release 7.0.3, we can remove this code here.
* support fetching metadata within the {@link ListObjectsRequest}.
*
* @param s3Object the S3Object to fetch the CWA hash for
* @param bucket the target bucket
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


package app.coronawarn.server.services.distribution.objectstore.client;

import java.util.Objects;
Expand Down Expand Up @@ -30,7 +28,7 @@ public S3Object(String objectName) {
* Constructs a new S3Object for the given object name.
*
* @param objectName the target object name
* @param cwaHash the checksum for that file
* @param cwaHash the checksum for that file
*/
public S3Object(String objectName, String cwaHash) {
this(objectName);
Expand Down Expand Up @@ -72,4 +70,9 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(objectName, cwaHash);
}

@Override
public String toString() {
return getObjectName();
}
}
Loading

0 comments on commit b03357a

Please sign in to comment.