Skip to content

Commit

Permalink
Feature / S3 storage custom endpoints (#244)
Browse files Browse the repository at this point in the history
* Allow setting endpoint for S3 storage in the data service

* Make region and endpoint optional in platform configuration for S3 endpoints

* Add "endpoint" option for S3 storage in the runtime
  • Loading branch information
martin-traverse committed Dec 16, 2022
1 parent a4010c5 commit 22b84ef
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Properties;
Expand All @@ -53,9 +54,10 @@

public class S3ObjectStorage implements IFileStorage {

public static final String REGION_PROPERTY = "region";
public static final String BUCKET_PROPERTY = "bucket";
public static final String PREFIX_PROPERTY = "prefix";
public static final String REGION_PROPERTY = "region";
public static final String ENDPOINT_PROPERTY = "endpoint";

public static final String CREDENTIALS_PROPERTY = "credentials";
public static final String CREDENTIALS_DEFAULT = "default";
Expand All @@ -66,10 +68,11 @@ public class S3ObjectStorage implements IFileStorage {
private final Logger log = LoggerFactory.getLogger(getClass());

private final String storageKey;
private final Region region;
private final AwsCredentialsProvider credentials;
private final String bucket;
private final StoragePath prefix;
private final AwsCredentialsProvider credentials;
private final Region region;
private final URI endpoint;

private final StorageErrors errors;

Expand All @@ -80,13 +83,15 @@ public S3ObjectStorage(Properties properties) {

this.storageKey = properties.getProperty(IStorageManager.PROP_STORAGE_KEY);

var region = properties.getProperty(REGION_PROPERTY);
var bucket = properties.getProperty(BUCKET_PROPERTY);
var prefix = properties.getProperty(PREFIX_PROPERTY);
var region = properties.getProperty(REGION_PROPERTY);
var endpoint = properties.getProperty(ENDPOINT_PROPERTY);

this.region = Region.of(region);
this.bucket = bucket;
this.prefix = prefix != null ? StoragePath.forPath(prefix) : StoragePath.root();
this.prefix = prefix != null && !prefix.isBlank() ? StoragePath.forPath(prefix) : StoragePath.root();
this.region = region != null && !region.isBlank() ? Region.of(region) : null;
this.endpoint = endpoint != null && !endpoint.isBlank() ? URI.create(endpoint) : null;

this.credentials = setupCredentials(properties);

Expand Down Expand Up @@ -151,14 +156,20 @@ public void start(EventLoopGroup eventLoopGroup) {
.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, Runnable::run)
.build();

this.client = S3AsyncClient.builder()
.region(region)
.credentialsProvider(credentials)
var clientBuilder = S3AsyncClient.builder()
.httpClientBuilder(httpClient)
.asyncConfiguration(async)
.build();
.credentialsProvider(credentials);

if (region != null)
clientBuilder.region(region);

if (endpoint != null)
clientBuilder.endpointOverride(endpoint);

this.client = clientBuilder.build();

log.info("Created S3 storage, region = [{}], bucket = [{}], prefix = [{}]", region, bucket, prefix);
log.info("Created S3 storage, bucket = [{}], prefix = [{}]", bucket, prefix);
}

@Override
Expand Down
21 changes: 15 additions & 6 deletions tracdap-runtime/python/src/tracdap/rt/_plugins/aws_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ class S3ObjectStorage(IFileStorage):

# https://arrow.apache.org/docs/python/filesystems.html

REGION_PROPERTY = "region"
BUCKET_PROPERTY = "bucket"
PREFIX_PROPERTY = "prefix"
REGION_PROPERTY = "region"
ENDPOINT_PROPERTY = "endpoint"

CREDENTIALS_PROPERTY = "credentials"
CREDENTIALS_DEFAULT = "default"
Expand All @@ -59,16 +60,24 @@ def __init__(self, config: _cfg.PluginConfig, options: dict = None):
self._config = config
self._options = options

self._region = config.properties[self.REGION_PROPERTY]
self._bucket = config.properties[self.BUCKET_PROPERTY]
self._prefix = config.properties[self.PREFIX_PROPERTY] if self.PREFIX_PROPERTY in config.properties else ""
self._region = config.properties[self.REGION_PROPERTY] if self.REGION_PROPERTY in config.properties else None
self._endpoint = config.properties[self.ENDPOINT_PROPERTY] if self.ENDPOINT_PROPERTY in config.properties else None

credentials_params = self.setup_credentials(config.properties)

self.__client = boto3.client(
service_name="s3",
region_name=self._region,
**credentials_params)
client_args = {
"service_name": "s3",
**credentials_params}

if self._region is not None:
client_args["region_name"] = self._region

if self._endpoint is not None:
client_args["endpoint_url"] = self._endpoint

self.__client = boto3.client(**client_args)

def setup_credentials(self, properties: dict):

Expand Down

0 comments on commit 22b84ef

Please sign in to comment.