Skip to content

Commit

Permalink
Set S3 endpoint region and support OCI object storage
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Fixes #14133

### Why are the changes needed?

Solves the issue that S3 cannot recognize endpoint region and supports
OCI object storage

### Does this PR introduce any user facing changes?
add an s3 region property key

pr-link: #14134
change-id: cid-1dd77cd700ffbecc6055ef4eeca43a7488921598
  • Loading branch information
LuQQiu committed Oct 6, 2021
1 parent 283cfd1 commit 36a4d5c
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 14 deletions.
8 changes: 8 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Expand Up @@ -949,6 +949,13 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_S3_ENDPOINT_REGION =
new Builder(Name.UNDERFS_S3_ENDPOINT_REGION)
.setDescription("Optionally, set the S3 endpoint region. If not provided, "
+ "inducted from the endpoint uri or set to null")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey UNDERFS_S3_OWNER_ID_TO_USERNAME_MAPPING =
new Builder(Name.UNDERFS_S3_OWNER_ID_TO_USERNAME_MAPPING)
.setDescription("Optionally, specify a preset s3 canonical id to Alluxio username "
Expand Down Expand Up @@ -5613,6 +5620,7 @@ public static final class Name {
public static final String UNDERFS_S3_DISABLE_DNS_BUCKETS =
"alluxio.underfs.s3.disable.dns.buckets";
public static final String UNDERFS_S3_ENDPOINT = "alluxio.underfs.s3.endpoint";
public static final String UNDERFS_S3_ENDPOINT_REGION = "alluxio.underfs.s3.endpoint.region";
public static final String UNDERFS_S3_OWNER_ID_TO_USERNAME_MAPPING =
"alluxio.underfs.s3.owner.id.to.username.mapping";
public static final String UNDERFS_S3_PROXY_HOST = "alluxio.underfs.s3.proxy.host";
Expand Down
17 changes: 16 additions & 1 deletion docs/en/ufs/S3.md
Expand Up @@ -174,11 +174,26 @@ to include:

```
alluxio.underfs.s3.endpoint=<S3_ENDPOINT>
alluxio.underfs.s3.endpoint.region=<S3_ENDPOINT_REGION>
```

For these parameters, replace `<S3_ENDPOINT>` with the hostname and port of your S3 service, e.g.,
Replace `<S3_ENDPOINT>` with the hostname and port of your S3 service, e.g.,
`http://localhost:9000`. Only use this parameter if you are using a provider other than `s3.amazonaws.com`.

### Connecting to Oracle Cloud Infrastructure (OCI) object storage

Both the endpoint and region value need to be updated to use non-home region.
```
alluxio.underfs.s3.endpoint=<S3_ENDPOINT>
alluxio.underfs.s3.endpoint.region=<S3_ENDPOINT_REGION>
```

All OCI object storage regions need to use `PathStyleAccess`
```
alluxio.underfs.s3.disable.dns.buckets=true
alluxio.underfs.s3.inherit.acl=false
```

### Using v2 S3 Signatures

Some S3 service providers only support v2 signatures. For these S3 providers, you can enforce using
Expand Down
Expand Up @@ -34,9 +34,11 @@
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.internal.Mimetypes;
import com.amazonaws.services.s3.internal.ServiceUtils;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
Expand All @@ -51,7 +53,9 @@
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.util.AwsHostNameUtils;
import com.amazonaws.util.Base64;
import com.amazonaws.util.RuntimeHttpUtils;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
Expand All @@ -63,6 +67,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
Expand All @@ -88,8 +93,10 @@ public class S3AUnderFileSystem extends ObjectUnderFileSystem {
/** Default owner of objects if owner cannot be determined. */
private static final String DEFAULT_OWNER = "";

private static final String S3_SERVICE_NAME = "s3";

/** AWS-SDK S3 client. */
private final AmazonS3Client mClient;
private final AmazonS3 mClient;

/** Bucket name of user's configured Alluxio bucket. */
private final String mBucketName;
Expand Down Expand Up @@ -199,32 +206,72 @@ public static S3AUnderFileSystem createInstance(AlluxioURI uri,
clientConf.setSignerOverride(conf.get(PropertyKey.UNDERFS_S3_SIGNER_ALGORITHM));
}

AmazonS3Client amazonS3Client = new AmazonS3Client(credentials, clientConf);
AmazonS3ClientBuilder clientBuilder = AmazonS3ClientBuilder
.standard()
.withCredentials(credentials)
.withClientConfiguration(clientConf);

// Set a custom endpoint.
if (conf.isSet(PropertyKey.UNDERFS_S3_ENDPOINT)) {
amazonS3Client.setEndpoint(conf.get(PropertyKey.UNDERFS_S3_ENDPOINT));
if (conf.getBoolean(PropertyKey.UNDERFS_S3_DISABLE_DNS_BUCKETS)) {
clientBuilder.withPathStyleAccessEnabled(true);
}

// Disable DNS style buckets, this enables path style requests.
if (Boolean.parseBoolean(conf.get(PropertyKey.UNDERFS_S3_DISABLE_DNS_BUCKETS))) {
S3ClientOptions clientOptions = S3ClientOptions.builder().setPathStyleAccess(true).build();
amazonS3Client.setS3ClientOptions(clientOptions);
AwsClientBuilder.EndpointConfiguration endpointConfiguration
= createEndpointConfiguration(conf, clientConf);
if (endpointConfiguration != null) {
clientBuilder.withEndpointConfiguration(endpointConfiguration);
}

AmazonS3 amazonS3Client = clientBuilder.build();

ExecutorService service = ExecutorServiceFactories
.fixedThreadPool("alluxio-s3-transfer-manager-worker",
numTransferThreads).create();

TransferManager transferManager = TransferManagerBuilder.standard()
.withS3Client(amazonS3Client).withExecutorFactory(() -> service)
.withS3Client(clientBuilder.build()).withExecutorFactory(() -> service)
.withMultipartCopyThreshold(MULTIPART_COPY_THRESHOLD)
.build();

return new S3AUnderFileSystem(uri, amazonS3Client, bucketName,
service, transferManager, conf, streamingUploadEnabled);
}

/**
* Creates an endpoint configuration.
*
* @param conf the aluxio conf
* @param clientConf the aws conf
* @return the endpoint configuration
*/
@Nullable
private static AwsClientBuilder.EndpointConfiguration createEndpointConfiguration(
UnderFileSystemConfiguration conf, ClientConfiguration clientConf) {
if (!conf.isSet(PropertyKey.UNDERFS_S3_ENDPOINT)) {
LOG.debug("No endpoint configuration generated, using default s3 endpoint");
return null;
}
String endpoint = conf.get(PropertyKey.UNDERFS_S3_ENDPOINT);
final URI epr = RuntimeHttpUtils.toUri(endpoint, clientConf);
LOG.debug("Creating endpoint configuration for {}", epr);

String region;
if (conf.isSet(PropertyKey.UNDERFS_S3_ENDPOINT_REGION)) {
region = conf.get(PropertyKey.UNDERFS_S3_ENDPOINT_REGION);
} else if (ServiceUtils.isS3USStandardEndpoint(endpoint)) {
// endpoint is standard s3 endpoint with default region, no need to set region
LOG.debug("Standard s3 endpoint, declare region as null");
region = null;
} else {
LOG.debug("Parsing region fom non-standard s3 endpoint");
region = AwsHostNameUtils.parseRegion(
epr.getHost(),
S3_SERVICE_NAME);
}
LOG.debug("Region for endpoint {}, URI {} is determined as {}",
endpoint, epr, region);
return new AwsClientBuilder.EndpointConfiguration(endpoint, region);
}

/**
* Constructor for {@link S3AUnderFileSystem}.
*
Expand All @@ -236,7 +283,7 @@ public static S3AUnderFileSystem createInstance(AlluxioURI uri,
* @param conf configuration for this S3A ufs
* @param streamingUploadEnabled whether streaming upload is enabled
*/
protected S3AUnderFileSystem(AlluxioURI uri, AmazonS3Client amazonS3Client, String bucketName,
protected S3AUnderFileSystem(AlluxioURI uri, AmazonS3 amazonS3Client, String bucketName,
ExecutorService executor, TransferManager transferManager, UnderFileSystemConfiguration conf,
boolean streamingUploadEnabled) {
super(uri, conf);
Expand Down

0 comments on commit 36a4d5c

Please sign in to comment.