Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BULK-22, #398: Support reading s3 urls #399

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ dsbulk load -h '"host1.com:9042","host2.com:9042"'

`dsbulk load -url ~/export.csv -k ks1 -t table1 -escape '\"'`

* Load table `table1` in keyspace `ks1` from an AWS S3 URL, passing the `region` and `profile` as
[query parameters in the URL](./url/README.md):

`dsbulk load -k ks1 -t table1 -url s3://bucket-name/key?region=us-west-1&profile=bucket-profile`

## Unload Examples

Unloading is simply the inverse of loading and due to the symmetry, many settings are
Expand Down
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@
<artifactId>commons-compress</artifactId>
<version>${commons-compress.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>${commons-collections4.version}</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
Expand Down
4 changes: 1 addition & 3 deletions changelog/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
## Changelog

## 1.9.0

- [improvement] BULK-22: Add ability to read from `s3://` URLs (#398).
## 1.9.0 (in progress)

- [improvement] Upgrade driver to 4.14.0.
- [improvement] BULK-22: Add ability to read from `s3://` URLs (#398).

## 1.8.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfigUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigUtils.class);

private static final Pattern THREADS_PATTERN =
Pattern.compile("(.+)\\s*C", Pattern.CASE_INSENSITIVE);

Expand Down Expand Up @@ -645,6 +649,10 @@ public static List<URL> getURLsFromFile(Path urlfile) throws IOException {
String.format("%s: Expecting valid filepath or URL, got '%s'", urlfile, path), e);
}
}
if (result.size() > 100_000) {
LOGGER.warn(
"You seem to have a lot of URLs in your urlfile. Consider breaking it into pieces to avoid out-of-memory errors.");
}
return result;
}

Expand Down
16 changes: 6 additions & 10 deletions manual/application.template.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1636,16 +1636,12 @@ dsbulk {
# Settings applicable for reading from AWS S3 URLs.
################################################################################################

# Which profile to use for AWS S3 credentials. See the [AWS SDK documentation](https://docs.aws.
# amazon.com/sdkref/latest/guide/file-format.html) for details on setting up profiles.
# Type: string
# Default value: ""
#s3.profile = ""

# Which AWS region to use.
# Type: string
# Default value: "us-east-1"
#s3.region = "us-east-1"
# The size (count) of the S3Client cache. Since each S3 URL

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whatever additional guidance gets included in settings.md, please add them here too. 🙏

# must contain the credentials for the target bucket, we cache
# the clients to prevent rebuilding the same client over and over.
# Type: number
# Default value: 20
#s3.clientCacheSize = 20

################################################################################################
# Settings applicable for the count workflow, ignored otherwise.
Expand Down
15 changes: 5 additions & 10 deletions manual/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -1435,18 +1435,13 @@ Default: **true**.

Settings applicable for reading from AWS S3 URLs.

#### --s3.profile<br />--dsbulk.s3.profile _&lt;string&gt;_
#### --s3.clientCacheSize<br />--dsbulk.s3.clientCacheSize _&lt;number&gt;_

Which profile to use for AWS S3 credentials. See the [AWS SDK documentation](https://docs.aws.
amazon.com/sdkref/latest/guide/file-format.html) for details on setting up profiles.
The size (count) of the S3Client cache. Since each S3 URL

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to request that we put in additional guidance here. It's not readily obvious to me why 20 is a good default.

For example, what happens if we set the cache size to 100? or 5? Thoughts? 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pretty much just picked an arbitrary number. 🤷‍♂️ In our use case, we're using the same credentials for all the URLs, so the cache only ever has one entry. I suspect using the same credentials for all the URLs will be the usual use case, and it would be a bit of a pain (and somewhat unexpected) to supply more than 20 different sets of credentials in a single urlfile. I suppose that is actually an argument for a cache size of 1. 🙃

I'm happy to add a sentence explaining this if you want, though. Do note, however, that this file (settings.md) and application.template.conf are automatically generated from dsbulk-reference.conf, so the same explanatory statement will be shared between the three files.

must contain the credentials for the target bucket, we cache
the clients to prevent rebuilding the same client over and over.

Default: **&lt;unspecified&gt;**.

#### --s3.region<br />--dsbulk.s3.region _&lt;string&gt;_

Which AWS region to use.

Default: **"us-east-1"**.
Default: **20**.

<a name="stats"></a>
## Stats Settings
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
<jackson.version>2.13.1</jackson.version>
<univocity.version>2.9.1</univocity.version>
<commons-compress.version>1.21</commons-compress.version>
<commons-collections4.version>4.4</commons-collections4.version>
<compress.zstd.version>1.5.2-1</compress.zstd.version>
<compress.xz.version>1.9</compress.xz.version>
<compress.brotli.version>0.1.2</compress.brotli.version>
Expand Down
9 changes: 8 additions & 1 deletion url/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,11 @@
This module contains URL utilities for DSBulk, among which:

1. DSBulk's `BulkLoaderURLStreamHandlerFactory`, which is DSBulk's default factory for URL handlers;
3. A URL stream handler for reading / writing to standard input / output.
2. A URL stream handler for reading / writing to standard input / output.
3. A URL stream handler for reading from AWS S3 URLs.
1. Every S3 URL must contain the proper query parameters from which an `S3Client` can be built. These parameters are:
1. `region` (required): The AWS region, such as `us-west-1`.
2. `profile` (optional, preferred): The profile to use to provide credentials. See [the AWS SDK credentials documentation](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html) for more information.
3. `accessKeyId` and `secretKeyId` (optional, discouraged): In case you don't have a profile set up, you can use this less-secure method. Both parameters are required if you choose this.
2. If only the `region` is provided, DSBulk will fall back to the default AWS credentials provider, which handles role-based credentials.
3. To prevent unnecessary client re-creation when using many URLs from a `urlfile`, `S3Client`s are cached by the query parameters. The size of the cache is controlled by the `dsbulk.s3.clientCacheSize` option (default: 20).
9 changes: 9 additions & 0 deletions url/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>url-connection-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
Expand All @@ -78,6 +82,11 @@
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,62 @@
*/
package com.datastax.oss.dsbulk.url;

import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLStreamHandler;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.collections4.map.LRUMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.utils.StringUtils;

/** A {@link URLStreamHandler} for reading from AWS S3 URls. */
public class S3URLStreamHandler extends URLStreamHandler {

private static final String REGION = "region";
private static final String PROFILE = "profile";
private static final String ACCESS_KEY_ID = "accessKeyId";
private static final String SECRET_ACCESS_KEY = "secretAccessKey";

private static final Logger LOGGER = LoggerFactory.getLogger(S3URLStreamHandler.class);

private final S3Client s3Client;
private final Map<String, S3Client> s3ClientCache;

S3URLStreamHandler(S3Client s3Client) {
this.s3Client = s3Client;
S3URLStreamHandler(int s3ClientCacheSize) {
this.s3ClientCache = Collections.synchronizedMap(new LRUMap<>(s3ClientCacheSize));
adutra marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
protected URLConnection openConnection(URL url) {
return new S3Connection(url, s3Client);
return new S3Connection(url, s3ClientCache);
}

private static class S3Connection extends URLConnection {
@VisibleForTesting
static class S3Connection extends URLConnection {

private final S3Client s3Client;
private final Map<String, S3Client> s3ClientCache;

@Override
public void connect() {
// Nothing to see here...
}

S3Connection(URL url, S3Client s3Client) {
S3Connection(URL url, Map<String, S3Client> s3ClientCache) {
super(url);
this.s3Client = s3Client;
this.s3ClientCache = s3ClientCache;
}

@Override
Expand All @@ -62,9 +80,67 @@ public InputStream getInputStream() {
LOGGER.debug("Getting S3 input stream for object '{}' in bucket '{}'...", key, bucket);
GetObjectRequest getObjectRequest =
GetObjectRequest.builder().bucket(bucket).key(key).build();
String query = url.getQuery();
if (query == null) {
throw new IllegalArgumentException(
"You must provide S3 client credentials in the URL query parameters.");
}
S3Client s3Client = s3ClientCache.computeIfAbsent(query, this::getS3Client);
return getInputStream(s3Client, getObjectRequest);
}

@VisibleForTesting
InputStream getInputStream(S3Client s3Client, GetObjectRequest getObjectRequest) {
return s3Client.getObjectAsBytes(getObjectRequest).asInputStream();
}

@VisibleForTesting
S3Client getS3Client(String query) {
Map<String, String> parameters = parseParameters(query);
String region;
if (parameters.containsKey(REGION)) {
region = parameters.get(REGION);
} else {
throw new IllegalArgumentException("You must supply an AWS 'region' parameter on S3 URls.");
}

SdkHttpClient httpClient = UrlConnectionHttpClient.builder().build();
S3ClientBuilder builder = S3Client.builder().httpClient(httpClient).region(Region.of(region));

String profile = parameters.getOrDefault(PROFILE, null);
String accessKeyId = parameters.getOrDefault(ACCESS_KEY_ID, null);
String secretAccessKey = parameters.getOrDefault(SECRET_ACCESS_KEY, null);
if (!StringUtils.isBlank(profile)) {
LOGGER.info("Using AWS profile {} to connect to S3.", profile);
builder.credentialsProvider(ProfileCredentialsProvider.create(profile));
} else if (accessKeyId != null || secretAccessKey != null) {
LOGGER.warn("Using access key ID and secret access key to connect to S3.");
LOGGER.warn(
"This is considered an insecure way to pass credentials. Please use a 'profile' instead.");
if (accessKeyId == null || secretAccessKey == null) {
throw new IllegalArgumentException(
"Both 'accessKeyId' and 'secretAccessKey' must be present if either one is provided.");
}
builder.credentialsProvider(() -> AwsBasicCredentials.create(accessKeyId, secretAccessKey));
} else {
LOGGER.info("Using default credentials provider to connect to S3.");
}

return builder.build();
}

private Map<String, String> parseParameters(String query) {
Map<String, String> parameters = new HashMap<>();
String[] paramList = query.split("&", 0);
for (String param : paramList) {
String[] parts = param.split("=", 0);
if (parts.length > 1 && !StringUtils.isBlank(parts[1])) {
parameters.put(parts[0], parts[1]);
adutra marked this conversation as resolved.
Show resolved Hide resolved
}
}
return parameters;
}

@Override
public OutputStream getOutputStream() {
throw new UnsupportedOperationException("Writing to S3 has not yet been implemented.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,65 +19,25 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import java.net.URLStreamHandler;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.utils.StringUtils;

public class S3URLStreamHandlerProvider implements URLStreamHandlerProvider {

private static final String REGION_PATH = "dsbulk.s3.region";
private static final String PROFILE_PATH = "dsbulk.s3.profile";

private static final Logger LOGGER = LoggerFactory.getLogger(S3URLStreamHandlerProvider.class);
private static final String S3CLIENT_CACHE_SIZE_PATH = "dsbulk.s3.clientCacheSize";
private static final int DEFAULT_S3CLIENT_CACHE_SIZE = 20;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In-line with my request above, could we add a one-line comment on why 20 is a good default please? 🙏


/** The protocol for AWS S3 URLs. I.e., URLs beginning with {@code s3://} */
public static final String S3_STREAM_PROTOCOL = "s3";

private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false);

private S3Client s3Client;

private void init(Config config) {
if (INITIALIZED.get()) {
LOGGER.warn("Ignoring additional attempts to initialize the S3URLStreamHandlerProvider.");
return;
}

String region;
if (config.hasPath(REGION_PATH)) {
region = config.getString(REGION_PATH);
} else {
throw new IllegalArgumentException("You must supply an AWS region to use S3 URls.");
}
String profile = config.hasPath(PROFILE_PATH) ? config.getString(PROFILE_PATH) : null;

SdkHttpClient httpClient = UrlConnectionHttpClient.builder().build();
S3ClientBuilder builder = S3Client.builder().httpClient(httpClient).region(Region.of(region));

if (!StringUtils.isBlank(profile)) {
LOGGER.info("Using AWS profile {} to connect to S3.", profile);
builder.credentialsProvider(ProfileCredentialsProvider.create(profile));
} else {
LOGGER.info("Using default credentials provider to connect to S3.");
}

this.s3Client = builder.build();
}

@Override
@NonNull
public Optional<URLStreamHandler> maybeCreateURLStreamHandler(
@NonNull String protocol, Config config) {
if (S3_STREAM_PROTOCOL.equalsIgnoreCase(protocol)) {
init(config);
return Optional.of(new S3URLStreamHandler(s3Client));
int s3ClientCacheSize =
config.hasPath(S3CLIENT_CACHE_SIZE_PATH)
? config.getInt(S3CLIENT_CACHE_SIZE_PATH)
: DEFAULT_S3CLIENT_CACHE_SIZE;
return Optional.of(new S3URLStreamHandler(s3ClientCacheSize));
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface URLStreamHandlerProvider {
* empty.
*
* @param protocol The protocol to create a handler for.
* @param config
* @param config The DSBulk config.
* @return The created handler, or empty if the protocol is not supported.
*/
@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
import static org.mockito.Mockito.when;

import com.typesafe.config.Config;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

@TestMethodOrder(OrderAnnotation.class)
class BulkLoaderURLStreamHandlerFactoryTest {

@Test
void should_handle_installed_handlers() {
Config config = mock(Config.class);
when(config.hasPath("dsbulk.s3.region")).thenReturn(true);
when(config.getString("dsbulk.s3.region")).thenReturn("us-west-1");
when(config.hasPath("dsbulk.s3.profile")).thenReturn(true);
when(config.getString("dsbulk.s3.profile")).thenReturn("profile");
when(config.hasPath("dsbulk.s3.clientCacheSize")).thenReturn(true);
when(config.getInt("dsbulk.s3.clientCacheSize")).thenReturn(25);

BulkLoaderURLStreamHandlerFactory.install();
BulkLoaderURLStreamHandlerFactory.setConfig(config);
Expand Down
Loading