Skip to content

Commit

Permalink
Support hostname verification on proxy to broker connection (#1214)
Browse files Browse the repository at this point in the history
* Support hostname verification on proxy to broker connection

* remove dep and rename config
  • Loading branch information
rdhabalia committed Feb 11, 2018
1 parent ccbfbf8 commit a27a1e2
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 9 deletions.
3 changes: 3 additions & 0 deletions conf/proxy.conf
Expand Up @@ -74,3 +74,6 @@ tlsCertificateFilePath=


# Path for the TLS private key file # Path for the TLS private key file
tlsKeyFilePath= tlsKeyFilePath=

# Validates hostname when proxy creates tls connection with broker
tlsHostnameVerificationEnabled=false
Expand Up @@ -24,6 +24,8 @@
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;


import javax.net.ssl.SSLSession;

import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.Commands;
Expand All @@ -37,16 +39,19 @@
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;


public class DirectProxyHandler { public class DirectProxyHandler {


Expand Down Expand Up @@ -104,7 +109,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
} }
ch.pipeline().addLast("frameDecoder", ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4)); new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
ch.pipeline().addLast(new ProxyBackendHandler()); ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config));
} }
}); });


Expand All @@ -124,7 +129,10 @@ protected void initChannel(SocketChannel ch) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
// Close the connection if the connection attempt has failed. // Close the connection if the connection attempt has failed.
inboundChannel.close(); inboundChannel.close();
return;
} }
final ProxyBackendHandler cnx = (ProxyBackendHandler) outboundChannel.pipeline().get("proxyOutboundHandler");
cnx.setRemoteHostName(targetBroker.getHost());
}); });
} }


Expand All @@ -135,9 +143,17 @@ enum BackendState {
public class ProxyBackendHandler extends PulsarDecoder implements FutureListener<Void> { public class ProxyBackendHandler extends PulsarDecoder implements FutureListener<Void> {


private BackendState state = BackendState.Init; private BackendState state = BackendState.Init;
private String remoteHostName;
protected ChannelHandlerContext ctx;
private ProxyConfiguration config;

public ProxyBackendHandler(ProxyConfiguration config) {
this.config = config;
}


@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
// Send the Connect command to broker // Send the Connect command to broker
String authData = ""; String authData = "";
if (authentication.getAuthData().hasDataFromCommand()) { if (authentication.getAuthData().hasDataFromCommand()) {
Expand Down Expand Up @@ -195,6 +211,15 @@ protected void handleConnected(CommandConnected connected) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received Connected from broker", inboundChannel, outboundChannel); log.debug("[{}] [{}] Received Connected from broker", inboundChannel, outboundChannel);
} }

if (config.isTlsHostnameVerificationEnabled() && remoteHostName != null
&& !verifyTlsHostName(remoteHostName, ctx)) {
// close the connection if host-verification failed with the broker
log.warn("[{}] Failed to verify hostname of {}", ctx.channel(), remoteHostName);
ctx.close();
return;
}

state = BackendState.HandshakeCompleted; state = BackendState.HandshakeCompleted;


inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion())).addListener(future -> { inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion())).addListener(future -> {
Expand All @@ -220,6 +245,21 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.warn("[{}] [{}] Caught exception: {}", inboundChannel, outboundChannel, cause.getMessage(), cause); log.warn("[{}] [{}] Caught exception: {}", inboundChannel, outboundChannel, cause.getMessage(), cause);
ctx.close(); ctx.close();
} }

public void setRemoteHostName(String remoteHostName) {
this.remoteHostName = remoteHostName;
}

private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");

SSLSession sslSession = null;
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
return (new DefaultHostnameVerifier()).verify(hostname, sslSession);
}
return false;
}
} }


private static final Logger log = LoggerFactory.getLogger(DirectProxyHandler.class); private static final Logger log = LoggerFactory.getLogger(DirectProxyHandler.class);
Expand Down
Expand Up @@ -82,6 +82,8 @@ public class ProxyConfiguration implements PulsarConfiguration {
private String tlsTrustCertsFilePath; private String tlsTrustCertsFilePath;
// Accept untrusted TLS certificate from client // Accept untrusted TLS certificate from client
private boolean tlsAllowInsecureConnection = false; private boolean tlsAllowInsecureConnection = false;
// Validates hostname when proxy creates tls connection with broker
private boolean tlsHostnameVerificationEnabled = false;


private Properties properties = new Properties(); private Properties properties = new Properties();


Expand Down Expand Up @@ -213,6 +215,14 @@ public void setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) {
this.tlsAllowInsecureConnection = tlsAllowInsecureConnection; this.tlsAllowInsecureConnection = tlsAllowInsecureConnection;
} }


public boolean isTlsHostnameVerificationEnabled() {
return tlsHostnameVerificationEnabled;
}

public void setTlsHostnameVerificationEnabled(boolean tlsHostnameVerificationEnabled) {
this.tlsHostnameVerificationEnabled = tlsHostnameVerificationEnabled;
}

public String getBrokerClientAuthenticationPlugin() { public String getBrokerClientAuthenticationPlugin() {
return brokerClientAuthenticationPlugin; return brokerClientAuthenticationPlugin;
} }
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -133,7 +134,6 @@ protected void setup() throws Exception {


proxyService = Mockito.spy(new ProxyService(proxyConfig)); proxyService = Mockito.spy(new ProxyService(proxyConfig));


proxyService.start();
} }


@AfterMethod @AfterMethod
Expand All @@ -143,9 +143,13 @@ protected void cleanup() throws Exception {
proxyService.close(); proxyService.close();
} }


void startProxy() throws Exception {
proxyService.start();
}

/** /**
* <pre> * <pre>
* It verifies e2e tls + Authentication + Authorization (client -> proxy -> broker> * It verifies e2e tls + Authentication + Authorization (client -> proxy -> broker)
* *
* 1. client connects to proxy over tls and pass auth-data * 1. client connects to proxy over tls and pass auth-data
* 2. proxy authenticate client and retrieve client-role * 2. proxy authenticate client and retrieve client-role
Expand All @@ -161,10 +165,12 @@ protected void cleanup() throws Exception {
public void textProxyAuthorization() throws Exception { public void textProxyAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName); log.info("-- Starting {} test --", methodName);


startProxy();
createAdminClient(); createAdminClient();
final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls(); final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
// create a client which connects to proxy over tls and pass authData // create a client which connects to proxy over tls and pass authData
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, false); ClientConfiguration clientConf = new ClientConfiguration();
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientConf);


String namespaceName = "my-property/proxy-authorization/my-ns"; String namespaceName = "my-property/proxy-authorization/my-ns";


Expand Down Expand Up @@ -209,13 +215,68 @@ public void textProxyAuthorization() throws Exception {
} }


@Test(dataProvider = "hostnameVerification") @Test(dataProvider = "hostnameVerification")
public void textProxyAuthorizationTlsHostVerification(boolean hostnameVerificationEnabled) throws Exception { public void textTlsHostVerificationProxyToClient(boolean hostnameVerificationEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

startProxy();
createAdminClient();
final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
// create a client which connects to proxy over tls and pass authData
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setTlsHostnameVerificationEnable(hostnameVerificationEnabled);
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientConf);

String namespaceName = "my-property/proxy-authorization/my-ns";

admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
admin.namespaces().createNamespace(namespaceName);

admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
try {
Consumer consumer = proxyClient.subscribe("persistent://my-property/proxy-authorization/my-ns/my-topic1",
"my-subscriber-name", conf);
if (hostnameVerificationEnabled) {
Assert.fail("Connection should be failed due to hostnameVerification enabled");
}
} catch (PulsarClientException e) {
if (!hostnameVerificationEnabled) {
Assert.fail("Consumer should be created because hostnameverification is disabled");
}
}

log.info("-- Exiting {} test --", methodName);
}

/**
* It verifies hostname verification at proxy when proxy tries to connect with broker. Proxy performs hostname
* verification when broker sends its certs over tls .
* <pre>
* 1. Broker sends certs back to proxy with CN="Broker" however, proxy tries to connect with hostname=localhost
* 2. so, client fails to create consumer if proxy is enabled with hostname verification
* </pre>
*
* @param hostnameVerificationEnabled
* @throws Exception
*/
@Test(dataProvider = "hostnameVerification")
public void textTlsHostVerificationProxyToBroker(boolean hostnameVerificationEnabled) throws Exception {
log.info("-- Starting {} test --", methodName); log.info("-- Starting {} test --", methodName);


proxyConfig.setTlsHostnameVerificationEnabled(hostnameVerificationEnabled);
startProxy();
createAdminClient(); createAdminClient();
final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls(); final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
// create a client which connects to proxy over tls and pass authData // create a client which connects to proxy over tls and pass authData
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, hostnameVerificationEnabled); ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setOperationTimeout(1, TimeUnit.SECONDS);
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientConf);


String namespaceName = "my-property/proxy-authorization/my-ns"; String namespaceName = "my-property/proxy-authorization/my-ns";


Expand Down Expand Up @@ -261,19 +322,17 @@ protected final void createAdminClient() throws Exception {
admin = spy(new PulsarAdmin(brokerUrlTls, clientConf)); admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
} }


private PulsarClient createPulsarClient(String proxyServiceUrl, boolean hosnameVerificationEnabled) throws PulsarClientException { private PulsarClient createPulsarClient(String proxyServiceUrl, ClientConfiguration clientConf) throws PulsarClientException {
Map<String, String> authParams = Maps.newHashMap(); Map<String, String> authParams = Maps.newHashMap();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
Authentication authTls = new AuthenticationTls(); Authentication authTls = new AuthenticationTls();
authTls.configure(authParams); authTls.configure(authParams);
org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS); clientConf.setStatsInterval(0, TimeUnit.SECONDS);
clientConf.setTlsTrustCertsFilePath(TLS_CLIENT_TRUST_CERT_FILE_PATH); clientConf.setTlsTrustCertsFilePath(TLS_CLIENT_TRUST_CERT_FILE_PATH);
clientConf.setTlsAllowInsecureConnection(true); clientConf.setTlsAllowInsecureConnection(true);
clientConf.setAuthentication(authTls); clientConf.setAuthentication(authTls);
clientConf.setUseTls(true); clientConf.setUseTls(true);
clientConf.setTlsHostnameVerificationEnable(hosnameVerificationEnabled);
return PulsarClient.create(proxyServiceUrl, clientConf); return PulsarClient.create(proxyServiceUrl, clientConf);
} }
} }

0 comments on commit a27a1e2

Please sign in to comment.