Skip to content

Commit

Permalink
1. Fix websocket tls bug. (#11243)
Browse files Browse the repository at this point in the history
2. Add ProxyServiceTlsStarterTest.
3. Refactor ProxyServiceStarter to be more easy to test.
  • Loading branch information
Technoboy- committed Jul 7, 2021
1 parent 620ee49 commit 887d74f
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 11 deletions.
Expand Up @@ -28,6 +28,7 @@

import com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
Expand Down Expand Up @@ -87,6 +88,10 @@ public class ProxyServiceStarter {

private ProxyConfiguration config;

private ProxyService proxyService;

private WebServer server;

public ProxyServiceStarter(String[] args) throws Exception {
try {

Expand Down Expand Up @@ -156,17 +161,12 @@ public void start() throws Exception {
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(config));
// create proxy service
ProxyService proxyService = new ProxyService(config, authenticationService);
proxyService = new ProxyService(config, authenticationService);
// create a web-service
final WebServer server = new WebServer(config, authenticationService);
server = new WebServer(config, authenticationService);

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
proxyService.close();
server.stop();
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
}
close();
}));

proxyService.start();
Expand Down Expand Up @@ -195,6 +195,19 @@ public double get() {
server.start();
}

public void close() {
try {
if(proxyService != null) {
proxyService.close();
}
if(server != null) {
server.stop();
}
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
}
}

public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
Expand Down Expand Up @@ -233,7 +246,9 @@ public static void addWebServerHandlers(WebServer server,
if (config.isWebSocketServiceEnabled()) {
// add WebSocket servlet
// Use local broker address to avoid different IP address when using a VIP for service discovery
WebSocketService webSocketService = new WebSocketService(createClusterData(config), PulsarConfigurationLoader.convertFrom(config));
ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(config);
serviceConfiguration.setBrokerClientTlsEnabled(config.isTlsEnabledWithBroker());
WebSocketService webSocketService = new WebSocketService(createClusterData(config), serviceConfiguration);
webSocketService.start();
final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService);
server.addServlet(WebSocketProducerServlet.SERVLET_PATH,
Expand Down
Expand Up @@ -48,12 +48,15 @@

public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {

static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"};

private ProxyServiceStarter serviceStarter;

@Override
@BeforeClass
protected void setup() throws Exception {
internalSetup();
String[] args = new String[]{"-c", "./src/test/resources/proxy.conf"};
ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args);
serviceStarter = new ProxyServiceStarter(ARGS);
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
serviceStarter.getConfig().setServicePort(Optional.of(11000));
Expand All @@ -65,6 +68,7 @@ protected void setup() throws Exception {
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
internalCleanup();
serviceStarter.close();
}

@Test
Expand Down
@@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;

import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;

import static org.apache.pulsar.proxy.server.ProxyServiceStarterTest.ARGS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {

private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
private ProxyServiceStarter serviceStarter;

@Override
@BeforeClass
protected void setup() throws Exception {
internalSetup();
serviceStarter = new ProxyServiceStarter(ARGS);
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
serviceStarter.getConfig().setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
serviceStarter.getConfig().setServicePortTls(Optional.of(11043));
serviceStarter.getConfig().setTlsEnabledWithBroker(true);
serviceStarter.getConfig().setWebSocketServiceEnabled(true);
serviceStarter.getConfig().setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
serviceStarter.getConfig().setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
serviceStarter.start();
}

protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
this.conf.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
}

@Override
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
internalCleanup();
serviceStarter.close();
}

@Test
public void testProducer() throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar+ssl://localhost:11043")
.allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
.build();

@Cleanup
Producer<byte[]> producer = client.newProducer()
.topic("persistent://sample/test/local/websocket-topic")
.create();

for (int i = 0; i < 10; i++) {
producer.send("test".getBytes());
}
}

@Test
public void testProduceAndConsumeMessageWithWebsocket() throws Exception {
HttpClient producerClient = new HttpClient();
WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient);
producerWebSocketClient.start();
MyWebSocket producerSocket = new MyWebSocket();
String produceUri = "ws://localhost:8080/ws/producer/persistent/sample/test/local/websocket-topic";
Future<Session> producerSession = producerWebSocketClient.connect(producerSocket, URI.create(produceUri));

ProducerMessage produceRequest = new ProducerMessage();
produceRequest.setContext("context");
produceRequest.setPayload(Base64.getEncoder().encodeToString("my payload".getBytes()));

HttpClient consumerClient = new HttpClient();
WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient);
consumerWebSocketClient.start();
MyWebSocket consumerSocket = new MyWebSocket();
String consumeUri = "ws://localhost:8080/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
Future<Session> consumerSession = consumerWebSocketClient.connect(consumerSocket, URI.create(consumeUri));
consumerSession.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
producerSession.get().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(produceRequest));
assertTrue(consumerSocket.getResponse().contains("ping"));
ProducerMessage message = ObjectMapperFactory.getThreadLocal().readValue(consumerSocket.getResponse(), ProducerMessage.class);
assertEquals(new String(Base64.getDecoder().decode(message.getPayload())), "my payload");
}

@WebSocket
public static class MyWebSocket extends WebSocketAdapter implements WebSocketPingPongListener {

ArrayBlockingQueue<String> incomingMessages = new ArrayBlockingQueue<>(10);

@Override
public void onWebSocketText(String message) {
incomingMessages.add(message);
}

@Override
public void onWebSocketClose(int i, String s) {
}

@Override
public void onWebSocketConnect(Session session) {
}

@Override
public void onWebSocketError(Throwable throwable) {
}

@Override
public void onWebSocketPing(ByteBuffer payload) {
}

@Override
public void onWebSocketPong(ByteBuffer payload) {
incomingMessages.add(BufferUtil.toDetailString(payload));
}

public String getResponse() throws InterruptedException {
return incomingMessages.take();
}
}

}

0 comments on commit 887d74f

Please sign in to comment.