From 7a2f0a4cc2a5c4ad7e58497f43d2d4e31ea68fd4 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Mon, 28 Nov 2022 23:51:16 -0600 Subject: [PATCH] Revert "[Discovery Service] Remove module and all its references (#12119)" This reverts commit 8c4c63082dd6ff9434cd7d3e62cd52d3e7b498dc. --- bin/pulsar | 10 + bin/pulsar-daemon | 4 + conf/discovery.conf | 90 +++++ distribution/server/pom.xml | 13 +- pom.xml | 2 + pulsar-broker/pom.xml | 7 + .../client/api/BrokerServiceLookupTest.java | 339 ++++++++++++++++++ .../service/web/DiscoveryServiceWebTest.java | 147 ++++++++ pulsar-discovery-service/pom.xml | 165 +++++++++ pulsar-discovery-service/readme.md | 60 ++++ .../service/BrokerDiscoveryProvider.java | 173 +++++++++ .../discovery/service/DiscoveryService.java | 225 ++++++++++++ .../discovery/service/ServerConnection.java | 175 +++++++++ .../service/ServiceChannelInitializer.java | 96 +++++ .../server/DiscoveryServiceStarter.java | 145 ++++++++ .../service/server/ServerManager.java | 172 +++++++++ .../service/server/ServiceConfig.java | 249 +++++++++++++ .../util/CmdGenerateDocumentation.java | 45 +++ .../service/web/DiscoveryServiceServlet.java | 188 ++++++++++ .../service/BaseDiscoveryTestSetup.java | 90 +++++ .../pulsar/discovery/service/CmdTest.java | 59 +++ .../service/DiscoveryServiceTest.java | 259 +++++++++++++ .../server/DiscoveryServiceStarterTest.java | 62 ++++ .../server/DiscoveryServiceWebTest.java | 119 ++++++ .../service/web/BaseZKStarterTest.java | 58 +++ .../service/web/DiscoveryServiceWebTest.java | 304 ++++++++++++++++ .../web/MetadataStoreCacheLoaderTest.java | 123 +++++++ .../src/test/resources/certificate/client.crt | 20 ++ .../src/test/resources/certificate/client.key | 28 ++ .../src/test/resources/certificate/server.crt | 20 ++ .../src/test/resources/certificate/server.key | 28 ++ site2/docs/deploy-bare-metal-multi-cluster.md | 22 ++ 32 files changed, 3490 insertions(+), 7 deletions(-) create mode 100644 conf/discovery.conf create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java create mode 100644 pulsar-discovery-service/pom.xml create mode 100644 pulsar-discovery-service/readme.md create mode 100644 pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java create mode 100644 pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java create mode 100644 pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java create mode 100644 pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java create mode 100644 pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarter.java create mode 100644 pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java create mode 100644 pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java create mode 100644 pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/util/CmdGenerateDocumentation.java create mode 100644 pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceServlet.java create mode 100644 pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java create mode 100644 pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/CmdTest.java create mode 100644 pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java create mode 100644 pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarterTest.java create mode 100644 pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceWebTest.java create mode 100644 pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/BaseZKStarterTest.java create mode 100644 pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java create mode 100644 pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/MetadataStoreCacheLoaderTest.java create mode 100644 pulsar-discovery-service/src/test/resources/certificate/client.crt create mode 100644 pulsar-discovery-service/src/test/resources/certificate/client.key create mode 100644 pulsar-discovery-service/src/test/resources/certificate/server.crt create mode 100644 pulsar-discovery-service/src/test/resources/certificate/server.key diff --git a/bin/pulsar b/bin/pulsar index 28738d84d5d86..a2255024b4818 100755 --- a/bin/pulsar +++ b/bin/pulsar @@ -29,6 +29,7 @@ DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf DEFAULT_ZK_CONF=$PULSAR_HOME/conf/zookeeper.conf DEFAULT_CONFIGURATION_STORE_CONF=$PULSAR_HOME/conf/global_zookeeper.conf +DEFAULT_DISCOVERY_CONF=$PULSAR_HOME/conf/discovery.conf DEFAULT_PROXY_CONF=$PULSAR_HOME/conf/proxy.conf DEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.conf DEFAULT_WEBSOCKET_CONF=$PULSAR_HOME/conf/websocket.conf @@ -57,6 +58,7 @@ where command is one of: bookie Run a bookie server zookeeper Run a zookeeper server configuration-store Run a configuration-store server + discovery Run a discovery server proxy Run a pulsar proxy websocket Run a web socket proxy server functions-worker Run a functions worker server @@ -86,6 +88,7 @@ Environment variables: PULSAR_BOOKKEEPER_CONF Configuration file for bookie (default: $DEFAULT_BOOKKEEPER_CONF) PULSAR_ZK_CONF Configuration file for zookeeper (default: $DEFAULT_ZK_CONF) PULSAR_CONFIGURATION_STORE_CONF Configuration file for global configuration store (default: $DEFAULT_CONFIGURATION_STORE_CONF) + PULSAR_DISCOVERY_CONF Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF) PULSAR_WEBSOCKET_CONF Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF) PULSAR_PROXY_CONF Configuration file for Pulsar proxy (default: $DEFAULT_PROXY_CONF) PULSAR_WORKER_CONF Configuration file for functions worker (default: $DEFAULT_WORKER_CONF) @@ -241,6 +244,10 @@ if [ -z "$PULSAR_CONFIGURATION_STORE_CONF" ]; then PULSAR_CONFIGURATION_STORE_CONF=$DEFAULT_CONFIGURATION_STORE_CONF fi +if [ -z "$PULSAR_DISCOVERY_CONF" ]; then + PULSAR_DISCOVERY_CONF=$DEFAULT_DISCOVERY_CONF +fi + if [ -z "$PULSAR_PROXY_CONF" ]; then PULSAR_PROXY_CONF=$DEFAULT_PROXY_CONF fi @@ -377,6 +384,9 @@ elif [ $COMMAND == "configuration-store" ]; then # Allow global ZK to turn into read-only mode when it cannot reach the quorum OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true" exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.zookeeper.server.quorum.QuorumPeerMain $PULSAR_CONFIGURATION_STORE_CONF $@ +elif [ $COMMAND == "discovery" ]; then + PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"discovery.log"} + exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.discovery.service.server.DiscoveryServiceStarter $PULSAR_DISCOVERY_CONF $@ elif [ $COMMAND == "proxy" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-proxy.log"} exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.proxy.server.ProxyServiceStarter --config $PULSAR_PROXY_CONF $@ diff --git a/bin/pulsar-daemon b/bin/pulsar-daemon index 210162b6a2190..75570fcfd31af 100755 --- a/bin/pulsar-daemon +++ b/bin/pulsar-daemon @@ -26,6 +26,7 @@ where command is one of: bookie Run a bookie server zookeeper Run a zookeeper server configuration-store Run a configuration-store server + discovery Run a discovery server websocket Run a websocket proxy server functions-worker Run a functions worker server standalone Run a standalone Pulsar service @@ -99,6 +100,9 @@ case $command in (configuration-store) echo "doing $startStop $command ..." ;; + (discovery) + echo "doing $startStop $command ..." + ;; (websocket) echo "doing $startStop $command ..." ;; diff --git a/conf/discovery.conf b/conf/discovery.conf new file mode 100644 index 0000000000000..76eb093c5c9c0 --- /dev/null +++ b/conf/discovery.conf @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Zookeeper quorum connection string (comma-separated) +zookeeperServers= + +# Configuration Store connection string (comma-separated) +configurationStoreServers= + +# ZooKeeper session timeout +zookeeperSessionTimeoutMs=30000 + +# ZooKeeper cache expiry time in seconds +zooKeeperCacheExpirySeconds=300 + +# Port to use to server binary-proto request +servicePort=6650 + +# Port to use to server binary-proto-tls request +servicePortTls= + +# Port that discovery service listen on +webServicePort=8080 + +# Port to use to server HTTPS request +webServicePortTls= + +# Control whether to bind directly on localhost rather than on normal hostname +bindOnLocalhost=false + +### --- Authentication --- ### + +# Enable authentication +authenticationEnabled=false + +# Authentication provider name list, which is comma separated list of class names (comma-separated) +authenticationProviders= + +# Enforce authorization +authorizationEnabled=false + +# Authorization provider name list, which is comma separated list of class names +authorizationProviders=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider + +# Role names that are treated as "super-user", meaning they will be able to do all admin +# operations and publish/consume from all topics (comma-separated) +superUserRoles= + +# Allow wildcard matching in authorization +# (wildcard matching only applicable if wildcard-char: +# * presents at first or last position eg: *.pulsar.service, pulsar.service.*) +authorizationAllowWildcardsMatching=false + +##### --- TLS --- ##### +# Deprecated - Use servicePortTls and webServicePortTls instead +tlsEnabled=false + +# Path for the TLS certificate file +tlsCertificateFilePath= + +# Path for the TLS private key file +tlsKeyFilePath= + +# Specify whether Client certificates are required for TLS +# Reject the Connection if the Client Certificate is not trusted. +tlsRequireTrustedClientCertOnConnect=false + +# Tls cert refresh duration in seconds (set 0 to check on every new connection) +tlsCertRefreshCheckDurationSec=300 + +### --- Deprecated config variables --- ### + +# Deprecated. Use configurationStoreServers +globalZookeeperServers= diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml index c147c532143e4..f670d989ad99b 100644 --- a/distribution/server/pom.xml +++ b/distribution/server/pom.xml @@ -40,6 +40,12 @@ ${project.version} + + ${project.groupId} + pulsar-discovery-service + ${project.version} + + ${project.groupId} pulsar-proxy @@ -175,13 +181,6 @@ simpleclient_log4j2 - - ${project.groupId} - bouncy-castle-bc - ${project.version} - pkg - - ${project.groupId} diff --git a/pom.xml b/pom.xml index 626c0b7ce46cf..abad9289f8040 100644 --- a/pom.xml +++ b/pom.xml @@ -2105,6 +2105,7 @@ flexible messaging model and an intuitive client API. pulsar-client-all pulsar-websocket pulsar-proxy + pulsar-discovery-service pulsar-testclient pulsar-broker-auth-athenz pulsar-client-auth-athenz @@ -2169,6 +2170,7 @@ flexible messaging model and an intuitive client API. pulsar-client-tools-test pulsar-websocket pulsar-proxy + pulsar-discovery-service pulsar-testclient pulsar-broker-auth-sasl pulsar-client-auth-sasl diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index 8d0f0e7547fc3..e6b7561c906b9 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -70,6 +70,13 @@ ${project.version} + + ${project.groupId} + pulsar-discovery-service + ${project.version} + test + + ${project.groupId} pulsar-websocket diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 53da440736208..5c2e29da17c3e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -47,6 +47,7 @@ import java.security.cert.Certificate; import java.util.ArrayList; import java.util.HashSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -78,14 +79,20 @@ import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.SecurityUtility; +import org.apache.pulsar.discovery.service.DiscoveryService; +import org.apache.pulsar.discovery.service.server.ServiceConfig; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.asynchttpclient.AsyncCompletionHandler; @@ -518,6 +525,326 @@ public void testWebserviceServiceTls() throws Exception { loadManager2 = null; } + /** + * Discovery-Service lookup over binary-protocol 1. Start discovery service 2. start broker 3. Create + * Producer/Consumer: by calling Discovery service for partitionedMetadata and topic lookup + * + * @throws Exception + */ + @Test + public void testDiscoveryLookup() throws Exception { + + // (1) start discovery service + ServiceConfig config = new ServiceConfig(); + config.setServicePort(Optional.of(0)); + config.setBindOnLocalhost(true); + + @Cleanup + DiscoveryService discoveryService = createAndStartDiscoveryService(config); + + // (2) lookup using discovery service + final String discoverySvcUrl = discoveryService.getServiceUrl(); + + @Cleanup + PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(discoverySvcUrl).build(); + Consumer consumer = pulsarClient2.newConsumer().topic("persistent://my-property2/use2/my-ns/my-topic1") + .subscriptionName("my-subscriber-name").subscribe(); + Producer producer = pulsarClient2.newProducer(Schema.BYTES) + .topic("persistent://my-property2/use2/my-ns/my-topic1") + .create(); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + Message msg = null; + Set messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + producer.close(); + + } + + /** + * Verify discovery-service binary-proto lookup using tls + * + * @throws Exception + */ + @SuppressWarnings("deprecation") + @Test + public void testDiscoveryLookupTls() throws Exception { + + final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt"; + final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key"; + final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt"; + final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key"; + + // (1) restart broker1 with tls enabled + conf.setBrokerServicePortTls(Optional.ofNullable(0)); + conf.setWebServicePortTls(Optional.ofNullable(0)); + conf.setTlsAllowInsecureConnection(true); + conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); + conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); + conf.setNumExecutorThreadPoolSize(5); + stopBroker(); + startBroker(); + + // (2) start discovery service + ServiceConfig config = new ServiceConfig(); + config.setServicePort(Optional.of(0)); + config.setServicePortTls(Optional.of(0)); + config.setBindOnLocalhost(true); + config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); + config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); + + @Cleanup + DiscoveryService discoveryService = createAndStartDiscoveryService(config); + + // (3) lookup using discovery service + final String discoverySvcUrl = discoveryService.getServiceUrlTls(); + + Map authParams = new HashMap<>(); + authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); + authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); + Authentication auth = new AuthenticationTls(); + auth.configure(authParams); + + @Cleanup + PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(discoverySvcUrl).authentication(auth) + .enableTls(true).allowTlsInsecureConnection(true).build(); + Consumer consumer = pulsarClient2.newConsumer().topic("persistent://my-property2/use2/my-ns/my-topic1") + .subscriptionName("my-subscriber-name").subscribe(); + Producer producer = pulsarClient2.newProducer(Schema.BYTES) + .topic("persistent://my-property2/use2/my-ns/my-topic1") + .create(); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + Message msg = null; + Set messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + producer.close(); + + } + + @Test + public void testDiscoveryLookupAuthAndAuthSuccess() throws Exception { + + // (1) start discovery service + ServiceConfig config = new ServiceConfig(); + config.setServicePort(Optional.of(0)); + config.setBindOnLocalhost(true); + // add Authentication Provider + Set providersClassNames = Sets.newHashSet(MockAuthenticationProvider.class.getName()); + config.setAuthenticationProviders(providersClassNames); + // enable authentication and authorization + config.setAuthenticationEnabled(true); + config.setAuthorizationEnabled(true); + config.setZookeeperServers("localhost:2181"); + config.setConfigurationStoreServers("localhost:3181"); + + @Cleanup + DiscoveryService discoveryService = createAndStartDiscoveryService(config); + + // (2) lookup using discovery service + final String discoverySvcUrl = discoveryService.getServiceUrl(); + // set authentication data + Authentication auth = new Authentication() { + private static final long serialVersionUID = 1L; + + @Override + public void close() throws IOException { + } + + @Override + public String getAuthMethodName() { + return "auth"; + } + + @Override + public AuthenticationDataProvider getAuthData() throws PulsarClientException { + return new AuthenticationDataProvider() { + private static final long serialVersionUID = 1L; + }; + } + + @Override + public void configure(Map authParams) { + } + + @Override + public void start() throws PulsarClientException { + } + }; + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(discoverySvcUrl).authentication(auth).build(); + Consumer consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("my-subscriber-name").subscribe(); + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic("persistent://my-property/use/my-ns/my-topic1") + .create(); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + Message msg = null; + Set messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + producer.close(); + } + + @Test + public void testDiscoveryLookupAuthenticationFailure() throws Exception { + + // (1) start discovery service + ServiceConfig config = new ServiceConfig(); + config.setServicePort(Optional.of(0)); + config.setBindOnLocalhost(true); + // set Authentication provider which fails authentication + Set providersClassNames = Sets.newHashSet(MockAuthenticationProviderFail.class.getName()); + config.setAuthenticationProviders(providersClassNames); + // enable authentication + config.setAuthenticationEnabled(true); + config.setAuthorizationEnabled(true); + + @Cleanup + DiscoveryService discoveryService = createAndStartDiscoveryService(config); + // (2) lookup using discovery service + final String discoverySvcUrl = discoveryService.getServiceUrl(); + + // set authentication data + Authentication auth = new Authentication() { + private static final long serialVersionUID = 1L; + + @Override + public void close() throws IOException { + } + + @Override + public String getAuthMethodName() { + return "auth"; + } + + @Override + public AuthenticationDataProvider getAuthData() throws PulsarClientException { + return new AuthenticationDataProvider() { + private static final long serialVersionUID = 1L; + }; + } + + @Override + public void configure(Map authParams) { + } + + @Override + public void start() throws PulsarClientException { + } + }; + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(discoverySvcUrl).authentication(auth) + .operationTimeout(1000, TimeUnit.MILLISECONDS).build(); + try { + pulsarClient.newConsumer().topic("persistent://my-property/use2/my-ns/my-topic1") + .subscriptionName("my-subscriber-name").subscribe(); + fail("should have failed due to authentication"); + } catch (PulsarClientException e) { + // Ok: expected + } + } + + @Test + public void testDiscoveryLookupAuthorizationFailure() throws Exception { + + // (1) start discovery service + ServiceConfig config = new ServiceConfig(); + config.setServicePort(Optional.of(0)); + config.setBindOnLocalhost(true); + // set Authentication provider which returns "invalid" appid so, authorization fails + Set providersClassNames = Sets.newHashSet(MockAuthorizationProviderFail.class.getName()); + config.setAuthenticationProviders(providersClassNames); + // enable authentication + config.setAuthenticationEnabled(true); + config.setAuthorizationEnabled(true); + + @Cleanup + DiscoveryService discoveryService = createAndStartDiscoveryService(config); + // (2) lookup using discovery service + final String discoverySvcUrl = discoveryService.getServiceUrl(); + + // set authentication data + Authentication auth = new Authentication() { + private static final long serialVersionUID = 1L; + + @Override + public void close() throws IOException { + } + + @Override + public String getAuthMethodName() { + return "auth"; + } + + @Override + public AuthenticationDataProvider getAuthData() throws PulsarClientException { + return new AuthenticationDataProvider() { + private static final long serialVersionUID = 1L; + }; + } + + @Override + public void configure(Map authParams) { + } + + @Override + public void start() throws PulsarClientException { + } + }; + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(discoverySvcUrl).authentication(auth) + .operationTimeout(1000, TimeUnit.MILLISECONDS).build(); + try { + pulsarClient.newConsumer().topic("persistent://my-property/use2/my-ns/my-topic1") + .subscriptionName("my-subscriber-name").subscribe(); + fail("should have failed due to authentication"); + } catch (PulsarClientException e) { + // Ok: expected + assertTrue(e instanceof PulsarClientException.LookupException); + } + } + /** * *
@@ -943,4 +1270,16 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
             return "invalid";
         }
     }
+
+    private DiscoveryService createAndStartDiscoveryService(ServiceConfig config) throws Exception {
+        MetadataStoreExtended localMetadatastore = new ZKMetadataStore(mockZooKeeper);
+        MetadataStoreExtended configMetadatastore = new ZKMetadataStore(mockZooKeeperGlobal);
+        DiscoveryService discoveryService = spy(new DiscoveryService(config));
+        doReturn(localMetadatastore).when(discoveryService).createLocalMetadataStore();
+        doReturn(configMetadatastore).when(discoveryService).createConfigurationMetadataStore();
+        doReturn(localMetadatastore).when(discoveryService).createLocalMetadataStore();
+        doReturn(configMetadatastore).when(discoveryService).createConfigurationMetadataStore();
+        discoveryService.start();
+        return discoveryService;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
new file mode 100644
index 0000000000000..217dc63e6380c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.discovery.service.web;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.policies.data.BundlesData;
+import org.apache.pulsar.discovery.service.server.ServerManager;
+import org.apache.pulsar.discovery.service.server.ServiceConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.logging.LoggingFeature;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-discovery")
+public class DiscoveryServiceWebTest extends ProducerConsumerBase {
+
+    private final Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class));
+    // DiscoveryServiceServlet gets initialized by a server and this map will help to retrieve ZK while mocking
+    // DiscoveryServiceServlet
+    private static final Map metadataStoreInstanceCache = Maps.newConcurrentMap();
+    private ServerManager server;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+        super.conf.setAuthorizationEnabled(true);
+        super.conf.setAuthenticationEnabled(true);
+
+        // start server
+        ServiceConfig config = new ServiceConfig();
+        config.setWebServicePort(Optional.of(0));
+        server = new ServerManager(config);
+        Map params = new TreeMap<>();
+        String zkServerUrl = "mockZkServerUrl";
+        metadataStoreInstanceCache.put(zkServerUrl, pulsar.createLocalMetadataStore());
+        params.put("zookeeperServers", zkServerUrl);
+        server.addServlet("/", DiscoveryServiceServletTest.class, params);
+        server.start();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        server.stop();
+        metadataStoreInstanceCache.clear();
+    }
+
+    /**
+     * 1. Start : Broker and Discovery service. 2. Provide started broker server as active broker to Discovery service
+     * 3. Call GET, PUT, POST request to discovery service that redirects to Broker service and receives response
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testRedirectUrlWithServerStarted() throws Exception {
+        String serviceUrl = server.getServiceUri().toString();
+        String putRequestUrl = serviceUrl + "admin/v2/namespaces/p1/n1";
+        String postRequestUrl = serviceUrl + "admin/v2/namespaces/p1/n1/replication";
+        String getRequestUrl = serviceUrl + "admin/v2/namespaces/p1";
+
+        /**
+         * verify : every time when vip receives a request: it redirects to above brokers sequentially and broker
+         * returns appropriate response which must not be null.
+         **/
+
+        assertEquals(hitBrokerService(HttpMethod.POST, postRequestUrl, Lists.newArrayList("use")),
+                "Need to authenticate to perform the request");
+        assertEquals(hitBrokerService(HttpMethod.PUT, putRequestUrl,
+                BundlesData.builder().numBundles(1).build()), "Need to authenticate to perform the request");
+        assertEquals(hitBrokerService(HttpMethod.GET, getRequestUrl, null), "Need to authenticate to perform the request");
+    }
+
+    public String hitBrokerService(String method, String url, Object data) throws JsonParseException {
+
+        Response response = null;
+        try {
+            WebTarget webTarget = client.target(url);
+            Invocation.Builder invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON);
+            if (HttpMethod.PUT.equals(method)) {
+                response = invocationBuilder.put(Entity.entity(data, MediaType.APPLICATION_JSON));
+            } else if (HttpMethod.GET.equals(method)) {
+                response = invocationBuilder.get();
+            } else if (HttpMethod.POST.equals(method)) {
+                response = invocationBuilder.post(Entity.entity(data, MediaType.APPLICATION_JSON));
+            } else {
+                fail("Unsupported http method");
+            }
+        } catch (Exception e) {
+            // fail
+            fail();
+        }
+
+        JsonObject jsonObject = new Gson().fromJson(response.readEntity(String.class), JsonObject.class);
+        return jsonObject.get("reason").getAsString();
+    }
+
+    public static class DiscoveryServiceServletTest extends DiscoveryServiceServlet {
+        @Override
+        public MetadataStoreExtended createLocalMetadataStore(String zookeeperServers, int operationimeoutMs) throws MetadataStoreException {
+            return metadataStoreInstanceCache.get(zookeeperServers);
+        }
+    }
+}
diff --git a/pulsar-discovery-service/pom.xml b/pulsar-discovery-service/pom.xml
new file mode 100644
index 0000000000000..d6972a022b70d
--- /dev/null
+++ b/pulsar-discovery-service/pom.xml
@@ -0,0 +1,165 @@
+
+
+
+  4.0.0
+  
+    org.apache.pulsar
+    pulsar
+    2.11.0-SNAPSHOT
+    ..
+  
+
+  pulsar-discovery-service
+  jar
+  Pulsar Discovery Service WAR
+
+  
+    
+      org.slf4j
+      slf4j-api
+    
+
+    
+      org.apache.commons
+      commons-lang3
+    
+
+    
+      ${project.groupId}
+      pulsar-zookeeper-utils
+      ${project.version}
+    
+
+    
+      ${project.groupId}
+      pulsar-common
+      ${project.version}
+    
+
+    
+      ${project.groupId}
+      pulsar-broker-common
+      ${project.version}
+    
+
+    
+      ${project.groupId}
+      bouncy-castle-bc
+      ${project.version}
+      pkg
+    
+
+    
+      org.eclipse.jetty
+      jetty-server
+    
+
+    
+      org.eclipse.jetty
+      jetty-alpn-conscrypt-server
+    
+
+    
+      org.eclipse.jetty
+      jetty-servlet
+    
+
+    
+      org.eclipse.jetty
+      jetty-servlets
+    
+
+    
+      org.glassfish.jersey.core
+      jersey-server
+    
+
+    
+      org.glassfish.jersey.containers
+      jersey-container-servlet-core
+    
+
+    
+      org.glassfish.jersey.media
+      jersey-media-json-jackson
+    
+
+    
+      jakarta.activation
+      jakarta.activation-api
+    
+
+    
+      com.fasterxml.jackson.jaxrs
+      jackson-jaxrs-json-provider
+    
+
+    
+      org.slf4j
+      jul-to-slf4j
+    
+
+    
+      com.google.guava
+      guava
+    
+
+    
+      io.swagger
+      swagger-annotations
+    
+
+    
+      org.glassfish.jersey.inject
+      jersey-hk2
+    
+
+    
+      javax.xml.bind
+      jaxb-api
+    
+
+    
+      com.sun.activation
+      javax.activation
+    
+
+    
+      ${project.groupId}
+      testmocks
+      ${project.version}
+      test
+    
+
+    
+      org.awaitility
+      awaitility
+      test
+    
+    
+      org.apache.logging.log4j
+      log4j-core
+    
+
+  
+
diff --git a/pulsar-discovery-service/readme.md b/pulsar-discovery-service/readme.md
new file mode 100644
index 0000000000000..67297a2bdb301
--- /dev/null
+++ b/pulsar-discovery-service/readme.md
@@ -0,0 +1,60 @@
+
+
+# Discovery service
+
+It keeps list of active available brokers and redirects all incoming requests to one of the broker in round-robin manner.
+
+## Deployment
+
+Discovery service module contains embedded jetty server and service can be started on it using following script:
+
+```
+mvn exec:java -Dexec.mainClass=org.apache.pulsar.discovery.service.server.DiscoveryServiceStarter -Dexec.args=$CONF_FILE
+```
+
+CONF_FILE should have following property
+```
+zookeeperServers=
+webServicePort=
+```
+
+
+
+## TEST
+
+After starting server: 
+
+Hit any broker service url by providing discovery service domain.
+
+Instead of calling individual broker url: 
+```
+http://broker-1:8080/admin/namespaces/{property}
+```
+ 
+call discovery service which redirects to one of the broker: 
+```
+http://pulsar-discovery-service:8080/admin/namespaces/{property}
+```
+Curl Example
+```
+curl -i -H "Accept: application/json" -H "Content-Type: application/json" -X GET http://pulsar-discovery:8080/admin/namespaces/{property} -L
+```
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
new file mode 100644
index 0000000000000..48752f3ae695e
--- /dev/null
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.discovery.service;
+
+import static org.apache.bookkeeper.util.MathUtils.signSafeMod;
+import com.google.common.base.Joiner;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.discovery.service.server.ServiceConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains available active broker list and returns next active broker in round-robin for discovery service.
+ *
+ */
+public class BrokerDiscoveryProvider implements Closeable {
+
+    final MetadataStoreCacheLoader metadataStoreCacheLoader;
+    private final AtomicInteger counter = new AtomicInteger();
+    private PulsarResources pulsarResources;
+
+    private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(4)
+            .name("pulsar-discovery-ordered").build();
+    private final ScheduledExecutorService scheduledExecutorScheduler = Executors.newScheduledThreadPool(4,
+            new DefaultThreadFactory("pulsar-discovery"));
+
+    private static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
+
+    public BrokerDiscoveryProvider(ServiceConfig config, PulsarResources pulsarResources) throws PulsarServerException {
+        try {
+            this.pulsarResources = pulsarResources;
+            this.metadataStoreCacheLoader = new MetadataStoreCacheLoader(pulsarResources,
+                    config.getZookeeperSessionTimeoutMs());
+        } catch (Exception e) {
+            LOG.error("Failed to start ZooKeeper {}", e.getMessage(), e);
+            throw new PulsarServerException("Failed to start zookeeper :" + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Find next broker {@link LoadManagerReport} in round-robin fashion.
+     *
+     * @return
+     * @throws PulsarServerException
+     */
+    LoadManagerReport nextBroker() throws PulsarServerException {
+        List availableBrokers = getAvailableBrokers();
+
+        if (availableBrokers.isEmpty()) {
+            throw new PulsarServerException("No active broker is available");
+        } else {
+            int brokersCount = availableBrokers.size();
+            int nextIdx = signSafeMod(counter.getAndIncrement(), brokersCount);
+            return availableBrokers.get(nextIdx);
+        }
+    }
+
+    List getAvailableBrokers() {
+        List availableBrokers = metadataStoreCacheLoader.getAvailableBrokers();
+        return availableBrokers;
+    }
+
+    CompletableFuture getPartitionedTopicMetadata(DiscoveryService service,
+            TopicName topicName, String role, AuthenticationDataSource authenticationData) {
+
+        CompletableFuture metadataFuture = new CompletableFuture<>();
+        try {
+            checkAuthorization(service, topicName, role, authenticationData);
+            // gets the number of partitions from the zk cache
+            pulsarResources.getNamespaceResources().getPartitionedTopicResources()
+                    .getPartitionedTopicMetadataAsync(topicName)
+                    .thenAccept(metadata -> {
+                // if the partitioned topic is not found in zk, then the topic
+                // is not partitioned
+                if (metadata.isPresent()) {
+                    metadataFuture.complete(metadata.get());
+                } else {
+                    metadataFuture.complete(new PartitionedTopicMetadata());
+                }
+            }).exceptionally(ex -> {
+                metadataFuture.completeExceptionally(ex);
+                return null;
+            });
+        } catch (Exception e) {
+            metadataFuture.completeExceptionally(e);
+        }
+        return metadataFuture;
+    }
+
+    protected static void checkAuthorization(DiscoveryService service, TopicName topicName, String role,
+            AuthenticationDataSource authenticationData)
+            throws Exception {
+        if (!service.getConfiguration().isAuthorizationEnabled()
+                || service.getConfiguration().getSuperUserRoles().contains(role)) {
+            // No enforcing of authorization policies
+            return;
+        }
+        // get zk policy manager
+        if (!service.getAuthorizationService().canLookup(topicName, role, authenticationData)) {
+            LOG.warn("[{}] Role {} is not allowed to lookup topic", topicName, role);
+            // check namespace authorization
+            TenantInfo tenantInfo;
+            try {
+                tenantInfo = service.getPulsarResources().getTenantResources()
+                        .getTenant(topicName.getTenant())
+                        .orElseThrow(() -> new IllegalAccessException("Property does not exist"));
+            } catch (NotFoundException e) {
+                LOG.warn("Failed to get property admin data for non existing property {}", topicName.getTenant());
+                throw new IllegalAccessException("Property does not exist");
+            } catch (Exception e) {
+                LOG.error("Failed to get property admin data for property");
+                throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s",
+                        topicName.getTenant(), e.getMessage()));
+            }
+            if (!service.getAuthorizationService()
+                    .isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) {
+                throw new IllegalAccessException("Don't have permission to administrate resources on this property");
+            }
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Successfully authorized {} on property {}", role, topicName.getTenant());
+        }
+    }
+
+    public static String path(String... parts) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("/admin/");
+        Joiner.on('/').appendTo(sb, parts);
+        return sb.toString();
+    }
+
+    @Override
+    public void close() throws IOException {
+        metadataStoreCacheLoader.close();
+        orderedExecutor.shutdown();
+        scheduledExecutorScheduler.shutdownNow();
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(BrokerDiscoveryProvider.class);
+}
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
new file mode 100644
index 0000000000000..864bcf6b84b4d
--- /dev/null
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.discovery.service;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import lombok.Getter;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.apache.pulsar.discovery.service.server.ServiceConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main discovery-service which starts component to serve incoming discovery-request over binary-proto channel and
+ * redirects to one of the active broker
+ *
+ */
+public class DiscoveryService implements Closeable {
+
+    private final ServiceConfig config;
+    private String serviceUrl;
+    private String serviceUrlTls;
+    private AuthenticationService authenticationService;
+    private AuthorizationService authorizationService;
+    private BrokerDiscoveryProvider discoveryProvider;
+    private final EventLoopGroup acceptorGroup;
+    private MetadataStoreExtended localMetadataStore;
+    private MetadataStoreExtended configMetadataStore;
+    @Getter
+    private PulsarResources pulsarResources;
+    @Getter
+    private final EventLoopGroup workerGroup;
+    private final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-discovery-acceptor");
+    private final DefaultThreadFactory workersThreadFactory = new DefaultThreadFactory("pulsar-discovery-io");
+    private final int numThreads = Runtime.getRuntime().availableProcessors();
+
+    private Channel channelListen;
+    private Channel channelListenTls;
+
+    public DiscoveryService(ServiceConfig serviceConfig) {
+        checkNotNull(serviceConfig);
+        this.config = serviceConfig;
+        this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, false, acceptorThreadFactory);
+        this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, false, workersThreadFactory);
+    }
+
+    /**
+     * Starts discovery service by initializing ZooKeeper and server
+     *
+     * @throws Exception
+     */
+    public void start() throws Exception {
+        localMetadataStore = createLocalMetadataStore();
+        configMetadataStore = createConfigurationMetadataStore();
+        pulsarResources = new PulsarResources(localMetadataStore, configMetadataStore);
+        discoveryProvider = new BrokerDiscoveryProvider(this.config, pulsarResources);
+        ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(config);
+        authenticationService = new AuthenticationService(serviceConfiguration);
+        authorizationService = new AuthorizationService(serviceConfiguration, pulsarResources);
+        startServer();
+    }
+
+    /**
+     * starts server to handle discovery-request from client-channel
+     *
+     * @throws Exception
+     */
+    public void startServer() throws Exception {
+
+        ServerBootstrap bootstrap = new ServerBootstrap();
+        bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
+        bootstrap.group(acceptorGroup, workerGroup);
+        bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
+        bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
+                new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
+        bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
+        EventLoopUtil.enableTriggeredMode(bootstrap);
+
+        bootstrap.childHandler(new ServiceChannelInitializer(this, config, false));
+        // Bind and start to accept incoming connections.
+
+        Preconditions.checkArgument(config.getServicePort().isPresent() || config.getServicePortTls().isPresent(),
+                "Either ServicePort or ServicePortTls should be configured.");
+
+        if (config.getServicePort().isPresent()) {
+            // Bind and start to accept incoming connections.
+            channelListen = bootstrap.bind(config.getServicePort().get()).sync().channel();
+            LOG.info("Started Pulsar Discovery service on {}", channelListen.localAddress());
+        }
+
+        if (config.getServicePortTls().isPresent()) {
+            ServerBootstrap tlsBootstrap = bootstrap.clone();
+            tlsBootstrap.childHandler(new ServiceChannelInitializer(this, config, true));
+            channelListenTls = tlsBootstrap.bind(config.getServicePortTls().get()).sync().channel();
+            LOG.info("Started Pulsar Discovery TLS service on port {}", channelListenTls.localAddress());
+        }
+
+        this.serviceUrl = serviceUrl();
+        this.serviceUrlTls = serviceUrlTls();
+    }
+
+    public BrokerDiscoveryProvider getDiscoveryProvider() {
+        return discoveryProvider;
+    }
+
+    public void close() throws IOException {
+        discoveryProvider.close();
+        acceptorGroup.shutdownGracefully();
+        workerGroup.shutdownGracefully();
+        try {
+            localMetadataStore.close();
+            configMetadataStore.close();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * Derive the host
+     *
+     * @return String containing the hostname
+     */
+    public String host() {
+        try {
+            if (!config.isBindOnLocalhost()) {
+                return InetAddress.getLocalHost().getHostName();
+            } else {
+                return "localhost";
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            throw new IllegalStateException("failed to find host", e);
+        }
+    }
+
+    public String serviceUrl() {
+        if (config.getServicePort().isPresent()) {
+            return new StringBuilder("pulsar://").append(host()).append(":")
+                    .append(((InetSocketAddress) channelListen.localAddress()).getPort())
+                    .toString();
+        } else {
+            return null;
+        }
+    }
+
+    public String serviceUrlTls() {
+        if (config.getServicePortTls().isPresent()) {
+            return new StringBuilder("pulsar+ssl://").append(host()).append(":")
+                    .append(((InetSocketAddress) channelListenTls.localAddress()).getPort())
+                    .toString();
+        } else {
+            return null;
+        }
+    }
+
+    public String getServiceUrl() {
+        return serviceUrl;
+    }
+
+    public String getServiceUrlTls() {
+        return serviceUrlTls;
+    }
+
+    public ServiceConfig getConfiguration() {
+        return config;
+    }
+
+    public AuthenticationService getAuthenticationService() {
+        return authenticationService;
+    }
+
+    public AuthorizationService getAuthorizationService() {
+        return authorizationService;
+    }
+
+    public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
+        return PulsarResources.createMetadataStore(config.getZookeeperServers(), config.getZookeeperSessionTimeoutMs());
+    }
+
+    public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
+        return PulsarResources.createMetadataStore(config.getConfigurationStoreServers(),
+                config.getZookeeperSessionTimeoutMs());
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(DiscoveryService.class);
+}
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
new file mode 100644
index 0000000000000..ebe2639b2c29c
--- /dev/null
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServerConnection.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.discovery.service;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType.Redirect;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+import javax.naming.AuthenticationException;
+import javax.net.ssl.SSLSession;
+
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.PulsarHandler;
+import org.apache.pulsar.common.api.proto.CommandConnect;
+import org.apache.pulsar.common.api.proto.CommandLookupTopic;
+import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
+import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.handler.ssl.SslHandler;
+
+/**
+ * Handles incoming discovery request from client and sends appropriate response back to client
+ *
+ */
+public class ServerConnection extends PulsarHandler {
+
+    private DiscoveryService service;
+    private String authRole = null;
+    private AuthenticationDataSource authenticationData = null;
+    private State state;
+    public static final String TLS_HANDLER = "tls";
+
+    enum State {
+        Start, Connected
+    }
+
+    public ServerConnection(DiscoveryService discoveryService) {
+        super(0, TimeUnit.SECONDS); // discovery-service doesn't need to run keepAlive task
+        this.service = discoveryService;
+        this.state = State.Start;
+    }
+
+    /**
+     * handles connect request and sends {@code State.Connected} ack to client
+     */
+    @Override
+    protected void handleConnect(CommandConnect connect) {
+        checkArgument(state == State.Start);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Received CONNECT from {}", remoteAddress);
+        }
+        if(service.getConfiguration().isAuthenticationEnabled()) {
+            try {
+                String authMethod = "none";
+                if (connect.hasAuthMethodName()) {
+                    authMethod = connect.getAuthMethodName();
+                } else if (connect.hasAuthMethod()) {
+                    // Legacy client is passing enum
+                    authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
+                }
+                String authData = new String(connect.getAuthData(), StandardCharsets.UTF_8);
+                ChannelHandler sslHandler = ctx.channel().pipeline().get(TLS_HANDLER);
+                SSLSession sslSession = null;
+                if (sslHandler != null) {
+                    sslSession = ((SslHandler) sslHandler).engine().getSession();
+                }
+                this.authenticationData = new AuthenticationDataCommand(authData, remoteAddress, sslSession);
+                authRole = service.getAuthenticationService()
+                        .authenticate(this.authenticationData, authMethod);
+                LOG.info("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod, authRole);
+            } catch (AuthenticationException e) {
+                String msg = "Unable to authenticate";
+                LOG.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage());
+                ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
+                close();
+                return;
+            }
+        }
+        ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
+        state = State.Connected;
+        setRemoteEndpointProtocolVersion(connect.getProtocolVersion());
+    }
+
+    @Override
+    protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
+        checkArgument(state == State.Connected);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Received PartitionMetadataLookup from {}", remoteAddress);
+        }
+        sendPartitionMetadataResponse(partitionMetadata);
+    }
+
+    /**
+     * handles discovery request from client ands sends next active broker address
+     */
+    @Override
+    protected void handleLookup(CommandLookupTopic lookup) {
+        checkArgument(state == State.Connected);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Received Lookup from {}", remoteAddress);
+        }
+        sendLookupResponse(lookup.getRequestId());
+    }
+
+    private void close() {
+        ctx.close();
+    }
+
+    private void sendLookupResponse(long requestId) {
+        try {
+            LoadManagerReport availableBroker = service.getDiscoveryProvider().nextBroker();
+            ctx.writeAndFlush(Commands.newLookupResponse(availableBroker.getPulsarServiceUrl(),
+                    availableBroker.getPulsarServiceUrlTls(), false, Redirect, requestId, false));
+        } catch (PulsarServerException e) {
+            LOG.warn("[{}] Failed to get next active broker {}", remoteAddress, e.getMessage(), e);
+            ctx.writeAndFlush(
+                    Commands.newLookupErrorResponse(ServerError.ServiceNotReady, e.getMessage(), requestId));
+        }
+    }
+
+    private void sendPartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata) {
+        final long requestId = partitionMetadata.getRequestId();
+        TopicName topicName = TopicName.get(partitionMetadata.getTopic());
+
+        service.getDiscoveryProvider()
+                .getPartitionedTopicMetadata(service, topicName, authRole, authenticationData)
+                .thenAccept(metadata -> {
+            if (LOG.isDebugEnabled()) {
+                        LOG.debug("[{}] Total number of partitions for topic {} is {}", authRole, topicName,
+                                metadata.partitions);
+            }
+            ctx.writeAndFlush(Commands.newPartitionMetadataResponse(metadata.partitions, requestId));
+        }).exceptionally(ex -> {
+                    LOG.warn("[{}] Failed to get partitioned metadata for topic {} {}", remoteAddress, topicName,
+                            ex.getMessage(), ex);
+            ctx.writeAndFlush(
+                    Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
+            return null;
+        });
+    }
+
+    @Override
+    protected boolean isHandshakeCompleted() {
+        return state == State.Connected;
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(ServerConnection.class);
+
+}
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
new file mode 100644
index 0000000000000..250259b611238
--- /dev/null
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/ServiceChannelInitializer.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.discovery.service;
+
+import io.netty.handler.ssl.SslHandler;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
+import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
+import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
+import org.apache.pulsar.discovery.service.server.ServiceConfig;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.ssl.SslContext;
+
+/**
+ * Initialize service channel handlers.
+ *
+ */
+public class ServiceChannelInitializer extends ChannelInitializer {
+
+    public static final String TLS_HANDLER = "tls";
+    private final DiscoveryService discoveryService;
+    private final boolean enableTls;
+    private final boolean tlsEnabledWithKeyStore;
+    private SslContextAutoRefreshBuilder sslCtxRefresher;
+    private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
+
+    public ServiceChannelInitializer(DiscoveryService discoveryService, ServiceConfig serviceConfig, boolean e)
+            throws Exception {
+        super();
+        this.discoveryService = discoveryService;
+        this.enableTls = e;
+        this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
+        if (this.enableTls) {
+            if (tlsEnabledWithKeyStore) {
+                nettySSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder(
+                        serviceConfig.getTlsProvider(),
+                        serviceConfig.getTlsKeyStoreType(),
+                        serviceConfig.getTlsKeyStore(),
+                        serviceConfig.getTlsKeyStorePassword(),
+                        serviceConfig.isTlsAllowInsecureConnection(),
+                        serviceConfig.getTlsTrustStoreType(),
+                        serviceConfig.getTlsTrustStore(),
+                        serviceConfig.getTlsTrustStorePassword(),
+                        serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+                        serviceConfig.getTlsCiphers(),
+                        serviceConfig.getTlsProtocols(),
+                        serviceConfig.getTlsCertRefreshCheckDurationSec());
+            } else {
+                sslCtxRefresher = new NettyServerSslContextBuilder(serviceConfig.isTlsAllowInsecureConnection(),
+                        serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
+                        serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
+                        serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
+                        serviceConfig.getTlsCertRefreshCheckDurationSec());
+            }
+        } else {
+            this.sslCtxRefresher = null;
+        }
+    }
+
+    @Override
+    protected void initChannel(SocketChannel ch) throws Exception {
+        if (sslCtxRefresher != null && this.enableTls) {
+            if (this.tlsEnabledWithKeyStore) {
+                ch.pipeline().addLast(TLS_HANDLER,
+                        new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
+            } else{
+                SslContext sslContext = sslCtxRefresher.get();
+                if (sslContext != null) {
+                    ch.pipeline().addLast(TLS_HANDLER, sslContext.newHandler(ch.alloc()));
+                }
+            }
+        }
+        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+            Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
+        ch.pipeline().addLast("handler", new ServerConnection(discoveryService));
+    }
+}
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarter.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarter.java
new file mode 100644
index 0000000000000..32851c07c6d66
--- /dev/null
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarter.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.discovery.service.server;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.slf4j.bridge.SLF4JBridgeHandler.install;
+import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.logging.log4j.core.util.datetime.FixedDateFormat;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.CmdGenerateDocs;
+import org.apache.pulsar.discovery.service.DiscoveryService;
+import org.apache.pulsar.discovery.service.web.DiscoveryServiceServlet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * Starts jetty server and initialize {@link DiscoveryServiceServlet} web-service
+ *
+ */
+public class DiscoveryServiceStarter {
+    private static class Arguments {
+        @Parameter(description = "config file")
+        private String configFile = "";
+
+        @Parameter(names = {"-h", "--help"}, description = "Show this help message")
+        private boolean help = false;
+
+        @Parameter(names = {"-g", "--generate-docs"}, description = "Generate docs")
+        private boolean generateDocs = false;
+    }
+
+    public static void checkConfig(ServiceConfig config) {
+        checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
+        checkArgument(!isEmpty(config.getConfigurationStoreServers()),  "configuration-store Servers must be provided");
+    }
+
+    public static void init(String configFile) throws Exception {
+        // setup handlers
+        removeHandlersForRootLogger();
+        install();
+
+        DateFormat dateFormat = new SimpleDateFormat(
+            FixedDateFormat.FixedFormat.ISO8601_OFFSET_DATE_TIME_HHMM.getPattern());
+        Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
+            System.out.println(String.format("%s [%s] error Uncaught exception in thread %s: %s", dateFormat.format(new Date()), thread.getContextClassLoader(), thread.getName(), exception.getMessage()));
+        });
+
+        // load config file
+        final ServiceConfig config = PulsarConfigurationLoader.create(configFile, ServiceConfig.class);
+        checkConfig(config);
+
+        // create Discovery service
+        DiscoveryService discoveryService = new DiscoveryService(config);
+        // create a web-service
+        final ServerManager server = new ServerManager(config);
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                try {
+                    discoveryService.close();
+                    server.stop();
+                } catch (Exception e) {
+                    log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
+                }
+            }
+        });
+
+        discoveryService.start();
+        startWebService(server, config);
+    }
+
+    protected static void startWebService(ServerManager server, ServiceConfig config) throws Exception {
+        // add servlet
+        Map initParameters = new TreeMap<>();
+        initParameters.put("zookeeperServers", config.getZookeeperServers());
+        server.addServlet("/*", DiscoveryServiceServlet.class, initParameters);
+
+        // start web-service
+        server.start();
+        log.info("Discovery service is started at {}", server.getServiceUri().toString());
+    }
+
+    public static void main(String[] args) throws Exception {
+        Arguments arguments = new Arguments();
+        JCommander jcommander = new JCommander();
+        try {
+            jcommander.addObject(arguments);
+            jcommander.parse(args);
+            if (arguments.help) {
+                jcommander.usage();
+                return;
+            }
+            if (arguments.generateDocs) {
+                CmdGenerateDocs cmd = new CmdGenerateDocs("pulsar");
+                cmd.addCommand("discovery", arguments);
+                cmd.run(null);
+                return;
+            }
+        } catch (Exception e) {
+            jcommander.usage();
+            return;
+        }
+
+        checkArgument(args.length == 1, "Need to specify a configuration file");
+
+        try {
+            // load config file and start server
+            init(args[0]);
+        } catch (Exception e) {
+            log.error("Failed to start discovery service.", e);
+            Runtime.getRuntime().halt(1);
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(DiscoveryServiceStarter.class);
+
+}
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
new file mode 100644
index 0000000000000..7360aed4d7f58
--- /dev/null
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServerManager.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.discovery.service.server;
+
+import com.google.common.collect.Lists;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import javax.servlet.Servlet;
+import org.apache.pulsar.broker.web.JettyRequestLogFactory;
+import org.apache.pulsar.common.util.RestException;
+import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.server.handler.DefaultHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.server.handler.RequestLogHandler;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages web-service startup/stop on jetty server.
+ *
+ */
+public class ServerManager {
+    private final Server server;
+    private final ExecutorThreadPool webServiceExecutor;
+    private final List handlers = Lists.newArrayList();
+
+    private final ServerConnector connector;
+    private final ServerConnector connectorTls;
+
+    public ServerManager(ServiceConfig config) {
+        this.webServiceExecutor = new ExecutorThreadPool();
+        this.webServiceExecutor.setName("pulsar-external-web");
+        this.server = new Server(webServiceExecutor);
+
+        List connectors = Lists.newArrayList();
+
+        if (config.getWebServicePort().isPresent()) {
+            connector = new ServerConnector(server, 1, 1);
+            connector.setPort(config.getWebServicePort().get());
+            connectors.add(connector);
+        } else {
+            connector = null;
+        }
+
+        if (config.getWebServicePortTls().isPresent()) {
+            try {
+                SslContextFactory sslCtxFactory;
+                if (config.isTlsEnabledWithKeyStore()) {
+                    sslCtxFactory = KeyStoreSSLContext.createSslContextFactory(
+                            config.getTlsProvider(),
+                            config.getTlsKeyStoreType(),
+                            config.getTlsKeyStore(),
+                            config.getTlsKeyStorePassword(),
+                            config.isTlsAllowInsecureConnection(),
+                            config.getTlsTrustStoreType(),
+                            config.getTlsTrustStore(),
+                            config.getTlsTrustStorePassword(),
+                            config.isTlsRequireTrustedClientCertOnConnect(),
+                            config.getTlsCertRefreshCheckDurationSec()
+                    );
+                } else {
+                    sslCtxFactory = SecurityUtility.createSslContextFactory(
+                            config.isTlsAllowInsecureConnection(),
+                            config.getTlsTrustCertsFilePath(),
+                            config.getTlsCertificateFilePath(),
+                            config.getTlsKeyFilePath(),
+                            config.isTlsRequireTrustedClientCertOnConnect(),
+                            true,
+                            config.getTlsCertRefreshCheckDurationSec());
+                }
+                connectorTls = new ServerConnector(server, 1, 1, sslCtxFactory);
+                connectorTls.setPort(config.getWebServicePortTls().get());
+                connectors.add(connectorTls);
+            } catch (Exception e) {
+                throw new RestException(e);
+            }
+        } else {
+            connectorTls = null;
+        }
+
+        // Limit number of concurrent HTTP connections to avoid getting out of file descriptors
+        connectors.stream().forEach(c -> c.setAcceptQueueSize(1024 / connectors.size()));
+        server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
+    }
+
+    public URI getServiceUri() {
+        return this.server.getURI();
+    }
+
+    public void addServlet(String path, Class servlet, Map initParameters) {
+        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+        context.setContextPath(path);
+
+        ServletHolder holder = new ServletHolder(servlet);
+        holder.setInitParameters(initParameters);
+        context.addServlet(holder, path);
+        handlers.add(context);
+    }
+
+    public void start() throws Exception {
+        RequestLogHandler requestLogHandler = new RequestLogHandler();
+        requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger());
+        handlers.add(0, new ContextHandlerCollection());
+        handlers.add(requestLogHandler);
+
+        ContextHandlerCollection contexts = new ContextHandlerCollection();
+        contexts.setHandlers(handlers.toArray(new Handler[handlers.size()]));
+
+        HandlerCollection handlerCollection = new HandlerCollection();
+        handlerCollection.setHandlers(new Handler[] { contexts, new DefaultHandler(), requestLogHandler });
+        server.setHandler(handlerCollection);
+
+        server.start();
+
+        log.info("Server started at end point {}", getServiceUri());
+    }
+
+    public void stop() throws Exception {
+        server.stop();
+        webServiceExecutor.stop();
+        log.info("Server stopped successfully");
+    }
+
+	public boolean isStarted() {
+		return server.isStarted();
+	}
+
+	public Optional getListenPortHTTP() {
+	    if (connector != null) {
+	        return Optional.of(connector.getLocalPort());
+	    } else {
+	        return Optional.empty();
+	    }
+	}
+
+	public Optional getListenPortHTTPS() {
+        if (connectorTls != null) {
+            return Optional.of(connectorTls.getLocalPort());
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ServerManager.class);
+}
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
new file mode 100644
index 0000000000000..06e55b62e6c5b
--- /dev/null
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.discovery.service.server;
+
+import io.swagger.annotations.ApiModelProperty;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+import lombok.Data;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.common.configuration.PulsarConfiguration;
+import org.apache.pulsar.discovery.service.web.DiscoveryServiceServlet;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Service Configuration to start :{@link DiscoveryServiceServlet}
+ *
+ */
+@Data
+public class ServiceConfig implements PulsarConfiguration {
+
+    @ApiModelProperty(
+            name = "zookeeperServers",
+            value = "Local ZooKeeper quorum connection string"
+    )
+    private String zookeeperServers;
+    // Global-Zookeeper quorum connection string
+    @Deprecated
+    private String globalZookeeperServers;
+
+    @ApiModelProperty(
+            name = "configurationStoreServers",
+            value = "Configuration store connection string"
+    )
+    private String configurationStoreServers;
+
+    @ApiModelProperty(
+            name = "zookeeperSessionTimeoutMs",
+            value = "ZooKeeper session timeout (in million seconds)"
+    )
+    private int zookeeperSessionTimeoutMs = 30_000;
+
+    @ApiModelProperty(
+            name = "zooKeeperCacheExpirySeconds",
+            value = "ZooKeeper cache expiry time (in seconds)"
+    )
+    private int zooKeeperCacheExpirySeconds=300;
+
+    @ApiModelProperty(
+            name = "servicePort",
+            value = "Port used to server binary-proto request"
+    )
+    private Optional servicePort = Optional.ofNullable(5000);
+
+    @ApiModelProperty(
+            name = "servicePortTls",
+            value = "Port used to server binary-proto-tls request"
+    )
+    private Optional servicePortTls = Optional.empty();
+
+    @ApiModelProperty(
+            name = "webServicePort",
+            value = "Port used to server HTTP request"
+    )
+    private Optional webServicePort = Optional.ofNullable(8080);
+
+    @ApiModelProperty(
+            name = "webServicePortTls",
+            value = "Port used to server HTTPS request"
+    )
+    private Optional webServicePortTls = Optional.empty();
+
+    @ApiModelProperty(
+            name = "bindOnLocalhost",
+            value = "Control whether to bind directly on localhost rather than on normal hostname"
+    )
+    private boolean bindOnLocalhost = false;
+
+    @ApiModelProperty(
+            name = "superUserRoles",
+            value = "Role names that are treated as \"super-user\", meaning they are able to "
+                    + "do all admin operations and publish to or consume from all topics"
+    )
+    private Set superUserRoles = Sets.newTreeSet();
+
+    @ApiModelProperty(
+            name = "authorizationAllowWildcardsMatching",
+            value = "Allow wildcard matching in authorization (wildcard matching only applicable "
+                    + "if wildcard char * presents at first or last position. "
+                    + "For example, *.pulsar.service, pulsar.service.*"
+    )
+    private boolean authorizationAllowWildcardsMatching = false;
+
+    @ApiModelProperty(
+            name = "authenticationEnabled",
+            value = "Whether enable authentication"
+    )
+    private boolean authenticationEnabled = false;
+
+    @ApiModelProperty(
+            name = "authenticationProviders",
+            value = "Authentication provider name list, which is a list of class names"
+    )
+    private Set authenticationProviders = Sets.newTreeSet();
+
+    @ApiModelProperty(
+            name = "authorizationEnabled",
+            value = "Whether enforce authorization"
+    )
+    private boolean authorizationEnabled = false;
+
+    @ApiModelProperty(
+            name = "authorizationProvider",
+            value = "Authorization provider fully qualified class-name"
+    )
+    private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
+
+    /***** --- TLS --- ****/
+    @Deprecated
+    private boolean tlsEnabled = false;
+
+    @ApiModelProperty(
+            name = "tlsCertRefreshCheckDurationSec",
+            value = "TLS cert refresh duration (in seconds). 0 means checking every new connection."
+    )
+    private long tlsCertRefreshCheckDurationSec = 300;
+
+    @ApiModelProperty(
+            name = "tlsCertificateFilePath",
+            value = "Path for the TLS certificate file"
+    )
+    private String tlsCertificateFilePath;
+
+    @ApiModelProperty(
+            name = "tlsKeyFilePath",
+            value = "Path for the TLS private key file"
+    )
+    private String tlsKeyFilePath;
+
+    @ApiModelProperty(
+            name = "tlsTrustCertsFilePath",
+            value = "Path for the trusted TLS certificate file"
+    )
+    private String tlsTrustCertsFilePath = "";
+
+    @ApiModelProperty(
+            name = "tlsAllowInsecureConnection",
+            value = "Accept untrusted TLS certificate from client"
+    )
+    private boolean tlsAllowInsecureConnection = false;
+
+    @ApiModelProperty(
+            name = "tlsProtocols",
+            value = "Specify the TLS protocols the broker uses to negotiate during TLS Handshake. "
+                    + "Example: [TLSv1.3, TLSv1.2]"
+    )
+    private Set tlsProtocols = Sets.newTreeSet();
+
+    @ApiModelProperty(
+            name = "tlsCiphers",
+            value = "Specify the tls cipher the broker will use to negotiate during TLS Handshake. "
+                    + "Example: [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]"
+    )
+    private Set tlsCiphers = Sets.newTreeSet();
+
+    @ApiModelProperty(
+            name = "tlsRequireTrustedClientCertOnConnect",
+            value = "Specify whether client certificates are required for TLS. "
+                    + "Reject the connection if the client certificate is not trusted."
+    )
+    private boolean tlsRequireTrustedClientCertOnConnect = false;
+
+    /***** --- TLS with KeyStore--- ****/
+    @ApiModelProperty(
+            name = "tlsEnabledWithKeyStore",
+            value = "Enable TLS with KeyStore type configuration in broker"
+    )
+    private boolean tlsEnabledWithKeyStore = false;
+
+    @ApiModelProperty(
+            name = "tlsProvider",
+            value = "Full class name of TLS Provider"
+    )
+    private String tlsProvider = null;
+
+    @ApiModelProperty(
+            name = "tlsKeyStoreType",
+            value = "TLS KeyStore type configurations in broker are JKS or PKCS12"
+    )
+    private String tlsKeyStoreType = "JKS";
+
+    @ApiModelProperty(
+            name = "tlsKeyStore",
+            value = "TLS KeyStore path in broker"
+    )
+    private String tlsKeyStore = null;
+
+    @ApiModelProperty(
+            name = "tlsKeyStorePassword",
+            value = "TLS KeyStore password in broker"
+    )
+    private String tlsKeyStorePassword = null;
+
+    @ApiModelProperty(
+            name = "tlsTrustStoreType",
+            value = "TLS TrustStore type configuration in broker are JKS or PKCS12"
+    )
+    private String tlsTrustStoreType = "JKS";
+
+    @ApiModelProperty(
+            name = "tlsTrustStore",
+            value = "TLS TrustStore path in broker"
+    )
+    private String tlsTrustStore = null;
+
+    @ApiModelProperty(
+            name = "tlsTrustStorePassword",
+            value = "TLS TrustStore password in broker"
+    )
+    private String tlsTrustStorePassword = null;
+
+    @ApiModelProperty(
+            name = "properties",
+            value = "You can store string in key-value format"
+    )
+    private Properties properties = new Properties();
+
+    public String getConfigurationStoreServers() {
+        return null == configurationStoreServers ? getGlobalZookeeperServers() : configurationStoreServers;
+    }
+}
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/util/CmdGenerateDocumentation.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/util/CmdGenerateDocumentation.java
new file mode 100644
index 0000000000000..484bdfcd6ac5f
--- /dev/null
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/util/CmdGenerateDocumentation.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.discovery.service.util;
+
+import com.beust.jcommander.Parameters;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BaseGenerateDocumentation;
+import org.apache.pulsar.discovery.service.server.ServiceConfig;
+
+@Data
+@Parameters(commandDescription = "Generate documentation automatically.")
+@Slf4j
+public class CmdGenerateDocumentation extends BaseGenerateDocumentation {
+
+    @Override
+    public String generateDocumentByClassName(String className) throws Exception {
+        StringBuilder sb = new StringBuilder();
+        if (ServiceConfig.class.getName().equals(className)) {
+            return generateDocByApiModelProperty(className, "Service discovery", sb);
+        }
+        return "Class [" + className + "] not found";
+    }
+
+    public static void main(String[] args) throws Exception {
+        CmdGenerateDocumentation generateDocumentation = new CmdGenerateDocumentation();
+        generateDocumentation.run(args);
+    }
+}
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceServlet.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceServlet.java
new file mode 100644
index 0000000000000..3c75d95dfa3f5
--- /dev/null
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceServlet.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.discovery.service.web;
+
+import static org.apache.bookkeeper.util.MathUtils.signSafeMod;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.common.util.RestException;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Acts a load-balancer that receives any incoming request and discover active-available broker in round-robin manner
+ * and redirect request to that broker.
+ * 

+ * Accepts any {@value GET, PUT, POST} request and redirects to available broker-server to serve the request + *

+ * + */ +public class DiscoveryServiceServlet extends HttpServlet { + + private static final long serialVersionUID = 1L; + + private final AtomicInteger counter = new AtomicInteger(); + + private MetadataStoreExtended localMetadataStore; + private MetadataStoreExtended configMetadataStore; + private PulsarResources pulsarResources; + private MetadataStoreCacheLoader metadataStoreCacheLoader; + + @Override + public void init(ServletConfig config) throws ServletException { + log.info("Initializing DiscoveryServiceServlet resource"); + + String zookeeperServers = config.getInitParameter("zookeeperServers"); + String zookeeperSessionTimeoutMsStr = config.getInitParameter("zookeeperSessionTimeoutMs"); + int zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMsStr != null + ? Integer.valueOf(zookeeperSessionTimeoutMsStr) + : 30_000; + + log.info("zookeeperServers={}", zookeeperServers); + + try { + localMetadataStore = createLocalMetadataStore(zookeeperServers, zookeeperSessionTimeoutMs); + pulsarResources = new PulsarResources(localMetadataStore, configMetadataStore); + metadataStoreCacheLoader = new MetadataStoreCacheLoader(pulsarResources, zookeeperSessionTimeoutMs); + } catch (Throwable t) { + throw new ServletException(t); + } + } + + @Override + public void destroy() { + try { + localMetadataStore.close(); + } catch (Exception e) { + log.warn("Failed to close the metadata-store {}", e.getMessage()); + } + try { + metadataStoreCacheLoader.close(); + } catch (IOException e) { + log.warn("Failed to close the metadataStore-cache {}", e.getMessage()); + } + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + redirect(req, resp); + } + + @Override + protected void doHead(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + redirect(req, resp); + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + redirect(req, resp); + } + + @Override + protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + redirect(req, resp); + } + + @Override + protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + redirect(req, resp); + } + + @Override + protected void doOptions(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + redirect(req, resp); + } + + @Override + protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + redirect(req, resp); + } + + private void redirect(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + + try { + LoadManagerReport broker = nextBroker(); + + URI brokerURI; + if (request.getScheme().equals("http")) { + // Use normal HTTP url + brokerURI = new URI(broker.getWebServiceUrl()); + } else { + brokerURI = new URI(broker.getWebServiceUrlTls()); + } + + StringBuilder location = new StringBuilder(); + location.append(brokerURI.getScheme()).append("://").append(brokerURI.getHost()).append(':') + .append(brokerURI.getPort()).append(request.getRequestURI()); + if (request.getQueryString() != null) { + location.append('?').append(request.getQueryString()); + } + if (log.isDebugEnabled()) { + log.debug("Redirecting to {}", location); + } + + response.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT); + response.setHeader("Location", location.toString()); + } catch (URISyntaxException e) { + log.warn("No broker found in zookeeper {}", e.getMessage(), e); + throw new RestException(Status.SERVICE_UNAVAILABLE, "Broker is not available"); + } + } + + /** + * Find next broker url in round-robin + * + * @return + */ + LoadManagerReport nextBroker() { + List availableBrokers = metadataStoreCacheLoader.getAvailableBrokers(); + + if (availableBrokers.isEmpty()) { + throw new RestException(Status.SERVICE_UNAVAILABLE, "No active broker is available"); + } else { + int brokersCount = availableBrokers.size(); + int nextIdx = signSafeMod(counter.getAndIncrement(), brokersCount); + return availableBrokers.get(nextIdx); + } + } + + public MetadataStoreExtended createLocalMetadataStore(String zookeeperServers, int operationimeoutMs) throws MetadataStoreException { + return PulsarResources.createMetadataStore(zookeeperServers, operationimeoutMs); + } + + private static final Logger log = LoggerFactory.getLogger(DiscoveryServiceServlet.class); +} diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java new file mode 100644 index 0000000000000..70fc26fa94fad --- /dev/null +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.discovery.service; + +import static org.apache.pulsar.broker.resources.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +import com.google.common.util.concurrent.MoreExecutors; + +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +import org.apache.pulsar.discovery.service.server.ServiceConfig; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.apache.zookeeper.MockZooKeeper; +import org.apache.zookeeper.KeeperException.Code; + + +public class BaseDiscoveryTestSetup { + + protected ServiceConfig config; + protected DiscoveryService service; + private MockZooKeeper mockZooKeeper; + protected MetadataStoreExtended zkStore; + private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt"; + private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key"; + + protected void setup() throws Exception { + config = new ServiceConfig(); + config.setServicePort(Optional.of(0)); + config.setServicePortTls(Optional.of(0)); + config.setBindOnLocalhost(true); + + config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); + config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); + + mockZooKeeper = createMockZooKeeper(); + zkStore = new ZKMetadataStore(mockZooKeeper); + zkStore.put(LOADBALANCE_BROKERS_ROOT, "".getBytes(StandardCharsets.UTF_8), + Optional.of(-1L)); + service = spy(new DiscoveryService(config)); + doReturn(new ZKMetadataStore(mockZooKeeper)).when(service).createLocalMetadataStore(); + doReturn(new ZKMetadataStore(mockZooKeeper)).when(service).createConfigurationMetadataStore(); + service.start(); + + } + + protected void cleanup() throws Exception { + mockZooKeeper.shutdown(); + zkStore.close(); + service.close(); + } + + protected MockZooKeeper createMockZooKeeper() throws Exception { + MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService()); + return zk; + } + + protected void simulateStoreError(String string, Code sessionexpired) { + mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> { + return op == MockZooKeeper.Op.GET + && path.equals("/admin/partitioned-topics/test/local/ns/persistent/my-topic-2"); + }); + } + + protected void simulateStoreErrorForNonPersistentTopic(String string, Code sessionexpired) { + mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> { + return op == MockZooKeeper.Op.GET + && path.equals("/admin/partitioned-topics/test/local/ns/non-persistent/my-topic-2"); + }); + } +} diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/CmdTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/CmdTest.java new file mode 100644 index 0000000000000..5c81289f0ea5a --- /dev/null +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/CmdTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.discovery.service; + +import static org.testng.Assert.assertTrue; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.lang.reflect.Field; +import org.apache.pulsar.common.configuration.FieldContext; +import org.apache.pulsar.discovery.service.util.CmdGenerateDocumentation; +import org.testng.annotations.Test; + +public class CmdTest { + + @Test + public void cmdParserServiceConfigTest() throws Exception { + String value = generateDoc("org.apache.pulsar.discovery.service.server.ServiceConfig"); + assertTrue(value.contains("Service discovery")); + } + + private String generateDoc(String clazz) throws Exception { + PrintStream oldStream = System.out; + try (ByteArrayOutputStream baoStream = new ByteArrayOutputStream(2048); + PrintStream cacheStream = new PrintStream(baoStream);) { + System.setOut(cacheStream); + CmdGenerateDocumentation.main(("-c " + clazz).split(" ")); + String message = baoStream.toString(); + Class cls = Class.forName(clazz); + Field[] fields = cls.getDeclaredFields(); + for (Field field : fields) { + field.setAccessible(true); + FieldContext fieldContext = field.getAnnotation(FieldContext.class); + if (fieldContext == null) { + continue; + } + assertTrue(message.indexOf(field.getName()) > 0); + } + return message; + } finally { + System.setOut(oldStream); + } + } +} diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java new file mode 100644 index 0000000000000..e7cd2fb3f6239 --- /dev/null +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.discovery.service; + +import static org.apache.pulsar.broker.resources.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.fail; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.security.PrivateKey; +import java.security.cert.X509Certificate; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.common.util.SecurityUtility; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.policies.data.loadbalancer.LoadReport; +import org.apache.zookeeper.KeeperException.Code; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.awaitility.Awaitility; + +public class DiscoveryServiceTest extends BaseDiscoveryTestSetup { + + private static final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt"; + private static final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key"; + + @BeforeMethod + private void init() throws Exception { + super.setup(); + } + + @AfterMethod(alwaysRun = true) + private void clean() throws Exception { + super.cleanup(); + } + + /** + * Verifies: Discovery-service returns broker is round-robin manner + * + * @throws Exception + */ + @Test + public void testBrokerDiscoveryRoundRobin() throws Exception { + addBrokerToZk(5); + String prevUrl = null; + BrokerDiscoveryProvider discoveryProvider = service.getDiscoveryProvider(); + for (int i = 0; i < 10; i++) { + String current = discoveryProvider.nextBroker().getPulsarServiceUrl(); + assertNotEquals(prevUrl, current, "unexpected " + current + " vs " + prevUrl + ", available " + discoveryProvider.getAvailableBrokers()); + prevUrl = current; + } + } + + @Test + public void testGetPartitionsMetadata() throws Exception { + TopicName topic1 = TopicName.get("persistent://test/local/ns/my-topic-1"); + + PartitionedTopicMetadata m = service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topic1, "role", null) + .get(); + assertEquals(m.partitions, 0); + + // Simulate ZK error + simulateStoreError("/admin/partitioned-topics/test/local/ns/persistent/my-topic-2", Code.SESSIONEXPIRED); + TopicName topic2 = TopicName.get("persistent://test/local/ns/my-topic-2"); + CompletableFuture future = service.getDiscoveryProvider() + .getPartitionedTopicMetadata(service, topic2, "role", null); + try { + future.get(); + fail("Partition metadata lookup should have failed"); + } catch (ExecutionException e) { + assertEquals(e.getCause().getClass(), MetadataStoreException.class); + } + } + + @Test + public void testGetPartitionsMetadataForNonPersistentTopic() throws Exception { + TopicName topic1 = TopicName.get("non-persistent://test/local/ns/my-topic-1"); + + PartitionedTopicMetadata m = service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topic1, "role", null) + .get(); + assertEquals(m.partitions, 0); + + // Simulate ZK error + simulateStoreErrorForNonPersistentTopic("/admin/partitioned-topics/test/local/ns/non-persistent/my-topic-2", Code.SESSIONEXPIRED); + TopicName topic2 = TopicName.get("non-persistent://test/local/ns/my-topic-2"); + CompletableFuture future = service.getDiscoveryProvider() + .getPartitionedTopicMetadata(service, topic2, "role", null); + try { + future.get(); + fail("Partition metadata lookup should have failed"); + } catch (ExecutionException e) { + assertEquals(e.getCause().getClass(), MetadataStoreException.class); + } + } + + /** + * It verifies: client connects to Discovery-service and receives discovery response successfully. + * + * @throws Exception + */ + @Test + public void testClientServerConnection() throws Exception { + addBrokerToZk(2); + + final CompletableFuture promise = new CompletableFuture<>(); + NioEventLoopGroup workerGroup = connectToService(service.getServiceUrl(), promise, false); + assertEquals(promise.get(10, TimeUnit.SECONDS).getType(), BaseCommand.Type.CONNECTED); + workerGroup.shutdownGracefully(); + } + + @Test + public void testClientServerConnectionTls() throws Exception { + addBrokerToZk(2); + + final CompletableFuture promise = new CompletableFuture<>(); + NioEventLoopGroup workerGroup = connectToService(service.getServiceUrlTls(), promise, true); + assertEquals(promise.get(10, TimeUnit.SECONDS).getType(), BaseCommand.Type.CONNECTED); + workerGroup.shutdownGracefully(); + } + + /** + * creates ClientHandler channel to connect and communicate with server + * + * @param serviceUrl + * @param promise + * @param tls + * @return + * @throws URISyntaxException + */ + public static NioEventLoopGroup connectToService(String serviceUrl, + CompletableFuture promise, + boolean tls) + throws URISyntaxException { + NioEventLoopGroup workerGroup = new NioEventLoopGroup(); + Bootstrap b = new Bootstrap(); + b.group(workerGroup); + b.channel(NioSocketChannel.class); + + b.handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + if (tls) { + SslContextBuilder builder = SslContextBuilder.forClient(); + builder.trustManager(InsecureTrustManagerFactory.INSTANCE); + X509Certificate[] certificates = SecurityUtility + .loadCertificatesFromPemFile(TLS_CLIENT_CERT_FILE_PATH); + PrivateKey privateKey = SecurityUtility.loadPrivateKeyFromPemFile(TLS_CLIENT_KEY_FILE_PATH); + builder.keyManager(privateKey, (X509Certificate[]) certificates); + SslContext sslCtx = builder.build(); + ch.pipeline().addLast("tls", sslCtx.newHandler(ch.alloc())); + } + ch.pipeline().addLast(new ClientHandler(promise)); + } + }); + URI uri = new URI(serviceUrl); + InetSocketAddress serviceAddress = new InetSocketAddress(uri.getHost(), uri.getPort()); + b.connect(serviceAddress).addListener((ChannelFuture future) -> { + if (!future.isSuccess()) { + promise.completeExceptionally(future.cause()); + } + }); + return workerGroup; + } + + static class ClientHandler extends ChannelInboundHandlerAdapter { + + final CompletableFuture promise; + + public ClientHandler(CompletableFuture promise) { + this.promise = promise; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + try { + ByteBuf buffer = (ByteBuf) msg; + buffer.readUnsignedInt(); // discard frame length + int cmdSize = (int) buffer.readUnsignedInt(); + BaseCommand cmd = new BaseCommand(); + cmd.parseFrom(buffer, cmdSize); + buffer.release(); + + promise.complete(cmd); + } catch (Exception e) { + promise.completeExceptionally(e); + } + ctx.close(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + promise.completeExceptionally(cause); + ctx.close(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + ctx.writeAndFlush(Commands.newConnect("", "", null)); + } + + } + + private void addBrokerToZk(int number) throws Exception { + + for (int i = 0; i < number; i++) { + LoadReport report = new LoadReport(null, null, "pulsar://broker-:15000" + i, null); + String reportData = ObjectMapperFactory.getThreadLocal().writeValueAsString(report); + zkStore.put(LOADBALANCE_BROKERS_ROOT + "/" + "broker-" + i, + reportData.getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).get(); + } + + Awaitility.await().until(() + -> service.getDiscoveryProvider().getAvailableBrokers().size() == number); + } + +} diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarterTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarterTest.java new file mode 100644 index 0000000000000..95d40776a0712 --- /dev/null +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarterTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.discovery.service.server; + +import static org.testng.Assert.assertTrue; +import com.beust.jcommander.Parameter; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.lang.reflect.Field; +import java.util.Arrays; +import org.testng.annotations.Test; + +public class DiscoveryServiceStarterTest { + @Test + public void testMainGenerateDocs() throws Exception { + PrintStream oldStream = System.out; + try { + ByteArrayOutputStream baoStream = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baoStream)); + + Class argumentsClass = + Class.forName("org.apache.pulsar.discovery.service.server.DiscoveryServiceStarter$Arguments"); + + DiscoveryServiceStarter.main(new String[]{"-g"}); + + String message = baoStream.toString(); + + Field[] fields = argumentsClass.getDeclaredFields(); + for (Field field : fields) { + boolean fieldHasAnno = field.isAnnotationPresent(Parameter.class); + if (fieldHasAnno) { + Parameter fieldAnno = field.getAnnotation(Parameter.class); + String[] names = fieldAnno.names(); + if (names.length == 0) { + continue; + } + String nameStr = Arrays.asList(names).toString(); + nameStr = nameStr.substring(1, nameStr.length() - 1); + assertTrue(message.indexOf(nameStr) > 0); + } + } + } finally { + System.setOut(oldStream); + } + } +} diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceWebTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceWebTest.java new file mode 100644 index 0000000000000..979679d18c8d1 --- /dev/null +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceWebTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.discovery.service.server; + +import static org.testng.Assert.assertTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; + +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.discovery.service.DiscoveryService; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +/** + * 1. starts discovery service a. loads broker list from zk 2. http-client calls multiple http request: GET, PUT and + * POST. 3. discovery service redirects to appropriate brokers in round-robin 4. client receives unknown host exception + * with redirected broker + * + */ +public class DiscoveryServiceWebTest { + + + @Test + public void testWebDiscoveryServiceStarter() throws Exception { + File testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties"); + if (testConfigFile.exists()) { + testConfigFile.delete(); + } + PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile))); + printWriter.println("zookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); + printWriter.println("configurationStoreServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); + printWriter.println("webServicePort=8080"); + printWriter.close(); + testConfigFile.deleteOnExit(); + final ServiceConfig config = PulsarConfigurationLoader.create(testConfigFile.getAbsolutePath(), ServiceConfig.class); + final ServerManager server = new ServerManager(config); + DiscoveryServiceStarter.startWebService(server, config); + assertTrue(server.isStarted()); + server.stop(); + testConfigFile.delete(); + } + + /** + * Test Configuration BackwardCompat for the change from globalzookeeper to configurationStore + */ + @Test + public void testConfigurationBackwardCompat() throws Exception { + DiscoveryService service = Mockito.mock(DiscoveryService.class); + + File testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties"); + if (testConfigFile.exists()) { + testConfigFile.delete(); + } + PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile))); + printWriter.println("zookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); + printWriter.println("globalZookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); + printWriter.println("webServicePort=8080"); + printWriter.close(); + testConfigFile.deleteOnExit(); + + ServiceConfig config = PulsarConfigurationLoader.create(testConfigFile.getAbsolutePath(), ServiceConfig.class); + // have zookeeperServers and globalZookeeperServers, config is valid + // should not throw IllegalArgumentException. + DiscoveryServiceStarter.checkConfig(config); + + + if (testConfigFile.exists()) { + testConfigFile.delete(); + } + PrintWriter printWriter2 = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile))); + printWriter2.println("zookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); + printWriter2.println("configurationStoreServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); + printWriter2.println("webServicePort=8080"); + printWriter2.close(); + config = PulsarConfigurationLoader.create(testConfigFile.getAbsolutePath(), ServiceConfig.class); + // have zookeeperServers and configurationStoreServers, config is valid + // should not throw IllegalArgumentException. + DiscoveryServiceStarter.checkConfig(config); + + + if (testConfigFile.exists()) { + testConfigFile.delete(); + } + PrintWriter printWriter3 = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile))); + printWriter3.println("zookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); + printWriter3.println("webServicePort=8080"); + printWriter3.close(); + config = PulsarConfigurationLoader.create(testConfigFile.getAbsolutePath(), ServiceConfig.class); + // only have zookeeperServers + // should throw IllegalArgumentException. + try { + DiscoveryServiceStarter.checkConfig(config); + } catch (IllegalArgumentException e) { + // expected: configure error + } + + testConfigFile.delete(); + } + +} diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/BaseZKStarterTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/BaseZKStarterTest.java new file mode 100644 index 0000000000000..0d2a0a81caf0f --- /dev/null +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/BaseZKStarterTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.discovery.service.web; + +import static org.apache.pulsar.broker.resources.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT; + +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.apache.zookeeper.MockZooKeeper; + +import com.google.common.util.concurrent.MoreExecutors; + +public class BaseZKStarterTest { + + private MockZooKeeper mockZooKeeper; + protected MetadataStoreExtended zkStore; + + protected void start() throws Exception { + mockZooKeeper = createMockZooKeeper(); + zkStore = new ZKMetadataStore(mockZooKeeper); + zkStore.put(LOADBALANCE_BROKERS_ROOT, "".getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).get(); + } + + protected void close() throws Exception { + mockZooKeeper.shutdown(); + zkStore.close(); + } + + /** + * Create MockZookeeper instance + * @return + * @throws Exception + */ + protected MockZooKeeper createMockZooKeeper() throws Exception { + MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService()); + return zk; + } + +} diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java new file mode 100644 index 0000000000000..dc9975f7cc9de --- /dev/null +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.discovery.service.web; + +import static javax.ws.rs.core.Response.Status.BAD_GATEWAY; +import static org.apache.pulsar.broker.resources.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +import java.io.InputStream; +import java.lang.reflect.Field; +import java.net.URL; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.MediaType; + +import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader; +import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.common.util.RestException; +import org.apache.pulsar.discovery.service.server.ServerManager; +import org.apache.pulsar.discovery.service.server.ServiceConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.policies.data.loadbalancer.LoadReport; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.logging.LoggingFeature; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * 1. starts discovery service a. loads broker list from zk 2. http-client calls multiple http request: GET, PUT and + * POST. 3. discovery service redirects to appropriate brokers in round-robin 4. client receives unknown host exception + * with redirected broker + * + */ +public class DiscoveryServiceWebTest extends BaseZKStarterTest{ + + private static final Logger log = LoggerFactory.getLogger(DiscoveryServiceWebTest.class); + + private Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class)); + private static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt"; + private static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key"; + // DiscoveryServiceServlet gets initialized by a server and this map will help to retrieve ZK while mocking + // DiscoveryServiceServlet + private static final Map metadataStoreInstanceCache = Maps.newConcurrentMap(); + + @BeforeMethod + private void init() throws Exception { + start(); + } + + @AfterMethod(alwaysRun = true) + private void cleanup() throws Exception { + close(); + metadataStoreInstanceCache.clear(); + } + + @Test + public void testNextBroker() throws Exception { + + PulsarResources resources = new PulsarResources(zkStore, null); + + // 1. create znode for each broker + List brokers = Lists.newArrayList("broker-1", "broker-2", "broker-3"); + brokers.stream().forEach(broker -> { + String path = LOADBALANCE_BROKERS_ROOT + "/" + broker; + try { + LoadReport report = new LoadReport(broker, null, null, null); + String reportData = ObjectMapperFactory.getThreadLocal().writeValueAsString(report); + zkStore.put(path, reportData.getBytes(StandardCharsets.UTF_8), Optional.of(-1L)) + .get(); + } catch (ExecutionException ne) { + // Ok + } catch (Exception e) { + if (e instanceof ExecutionException && (e.getCause()) instanceof AlreadyExistsException) { + // Ok + } else { + log.warn("Failed to write to metadata-store {}", path, e); + fail("failed while creating broker znodes"); + } + } + }); + + // 2. Setup discovery-zkcache + DiscoveryServiceServlet discovery = new DiscoveryServiceServlet(); + Field zkCacheField = DiscoveryServiceServlet.class.getDeclaredField("metadataStoreCacheLoader"); + zkCacheField.setAccessible(true); + MetadataStoreCacheLoader metadataCacheLoader = new MetadataStoreCacheLoader(resources, 30_000); + zkCacheField.set(discovery, metadataCacheLoader); + + // 3. verify nextBroker functionality : round-robin in broker list + for (String broker : brokers) { + assertEquals(broker, discovery.nextBroker().getWebServiceUrl()); + } + } + + @Test + public void testRiderectUrlWithServerStarted() throws Exception { + + // 1. start server + ServiceConfig config = new ServiceConfig(); + config.setWebServicePort(Optional.of(0)); + ServerManager server = new ServerManager(config); + Map params = new TreeMap<>(); + String zkServerUrl = "mockZkServer"; + metadataStoreInstanceCache.put(zkServerUrl, zkStore); + params.put("zookeeperServers", zkServerUrl); + server.addServlet("/", DiscoveryServiceServletTest.class, params); + server.start(); + + // 2. create znode for each broker + List brokers = Lists.newArrayList("broker-1", "broker-2", "broker-3"); + brokers.stream().forEach(b -> { + try { + final String broker = b + ":15000"; + LoadReport report = new LoadReport("http://" + broker, null, null, null); + String reportData = ObjectMapperFactory.getThreadLocal().writeValueAsString(report); + zkStore.put(LOADBALANCE_BROKERS_ROOT + "/" + broker, + reportData.getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).get(); + } catch (Exception e) { + if (e instanceof ExecutionException && (e.getCause()) instanceof AlreadyExistsException) { + // Ok + } else { + log.warn("Failed to write to metadata-store", e); + fail("failed while creating broker znodes"); + } + } + }); + + String serviceUrl = server.getServiceUri().toString(); + String requestUrl = serviceUrl + "admin/namespaces/p1/c1/n1"; + + /** + * 3. verify : every time when vip receives a request: it redirects to above brokers sequentially and client + * must get unknown host exception with above brokers in a sequential manner. + **/ + + assertEquals(brokers, validateRequest(brokers, HttpMethod.PUT, requestUrl, BundlesData.builder().numBundles(1).build()), + "redirection failed"); + assertEquals(brokers, validateRequest(brokers, HttpMethod.GET, requestUrl, null), "redirection failed"); + assertEquals(brokers, validateRequest(brokers, HttpMethod.POST, requestUrl, BundlesData.builder().numBundles(1).build()), + "redirection failed"); + + server.stop(); + + } + + + @Test + public void testTlsEnable() throws Exception { + + // 1. start server with tls enable + ServiceConfig config = new ServiceConfig(); + config.setWebServicePort(Optional.of(0)); + config.setWebServicePortTls(Optional.of(0)); + config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); + config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); + ServerManager server = new ServerManager(config); + Map params = new TreeMap<>(); + String zkServerUrl = "mockZkServer"; + metadataStoreInstanceCache.put(zkServerUrl, zkStore); + params.put("zookeeperServers", zkServerUrl); + server.addServlet("/", DiscoveryServiceServletTest.class, params); + + // 2. get ZookeeperCacheLoader to add more brokers + final String redirect_broker_host = "broker-1"; + List brokers = Lists.newArrayList(redirect_broker_host); + brokers.stream().forEach(b -> { + try { + final String brokerUrl = b + ":" + server.getListenPortHTTP(); + final String brokerUrlTls = b + ":" + server.getListenPortHTTPS(); + + LoadReport report = new LoadReport("http://" + brokerUrl, "https://" + brokerUrlTls, null, null); + String reportData = ObjectMapperFactory.getThreadLocal().writeValueAsString(report); + zkStore.put(LOADBALANCE_BROKERS_ROOT + "/" + brokerUrl, + reportData.getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).get(); + } catch (Exception e) { + if (e instanceof ExecutionException && (e.getCause()) instanceof AlreadyExistsException) { + // Ok + } else { + log.warn("Failed to write to metadata-store", e); + fail("failed while creating broker znodes"); + } + } + }); + + // 3. https request with tls enable at server side + String serviceUrl = String.format("https://localhost:%s/", server.getListenPortHTTPS()); + String requestUrl = serviceUrl + "admin/namespaces/p1/c1/n1"; + + KeyManager[] keyManagers = null; + TrustManager[] trustManagers = InsecureTrustManagerFactory.INSTANCE.getTrustManagers(); + SSLContext sslCtx = SSLContext.getInstance("TLS"); + sslCtx.init(keyManagers, trustManagers, new SecureRandom()); + HttpsURLConnection.setDefaultSSLSocketFactory(sslCtx.getSocketFactory()); + try { + InputStream response = new URL(requestUrl).openStream(); + fail("it should give unknown host exception as: discovery service redirects request to: " + + redirect_broker_host); + } catch (Exception e) { + } + + server.stop(); + } + + @Test + public void testException() { + RestException exception1 = new RestException(BAD_GATEWAY, "test-msg"); + assertTrue(exception1.getMessage().contains("test-msg")); + RestException exception2 = new RestException(BAD_GATEWAY.getStatusCode(), "test-msg"); + assertTrue(exception2.getMessage().contains("test-msg")); + RestException exception3 = new RestException(exception2); + assertTrue(exception3.getMessage().contains(BAD_GATEWAY.toString())); + } + + public List validateRequest(List brokers, String method, String url, BundlesData bundle) { + + List redirectBrokers = brokers.stream().map(broker -> { + + String redirectedBroker = null; + try { + WebTarget webTarget = client.target(url); + Invocation.Builder invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON); + if (HttpMethod.PUT.equals(method)) { + invocationBuilder.put(Entity.entity(bundle, MediaType.APPLICATION_JSON)); + fail(); + } else if (HttpMethod.GET.equals(method)) { + invocationBuilder.get(); + fail(); + } else if (HttpMethod.POST.equals(method)) { + invocationBuilder.post(Entity.entity(bundle, MediaType.APPLICATION_JSON)); + fail(); + } else { + fail("Unsupported http method"); + } + } catch (Exception e) { + + if (e.getCause() instanceof UnknownHostException) { + redirectedBroker = e.getCause().getMessage().split(":")[0]; + } else { + // fail + fail("Expected to receive UnknownHostException, but received : " + e); + } + } + return redirectedBroker; + }).collect(Collectors.toList()); + + return redirectBrokers; + } + + + public static class DiscoveryServiceServletTest extends DiscoveryServiceServlet { + @Override + public MetadataStoreExtended createLocalMetadataStore(String zookeeperServers, int operationimeoutMs) throws MetadataStoreException { + return metadataStoreInstanceCache.get(zookeeperServers); + } + } +} diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/MetadataStoreCacheLoaderTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/MetadataStoreCacheLoaderTest.java new file mode 100644 index 0000000000000..9a87bfaf188b6 --- /dev/null +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/MetadataStoreCacheLoaderTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.discovery.service.web; + +import static org.apache.pulsar.broker.resources.MetadataStoreCacheLoader.LOADBALANCE_BROKERS_ROOT; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader; +import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; +import org.apache.pulsar.policies.data.loadbalancer.LoadReport; +import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; +import org.apache.zookeeper.KeeperException; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; + +public class MetadataStoreCacheLoaderTest extends BaseZKStarterTest { + + @BeforeMethod + private void init() throws Exception { + start(); + } + + @AfterMethod(alwaysRun = true) + private void cleanup() throws Exception { + close(); + } + + /** + * Create znode for available broker in ZooKeeper and updates it again to verify ZooKeeper cache update + * + * @throws InterruptedException + * @throws KeeperException + * @throws IOException + */ + @Test + public void testZookeeperCacheLoader() throws InterruptedException, KeeperException, Exception { + + PulsarResources resources = new PulsarResources(zkStore, null); + @SuppressWarnings("resource") + MetadataStoreCacheLoader zkLoader = new MetadataStoreCacheLoader(resources, 30_000); + + List brokers = Lists.newArrayList("broker-1:15000", "broker-2:15000", "broker-3:15000"); + for (int i = 0; i < brokers.size(); i++) { + try { + LoadManagerReport report = i % 2 == 0 ? getSimpleLoadManagerLoadReport(brokers.get(i)) + : getModularLoadManagerLoadReport(brokers.get(i)); + zkStore.put(LOADBALANCE_BROKERS_ROOT + "/" + brokers.get(i), + ObjectMapperFactory.getThreadLocal().writeValueAsBytes(report), Optional.of(-1L)) + .join(); + } catch (Exception e) { + fail("failed while creating broker znodes"); + } + } + + Awaitility.await().untilAsserted(() -> assertEquals(zkLoader.getAvailableBrokers().size(), 3)); + + // 2. get available brokers from ZookeeperCacheLoader + List list = zkLoader.getAvailableBrokers(); + + // 3. verify retrieved broker list + Set cachedBrokers = list.stream().map(loadReport -> loadReport.getWebServiceUrl()) + .collect(Collectors.toSet()); + Assert.assertEquals(list.size(), brokers.size()); + Assert.assertTrue(brokers.containsAll(cachedBrokers)); + + // 4.a add new broker + final String newBroker = "broker-4:15000"; + LoadManagerReport report = getSimpleLoadManagerLoadReport(newBroker); + zkStore.put(LOADBALANCE_BROKERS_ROOT + "/" + newBroker, + ObjectMapperFactory.getThreadLocal().writeValueAsBytes(report), Optional.of(-1L)) + .join(); + brokers.add(newBroker); + + Thread.sleep(100); // wait for 100 msec: to get cache updated + + // 4.b. get available brokers from ZookeeperCacheLoader + list = zkLoader.getAvailableBrokers(); + + // 4.c. verify retrieved broker list + cachedBrokers = list.stream().map(loadReport -> loadReport.getWebServiceUrl()).collect(Collectors.toSet()); + Assert.assertEquals(list.size(), brokers.size()); + Assert.assertTrue(brokers.containsAll(cachedBrokers)); + + } + + private LoadReport getSimpleLoadManagerLoadReport(String brokerUrl) { + return new LoadReport(brokerUrl, null, null, null); + } + + private LocalBrokerData getModularLoadManagerLoadReport(String brokerUrl) { + return new LocalBrokerData(brokerUrl, null, null, null); + } +} diff --git a/pulsar-discovery-service/src/test/resources/certificate/client.crt b/pulsar-discovery-service/src/test/resources/certificate/client.crt new file mode 100644 index 0000000000000..2d7d156866a86 --- /dev/null +++ b/pulsar-discovery-service/src/test/resources/certificate/client.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDVjCCAj4CCQCtw/UnTFDT7DANBgkqhkiG9w0BAQUFADBtMQswCQYDVQQGEwJB +VTETMBEGA1UECAwKU29tZS1TdGF0ZTEVMBMGA1UEBwwMRGVmYXVsdCBDaXR5MSEw +HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBmNsaWVu +dDAeFw0xNjA2MjAwMTQ1NDZaFw0yNjA2MTgwMTQ1NDZaMG0xCzAJBgNVBAYTAkFV +MRMwEQYDVQQIDApTb21lLVN0YXRlMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxITAf +BgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UEAwwGY2xpZW50 +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqQV5F3Au9FWXIYPdWqiX +Rk5gdVmVkDuuFK4ZoOd8inoJpB3PPkpmpgoVkKQHDFhgx3ODGWIUgo+n6QDsJxY4 +ygHfVeggQgek8iUfteYVsIcHS0bjkhIij/3ihC301FkiqbrV069oLvUXLKcv3zxG +mdBAiz0k4xGZhFieVRvQCLY9syUUxmQ/3Cv42lDY8a1gTw4CRRx/hCfDvXCKhOT4 +bMwUIDZfHB3JoDh3Thp8FLz0nTrRF75mSQJ/OdcafIm0Xoz2Otp/CSxLS+U1lLvG +05crWTDe0om7NW4mK4CqGCFq5gUw7eIzaeO7Q5Qez9XGTMzkgIDTMvNYGGEeJhhm +NQIDAQABMA0GCSqGSIb3DQEBBQUAA4IBAQAKXy4g6hljY5MpO8mbZh+uJHq6NEUs +4dr7OKDDWc39AROZsGf2eFUmHOjmRSw7VHpguGKI+rFRELVffpg/VvMh5apu+DBf +jhxtDNceAyh5uugPNUJHXyeikBDYW8bAzUU3DmMldPkTZWcGjurmyhDQ1TtK2YJe +RMFBXw5aAzdJMNi6OfXDH/ZX32hrb482yghDZj+ndnm0FefmLbFTQRMF8/fIHb1W +kqNHwIaapZwH6j/MJy/TRFYcJunrBUYT9zVjY46k3GU0ex/Bn7T4pg9gzgFGZJhn +jQQFKliIC84thCzdlPkrLduLY8tmlDKpLXatbEQ+s1MmNOURm6irPp6g +-----END CERTIFICATE----- diff --git a/pulsar-discovery-service/src/test/resources/certificate/client.key b/pulsar-discovery-service/src/test/resources/certificate/client.key new file mode 100644 index 0000000000000..34fc701c5257d --- /dev/null +++ b/pulsar-discovery-service/src/test/resources/certificate/client.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCpBXkXcC70VZch +g91aqJdGTmB1WZWQO64Urhmg53yKegmkHc8+SmamChWQpAcMWGDHc4MZYhSCj6fp +AOwnFjjKAd9V6CBCB6TyJR+15hWwhwdLRuOSEiKP/eKELfTUWSKputXTr2gu9Rcs +py/fPEaZ0ECLPSTjEZmEWJ5VG9AItj2zJRTGZD/cK/jaUNjxrWBPDgJFHH+EJ8O9 +cIqE5PhszBQgNl8cHcmgOHdOGnwUvPSdOtEXvmZJAn851xp8ibRejPY62n8JLEtL +5TWUu8bTlytZMN7Sibs1biYrgKoYIWrmBTDt4jNp47tDlB7P1cZMzOSAgNMy81gY +YR4mGGY1AgMBAAECggEAcJj3yVhvv0/BhY8+CCYl2K1f7u1GCLbpSleNNTbhLbMM +9yrwo/OWnGg9Y4USOPQrTNOz81X2id+/oSZ/K67PGCvVJ3qi+rny9WkrzdbAfkAF +6O0Jr4arRbeBjkK7Rjc3M1EHH6VLx3R5AsNBzfpuogss5FVQXICd/5+1oscLeLEx +/Fn+51IEn9FUg5vr7ElG51f+zPxexcWHLNoqGjTEIGGtI8/CfTzD9tBV4sIjf/Nc +Zzfs9XYrChfcrS0U1zDa+L7c5gYfoN6M08sBiuZlhyyO9wgzPlp+XnsrSFv6hUta +0scjAbN4bh+orQn6zgFN/sjkQnraWXW7pKFLyTR/IQKBgQDVju4IbhE9XRweNgXi +s3BuGV+HsuFffEf0904/zCuCUcScGb5WCz5+KtlFJ//YxfocHVZajH+4GdCGbWim +m+H3XvRpWgfK/aBNOXu5ueLbnPYyPjTrcpKRsomeoiV+Jz1tv5PQElwzCiCzVvQf +fMyhQT16YIsFQAGJzQMBEHWODQKBgQDKnKps3sKSR3ycUtIxCVXUir7p52qst0Pm +bPO8JrcRKZP2z8MJB96+DcQFzrxj7t5DDktkYEsFOPPuIeUsYXsY+MKHs4hEQVCz +hpDJJNQ8s+SV8TLzKpinZEmLIjslLbn2rQrpqybPg84VxqX3qqM8IrXhMf77aGj6 +QHqvQwHWyQKBgQDF1RVO+9++j82ncvY6z22coKath5leIjxqgtqbISFBJUxUK0j2 +Xo4yxLDnbqmE/8m1V7wSP8tlGYzhquLiTM+kn/Mc0Ukc0503TMQABmJQfXRYkOXn +IwkCLXltWdoPpnwyeeGNRCTjJ0OpvyiBLtRFobE498xxPZzvMdrRlpS/1QKBgQCo +wmMleUnBQ2/kWQugMnFeLg6kjs+IesFAnYFKN0kGL4aB7j06OWbrEFY0rCS4bA6O +9coQGjCCchSjRXI4TB2XCCQnmX8nsuuADNZt45Iv2XrM9XEFn3Y0/tBO5j0zU2nw +r+NGC/uwns050BMPPf7mqNarctQ6HZZK0wgdEQfoGQKBgC+pbkQv9cn68TsiaJ3w +tvNRTXCIAAH4Vtn9Cp+63ao+kXn94BJqQF99i58kJpG4ol6wbCHUoC6fHgxUh5HB +JB0HjC2eCMgn4acAQg0sPW6l35KX36yYxtrL7eosB/yBYum0XAwmboNjEhlCZkOs +YOpSsn61g7xqqrt40Spb5vUn +-----END PRIVATE KEY----- diff --git a/pulsar-discovery-service/src/test/resources/certificate/server.crt b/pulsar-discovery-service/src/test/resources/certificate/server.crt new file mode 100644 index 0000000000000..59b651be2a406 --- /dev/null +++ b/pulsar-discovery-service/src/test/resources/certificate/server.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDLjCCAhYCCQDn/Yvym+FMsDANBgkqhkiG9w0BAQUFADBZMQswCQYDVQQGEwJB +VTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0 +cyBQdHkgTHRkMRIwEAYDVQQDEwlsb2NhbGhvc3QwHhcNMTYwNjEzMjIyMTQ2WhcN +MjYwNjExMjIyMTQ2WjBZMQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0 +ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMRIwEAYDVQQDEwls +b2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCs29IuzZvk +OGUkS/wqKzd/h2esqjCSjw4SLLbeh1GA3UEvh1k9+eRiYwJG1yCOHmcsp4A8Du99 +8xbgeihpWWw7pjL5VVky3ciuvHyz1Cc6bKRps/GzVJBwFP0gzHnK8bUM86U52yGT +1DepD/Y2lURy0igdVcAMjGweMwoTmiaVcwZexfYuEef+jz3fmpmOwP9rboIA9rQr +mTbLzzkbAwZXdl+bRvIefIjIazIzTOs8tJWrhFaTJUgBhhLjFIwTdpS+n+FqOu8J +92K+PvKjIeJ3kmnZyRHK7uidlAn0g/DK+co1sX3zORPCWeg21K+/vVHTj91zARNb +O9hVS4bqqsw9AgMBAAEwDQYJKoZIhvcNAQEFBQADggEBACE0WBuTbHcPtYKv2ZMS +mYk9jvtAhmWHQ6tNqV8CmS2AsrzZdWglGaqIRsm5slkD2BGeQS+BesTArUuENTmP +r9kJSecdiiB8aWtLbhoCSH3QR6IW/b5UVl6sR5OIh7SkNTjMSUSDnMEVLNGyKZGS +gCGVbDf3n5KhOTnwqguELRykynKFt2LVksBia9+88lUtiRHpbyClo/KVWltJlaww +PT0WEpwqVUcHmwrR3MTzJDEPvIplSgxdaDmFGYS1YKm9T/wQd+t/0DbXMmfJXBbd +FVUnB6o7qJVU9N2Tbaj9NbCtwz5nTZG4A5kRXWHVjZsn5WzLuS/me3rDXjwlfB2p +ipY= +-----END CERTIFICATE----- diff --git a/pulsar-discovery-service/src/test/resources/certificate/server.key b/pulsar-discovery-service/src/test/resources/certificate/server.key new file mode 100644 index 0000000000000..6da70f5aec3b5 --- /dev/null +++ b/pulsar-discovery-service/src/test/resources/certificate/server.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCs29IuzZvkOGUk +S/wqKzd/h2esqjCSjw4SLLbeh1GA3UEvh1k9+eRiYwJG1yCOHmcsp4A8Du998xbg +eihpWWw7pjL5VVky3ciuvHyz1Cc6bKRps/GzVJBwFP0gzHnK8bUM86U52yGT1Dep +D/Y2lURy0igdVcAMjGweMwoTmiaVcwZexfYuEef+jz3fmpmOwP9rboIA9rQrmTbL +zzkbAwZXdl+bRvIefIjIazIzTOs8tJWrhFaTJUgBhhLjFIwTdpS+n+FqOu8J92K+ +PvKjIeJ3kmnZyRHK7uidlAn0g/DK+co1sX3zORPCWeg21K+/vVHTj91zARNbO9hV +S4bqqsw9AgMBAAECggEAd/LuDeZFZ/+uR5qmuAhXMZqfWZSbsges5vW6S/6wkvB1 +vGp6heQzFAbKXKgJgjUcuULeXE6s58RYuppqEnin/1hcBOKxy/dUu9Q14H+2XPdo +u6TPcvaaZ/xYjnr1hNtnHD6yB8zEpxVbLmjSHJxF7Dti9MA9TTfgCrC2LFYKsicD +/5AQyHuwpHyTL3Iiwv4Qtks/SD2a3fu8lD0yTQwA/hY6/0ieXxXd9tZV5a6GSA0P +nieol1byfuX7Q5fb8ggPd9u9K1mVZTBRKiE5w+uU4Ic2IkBmZX5ZuRS+vFplpLsY +YpFPvzFmpNkpK2SdYjJ+V4tkJsFHmOaFRgW/0QB2DQKBgQDeQMSZBQlPUrgRdWHN +OyvTcrSvXzg5DbaIj39tgdNZ6PYns/thD0n707KGRJOChIyYiiKxLxzLWdPUxqQO +rNLUV9IkMVc/QZR8RUqGc2BxmPOxAprhzeOhLsyqP/sgtxRHAnLqmkXuHYoxvTZ6 +LFCRCZBpEJrutGxl3s/x+sfkuwKBgQDHGwnSmvArpL8ZY1dV4xKNkxifCBnNmqAl +TKHPW3odN9nkMECEt1XUIioUUKXUsiAZNp5xa/v1DEyJ4f2T20QKcAGbS18b1M5W +axIoH3IhyLo74tuo0fthgq5bzypfFOlIjo7F9mpEky/461RWmoNAAlp9+FkDi48C +KwjAk39/ZwKBgQDXFJqs8sDFsOlMi+nvsHmDERhmNqG0JN8mXKgWk3KzKc09MuHs +Vd1lBMNZSHfv8NIWtGdKTKty5yUmXm1ZfkoxECPevpkOMCq/8FZksrb8d+YswLae +Gp9U1nNdtrkSOdo3tdj7y/wsqQ2ZgOB9bvEwyq6j3lvw8U2NcAiQxf44DQKBgBHb +lPf0uZHQhutKA61KXoGgLdclrNrKAY8W3nRwqfUw6zQSN9cvcl1Cay/DQ/xdtY9N +XMyjeMezwLGlOU8nnWSqQxqgmfkvDwqlM82xdFUfYcS5RiZQHxHR3L2TSSOaBoph +buDGhyV7ZhQXV0slNJxrGZ6uxZ0RyVPSdEiBcjAFAoGBAJqZ6uCVHpv/FwZVggu7 +Xb9EIxZnLSmXwaXFpJoMZpRpKb8cSTTJbgSMv3Dq2LcNKYXdNBhgKgPSc/XipXt9 +ZdT36KWipV+PzW691kUiWHtA8/+E0LCi4Y7rlcBMz9PgDNXK4XMMZOVKxDqPcHSJ +P6y01ku7T2X+abUiJ334Hg6G +-----END PRIVATE KEY----- diff --git a/site2/docs/deploy-bare-metal-multi-cluster.md b/site2/docs/deploy-bare-metal-multi-cluster.md index ac4add4a0ab6d..292aab8a6c713 100644 --- a/site2/docs/deploy-bare-metal-multi-cluster.md +++ b/site2/docs/deploy-bare-metal-multi-cluster.md @@ -336,6 +336,28 @@ You can use your own service discovery system, and you only need to satisfy just > **Service discovery already provided by many scheduling systems** > Many large-scale deployment systems, such as [Kubernetes](deploy-kubernetes.md), have service discovery systems built in. If you run Pulsar on such a system, you may not need to provide your own service discovery mechanism. + +### Service discovery setup + +The service discovery mechanism included with Pulsar maintains a list of active brokers, which is stored in ZooKeeper, and supports lookup using HTTP and also the [binary protocol](developing-binary-protocol.md) of Pulsar. + +To get started setting up the built-in service of discovery of Pulsar, you need to change a few parameters in the [`conf/discovery.conf`](reference-configuration.md#service-discovery) configuration file. Set the [`zookeeperServers`](reference-configuration.md#service-discovery-zookeeperServers) parameter to the ZooKeeper quorum connection string of the cluster and the [`configurationStoreServers`](reference-configuration.md#service-discovery-configurationStoreServers) setting to the [configuration +store](reference-terminology.md#configuration-store) quorum connection string. + +```properties +# Zookeeper quorum connection string +zookeeperServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181 + +# Global configuration store connection string +configurationStoreServers=zk1.us-west.example.com:2184,zk2.us-west.example.com:2184,zk3.us-west.example.com:2184 +``` + +To start the discovery service: + +```shell +$ bin/pulsar-daemon start discovery +``` + ## Admin client and verification At this point, your Pulsar instance should be ready to use. You can now configure client machines that can serve as [administrative clients](admin-api-overview.md) for each cluster. You can use the [`conf/client.conf`](reference-configuration.md#client) configuration file to configure admin clients.