diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java index 8fdb21d..0f20286 100644 --- a/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java @@ -67,6 +67,11 @@ public interface PluginTask @ConfigDefault("null") public Optional getAccessKeyId(); + @Config("http_proxy") + @ConfigDefault("null") + public Optional getHttpProxy(); + public void setHttpProxy(Optional httpProxy); + @Config("incremental") @ConfigDefault("true") public boolean getIncremental(); @@ -144,9 +149,38 @@ protected ClientConfiguration getClientConfiguration(PluginTask task) clientConfig.setMaxErrorRetry(3); // SDK default: 3 clientConfig.setSocketTimeout(8*60*1000); // SDK default: 50*1000 + // set http proxy + if (task.getHttpProxy().isPresent()) { + setHttpProxyInAwsClient(clientConfig, task.getHttpProxy().get()); + } + return clientConfig; } + private void setHttpProxyInAwsClient(ClientConfiguration clientConfig, HttpProxy httpProxy) + { + // host + clientConfig.setProxyHost(httpProxy.getHost()); + + // port + if (httpProxy.getPort().isPresent()) { + clientConfig.setProxyPort(httpProxy.getPort().get()); + } + + // useHttps + clientConfig.setProtocol(httpProxy.useHttps() ? Protocol.HTTPS : Protocol.HTTP); + + // user + if (httpProxy.getUser().isPresent()) { + clientConfig.setProxyUsername(httpProxy.getUser().get()); + } + + // password + if (httpProxy.getPassword().isPresent()) { + clientConfig.setProxyPassword(httpProxy.getPassword().get()); + } + } + private FileList listFiles(PluginTask task) { try { diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/HttpProxy.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/HttpProxy.java new file mode 100644 index 0000000..e632d6d --- /dev/null +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/HttpProxy.java @@ -0,0 +1,60 @@ +package org.embulk.input.s3; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; + +/** + * HttpProxy is config unit for Input/Output plugins' configs. + * + * TODO + * This unit will be moved to embulk/embulk-plugin-units.git. + */ +public class HttpProxy +{ + private final String host; + private final Optional port; + private final boolean https; + private final Optional user; + private final Optional password; // TODO use SecretString + + @JsonCreator + public HttpProxy( + @JsonProperty("host") String host, + @JsonProperty("port") Optional port, + @JsonProperty("https") boolean https, + @JsonProperty("user") Optional user, + @JsonProperty("password") Optional password) + { + this.host = host; + this.port = port; + this.https = https; + this.user = user; + this.password = password; + } + + public String getHost() + { + return host; + } + + public Optional getPort() + { + return port; + } + + public boolean useHttps() + { + return https; + } + + public Optional getUser() + { + return user; + } + + public Optional getPassword() + { + return password; + } +} diff --git a/embulk-input-s3/src/test/java/org/embulk/input/s3/TestHttpProxy.java b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestHttpProxy.java new file mode 100644 index 0000000..4146009 --- /dev/null +++ b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestHttpProxy.java @@ -0,0 +1,112 @@ +package org.embulk.input.s3; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import org.embulk.EmbulkTestRuntime; +import org.embulk.config.ConfigSource; +import org.embulk.input.s3.S3FileInputPlugin.S3PluginTask; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.net.URISyntaxException; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestHttpProxy +{ + @Rule + public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); + + private ConfigSource config; + + @Before + public void createResources() + { + config = runtime.getExec().newConfigSource(); + setupS3Config(config); + } + + @Test + public void checkDefaultHttpProxy() + { + ConfigSource conf = config.deepCopy(); + setupS3Config(conf); + S3PluginTask task = conf.loadConfig(S3PluginTask.class); + assertTrue(!task.getHttpProxy().isPresent()); + } + + @Test + public void checkHttpProxy() + { + { // specify host + String host = "my_host"; + Map httpProxyMap = ImmutableMap.of("host", host); + ConfigSource conf = config.deepCopy().set("http_proxy", httpProxyMap); + S3PluginTask task = conf.loadConfig(S3PluginTask.class); + + assertHttpProxy(new HttpProxy(host, Optional.absent(), false, Optional.absent(), Optional.absent()), + task.getHttpProxy().get()); + } + + { // specify host, port, use_ssl + String host = "my_host"; + int port = 8080; + boolean useSsl = true; + Map httpProxyMap = ImmutableMap.of( + "host", host, + "port", 8080, + "https", true); + ConfigSource conf = config.deepCopy().set("http_proxy", httpProxyMap); + S3PluginTask task = conf.loadConfig(S3PluginTask.class); + + assertHttpProxy(new HttpProxy(host, Optional.of(port), true, Optional.absent(), Optional.absent()), + task.getHttpProxy().get()); + } + + { // specify host, port, use_ssl, user, password + String host = "my_host"; + int port = 8080; + boolean useSsl = true; + String user = "my_user"; + String password = "my_pass"; + Map httpProxyMap = ImmutableMap.of( + "host", host, + "port", 8080, + "https", true, + "user", user, + "password", password); + ConfigSource conf = config.deepCopy().set("http_proxy", httpProxyMap); + S3PluginTask task = conf.loadConfig(S3PluginTask.class); + + assertHttpProxy(new HttpProxy(host, Optional.of(port), true, Optional.of(user), Optional.of(password)), + task.getHttpProxy().get()); + } + } + + private static void setupS3Config(ConfigSource config) + { + config.set("bucket", "my_bucket").set("path_prefix", "my_path_prefix"); + } + + private static void assertHttpProxy(HttpProxy expected, HttpProxy actual) + { + assertEquals(expected.getHost(), actual.getHost()); + assertEquals(expected.getPort().isPresent(), actual.getPort().isPresent()); + if (expected.getPort().isPresent()) { + assertEquals(expected.getPort().get(), actual.getPort().get()); + } + assertEquals(expected.useHttps(), actual.useHttps()); + assertEquals(expected.getUser().isPresent(), actual.getUser().isPresent()); + if (expected.getUser().isPresent()) { + assertEquals(expected.getUser().get(), actual.getUser().get()); + } + assertEquals(expected.getPassword().isPresent(), actual.getPassword().isPresent()); + if (expected.getPassword().isPresent()) { + assertEquals(expected.getPassword().get(), actual.getPassword().get()); + } + } +}