diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index bf1f35a53..4d51ba583 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -332,6 +332,8 @@ e.g. "DAT-42".
Before you send your pull request, make sure that you have a unit test that failed before
the fix and succeeds after.
+You will also need to sign the [DataStax Contribution License Agreement](https://cla.datastax.com/).
+
If your pull requests addresses a Jira issue, add the corresponding entry to the
[changelog](./changelog/README.md), in the following format:
@@ -372,3 +374,23 @@ where the changes should be squashed:
We require at least one approval by a peer reviewer.
When the pull request is finally approved, squash all commits together to have a cleaner history
before merging. It is OK to delete the branch after merging.
+
+# Building from Source
+
+If you would like to build `dsbulk` from source, you will need the following:
+
+* Java 8. Later versions of Java may be able to run `dsbulk`, but 8 is required for building.
+* Maven 3.x.
+
+From the root directory, run `mvn install`. This is the basic build and pretty much just makes sure
+that everything compiles and all the tests pass. To build the _.tar.gz_ or _.zip_ file that can be
+distributed to end users for execution, run `mvn install -Prelease`. The tarred/zipped files will
+appear in _distribution/target_, from which you can extract and run your updated code on your test
+cases.
+
+## Building Documentation
+
+Note that the build process automatically updates certain documentation files. For example, the
+documents in the [manual](./manual) are generated from the config templates for
+[dsbulk options](./workflow/commons/src/main/resources/dsbulk-reference.conf) and
+[driver options](./workflow/commons/src/main/resources/driver-reference.conf).
diff --git a/README.md b/README.md
index 1026750ea..17c95fd4c 100644
--- a/README.md
+++ b/README.md
@@ -298,6 +298,11 @@ dsbulk load -h '"host1.com:9042","host2.com:9042"'
`dsbulk load -url ~/export.csv -k ks1 -t table1 -escape '\"'`
+* Load table `table1` in keyspace `ks1` from an AWS S3 URL, passing the `region` and `profile` as
+ [query parameters in the URL](./url/README.md):
+
+ `dsbulk load -k ks1 -t table1 -url s3://bucket-name/key?region=us-west-1&profile=bucket-profile`
+
## Unload Examples
Unloading is simply the inverse of loading and due to the symmetry, many settings are
diff --git a/bom/pom.xml b/bom/pom.xml
index 132d04091..56e0a915e 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -375,6 +375,13 @@
jansi
${jansi.version}
+
+ software.amazon.awssdk
+ bom
+ ${aws.sdk.version}
+ pom
+ import
+
diff --git a/changelog/README.md b/changelog/README.md
index c903a4059..9ba6ea00e 100644
--- a/changelog/README.md
+++ b/changelog/README.md
@@ -1,7 +1,12 @@
## Changelog
+## 1.11.0
+
+- [improvement] BULK-22: Add ability to read from `s3://` URLs (#398).
+
## 1.10.0
+- [improvement] Upgrade driver to 4.14.1.
- [improvement] [#423](https://github.com/datastax/dsbulk/issues/423): Upgrade driver to 4.14.1, and other DSBulk dependencies.
- [bug] [#419](https://github.com/datastax/dsbulk/issues/419): Fix wrong ANSI mode option name.
- [bug] [#425](https://github.com/datastax/dsbulk/issues/425): Don't print count results if read result counter is null.
diff --git a/config/src/main/java/com/datastax/oss/dsbulk/config/ConfigUtils.java b/config/src/main/java/com/datastax/oss/dsbulk/config/ConfigUtils.java
index b373f80bc..28225d4f0 100644
--- a/config/src/main/java/com/datastax/oss/dsbulk/config/ConfigUtils.java
+++ b/config/src/main/java/com/datastax/oss/dsbulk/config/ConfigUtils.java
@@ -44,9 +44,13 @@
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ConfigUtils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConfigUtils.class);
+
private static final Pattern THREADS_PATTERN =
Pattern.compile("(.+)\\s*C", Pattern.CASE_INSENSITIVE);
@@ -697,6 +701,10 @@ public static List getURLsFromFile(Path urlfile) throws IOException {
String.format("%s: Expecting valid filepath or URL, got '%s'", urlfile, path), e);
}
}
+ if (result.size() > 100_000) {
+ LOGGER.warn(
+ "You seem to have a lot of URLs in your urlfile. Consider breaking it into pieces to avoid out-of-memory errors.");
+ }
return result;
}
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 501320046..68f13af93 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -218,7 +218,7 @@
META-INF/io.netty.versions.properties
-
+
diff --git a/manual/application.template.conf b/manual/application.template.conf
index a0b988872..0f3e1923d 100644
--- a/manual/application.template.conf
+++ b/manual/application.template.conf
@@ -1823,6 +1823,21 @@ dsbulk {
# Default value: true
#runner.promptForPasswords = true
+ ################################################################################################
+ # Settings applicable for reading from AWS S3 URLs.
+ ################################################################################################
+
+ # The size (count) of the S3Client cache. Since each S3 URL
+ # must contain the credentials for the target bucket, we cache
+ # the clients to prevent rebuilding the same client over and over.
+ # The default size of 20 is totally arbitrary, as we generally
+ # expect that most S3 URLs in a given batch will be using the
+ # same credentials, meaning the cache will really only ever
+ # contain one entry.
+ # Type: number
+ # Default value: 20
+ #s3.clientCacheSize = 20
+
################################################################################################
# Settings applicable for the count workflow, ignored otherwise.
################################################################################################
diff --git a/manual/settings.md b/manual/settings.md
index d021f9e2b..0e6807e23 100644
--- a/manual/settings.md
+++ b/manual/settings.md
@@ -1,4 +1,4 @@
-# DataStax Bulk Loader v1.10.0-SNAPSHOT Options
+# DataStax Bulk Loader v1.11.0-SNAPSHOT Options
*NOTE:* The long options described here can be persisted in `conf/application.conf` and thus permanently override defaults and avoid specifying options on the command line.
@@ -18,6 +18,7 @@ A template configuration file can be found [here](./application.template.conf).
Log Settings
Monitoring Settings
Runner Settings
+S3 Settings
Stats Settings
Driver Settings
@@ -1567,6 +1568,23 @@ Prompting from passwords require interactive shells; if the standard input is no
Default: **true**.
+
+## S3 Settings
+
+Settings applicable for reading from AWS S3 URLs.
+
+#### --s3.clientCacheSize
--dsbulk.s3.clientCacheSize _<number>_
+
+The size (count) of the S3Client cache. Since each S3 URL
+must contain the credentials for the target bucket, we cache
+the clients to prevent rebuilding the same client over and over.
+The default size of 20 is totally arbitrary, as we generally
+expect that most S3 URLs in a given batch will be using the
+same credentials, meaning the cache will really only ever
+contain one entry.
+
+Default: **20**.
+
## Stats Settings
@@ -2013,3 +2031,4 @@ The node-level metrics to enable. Available metrics are:
- errors.connection.auth
Default: **[]**.
+
diff --git a/pom.xml b/pom.xml
index 8e3b72654..c70122e3a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,7 @@
1.9.13
20220320
1.18
+ 2.17.121
0.15.0
5.8.2
diff --git a/runner/src/main/java/com/datastax/oss/dsbulk/runner/DataStaxBulkLoader.java b/runner/src/main/java/com/datastax/oss/dsbulk/runner/DataStaxBulkLoader.java
index eb493f8f9..e7ef64f5e 100644
--- a/runner/src/main/java/com/datastax/oss/dsbulk/runner/DataStaxBulkLoader.java
+++ b/runner/src/main/java/com/datastax/oss/dsbulk/runner/DataStaxBulkLoader.java
@@ -62,6 +62,7 @@ public ExitStatus run() {
CommandLineParser parser = new CommandLineParser(args);
ParsedCommandLine result = parser.parse();
Config config = result.getConfig();
+ BulkLoaderURLStreamHandlerFactory.setConfig(config);
workflow = result.getWorkflowProvider().newWorkflow(config);
WorkflowThread workflowThread = new WorkflowThread(workflow);
diff --git a/url/README.md b/url/README.md
index 10bdcd23a..14b9065e9 100644
--- a/url/README.md
+++ b/url/README.md
@@ -3,4 +3,11 @@
This module contains URL utilities for DSBulk, among which:
1. DSBulk's `BulkLoaderURLStreamHandlerFactory`, which is DSBulk's default factory for URL handlers;
-3. A URL stream handler for reading / writing to standard input / output.
+2. A URL stream handler for reading / writing to standard input / output.
+3. A URL stream handler for reading from AWS S3 URLs.
+ 1. Every S3 URL must contain the proper query parameters from which an `S3Client` can be built. These parameters are:
+ 1. `region` (required): The AWS region, such as `us-west-1`.
+ 2. `profile` (optional, preferred): The profile to use to provide credentials. See [the AWS SDK credentials documentation](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html) for more information.
+ 3. `accessKeyId` and `secretKeyId` (optional, discouraged): In case you don't have a profile set up, you can use this less-secure method. Both parameters are required if you choose this.
+ 2. If only the `region` is provided, DSBulk will fall back to the default AWS credentials provider, which handles role-based credentials.
+ 3. To prevent unnecessary client re-creation when using many URLs from a `urlfile`, `S3Client`s are cached by the query parameters. The size of the cache is controlled by the `dsbulk.s3.clientCacheSize` option (default: 20).
diff --git a/url/pom.xml b/url/pom.xml
index b75582ce1..061b32902 100644
--- a/url/pom.xml
+++ b/url/pom.xml
@@ -46,6 +46,22 @@
org.slf4j
slf4j-api
+
+ software.amazon.awssdk
+ s3
+
+
+ software.amazon.awssdk
+ url-connection-client
+
+
+ com.github.ben-manes.caffeine
+ caffeine
+
+
+ com.typesafe
+ config
+
org.junit.jupiter
junit-jupiter-engine
@@ -66,6 +82,11 @@
logback-classic
test
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
com.github.spotbugs
spotbugs-annotations
diff --git a/url/src/main/java/com/datastax/oss/dsbulk/url/BulkLoaderURLStreamHandlerFactory.java b/url/src/main/java/com/datastax/oss/dsbulk/url/BulkLoaderURLStreamHandlerFactory.java
index f0649e507..eccd11e75 100644
--- a/url/src/main/java/com/datastax/oss/dsbulk/url/BulkLoaderURLStreamHandlerFactory.java
+++ b/url/src/main/java/com/datastax/oss/dsbulk/url/BulkLoaderURLStreamHandlerFactory.java
@@ -17,6 +17,7 @@
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
+import com.typesafe.config.Config;
import java.net.URL;
import java.net.URLStreamHandler;
import java.net.URLStreamHandlerFactory;
@@ -57,8 +58,18 @@ public static void install() {
}
}
+ public static void setConfig(Config config) {
+ if (!INSTALLED.get()) {
+ throw new IllegalStateException(
+ "You must install the URL stream handler factories before setting the config.");
+ }
+ INSTANCE.config = config;
+ }
+
private final ImmutableList providers;
+ private Config config;
+
private BulkLoaderURLStreamHandlerFactory() {
// IMPORTANT: the discovery must be done *before* this factory is installed,
// otherwise the discovery may result in infinite recursion.
@@ -78,7 +89,8 @@ public URLStreamHandler createURLStreamHandler(String protocol) {
URLStreamHandler handler = null;
if (protocol != null) {
for (URLStreamHandlerProvider provider : providers) {
- Optional maybeHandler = provider.maybeCreateURLStreamHandler(protocol);
+ Optional maybeHandler =
+ provider.maybeCreateURLStreamHandler(protocol, config);
if (maybeHandler.isPresent()) {
handler = maybeHandler.get();
break;
diff --git a/url/src/main/java/com/datastax/oss/dsbulk/url/S3URLStreamHandler.java b/url/src/main/java/com/datastax/oss/dsbulk/url/S3URLStreamHandler.java
new file mode 100644
index 000000000..9d550da9e
--- /dev/null
+++ b/url/src/main/java/com/datastax/oss/dsbulk/url/S3URLStreamHandler.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.dsbulk.url;
+
+import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLDecoder;
+import java.net.URLStreamHandler;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.utils.StringUtils;
+
+/** A {@link URLStreamHandler} for reading from AWS S3 URls. */
+public class S3URLStreamHandler extends URLStreamHandler {
+
+ private static final String REGION = "region";
+ private static final String PROFILE = "profile";
+ private static final String ACCESS_KEY_ID = "accessKeyId";
+ private static final String SECRET_ACCESS_KEY = "secretAccessKey";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(S3URLStreamHandler.class);
+
+ private final Cache s3ClientCache;
+
+ S3URLStreamHandler(int s3ClientCacheSize) {
+ this.s3ClientCache = Caffeine.newBuilder().maximumSize(s3ClientCacheSize).build();
+ }
+
+ @Override
+ protected URLConnection openConnection(URL url) {
+ return new S3Connection(url, s3ClientCache);
+ }
+
+ @VisibleForTesting
+ static class S3Connection extends URLConnection {
+
+ private final Cache s3ClientCache;
+
+ @Override
+ public void connect() {
+ // Nothing to see here...
+ }
+
+ S3Connection(URL url, Cache s3ClientCache) {
+ super(url);
+ this.s3ClientCache = s3ClientCache;
+ }
+
+ @Override
+ public InputStream getInputStream() {
+ String bucket = url.getHost();
+ String key = url.getPath().substring(1); // Strip leading '/'.
+ LOGGER.debug("Getting S3 input stream for object '{}' in bucket '{}'...", key, bucket);
+ GetObjectRequest getObjectRequest =
+ GetObjectRequest.builder().bucket(bucket).key(key).build();
+ String query = url.getQuery();
+ if (StringUtils.isBlank(query)) {
+ throw new IllegalArgumentException(
+ "You must provide S3 client credentials in the URL query parameters.");
+ }
+ S3ClientInfo s3ClientInfo = new S3ClientInfo(query);
+ S3Client s3Client = s3ClientCache.get(s3ClientInfo, this::getS3Client);
+ return getInputStream(s3Client, getObjectRequest);
+ }
+
+ @VisibleForTesting
+ InputStream getInputStream(S3Client s3Client, GetObjectRequest getObjectRequest) {
+ return s3Client.getObjectAsBytes(getObjectRequest).asInputStream();
+ }
+
+ @VisibleForTesting
+ S3Client getS3Client(S3ClientInfo s3ClientInfo) {
+ SdkHttpClient httpClient = UrlConnectionHttpClient.builder().build();
+ S3ClientBuilder builder =
+ S3Client.builder().httpClient(httpClient).region(Region.of(s3ClientInfo.getRegion()));
+
+ String profile = s3ClientInfo.getProfile();
+ String accessKeyId = s3ClientInfo.getAccessKeyId();
+ String secretAccessKey = s3ClientInfo.getSecretAccessKey();
+ if (!StringUtils.isBlank(profile)) {
+ LOGGER.info("Using AWS profile {} to connect to S3.", profile);
+ builder.credentialsProvider(ProfileCredentialsProvider.create(profile));
+ } else if (accessKeyId != null || secretAccessKey != null) {
+ LOGGER.warn("Using access key ID and secret access key to connect to S3.");
+ LOGGER.warn(
+ "This is considered an insecure way to pass credentials. Please use a 'profile' instead.");
+ if (accessKeyId == null || secretAccessKey == null) {
+ throw new IllegalArgumentException(
+ "Both 'accessKeyId' and 'secretAccessKey' must be present if either one is provided.");
+ }
+ builder.credentialsProvider(() -> AwsBasicCredentials.create(accessKeyId, secretAccessKey));
+ } else {
+ LOGGER.info("Using default credentials provider to connect to S3.");
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public OutputStream getOutputStream() {
+ throw new UnsupportedOperationException("Writing to S3 has not yet been implemented.");
+ }
+ }
+
+ @VisibleForTesting
+ static class S3ClientInfo {
+
+ private final String region;
+ private final String profile;
+ private final String accessKeyId;
+ private final String secretAccessKey;
+
+ S3ClientInfo(String query) {
+ Map> parameters;
+ try {
+ parameters = parseParameters(query);
+ } catch (UnsupportedEncodingException e) {
+ // This should never happen, since everyone should support UTF-8.
+ throw new IllegalArgumentException("UTF-8 encoding was not found on your system.", e);
+ }
+ region = getQueryParam(parameters, REGION);
+ if (StringUtils.isBlank(region)) {
+ throw new IllegalArgumentException("You must supply an AWS 'region' parameter on S3 URls.");
+ }
+ profile = getQueryParam(parameters, PROFILE);
+ accessKeyId = getQueryParam(parameters, ACCESS_KEY_ID);
+ secretAccessKey = getQueryParam(parameters, SECRET_ACCESS_KEY);
+ }
+
+ // Borrowed from
+ // https://stackoverflow.com/questions/13592236/parse-a-uri-string-into-name-value-collection
+ private static Map> parseParameters(String query)
+ throws UnsupportedEncodingException {
+ final Map> queryPairs = new LinkedHashMap<>();
+ final String[] pairs = query.split("&", 0);
+ for (String pair : pairs) {
+ final int idx = pair.indexOf("=");
+ final String key = idx > 0 ? URLDecoder.decode(pair.substring(0, idx), "UTF-8") : pair;
+ queryPairs.computeIfAbsent(key, k -> new ArrayList<>());
+ final String value =
+ idx > 0 && pair.length() > idx + 1
+ ? URLDecoder.decode(pair.substring(idx + 1), "UTF-8")
+ : null;
+ queryPairs.get(key).add(value);
+ }
+ return queryPairs;
+ }
+
+ private static String getQueryParam(Map> parameters, String key) {
+ if (parameters.containsKey(key)) {
+ List values = parameters.get(key);
+ if (values != null && !values.isEmpty()) {
+ return values.get(0);
+ }
+ }
+ return null;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public String getProfile() {
+ return profile;
+ }
+
+ public String getAccessKeyId() {
+ return accessKeyId;
+ }
+
+ public String getSecretAccessKey() {
+ return secretAccessKey;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ S3ClientInfo that = (S3ClientInfo) o;
+ return region.equals(that.region)
+ && Objects.equals(profile, that.profile)
+ && Objects.equals(accessKeyId, that.accessKeyId)
+ && Objects.equals(secretAccessKey, that.secretAccessKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(region, profile, accessKeyId, secretAccessKey);
+ }
+ }
+}
diff --git a/url/src/main/java/com/datastax/oss/dsbulk/url/S3URLStreamHandlerProvider.java b/url/src/main/java/com/datastax/oss/dsbulk/url/S3URLStreamHandlerProvider.java
new file mode 100644
index 000000000..2a749f192
--- /dev/null
+++ b/url/src/main/java/com/datastax/oss/dsbulk/url/S3URLStreamHandlerProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.dsbulk.url;
+
+import com.typesafe.config.Config;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import java.net.URLStreamHandler;
+import java.util.Optional;
+
+public class S3URLStreamHandlerProvider implements URLStreamHandlerProvider {
+
+ private static final String S3CLIENT_CACHE_SIZE_PATH = "dsbulk.s3.clientCacheSize";
+ private static final int DEFAULT_S3CLIENT_CACHE_SIZE = 20; // Totally arbitrary default.
+
+ /** The protocol for AWS S3 URLs. I.e., URLs beginning with {@code s3://} */
+ public static final String S3_STREAM_PROTOCOL = "s3";
+
+ @Override
+ @NonNull
+ public Optional maybeCreateURLStreamHandler(
+ @NonNull String protocol, Config config) {
+ if (S3_STREAM_PROTOCOL.equalsIgnoreCase(protocol)) {
+ int s3ClientCacheSize =
+ config.hasPath(S3CLIENT_CACHE_SIZE_PATH)
+ ? config.getInt(S3CLIENT_CACHE_SIZE_PATH)
+ : DEFAULT_S3CLIENT_CACHE_SIZE;
+ return Optional.of(new S3URLStreamHandler(s3ClientCacheSize));
+ }
+ return Optional.empty();
+ }
+}
diff --git a/url/src/main/java/com/datastax/oss/dsbulk/url/StdinStdoutURLStreamHandlerProvider.java b/url/src/main/java/com/datastax/oss/dsbulk/url/StdinStdoutURLStreamHandlerProvider.java
index 88718e09f..76dd8d2e9 100644
--- a/url/src/main/java/com/datastax/oss/dsbulk/url/StdinStdoutURLStreamHandlerProvider.java
+++ b/url/src/main/java/com/datastax/oss/dsbulk/url/StdinStdoutURLStreamHandlerProvider.java
@@ -15,6 +15,7 @@
*/
package com.datastax.oss.dsbulk.url;
+import com.typesafe.config.Config;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.net.URLStreamHandler;
import java.util.Optional;
@@ -29,7 +30,8 @@ public class StdinStdoutURLStreamHandlerProvider implements URLStreamHandlerProv
@Override
@NonNull
- public Optional maybeCreateURLStreamHandler(@NonNull String protocol) {
+ public Optional maybeCreateURLStreamHandler(
+ @NonNull String protocol, Config config) {
if (STANDARD_STREAM_PROTOCOL.equalsIgnoreCase(protocol)) {
return Optional.of(new StdinStdoutURLStreamHandler());
}
diff --git a/url/src/main/java/com/datastax/oss/dsbulk/url/URLStreamHandlerProvider.java b/url/src/main/java/com/datastax/oss/dsbulk/url/URLStreamHandlerProvider.java
index b29c643bc..56f3f1c72 100644
--- a/url/src/main/java/com/datastax/oss/dsbulk/url/URLStreamHandlerProvider.java
+++ b/url/src/main/java/com/datastax/oss/dsbulk/url/URLStreamHandlerProvider.java
@@ -15,6 +15,7 @@
*/
package com.datastax.oss.dsbulk.url;
+import com.typesafe.config.Config;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.net.URLStreamHandler;
import java.util.Optional;
@@ -35,8 +36,9 @@ public interface URLStreamHandlerProvider {
* empty.
*
* @param protocol The protocol to create a handler for.
+ * @param config The DSBulk config.
* @return The created handler, or empty if the protocol is not supported.
*/
@NonNull
- Optional maybeCreateURLStreamHandler(@NonNull String protocol);
+ Optional maybeCreateURLStreamHandler(@NonNull String protocol, Config config);
}
diff --git a/url/src/main/resources/META-INF/services/com.datastax.oss.dsbulk.url.URLStreamHandlerProvider b/url/src/main/resources/META-INF/services/com.datastax.oss.dsbulk.url.URLStreamHandlerProvider
index acf401d39..305fb8389 100644
--- a/url/src/main/resources/META-INF/services/com.datastax.oss.dsbulk.url.URLStreamHandlerProvider
+++ b/url/src/main/resources/META-INF/services/com.datastax.oss.dsbulk.url.URLStreamHandlerProvider
@@ -1 +1,2 @@
-com.datastax.oss.dsbulk.url.StdinStdoutURLStreamHandlerProvider
\ No newline at end of file
+com.datastax.oss.dsbulk.url.StdinStdoutURLStreamHandlerProvider
+com.datastax.oss.dsbulk.url.S3URLStreamHandlerProvider
\ No newline at end of file
diff --git a/url/src/test/java/com/datastax/oss/dsbulk/url/BulkLoaderURLStreamHandlerFactoryTest.java b/url/src/test/java/com/datastax/oss/dsbulk/url/BulkLoaderURLStreamHandlerFactoryTest.java
index d014976e1..80838873c 100644
--- a/url/src/test/java/com/datastax/oss/dsbulk/url/BulkLoaderURLStreamHandlerFactoryTest.java
+++ b/url/src/test/java/com/datastax/oss/dsbulk/url/BulkLoaderURLStreamHandlerFactoryTest.java
@@ -16,19 +16,35 @@
package com.datastax.oss.dsbulk.url;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import com.typesafe.config.Config;
+import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+@TestMethodOrder(OrderAnnotation.class)
class BulkLoaderURLStreamHandlerFactoryTest {
@Test
void should_handle_installed_handlers() {
+ Config config = mock(Config.class);
+ when(config.hasPath("dsbulk.s3.clientCacheSize")).thenReturn(true);
+ when(config.getInt("dsbulk.s3.clientCacheSize")).thenReturn(25);
+
+ BulkLoaderURLStreamHandlerFactory.install();
+ BulkLoaderURLStreamHandlerFactory.setConfig(config);
BulkLoaderURLStreamHandlerFactory factory = BulkLoaderURLStreamHandlerFactory.INSTANCE;
+
assertThat(factory.createURLStreamHandler("std"))
.isNotNull()
.isInstanceOf(StdinStdoutURLStreamHandler.class);
assertThat(factory.createURLStreamHandler("STD"))
.isNotNull()
.isInstanceOf(StdinStdoutURLStreamHandler.class);
+ assertThat(factory.createURLStreamHandler("s3"))
+ .isNotNull()
+ .isInstanceOf(S3URLStreamHandler.class);
}
}
diff --git a/url/src/test/java/com/datastax/oss/dsbulk/url/S3URLStreamHandlerProviderTest.java b/url/src/test/java/com/datastax/oss/dsbulk/url/S3URLStreamHandlerProviderTest.java
new file mode 100644
index 000000000..d6f4fb354
--- /dev/null
+++ b/url/src/test/java/com/datastax/oss/dsbulk/url/S3URLStreamHandlerProviderTest.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.dsbulk.url;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.typesafe.config.Config;
+import org.junit.jupiter.api.Test;
+
+class S3URLStreamHandlerProviderTest {
+
+ @Test
+ void should_handle_s3_protocol() {
+ Config config = mock(Config.class);
+ when(config.hasPath("dsbulk.s3.clientCacheSize")).thenReturn(true);
+ when(config.getInt("dsbulk.s3.clientCacheSize")).thenReturn(25);
+
+ S3URLStreamHandlerProvider provider = new S3URLStreamHandlerProvider();
+
+ assertThat(provider.maybeCreateURLStreamHandler("s3", config))
+ .isNotNull()
+ .containsInstanceOf(S3URLStreamHandler.class);
+ }
+}
diff --git a/url/src/test/java/com/datastax/oss/dsbulk/url/S3URLStreamHandlerTest.java b/url/src/test/java/com/datastax/oss/dsbulk/url/S3URLStreamHandlerTest.java
new file mode 100644
index 000000000..471646efd
--- /dev/null
+++ b/url/src/test/java/com/datastax/oss/dsbulk/url/S3URLStreamHandlerTest.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.dsbulk.url;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.datastax.oss.dsbulk.url.S3URLStreamHandler.S3ClientInfo;
+import com.datastax.oss.dsbulk.url.S3URLStreamHandler.S3Connection;
+import com.typesafe.config.Config;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.stubbing.Answer;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+
+class S3URLStreamHandlerTest {
+
+ @Mock private Config config;
+ @Mock private InputStream mockInputStream;
+
+ private AutoCloseable mocks;
+
+ @BeforeEach
+ void setup() {
+ mocks = MockitoAnnotations.openMocks(this);
+
+ when(config.hasPath("dsbulk.s3.clientCacheSize")).thenReturn(true);
+ when(config.getInt("dsbulk.s3.clientCacheSize")).thenReturn(2);
+
+ BulkLoaderURLStreamHandlerFactory.install();
+ BulkLoaderURLStreamHandlerFactory.setConfig(config);
+ // Now, calling url.openConnection() will use the right handler.
+ }
+
+ @AfterEach
+ void clean_up() throws Exception {
+ mocks.close();
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "s3://test-bucket/test-key,You must provide S3 client credentials in the URL query parameters.",
+ "s3://test-bucket/test-key?profile=default,You must supply an AWS 'region' parameter on S3 URls.",
+ "s3://test-bucket/test-key?region=us-west-1&accessKeyId=secret,Both 'accessKeyId' and 'secretAccessKey' must be present if either one is provided.",
+ "s3://test-bucket/test-key?region=us-west-1&secretAccessKey=secret,Both 'accessKeyId' and 'secretAccessKey' must be present if either one is provided."
+ })
+ void should_require_query_parameters(String s3Url, String errorMessage) throws IOException {
+ URL url = new URL(s3Url);
+ S3Connection connection = spy((S3Connection) url.openConnection());
+
+ doReturn(mockInputStream).when(connection).getInputStream(any(), any());
+
+ Throwable t = catchThrowable(connection::getInputStream);
+
+ assertThat(t).isNotNull().isInstanceOf(IllegalArgumentException.class).hasMessage(errorMessage);
+ }
+
+ @ParameterizedTest
+ @ValueSource(
+ strings = {
+ // This one uses a profile.
+ "s3://test-bucket/test-key?region=us-west-1&profile=nameNoOneWouldActuallyGiveTheirProfile",
+ // This one uses the default credentials provider.
+ "s3://test-bucket/test-key?region=us-west-1",
+ // This one uses both secret parameters.
+ "s3://test-bucket/test-key?region=us-west-1&accessKeyId=secret&secretAccessKey=alsoSecret"
+ })
+ void should_provide_input_stream_when_parameters_are_correct(String s3Url) throws IOException {
+ URL url = new URL(s3Url);
+ S3Connection connection = spy((S3Connection) url.openConnection());
+
+ doReturn(mockInputStream).when(connection).getInputStream(any(), any());
+
+ assertThat(connection.getInputStream()).isNotNull();
+ }
+
+ @Test
+ void should_cache_clients() throws IOException {
+ URL url1 = new URL("s3://test-bucket/test-key-1?region=us-west-1&test=should_cache");
+ S3Connection connection1 = spy((S3Connection) url1.openConnection());
+ URL url2 = new URL("s3://test-bucket/test-key-2?region=us-west-1&test=should_cache");
+ S3Connection connection2 = spy((S3Connection) url2.openConnection());
+
+ S3Client mockClient = mock(S3Client.class);
+ when(mockClient.getObjectAsBytes(any(GetObjectRequest.class)))
+ .thenAnswer(
+ (Answer>)
+ invocation -> {
+ GetObjectResponse response = GetObjectResponse.builder().build();
+ byte[] bytes = new byte[] {};
+ InputStream is = new ByteArrayInputStream(bytes);
+ return ResponseBytes.fromInputStream(response, is);
+ });
+ doReturn(mockClient).when(connection1).getS3Client(any());
+
+ InputStream stream1 = connection1.getInputStream();
+ InputStream stream2 = connection2.getInputStream();
+
+ assertThat(stream1).isNotSameAs(stream2); // Two different URls produce different streams.
+ verify(mockClient, times(2)).getObjectAsBytes(any(GetObjectRequest.class));
+ verify(connection1)
+ .getS3Client(
+ new S3ClientInfo(
+ "region=us-west-1&test=should_cache")); // We got the client for one connection...
+ verify(connection2, never()).getS3Client(any()); // ... but not the second connection.
+ }
+
+ @Test
+ void should_not_support_writing_to_s3() throws IOException {
+ URL url = new URL("s3://test-bucket/test-key");
+ S3Connection connection = spy((S3Connection) url.openConnection());
+
+ Throwable t = catchThrowable(connection::getOutputStream);
+
+ assertThat(t)
+ .isNotNull()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Writing to S3 has not yet been implemented.");
+ }
+}
diff --git a/url/src/test/java/com/datastax/oss/dsbulk/url/StdinStdoutURLStreamHandlerProviderTest.java b/url/src/test/java/com/datastax/oss/dsbulk/url/StdinStdoutURLStreamHandlerProviderTest.java
index df31860f9..7922ef6f0 100644
--- a/url/src/test/java/com/datastax/oss/dsbulk/url/StdinStdoutURLStreamHandlerProviderTest.java
+++ b/url/src/test/java/com/datastax/oss/dsbulk/url/StdinStdoutURLStreamHandlerProviderTest.java
@@ -24,10 +24,10 @@ class StdinStdoutURLStreamHandlerProviderTest {
@Test
void should_handle_std_protocol() {
StdinStdoutURLStreamHandlerProvider provider = new StdinStdoutURLStreamHandlerProvider();
- assertThat(provider.maybeCreateURLStreamHandler("std"))
+ assertThat(provider.maybeCreateURLStreamHandler("std", null))
.isNotNull()
.containsInstanceOf(StdinStdoutURLStreamHandler.class);
- assertThat(provider.maybeCreateURLStreamHandler("STD"))
+ assertThat(provider.maybeCreateURLStreamHandler("STD", null))
.isNotNull()
.containsInstanceOf(StdinStdoutURLStreamHandler.class);
}
diff --git a/workflow/commons/src/main/resources/dsbulk-reference.conf b/workflow/commons/src/main/resources/dsbulk-reference.conf
index c8fda7077..ea65b652d 100644
--- a/workflow/commons/src/main/resources/dsbulk-reference.conf
+++ b/workflow/commons/src/main/resources/dsbulk-reference.conf
@@ -1073,6 +1073,18 @@ dsbulk {
}
+ # Settings applicable for reading from AWS S3 URLs.
+ s3 {
+ # The size (count) of the S3Client cache. Since each S3 URL
+ # must contain the credentials for the target bucket, we cache
+ # the clients to prevent rebuilding the same client over and over.
+ # The default size of 20 is totally arbitrary, as we generally
+ # expect that most S3 URLs in a given batch will be using the
+ # same credentials, meaning the cache will really only ever
+ # contain one entry.
+ clientCacheSize = 20
+ }
+
# This group of settings is purely internal and is the interface for
# DSBulk's infrastructure to customize how some settings are exposed to the user.
#