Skip to content

Commit

Permalink
Add SOCKS support (w/o authentication)
Browse files Browse the repository at this point in the history
fix #166
  • Loading branch information
costin committed Mar 15, 2014
1 parent 8ac7161 commit fb68fe1
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 16 deletions.
Expand Up @@ -94,7 +94,7 @@ public Response execute(Request request) throws EsHadoopProtocolException {
log.error(String.format("Node [%s] failed; " + (newNode ? "selected next node [" + currentUri + "]" : "no other nodes left - aborting..."), failed));

if (!newNode) {
throw new EsHadoopProtocolException("Out of nodes and retries; caught exception", ex);
throw new EsHadoopProtocolException("Connection error (check network and/or proxy settings) - out of nodes and retries", ex);
}
}
} while (newNode);
Expand Down
Expand Up @@ -44,7 +44,6 @@
import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.commons.httpclient.protocol.Protocol;
import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.Settings;
Expand Down Expand Up @@ -77,7 +76,7 @@ public class CommonsHttpTransport implements Transport, StatsAware {
private final HttpClient client;
private final Stats stats = new Stats();
private HttpConnection conn;

private String proxyInfo = "";

private static class ResponseInputStream extends DelegatingInputStream {

Expand Down Expand Up @@ -139,8 +138,8 @@ public boolean retryMethod(HttpMethod method, IOException exception, int executi
params.setSoTimeout((int) settings.getHttpTimeout());
HostConfiguration hostConfig = new HostConfiguration();

hostConfig = setupHttpProxy(settings, hostConfig);
hostConfig = setupSocksProxy(settings, hostConfig);
hostConfig = setupHttpProxy(settings, hostConfig);

try {
hostConfig.setHost(new URI(prefixUri(host), false));
Expand Down Expand Up @@ -172,14 +171,22 @@ private HostConfiguration setupHttpProxy(Settings settings, HostConfiguration ho

if (StringUtils.hasText(proxyHost)) {
hostConfig.setProxy(proxyHost, proxyPort);

log.info(String.format("Using HTTP proxy [%s:%s]", proxyHost, proxyPort));
proxyInfo = proxyInfo.concat(String.format("[HTTP proxy %s:%s]", proxyHost, proxyPort));

if (StringUtils.hasText(settings.getNetworkProxyHttpUser())) {
HttpState state = new HttpState();
state.setProxyCredentials(AuthScope.ANY, new UsernamePasswordCredentials(settings.getNetworkProxyHttpUser(), settings.getNetworkProxyHttpPass()));
client.setState(state);
}

if (log.isDebugEnabled()) {
if (StringUtils.hasText(settings.getNetworkProxyHttpUser())) {
log.debug(String.format("Using authenticated HTTP proxy [%s:%s]", proxyHost, proxyPort));
}
else {
log.debug(String.format("Using HTTP proxy [%s:%s]", proxyHost, proxyPort));
}
}
}

return hostConfig;
Expand All @@ -189,27 +196,51 @@ private HostConfiguration setupSocksProxy(Settings settings, HostConfiguration h
// set proxy settings
String proxyHost = null;
int proxyPort = -1;
String proxyUser = null;
String proxyPass = null;

if (settings.getNetworkHttpUseSystemProperties()) {
proxyHost = System.getProperty("socksProxyHost");
proxyPort = Integer.getInteger("socksProxyPort", -1);
proxyUser = System.getProperty("java.net.socks.username");
proxyPass = System.getProperty("java.net.socks.password");
}
if (StringUtils.hasText(settings.getNetworkProxySocksHost())) {
proxyHost = settings.getNetworkProxySocksHost();
}
if (settings.getNetworkProxySocksPort() > 0) {
proxyPort = settings.getNetworkProxySocksPort();
}
if (StringUtils.hasText(settings.getNetworkProxySocksUser())) {
proxyUser = settings.getNetworkProxySocksUser();
}
if (StringUtils.hasText(settings.getNetworkProxySocksPass())) {
proxyPass = settings.getNetworkProxySocksPass();
}

// we actually have a socks proxy, let's start the setup
if (StringUtils.hasText(proxyHost)) {
log.info(String.format("Using SOCKS proxy [%s:%s]", proxyHost, proxyPort));
proxyInfo = proxyInfo.concat(String.format("[SOCKS proxy %s:%s]", proxyHost, proxyPort));

hostConfig = new ProtocolAwareHostConfiguration(hostConfig);
if (log.isDebugEnabled()) {
if (StringUtils.hasText(proxyUser)) {
log.debug(String.format("Using authenticated SOCKS proxy [%s:%s]", proxyHost, proxyPort));
}
else {
log.debug(String.format("Using SOCKS proxy [%s:%s]", proxyHost, proxyPort));
}
}

ProtocolSocketFactory socksSocksFactory = new SocksSocketFactory(proxyHost, proxyPort);
Protocol protocol = new Protocol("http", socksSocksFactory, 80);
Protocol.registerProtocol("http", new Protocol("http", socksSocksFactory, 80));
hostConfig.setHost(proxyHost, proxyPort, protocol);
// NB: not really needed (see below that the protocol is reseted) but in place just in case
hostConfig = new ProtocolAwareHostConfiguration(hostConfig);
SocksSocketFactory socksSocksFactory = new SocksSocketFactory(proxyHost, proxyPort, proxyUser, proxyPass);
Protocol directHttp = Protocol.getProtocol("http");
Protocol proxiedHttp = new SocksProtocol(socksSocksFactory, directHttp);
hostConfig.setHost(proxyHost, proxyPort, proxiedHttp);
// NB: register the new protocol since when using absolute URIs, HttpClient#executeMethod will override the configuration (#387)
// NB: hence why the original/direct http protocol is saved - as otherwise the connection is not closed since it is considered different
// NB: (as the protocol identities don't match)
Protocol.registerProtocol("http", proxiedHttp);
}

return hostConfig;
Expand Down Expand Up @@ -260,17 +291,17 @@ public Response execute(Request request) throws IOException {
entityMethod.setContentChunked(false);
}

client.executeMethod(http);

// when tracing, log everything
if (log.isTraceEnabled()) {
log.trace(String.format("Tx [%s]@[%s][%s] w/ payload [%s]", request.method().name(), request.uri(), request.path(), request.body()));
log.trace(String.format("Tx %s[%s]@[%s][%s] w/ payload [%s]", proxyInfo, request.method().name(), request.uri(), request.path(), request.body()));
}

client.executeMethod(http);

if (log.isTraceEnabled()) {
Socket sk = ReflectionUtils.invoke(GET_SOCKET, conn, (Object[]) null);
String addr = sk.getLocalAddress().getHostAddress();
log.trace(String.format("Rx @[%s] [%s-%s] [%s]", addr, http.getStatusCode(), HttpStatus.getStatusText(http.getStatusCode()), http.getResponseBodyAsString()));
log.trace(String.format("Rx %s@[%s] [%s-%s] [%s]", proxyInfo, addr, http.getStatusCode(), HttpStatus.getStatusText(http.getStatusCode()), http.getResponseBodyAsString()));
}

return new SimpleResponse(http.getStatusCode(), new ResponseInputStream(http), request.uri());
Expand Down
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.rest.commonshttp;

import org.apache.commons.httpclient.protocol.Protocol;

/**
* Class used to make sure the SOCKS protocol object is behaving like the original one.
* Otherwise this leads to connections not being closed (as they are considered different).
*/
class SocksProtocol extends Protocol {

private final Protocol original;

SocksProtocol(SocksSocketFactory factory, Protocol original) {
super("http", factory, 80);
this.original = original;
}

public boolean equals(Object obj) {
return (obj instanceof SocksProtocol ? true : original.equals(obj));
}

public int hashCode() {
return original.hashCode();
}
}
Expand Up @@ -19,8 +19,10 @@
package org.elasticsearch.hadoop.rest.commonshttp;

import java.io.IOException;
import java.net.Authenticator;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.PasswordAuthentication;
import java.net.Proxy;
import java.net.Socket;
import java.net.SocketAddress;
Expand All @@ -29,15 +31,31 @@
import org.apache.commons.httpclient.ConnectTimeoutException;
import org.apache.commons.httpclient.params.HttpConnectionParams;
import org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory;
import org.elasticsearch.hadoop.util.StringUtils;

class SocksSocketFactory extends DefaultProtocolSocketFactory {

private final String socksHost;
private final int socksPort;

SocksSocketFactory(String socksHost, int socksPort) {
this(socksHost, socksPort, null, null);
}

SocksSocketFactory(String socksHost, int socksPort, final String user, final String pass) {
this.socksHost = socksHost;
this.socksPort = socksPort;

if (StringUtils.hasText(user)) {
final PasswordAuthentication auth = new PasswordAuthentication(user, pass.toCharArray());

Authenticator.setDefault(new Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
return auth;
}
});
}
}

public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort, final HttpConnectionParams params)
Expand Down

0 comments on commit fb68fe1

Please sign in to comment.