Skip to content

Commit

Permalink
Enable specification of TLS Protocol Versions and Cipher Suites (#1225)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jai Asher committed Feb 13, 2018
1 parent 7dd64d0 commit b990674
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 15 deletions.
Expand Up @@ -180,7 +180,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
private String tlsTrustCertsFilePath = "";
// Accept untrusted TLS certificate from client
private boolean tlsAllowInsecureConnection = false;

// Specify the tls protocols the broker will use to negotiate during TLS Handshake.
// Example:- [TLSv1.2, TLSv1.1, TLSv1]
private Set<String> tlsProtocols = Sets.newTreeSet();
// Specify the tls cipher the broker will use to negotiate during TLS Handshake.
// Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
private Set<String> tlsCiphers = Sets.newTreeSet();

/***** --- Authentication --- ****/
// Enable authentication
private boolean authenticationEnabled = false;
Expand Down Expand Up @@ -1400,4 +1406,20 @@ public boolean authenticateOriginalAuthData() {
public void setAuthenticateOriginalAuthData(boolean authenticateOriginalAuthData) {
this.authenticateOriginalAuthData = authenticateOriginalAuthData;
}

public Set<String> getTlsProtocols() {
return tlsProtocols;
}

public void setTlsProtocols(Set<String> tlsProtocols) {
this.tlsProtocols = tlsProtocols;
}

public Set<String> getTlsCiphers() {
return tlsCiphers;
}

public void setTlsCiphers(Set<String> tlsCiphers) {
this.tlsCiphers = tlsCiphers;
}
}
Expand Up @@ -50,7 +50,10 @@ public PulsarChannelInitializer(BrokerService brokerService, ServiceConfiguratio
@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (enableTLS) {
SslContext sslCtx = SecurityUtility.createNettySslContextForServer(serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath());
SslContext sslCtx = SecurityUtility.createNettySslContextForServer(
serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustCertsFilePath(),
serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath(),
serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols());
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
}

Expand Down
Expand Up @@ -253,13 +253,13 @@ public void close() {
}
};

public static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTime)
public static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
throws Exception {
for (int i = 0; i < retryCount; i++) {
if (predicate.test(null) || i == (retryCount - 1)) {
break;
}
Thread.sleep(intSleepTime + (intSleepTime * i));
Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
}
}

Expand Down
Expand Up @@ -564,6 +564,10 @@ private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
SSLSession sslSession = null;
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
if (log.isDebugEnabled()) {
log.debug("Verifying HostName for {}, Cipher {}, Protocols {}", hostname, sslSession.getCipherSuite(),
sslSession.getProtocol());
}
return hostnameVerifier.verify(hostname, sslSession);
}
return false;
Expand Down
Expand Up @@ -37,6 +37,8 @@
import java.security.spec.PKCS8EncodedKeySpec;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.Set;

import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
Expand Down Expand Up @@ -95,12 +97,21 @@ public static SslContext createNettySslContextForClient(boolean allowInsecureCon
}

public static SslContext createNettySslContextForServer(boolean allowInsecureConnection, String trustCertsFilePath,
String certFilePath, String keyFilePath)
String certFilePath, String keyFilePath, Set<String> ciphers, Set<String> protocols)
throws GeneralSecurityException, SSLException, FileNotFoundException {
X509Certificate[] certificates = loadCertificatesFromPemFile(certFilePath);
PrivateKey privateKey = loadPrivateKeyFromPemFile(keyFilePath);

SslContextBuilder builder = SslContextBuilder.forServer(privateKey, (X509Certificate[]) certificates);
if (ciphers != null && ciphers.size() > 0) {
builder.ciphers(ciphers);
}

if (protocols != null && protocols.size() > 0) {
String[] protocolsArray = new String[protocols.size()];
builder.protocols(protocols.toArray(protocolsArray));
}

if (allowInsecureConnection) {
builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
} else {
Expand Down
Expand Up @@ -51,7 +51,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
if (enableTLS) {
SslContext sslCtx = SecurityUtility.createNettySslContextForServer(
serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustCertsFilePath(),
serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath());
serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath(),
serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols());
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
}
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
Expand Down
Expand Up @@ -82,7 +82,13 @@ public class ServiceConfig implements PulsarConfiguration {
private String tlsTrustCertsFilePath = "";
// Accept untrusted TLS certificate from client
private boolean tlsAllowInsecureConnection = false;

// Specify the tls protocols the broker will use to negotiate during TLS Handshake.
// Example:- [TLSv1.2, TLSv1.1, TLSv1]
private Set<String> tlsProtocols = Sets.newTreeSet();
// Specify the tls cipher the broker will use to negotiate during TLS Handshake.
// Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
private Set<String> tlsCiphers = Sets.newTreeSet();

private Properties properties = new Properties();

public String getZookeeperServers() {
Expand Down Expand Up @@ -244,4 +250,20 @@ public Properties getProperties() {
public void setProperties(Properties properties) {
this.properties = properties;
}

public Set<String> getTlsProtocols() {
return tlsProtocols;
}

public void setTlsProtocols(Set<String> tlsProtocols) {
this.tlsProtocols = tlsProtocols;
}

public Set<String> getTlsCiphers() {
return tlsCiphers;
}

public void setTlsCiphers(Set<String> tlsCiphers) {
this.tlsCiphers = tlsCiphers;
}
}
Expand Up @@ -87,7 +87,13 @@ public class ProxyConfiguration implements PulsarConfiguration {
private boolean tlsAllowInsecureConnection = false;
// Validates hostname when proxy creates tls connection with broker
private boolean tlsHostnameVerificationEnabled = false;

// Specify the tls protocols the broker will use to negotiate during TLS Handshake.
// Example:- [TLSv1.2, TLSv1.1, TLSv1]
private Set<String> tlsProtocols = Sets.newTreeSet();
// Specify the tls cipher the broker will use to negotiate during TLS Handshake.
// Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
private Set<String> tlsCiphers = Sets.newTreeSet();

private Properties properties = new Properties();

public boolean forwardAuthorizationCredentials() {
Expand Down Expand Up @@ -289,4 +295,20 @@ public Properties getProperties() {
public void setProperties(Properties properties) {
this.properties = properties;
}

public Set<String> getTlsProtocols() {
return tlsProtocols;
}

public void setTlsProtocols(Set<String> tlsProtocols) {
this.tlsProtocols = tlsProtocols;
}

public Set<String> getTlsCiphers() {
return tlsCiphers;
}

public void setTlsCiphers(Set<String> tlsCiphers) {
this.tlsCiphers = tlsCiphers;
}
}
Expand Up @@ -49,7 +49,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
if (enableTLS) {
SslContext sslCtx = SecurityUtility.createNettySslContextForServer(true /* to allow InsecureConnection */,
serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
serviceConfig.getTlsKeyFilePath());
serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols());
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
}

Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -75,10 +76,65 @@ public class ProxyWithProxyAuthorizationTest extends ProducerConsumerBase {
private ProxyConfiguration proxyConfig = new ProxyConfiguration();

@DataProvider(name = "hostnameVerification")
public Object[][] codecProvider() {
public Object[][] hostnameVerificationCodecProvider() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}

@DataProvider(name = "protocolsCiphersProvider")
public Object[][] protocolsCiphersProviderCodecProvider() {
// Test using defaults
Set<String> ciphers_1 = Sets.newTreeSet();
Set<String> protocols_1 = Sets.newTreeSet();

// Test explicitly specifying protocols defaults
Set<String> ciphers_2 = Sets.newTreeSet();
Set<String> protocols_2 = Sets.newTreeSet();
protocols_2.add("TLSv1.2");
protocols_2.add("TLSv1.1");
protocols_2.add("TLSv1");

// Test for invalid ciphers
Set<String> ciphers_3 = Sets.newTreeSet();
Set<String> protocols_3 = Sets.newTreeSet();
ciphers_3.add("INVALID_PROTOCOL");

// Incorrect Config since TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 was introduced in TLSv1.2
Set<String> ciphers_4 = Sets.newTreeSet();
Set<String> protocols_4 = Sets.newTreeSet();
ciphers_4.add("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");
protocols_4.add("TLSv1.1");

// Incorrect Config since TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 was introduced in TLSv1.2
Set<String> ciphers_5 = Sets.newTreeSet();
Set<String> protocols_5 = Sets.newTreeSet();
ciphers_5.add("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");
protocols_5.add("TLSv1");

// Correct Config
Set<String> ciphers_6 = Sets.newTreeSet();
Set<String> protocols_6 = Sets.newTreeSet();
ciphers_6.add("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");
protocols_6.add("TLSv1.2");

// In correct config - JDK 8 doesn't support TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
Set<String> ciphers_7 = Sets.newTreeSet();
Set<String> protocols_7 = Sets.newTreeSet();
protocols_7.add("TLSv1.2");
ciphers_7.add("TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384");

// Correct config - Atlease one of the Cipher Suite is supported
Set<String> ciphers_8 = Sets.newTreeSet();
Set<String> protocols_8 = Sets.newTreeSet();
protocols_8.add("TLSv1.2");
ciphers_8.add("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");
ciphers_8.add("TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384");

return new Object[][] { { ciphers_1, protocols_1, Boolean.FALSE }, { ciphers_2, protocols_2, Boolean.FALSE },
{ ciphers_3, protocols_3, Boolean.TRUE }, { ciphers_4, protocols_4, Boolean.TRUE },
{ ciphers_5, protocols_5, Boolean.TRUE }, { ciphers_6, protocols_6, Boolean.FALSE },
{ ciphers_7, protocols_7, Boolean.TRUE }, { ciphers_8, protocols_8, Boolean.FALSE }};
}

@BeforeMethod
@Override
protected void setup() throws Exception {
Expand Down Expand Up @@ -162,15 +218,14 @@ void startProxy() throws Exception {
* @throws Exception
*/
@Test
public void textProxyAuthorization() throws Exception {
public void testProxyAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);

startProxy();
createAdminClient();
final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
// create a client which connects to proxy over tls and pass authData
ClientConfiguration clientConf = new ClientConfiguration();
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientConf);
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, new ClientConfiguration());

String namespaceName = "my-property/proxy-authorization/my-ns";

Expand Down Expand Up @@ -215,7 +270,7 @@ public void textProxyAuthorization() throws Exception {
}

@Test(dataProvider = "hostnameVerification")
public void textTlsHostVerificationProxyToClient(boolean hostnameVerificationEnabled) throws Exception {
public void testTlsHostVerificationProxyToClient(boolean hostnameVerificationEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

startProxy();
Expand Down Expand Up @@ -266,7 +321,7 @@ public void textTlsHostVerificationProxyToClient(boolean hostnameVerificationEna
* @throws Exception
*/
@Test(dataProvider = "hostnameVerification")
public void textTlsHostVerificationProxyToBroker(boolean hostnameVerificationEnabled) throws Exception {
public void testTlsHostVerificationProxyToBroker(boolean hostnameVerificationEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

proxyConfig.setTlsHostnameVerificationEnabled(hostnameVerificationEnabled);
Expand Down Expand Up @@ -306,6 +361,85 @@ public void textTlsHostVerificationProxyToBroker(boolean hostnameVerificationEna
log.info("-- Exiting {} test --", methodName);
}

/*
* This test verifies whether the Client and Proxy honor the protocols and ciphers specified.
* Details description of test cases can be found in protocolsCiphersProviderCodecProvider
*/
@Test(dataProvider = "protocolsCiphersProvider", timeOut=5000)
public void tlsCiphersAndProtocols(Set<String> tlsCiphers, Set<String> tlsProtocols, boolean expectFailure) throws Exception {
log.info("-- Starting {} test --", methodName);
String namespaceName = "my-property/proxy-authorization/my-ns";
createAdminClient();

admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
admin.namespaces().createNamespace(namespaceName);

admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));

ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setAuthorizationEnabled(false);
proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
proxyConfig.setBrokerServiceURLTLS("pulsar://localhost:" + BROKER_PORT_TLS);

proxyConfig.setServicePort(PortManager.nextFreePort());
proxyConfig.setServicePortTls(PortManager.nextFreePort());
proxyConfig.setWebServicePort(PortManager.nextFreePort());
proxyConfig.setWebServicePortTls(PortManager.nextFreePort());
proxyConfig.setTlsEnabledInProxy(true);
proxyConfig.setTlsEnabledWithBroker(true);

// enable tls and auth&auth at proxy
proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
proxyConfig.setTlsTrustCertsFilePath(TLS_PROXY_TRUST_CERT_FILE_PATH);

proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(
"tlsCertFile:" + TLS_PROXY_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_PROXY_KEY_FILE_PATH);

Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
conf.setAuthenticationProviders(providers);
proxyConfig.setAuthenticationProviders(providers);
proxyConfig.setTlsProtocols(tlsProtocols);
proxyConfig.setTlsCiphers(tlsCiphers);
ProxyService proxyService = Mockito.spy(new ProxyService(proxyConfig));
proxyService.start();
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically((test) -> {
try {
return admin.namespaces().getPermissions(namespaceName).containsKey("Proxy")
&& admin.namespaces().getPermissions(namespaceName).containsKey("Client");
} catch (PulsarAdminException e) {
return false;
}
}, 3, 1000);
try {

final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
ClientConfiguration clientConf = new ClientConfiguration();
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientConf);
Consumer consumer = proxyClient.subscribe("persistent://my-property/proxy-authorization/my-ns/my-topic1",
"my-subscriber-name", new ConsumerConfiguration());

if (expectFailure) {
Assert.fail("Failure expected for this test case");
}
consumer.close();
proxyClient.close();
} catch (Exception ex) {
if (!expectFailure) {
Assert.fail("This test case should not fail");
}
}
admin.close();
log.info("-- Exiting {} test --", methodName);
}

protected final void createAdminClient() throws Exception {
Map<String, String> authParams = Maps.newHashMap();
authParams.put("tlsCertFile", TLS_SUPERUSER_CLIENT_CERT_FILE_PATH);
Expand Down

0 comments on commit b990674

Please sign in to comment.