Skip to content

Commit

Permalink
HADOOP-11171 Enable using a proxy server to connect to S3a. (Thomas D…
Browse files Browse the repository at this point in the history
…emoor via stevel)
  • Loading branch information
steveloughran committed Jan 17, 2015
1 parent c6c0f4e commit 2908fe4
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 2 deletions.
3 changes: 3 additions & 0 deletions hadoop-common-project/hadoop-common/CHANGES.txt
Expand Up @@ -488,6 +488,9 @@ Release 2.7.0 - UNRELEASED

HADOOP-11261 Set custom endpoint for S3A. (Thomas Demoor via stevel)

HADOOP-11171 Enable using a proxy server to connect to S3a.
(Thomas Demoor via stevel)

OPTIMIZATIONS

HADOOP-11323. WritableComparator#compare keeps reference to byte array.
Expand Down
Expand Up @@ -31,6 +31,13 @@ public class Constants {

//use a custom endpoint?
public static final String ENDPOINT = "fs.s3a.endpoint";
//connect to s3 through a proxy server?
public static final String PROXY_HOST = "fs.s3a.proxy.host";
public static final String PROXY_PORT = "fs.s3a.proxy.port";
public static final String PROXY_USERNAME = "fs.s3a.proxy.username";
public static final String PROXY_PASSWORD = "fs.s3a.proxy.password";
public static final String PROXY_DOMAIN = "fs.s3a.proxy.domain";
public static final String PROXY_WORKSTATION = "fs.s3a.proxy.workstation";

// number of times we should retry errors
public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
Expand Down
Expand Up @@ -169,13 +169,54 @@ public void initialize(URI name, Configuration conf) throws IOException {
ClientConfiguration awsConf = new ClientConfiguration();
awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS,
DEFAULT_MAXIMUM_CONNECTIONS));
awsConf.setProtocol(conf.getBoolean(SECURE_CONNECTIONS,
DEFAULT_SECURE_CONNECTIONS) ? Protocol.HTTPS : Protocol.HTTP);
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
DEFAULT_SECURE_CONNECTIONS);
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES,
DEFAULT_MAX_ERROR_RETRIES));
awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT));

String proxyHost = conf.getTrimmed(PROXY_HOST,"");
int proxyPort = conf.getInt(PROXY_PORT, -1);
if (!proxyHost.isEmpty()) {
awsConf.setProxyHost(proxyHost);
if (proxyPort >= 0) {
awsConf.setProxyPort(proxyPort);
} else {
if (secureConnections) {
LOG.warn("Proxy host set without port. Using HTTPS default 443");
awsConf.setProxyPort(443);
} else {
LOG.warn("Proxy host set without port. Using HTTP default 80");
awsConf.setProxyPort(80);
}
}
String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
if ((proxyUsername == null) != (proxyPassword == null)) {
String msg = "Proxy error: " + PROXY_USERNAME + " or " +
PROXY_PASSWORD + " set without the other.";
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
awsConf.setProxyUsername(proxyUsername);
awsConf.setProxyPassword(proxyPassword);
awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
if (LOG.isDebugEnabled()) {
LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
"domain {} as workstation {}", awsConf.getProxyHost(),
awsConf.getProxyPort(), String.valueOf(awsConf.getProxyUsername()),
awsConf.getProxyPassword(), awsConf.getProxyDomain(),
awsConf.getProxyWorkstation());
}
} else if (proxyPort >= 0) {
String msg = "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
LOG.error(msg);
throw new IllegalArgumentException(msg);
}

s3 = new AmazonS3Client(credentials, awsConf);
String endPoint = conf.getTrimmed(ENDPOINT,"");
if (!endPoint.isEmpty()) {
Expand Down
Expand Up @@ -20,6 +20,7 @@

import com.amazonaws.services.s3.AmazonS3Client;
import org.apache.commons.lang.StringUtils;
import com.amazonaws.AmazonClientException;
import org.apache.hadoop.conf.Configuration;

import org.junit.Rule;
Expand Down Expand Up @@ -82,4 +83,98 @@ public void TestEndpoint() throws Exception {
endPointRegion, s3.getBucketLocation(fs.getUri().getHost()));
}
}

@Test
public void TestProxyConnection() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.setInt(Constants.PROXY_PORT, 1);
String proxy =
conf.get(Constants.PROXY_HOST) + ":" + conf.get(Constants.PROXY_PORT);
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server at " + proxy);
} catch (AmazonClientException e) {
if (!e.getMessage().contains(proxy + " refused")) {
throw e;
}
}
}

@Test
public void TestProxyPortWithoutHost() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.setInt(Constants.PROXY_PORT, 1);
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a proxy configuration error");
} catch (IllegalArgumentException e) {
String msg = e.toString();
if (!msg.contains(Constants.PROXY_HOST) &&
!msg.contains(Constants.PROXY_PORT)) {
throw e;
}
}
}

@Test
public void TestAutomaticProxyPortSelection() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.set(Constants.SECURE_CONNECTIONS, "true");
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server");
} catch (AmazonClientException e) {
if (!e.getMessage().contains("443")) {
throw e;
}
}
conf.set(Constants.SECURE_CONNECTIONS, "false");
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server");
} catch (AmazonClientException e) {
if (!e.getMessage().contains("80")) {
throw e;
}
}
}

@Test
public void TestUsernameInconsistentWithPassword() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.setInt(Constants.PROXY_PORT, 1);
conf.set(Constants.PROXY_USERNAME, "user");
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server");
} catch (IllegalArgumentException e) {
String msg = e.toString();
if (!msg.contains(Constants.PROXY_USERNAME) &&
!msg.contains(Constants.PROXY_PASSWORD)) {
throw e;
}
}
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.setInt(Constants.PROXY_PORT, 1);
conf.set(Constants.PROXY_PASSWORD, "password");
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server");
} catch (IllegalArgumentException e) {
String msg = e.toString();
if (!msg.contains(Constants.PROXY_USERNAME) &&
!msg.contains(Constants.PROXY_PASSWORD)) {
throw e;
}
}
}
}

0 comments on commit 2908fe4

Please sign in to comment.