Skip to content

Commit

Permalink
[pulsar-common] add option that field can be empty (#3543)
Browse files Browse the repository at this point in the history
Revert "[pulsar-common] add option that field can be empty"

This reverts commit e1e62bbaf0448867d9ab6af1efb23a46e4de947a.

set optional param

fix tests

fix configs

fix test

fix test
  • Loading branch information
rdhabalia committed May 20, 2019
1 parent 6e51237 commit 93d3c5e
Show file tree
Hide file tree
Showing 61 changed files with 412 additions and 299 deletions.
Expand Up @@ -24,6 +24,7 @@
import java.nio.file.Files;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -225,8 +226,8 @@ void testAuthentication() throws Exception {

ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setServicePort(servicePort);
proxyConfig.setWebServicePort(webServicePort);
proxyConfig.setServicePort(Optional.of(servicePort));
proxyConfig.setWebServicePort(Optional.of(webServicePort));
proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT);
proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
Expand Down
Expand Up @@ -109,22 +109,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_SERVER,
doc = "The port for serving binary protobuf requests"
)
private Integer brokerServicePort = 6650;
private Optional<Integer> brokerServicePort = Optional.of(6650);
@FieldContext(
category = CATEGORY_SERVER,
doc = "The port for serving tls secured binary protobuf requests"
)
private Integer brokerServicePortTls = null;
private Optional<Integer> brokerServicePortTls = Optional.empty();
@FieldContext(
category = CATEGORY_SERVER,
doc = "The port for serving http requests"
)
private Integer webServicePort = 8080;
private Optional<Integer> webServicePort = Optional.of(8080);
@FieldContext(
category = CATEGORY_SERVER,
doc = "The port for serving https requests"
)
private Integer webServicePortTls = null;
private Optional<Integer> webServicePortTls = Optional.empty();

@FieldContext(
category = CATEGORY_SERVER,
Expand Down Expand Up @@ -1274,18 +1274,18 @@ public int getBookkeeperHealthCheckIntervalSec() {
}

public Optional<Integer> getBrokerServicePort() {
return Optional.ofNullable(brokerServicePort);
return brokerServicePort;
}

public Optional<Integer> getBrokerServicePortTls() {
return Optional.ofNullable(brokerServicePortTls);
return brokerServicePortTls;
}

public Optional<Integer> getWebServicePort() {
return Optional.ofNullable(webServicePort);
return webServicePort;
}

public Optional<Integer> getWebServicePortTls() {
return Optional.ofNullable(webServicePortTls);
return webServicePortTls;
}
}
}
Expand Up @@ -20,6 +20,7 @@

import static org.apache.pulsar.common.configuration.PulsarConfigurationLoader.isComplete;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand All @@ -32,6 +33,7 @@
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Optional;
import java.util.Properties;

import org.apache.bookkeeper.client.api.DigestType;
Expand All @@ -44,10 +46,10 @@ public class MockConfiguration implements PulsarConfiguration {

private String zookeeperServers = "localhost:2181";
private String configurationStoreServers = "localhost:2184";
private int brokerServicePort = 7650;
private int brokerServicePortTls = 7651;
private int webServicePort = 9080;
private int webServicePortTls = 9443;
private Optional<Integer> brokerServicePort = Optional.of(7650);
private Optional<Integer> brokerServicePortTls = Optional.of(7651);
private Optional<Integer> webServicePort = Optional.of(9080);
private Optional<Integer> webServicePortTls = Optional.of(9443);
private int notExistFieldInServiceConfig = 0;

@Override
Expand Down Expand Up @@ -102,6 +104,9 @@ public void testPulsarConfiguraitonLoadingStream() throws Exception {
printWriter.println("brokerClientAuthenticationParameters=role:my-role");
printWriter.println("superUserRoles=appid1,appid2");
printWriter.println("brokerServicePort=7777");
printWriter.println("brokerServicePortTls=8777");
printWriter.println("webServicePort=");
printWriter.println("webServicePortTls=");
printWriter.println("managedLedgerDefaultMarkDeleteRateLimit=5.0");
printWriter.println("managedLedgerDigestType=CRC32C");
printWriter.println("managedLedgerCacheSizeMB=");
Expand All @@ -116,6 +121,9 @@ public void testPulsarConfiguraitonLoadingStream() throws Exception {
assertEquals(serviceConfig.getClusterName(), "usc");
assertEquals(serviceConfig.getBrokerClientAuthenticationParameters(), "role:my-role");
assertEquals(serviceConfig.getBrokerServicePort().get(), new Integer(7777));
assertEquals(serviceConfig.getBrokerServicePortTls().get(), new Integer(8777));
assertFalse(serviceConfig.getWebServicePort().isPresent());
assertFalse(serviceConfig.getWebServicePortTls().isPresent());
assertEquals(serviceConfig.getManagedLedgerDigestType(), DigestType.CRC32C);
assertTrue(serviceConfig.getManagedLedgerCacheSizeMB() > 0);
}
Expand Down
Expand Up @@ -28,6 +28,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -81,12 +82,12 @@ void setup() throws Exception {
brokerNativeBrokerPorts[i] = PortManager.nextFreePort();

ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerServicePort(brokerNativeBrokerPorts[i]);
config.setBrokerServicePort(Optional.ofNullable(brokerNativeBrokerPorts[i]));
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
config.setWebServicePort(brokerWebServicePorts[i]);
config.setWebServicePort(Optional.ofNullable(brokerWebServicePorts[i]));
config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config.setBrokerServicePort(brokerNativeBrokerPorts[i]);
config.setBrokerServicePort(Optional.ofNullable(brokerNativeBrokerPorts[i]));
config.setDefaultNumberOfNamespaceBundles(1);
config.setLoadBalancerEnabled(false);
configurations[i] = config;
Expand Down
Expand Up @@ -140,8 +140,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS));
conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);

Expand Down
Expand Up @@ -19,6 +19,18 @@
package org.apache.pulsar.broker.admin;

import com.google.common.collect.ImmutableSet;

import java.util.List;
import java.util.Optional;
import java.security.cert.X509Certificate;
import javax.net.ssl.SSLContext;
import javax.ws.rs.NotAuthorizedException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;

import lombok.extern.slf4j.Slf4j;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -41,16 +53,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import javax.net.ssl.SSLContext;
import javax.ws.rs.NotAuthorizedException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import java.security.cert.X509Certificate;
import java.util.List;

@Slf4j
public class AdminApiTlsAuthTest extends MockedPulsarServiceBaseTest {

Expand All @@ -62,8 +64,8 @@ private static String getTLSFile(String name) {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS));
conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS));
conf.setTlsCertificateFilePath(getTLSFile("broker.cert"));
conf.setTlsKeyFilePath(getTLSFile("broker.key-pk8"));
conf.setTlsTrustCertsFilePath(getTLSFile("ca.cert"));
Expand Down
Expand Up @@ -19,6 +19,12 @@
package org.apache.pulsar.broker.admin;

import com.google.common.collect.ImmutableSet;

import java.util.Optional;

import static org.testng.Assert.fail;

import java.lang.reflect.Method;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -35,10 +41,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.lang.reflect.Method;

import static org.testng.Assert.fail;

@Slf4j
public class BrokerAdminClientTlsAuthTest extends MockedPulsarServiceBaseTest {
protected String methodName;
Expand All @@ -55,8 +57,8 @@ private static String getTLSFile(String name) {
@BeforeMethod
@Override
public void setup() throws Exception {
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS));
conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS));
buildConf(conf);
super.internalSetup();
}
Expand Down Expand Up @@ -109,10 +111,10 @@ public void testPersistentList() throws Exception {

/***** Start Broker 2 ******/
ServiceConfiguration conf = new ServiceConfiguration();
conf.setBrokerServicePort(PortManager.nextFreePort());
conf.setBrokerServicePortTls(PortManager.nextFreePort());
conf.setWebServicePort(PortManager.nextFreePort());
conf.setWebServicePortTls(PortManager.nextFreePort());
conf.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort()));
conf.setBrokerServicePortTls(Optional.ofNullable(PortManager.nextFreePort()));
conf.setWebServicePort(Optional.ofNullable(PortManager.nextFreePort()));
conf.setWebServicePortTls(Optional.ofNullable(PortManager.nextFreePort()));
conf.setAdvertisedAddress("localhost");
conf.setClusterName(this.conf.getClusterName());
conf.setZookeeperServers("localhost:2181");
Expand Down
Expand Up @@ -139,8 +139,8 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(BROKER_PORT_TLS);
conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
conf.setBrokerServicePortTls(Optional.of(BROKER_PORT_TLS));
conf.setWebServicePortTls(Optional.of(BROKER_WEBSERVICE_PORT_TLS));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);

Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -91,8 +92,10 @@ public MockedPulsarServiceBaseTest() {

protected void resetConfig() {
this.conf = new ServiceConfiguration();
this.conf.setBrokerServicePort(BROKER_PORT);
this.conf.setWebServicePort(BROKER_WEBSERVICE_PORT);
this.conf.setAdvertisedAddress("localhost");
this.conf.setBrokerServicePort(Optional.ofNullable(BROKER_PORT));
this.conf.setAdvertisedAddress("localhost");
this.conf.setWebServicePort(Optional.ofNullable(BROKER_WEBSERVICE_PORT));
this.conf.setClusterName(configClusterName);
this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate
this.conf.setManagedLedgerCacheSizeMB(8);
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -119,9 +120,9 @@ void setup() throws Exception {
ServiceConfiguration config1 = new ServiceConfiguration();
config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config1.setClusterName("use");
config1.setWebServicePort(PRIMARY_BROKER_WEBSERVICE_PORT);
config1.setWebServicePort(Optional.ofNullable(PRIMARY_BROKER_WEBSERVICE_PORT));
config1.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config1.setBrokerServicePort(PRIMARY_BROKER_PORT);
config1.setBrokerServicePort(Optional.ofNullable(PRIMARY_BROKER_PORT));
config1.setFailureDomainsEnabled(true);
config1.setLoadBalancerEnabled(true);
config1.setAdvertisedAddress("localhost");
Expand All @@ -138,9 +139,9 @@ void setup() throws Exception {
ServiceConfiguration config2 = new ServiceConfiguration();
config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config2.setClusterName("use");
config2.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT);
config2.setWebServicePort(Optional.ofNullable(SECONDARY_BROKER_WEBSERVICE_PORT));
config2.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config2.setBrokerServicePort(SECONDARY_BROKER_PORT);
config2.setBrokerServicePort(Optional.ofNullable(SECONDARY_BROKER_PORT));
config2.setFailureDomainsEnabled(true);
pulsar2 = new PulsarService(config2);
secondaryHost = String.format("%s:%d", "localhost",
Expand Down

0 comments on commit 93d3c5e

Please sign in to comment.