Skip to content

Commit

Permalink
Fix ProxyPublishConsumeTlsTest (#496)
Browse files Browse the repository at this point in the history
  • Loading branch information
nkurihar authored and merlimat committed Sep 29, 2017
1 parent 6010db0 commit fe28023
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 91 deletions.
Expand Up @@ -131,7 +131,10 @@ protected final void init() throws Exception {
protected final void internalCleanup() throws Exception { protected final void internalCleanup() throws Exception {
try { try {
admin.close(); admin.close();
pulsarClient.close(); // There are some test cases where pulsarClient is not initialized.
if (pulsarClient != null) {
pulsarClient.close();
}
pulsar.close(); pulsar.close();
mockBookKeeper.reallyShutdow(); mockBookKeeper.reallyShutdow();
mockZookKeeper.shutdown(); mockZookKeeper.shutdown();
Expand Down
@@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.mockito.Mockito.spy;

import java.net.URI;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

public class TlsProducerConsumerBase extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(TlsProducerConsumerBase.class);

protected final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
protected final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
protected final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";

@BeforeMethod
@Override
protected void setup() throws Exception {

// TLS configuration for Broker
internalSetUpForBroker();

// Start Broker
super.init();
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

protected void internalSetUpForBroker() throws Exception {
conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setClusterName("use");
}

protected void internalSetUpForClient() throws Exception {
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
clientConf.setUseTls(true);
String lookupUrl = new URI("pulsar+ssl://localhost:" + BROKER_PORT_TLS).toString();
pulsarClient = PulsarClient.create(lookupUrl, clientConf);
}

protected void internalSetUpForNamespace() throws Exception {
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
clientConf.setUseTls(true);
admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
admin.clusters().createCluster("use",
new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT,
"pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");
}
}
Expand Up @@ -18,94 +18,37 @@
*/ */
package org.apache.pulsar.client.api; package org.apache.pulsar.client.api;


import static org.mockito.Mockito.spy;

import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.junit.Assert; import org.junit.Assert;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;


import com.google.common.collect.Lists; public class TlsProducerConsumerTest extends TlsProducerConsumerBase {
import com.google.common.collect.Sets; private static final Logger log = LoggerFactory.getLogger(TlsProducerConsumerTest.class);

public class TlsProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(AuthenticatedProducerConsumerTest.class);

private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";

@BeforeMethod
@Override
protected void setup() throws Exception {

conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);

conf.setClusterName("use");

super.init();
}

protected final void internalSetupForTls() throws Exception {

org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
clientConf.setUseTls(true);

admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
String lookupUrl = new URI("pulsar+ssl://localhost:" + BROKER_PORT_TLS).toString();
pulsarClient = PulsarClient.create(lookupUrl, clientConf);
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}


/** /**
* verifies that messages whose size is larger than 2^14 bytes (max size of single TLS chunk) can be * verifies that messages whose size is larger than 2^14 bytes (max size of single TLS chunk) can be
* produced/consumed * produced/consumed
* *
* @throws Exception * @throws Exception
*/ */
@Test @Test(timeOut = 30000)
public void testTlsLargeSizeMessage() throws Exception { public void testTlsLargeSizeMessage() throws Exception {
log.info("-- Starting {} test --", methodName); log.info("-- Starting {} test --", methodName);


final int MESSAGE_SIZE = 16 * 1024 + 1; final int MESSAGE_SIZE = 16 * 1024 + 1;
log.info("-- message size --", MESSAGE_SIZE); log.info("-- message size --", MESSAGE_SIZE);


internalSetupForTls(); internalSetUpForClient();

internalSetUpForNamespace();
admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");


ConsumerConfiguration conf = new ConsumerConfiguration(); ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive); conf.setSubscriptionType(SubscriptionType.Exclusive);
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name", Consumer consumer = pulsarClient
conf); .subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name", conf);


ProducerConfiguration producerConf = new ProducerConfiguration(); ProducerConfiguration producerConf = new ProducerConfiguration();


Expand Down
Expand Up @@ -23,19 +23,14 @@
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;


import java.net.URI; import java.net.URI;
import java.security.KeyManagementException; import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;

import org.apache.bookkeeper.test.PortManager; import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.client.api.TlsProducerConsumerTest; import org.apache.pulsar.client.api.TlsProducerConsumerBase;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.websocket.WebSocketService; import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.service.ProxyServer; import org.apache.pulsar.websocket.service.ProxyServer;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration; import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
Expand All @@ -51,23 +46,18 @@
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;


import io.netty.handler.ssl.util.InsecureTrustManagerFactory; public class ProxyPublishConsumeTlsTest extends TlsProducerConsumerBase {

public class ProxyPublishConsumeTls extends TlsProducerConsumerTest {
protected String methodName; protected String methodName;
private int port; private int port;
private int tlsPort; private int tlsPort;
private static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
private static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";


private ProxyServer proxyServer; private ProxyServer proxyServer;
private WebSocketService service; private WebSocketService service;


@BeforeMethod @BeforeMethod
public void setup() throws Exception { public void setup() throws Exception {
super.setup(); super.setup();

super.internalSetUpForNamespace();
this.internalSetupForTls();


port = PortManager.nextFreePort(); port = PortManager.nextFreePort();
tlsPort = PortManager.nextFreePort(); tlsPort = PortManager.nextFreePort();
Expand All @@ -77,6 +67,7 @@ public void setup() throws Exception {
config.setTlsEnabled(true); config.setTlsEnabled(true);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
config.setClusterName("use"); config.setClusterName("use");
config.setGlobalZookeeperServers("dummy-zk-servers"); config.setGlobalZookeeperServers("dummy-zk-servers");
service = spy(new WebSocketService(config)); service = spy(new WebSocketService(config));
Expand All @@ -95,20 +86,17 @@ protected void cleanup() throws Exception {


} }


@Test(timeOut=10000) @Test(timeOut = 30000)
public void socketTest() throws InterruptedException, NoSuchAlgorithmException, KeyManagementException { public void socketTest() throws InterruptedException, GeneralSecurityException {
String consumerUri = "wss://localhost:" + tlsPort + "/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub"; String consumerUri =
"wss://localhost:" + tlsPort + "/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub";
String producerUri = "wss://localhost:" + tlsPort + "/ws/producer/persistent/my-property/use/my-ns/my-topic/"; String producerUri = "wss://localhost:" + tlsPort + "/ws/producer/persistent/my-property/use/my-ns/my-topic/";
URI consumeUri = URI.create(consumerUri); URI consumeUri = URI.create(consumerUri);
URI produceUri = URI.create(producerUri); URI produceUri = URI.create(producerUri);


KeyManager[] keyManagers = null;
TrustManager[] trustManagers = InsecureTrustManagerFactory.INSTANCE.getTrustManagers();
SSLContext sslCtx = SSLContext.getInstance("TLS");
sslCtx.init(keyManagers, trustManagers, new SecureRandom());

SslContextFactory sslContextFactory = new SslContextFactory(); SslContextFactory sslContextFactory = new SslContextFactory();
sslContextFactory.setSslContext(sslCtx); sslContextFactory.setSslContext(SecurityUtility
.createSslContext(false, SecurityUtility.loadCertificatesFromPemFile(TLS_TRUST_CERT_FILE_PATH)));


WebSocketClient consumeClient = new WebSocketClient(sslContextFactory); WebSocketClient consumeClient = new WebSocketClient(sslContextFactory);
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
Expand All @@ -134,6 +122,7 @@ public void socketTest() throws InterruptedException, NoSuchAlgorithmException,
Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer()); Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
} catch (Throwable t) { } catch (Throwable t) {
log.error(t.getMessage()); log.error(t.getMessage());
Assert.fail(t.getMessage());
} finally { } finally {
ExecutorService executor = newFixedThreadPool(1); ExecutorService executor = newFixedThreadPool(1);
try { try {
Expand All @@ -153,5 +142,5 @@ public void socketTest() throws InterruptedException, NoSuchAlgorithmException,
} }
} }


private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTls.class); private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTlsTest.class);
} }

0 comments on commit fe28023

Please sign in to comment.