Skip to content

Commit

Permalink
Feature / AWS alternate credentials mechanism (#215)
Browse files Browse the repository at this point in the history
* Support multiple credentials mechanisms for S3 storage in the platform

* Support multiple credentials mechanisms for S3 storage in the runtime

* Update documentation for S3 storage
  • Loading branch information
martin-traverse committed Dec 6, 2022
1 parent 05bd5d3 commit 2d68a49
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 48 deletions.
35 changes: 20 additions & 15 deletions doc/deployment/storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,24 @@ For instructions on setting up local storage, see the
AWS S3 Storage
--------------

You will need to set up an S3 bucket, and an IAM user with permissions to access that bucket.
These are the permissions that need to be assigned to the bucket.
You will need to set up an S3 bucket and suitable role permissions. The role can be managed
by AWS, or explicitly by creating an IAM user or group assigned to the role.

Permissions can be managed at the bucket level or via role policies, either way you will need
statements to grant at a minimum these permissions:

.. code-block:: json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ListObjectsInBucket",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::<aws_account_id>:user/<iam_user>"
},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::<bucket_name>"
},
{
"Sid": "AllObjectActions",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::<aws_account_id>:user/<iam_user>"
},
"Action": [
"s3:*Object",
"s3:*ObjectAttributes"
Expand All @@ -50,11 +45,15 @@ To install the AWS storage plugin, download the plugins package from the latest
will find a folder for the AWS storage plugin, copy the contents of this folder into the *plugins*
folder of your TRAC D.A.P. installation.

You will then be able to configure an S3 storage instance in your platform configuration. The region,
bucket name and access key properties are required.
You will then be able to configure an S3 storage instance in your platform configuration. Region and bucket name
are required. The *prefix* property is optional, if specified it will be used as a prefix for all objects stored
in the bucket. TRAC follows the convention of using path-like object keys, so backslashes can be used in the
prefix if desired.

The *path* property is optional, if specified it will be used as a prefix for all objects stored in the bucket.
TRAC follows the convention of using path-like object keys, so backslashes can be used in the path prefix if desired.
Credentials can be supplied in various different ways. For deployments where all the TRAC services are running
in AWS, the •default• mechanism allows permissions to be managed using AWS roles without need for further
configuration in TRAC. This is the preferred mechanism for deployments where it is available. If the TRAC
services are not running in AWS the *static* method can be used to authenticate with an access key.

.. code-block:: yaml
Expand All @@ -67,6 +66,12 @@ TRAC follows the convention of using path-like object keys, so backslashes can b
properties:
region: <aws_region>
bucket: <aws_bucket_name>
path: <storage_prefix>
prefix: <storage_prefix>
# For credentials supplied automatically by AWS assigned roles
credentials: default
# Or for credentials assigned to an explicit IAM user
credentials: static
accessKeyId: <aws_access_key_id>
secretAccessKey: <aws_secret_access_key>
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
import io.netty.channel.EventLoopGroup;
import org.finos.tracdap.common.concurrent.IExecutionContext;
import org.finos.tracdap.common.data.IDataContext;
import org.finos.tracdap.common.exception.EStartup;
import org.finos.tracdap.common.exception.ETracInternal;
import org.finos.tracdap.common.storage.*;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.credentials.*;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
Expand Down Expand Up @@ -56,8 +55,11 @@ public class S3ObjectStorage implements IFileStorage {

public static final String REGION_PROPERTY = "region";
public static final String BUCKET_PROPERTY = "bucket";
public static final String PATH_PROPERTY = "path";
public static final String PREFIX_PROPERTY = "prefix";

public static final String CREDENTIALS_PROPERTY = "credentials";
public static final String CREDENTIALS_DEFAULT = "default";
public static final String CREDENTIALS_STATIC = "static";
public static final String ACCESS_KEY_ID_PROPERTY = "accessKeyId";
public static final String SECRET_ACCESS_KEY_PROPERTY = "secretAccessKey";

Expand All @@ -66,8 +68,8 @@ public class S3ObjectStorage implements IFileStorage {
private final String storageKey;
private final Region region;
private final String bucket;
private final StoragePath rootPath;
private final AwsCredentials credentials;
private final StoragePath prefix;
private final AwsCredentialsProvider credentials;

private final StorageErrors errors;

Expand All @@ -78,21 +80,46 @@ public S3ObjectStorage(Properties properties) {

this.storageKey = properties.getProperty(IStorageManager.PROP_STORAGE_KEY);

var bucket = properties.getProperty(BUCKET_PROPERTY);
var path = properties.getProperty(PATH_PROPERTY);
var region = properties.getProperty(REGION_PROPERTY);

var accessKeyId = properties.getProperty(ACCESS_KEY_ID_PROPERTY);
var secretAccessKey = properties.getProperty(SECRET_ACCESS_KEY_PROPERTY);
var bucket = properties.getProperty(BUCKET_PROPERTY);
var prefix = properties.getProperty(PREFIX_PROPERTY);

this.region = Region.of(region);
this.bucket = bucket;
this.rootPath = path != null ? StoragePath.forPath(path) : StoragePath.root();
this.credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey);
this.prefix = prefix != null ? StoragePath.forPath(prefix) : StoragePath.root();

this.credentials = setupCredentials(properties);

this.errors = new S3StorageErrors(storageKey, log);
}

private AwsCredentialsProvider setupCredentials(Properties properties) {

var mechanism = properties.containsKey(CREDENTIALS_PROPERTY)
? properties.getProperty(CREDENTIALS_PROPERTY)
: CREDENTIALS_DEFAULT;

if (CREDENTIALS_DEFAULT.equalsIgnoreCase(mechanism)) {
log.info("Using [{}] credentials mechanism", CREDENTIALS_DEFAULT);
return DefaultCredentialsProvider.create();
}

if (CREDENTIALS_STATIC.equalsIgnoreCase(mechanism)) {

var accessKeyId = properties.getProperty(ACCESS_KEY_ID_PROPERTY);
var secretAccessKey = properties.getProperty(SECRET_ACCESS_KEY_PROPERTY);
var credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey);

log.info("Using [{}] credentials mechanism, access key id = [{}]", CREDENTIALS_STATIC, accessKeyId);

return StaticCredentialsProvider.create(credentials);
}

var message = String.format("Unrecognised credentials mechanism: [%s]", mechanism);
log.error(message);
throw new EStartup(message);
}

@Override
public void start(EventLoopGroup eventLoopGroup) {

Expand Down Expand Up @@ -126,12 +153,12 @@ public void start(EventLoopGroup eventLoopGroup) {

this.client = S3AsyncClient.builder()
.region(region)
.credentialsProvider(StaticCredentialsProvider.create(credentials))
.credentialsProvider(credentials)
.httpClientBuilder(httpClient)
.asyncConfiguration(async)
.build();

log.info("Created S3 storage, region = [{}], bucket = [{}], root path = [{}]", region, bucket, rootPath);
log.info("Created S3 storage, region = [{}], bucket = [{}], prefix = [{}]", region, bucket, prefix);
}

@Override
Expand Down Expand Up @@ -300,7 +327,7 @@ private DirStat lsResult(String dirObjectKey, ListObjectsResponse response) {
log.info(response.commonPrefixes().toString());

var stats = new ArrayList<FileStat>();
var rootPrefix = rootPath.toString();
var rootPrefix = prefix.toString();

// First entry should always be the directory being listed
if (response.contents().isEmpty()) {
Expand Down Expand Up @@ -440,15 +467,15 @@ private String resolvePath(String requestedPath, boolean allowRootDir, String op
if (storagePath.startsWith(".."))
throw errors.explicitError(STORAGE_PATH_OUTSIDE_ROOT, requestedPath, operationName);

var absolutePath = rootPath.resolve(storagePath).normalize();
var absolutePath = prefix.resolve(storagePath).normalize();

if (!rootPath.contains(absolutePath))
if (!prefix.contains(absolutePath))
throw errors.explicitError(STORAGE_PATH_OUTSIDE_ROOT, requestedPath, operationName);

if (absolutePath.equals(rootPath) && !allowRootDir)
if (absolutePath.equals(prefix) && !allowRootDir)
throw errors.explicitError(STORAGE_PATH_IS_ROOT, requestedPath, operationName);

log.info("root: {}, requested: {}, absolute: {}", rootPath, requestedPath, absolutePath);
log.info("root: {}, requested: {}, absolute: {}", prefix, requestedPath, absolutePath);

// For bucket storage, do not use "/" for the root path
// Otherwise everything gets put in a folder called "/"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.finos.tracdap.plugins.aws.storage;

import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutor;
Expand Down Expand Up @@ -71,7 +70,7 @@ void setup() {
execContext = new ExecutionContext(new DefaultEventExecutor(new DefaultThreadFactory("t-events")));
dataContext = new DataContext(execContext.eventLoopExecutor(), new RootAllocator());

storageProps.put(S3ObjectStorage.PATH_PROPERTY, testDir);
storageProps.put(S3ObjectStorage.PREFIX_PROPERTY, testDir);
storage = new S3ObjectStorage(storageProps);
storage.start(elg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.finos.tracdap.plugins.aws.storage;

import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.finos.tracdap.common.concurrent.ExecutionContext;
Expand Down Expand Up @@ -73,7 +72,7 @@ void setup() {
execContext = new ExecutionContext(new DefaultEventExecutor(new DefaultThreadFactory("t-events")));
dataContext = new DataContext(execContext.eventLoopExecutor(), new RootAllocator());

storageProps.put(S3ObjectStorage.PATH_PROPERTY, testDir);
storageProps.put(S3ObjectStorage.PREFIX_PROPERTY, testDir);
storage = new S3ObjectStorage(storageProps);
storage.start(elg);
}
Expand Down
43 changes: 34 additions & 9 deletions tracdap-runtime/python/src/tracdap/rt/_plugins/aws_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ class S3ObjectStorage(IFileStorage):

REGION_PROPERTY = "region"
BUCKET_PROPERTY = "bucket"
PATH_PROPERTY = "path"
PREFIX_PROPERTY = "prefix"

CREDENTIALS_PROPERTY = "credentials"
CREDENTIALS_DEFAULT = "default"
CREDENTIALS_STATIC = "static"

ACCESS_KEY_ID_PROPERTY = "accessKeyId"
SECRET_ACCESS_KEY_PROPERTY = "secretAccessKey"
Expand All @@ -57,16 +61,37 @@ def __init__(self, config: _cfg.PluginConfig, options: dict = None):

self._region = config.properties[self.REGION_PROPERTY]
self._bucket = config.properties[self.BUCKET_PROPERTY]
self._rootPath = config.properties[self.PATH_PROPERTY] if self.PATH_PROPERTY in config.properties else ""
self._prefix = config.properties[self.PREFIX_PROPERTY] if self.PREFIX_PROPERTY in config.properties else ""

access_key_id = config.properties[self.ACCESS_KEY_ID_PROPERTY]
secret_access_key = config.properties[self.SECRET_ACCESS_KEY_PROPERTY]
credentials_params = self.setup_credentials(config.properties)

self.__client = boto3.client(
service_name="s3",
region_name=self._region,
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key)
**credentials_params)

def setup_credentials(self, properties: dict):

mechanism = properties[self.CREDENTIALS_PROPERTY] if self.CREDENTIALS_PROPERTY in properties else self.CREDENTIALS_DEFAULT

if mechanism.lower() == self.CREDENTIALS_DEFAULT:
self._log.info(f"Using [{self.CREDENTIALS_DEFAULT}] credentials mechanism")
return {}

if mechanism.lower() == self.CREDENTIALS_STATIC:

access_key_id = properties[self.ACCESS_KEY_ID_PROPERTY]
secret_access_key = properties[self.SECRET_ACCESS_KEY_PROPERTY]

self._log.info(
f"Using [{self.CREDENTIALS_STATIC}] credentials mechanism, " +
f"access key id = [{access_key_id}]")

return {"aws_access_key_id": access_key_id, "aws_secret_access_key": secret_access_key}

message = f"Unrecognised credentials mechanism: [{mechanism}]"
self._log.error(message)
raise _ex.EStartup(message)

def exists(self, storage_path: str) -> bool:

Expand Down Expand Up @@ -246,10 +271,10 @@ def write_text_stream(self, storage_path: str, encoding: str = 'utf-8', overwrit

def _resolve_path(self, storage_path: str) -> str:

if self._rootPath is None or self._rootPath.strip() == "":
if self._prefix is None or self._prefix.strip() == "":
return storage_path

separator = "" if self._rootPath.endswith("/") else "/"
full_path = self._rootPath + separator + storage_path
separator = "" if self._prefix.endswith("/") else "/"
full_path = self._prefix + separator + storage_path

return full_path[1:] if full_path.startswith("/") else full_path

0 comments on commit 2d68a49

Please sign in to comment.