Skip to content

Commit

Permalink
--story=872733891 负载均衡优化(apache#13069)(apache#13117 ) (merge request …
Browse files Browse the repository at this point in the history
…!64)


Squash merge branch 'optimize-ownership-assign' into '2.8.1'
--story=872733891 负载均衡优化(apache#13069)(apache#13117 )

TAPD: --story=872733891
  • Loading branch information
mayozhang committed Mar 14, 2022
1 parent 1dd77bf commit 4cb00a8
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 54 deletions.
8 changes: 8 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,14 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-metadata</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
// The leader election service was not initialized yet. This can happen because the broker service is
// initialized first and it might start receiving lookup requests before the leader election service is
// fully initialized.
LOG.warn("Leader election service isn't initialized yet. "
+ "Returning empty result to lookup. NamespaceBundle[{}]",
bundle);
lookupFuture.complete(Optional.empty());
return;
}
Expand All @@ -484,23 +487,45 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
if (options.isAuthoritative()) {
// leader broker already assigned the current broker as owner
candidateBroker = pulsar.getSafeWebServiceAddress();
} else if (!this.loadManager.get().isCentralized()
|| pulsar.getLeaderElectionService().isLeader()
|| !currentLeader.isPresent()

} else {
LoadManager loadManager = this.loadManager.get();
boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader();
if (!makeLoadManagerDecisionOnThisBroker) {
// If leader is not active, fallback to pick the least loaded from current broker loadmanager
|| !isBrokerActive(currentLeader.get().getServiceUrl())
) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
lookupFuture.complete(Optional.empty());
return;
boolean leaderBrokerActive = currentLeader.isPresent()
&& isBrokerActive(currentLeader.get().getServiceUrl());
if (!leaderBrokerActive) {
makeLoadManagerDecisionOnThisBroker = true;
if (!currentLeader.isPresent()) {
LOG.warn(
"The information about the current leader broker wasn't available. "
+ "Handling load manager decisions in a decentralized way. "
+ "NamespaceBundle[{}]",
bundle);
} else {
LOG.warn(
"The current leader broker {} isn't active. "
+ "Handling load manager decisions in a decentralized way. "
+ "NamespaceBundle[{}]",
currentLeader.get(), bundle);
}
}
}
if (makeLoadManagerDecisionOnThisBroker) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
LOG.warn("Load manager didn't return any available broker. "
+ "Returning empty result to lookup. NamespaceBundle[{}]",
bundle);
lookupFuture.complete(Optional.empty());
return;
}
candidateBroker = availableBroker.get();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
candidateBroker = currentLeader.get().getServiceUrl();
}
candidateBroker = availableBroker.get();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
candidateBroker = currentLeader.get().getServiceUrl();
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -583,19 +608,16 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}

protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
final String advertisedListenerName)
throws Exception {
final String advertisedListenerName) {

CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
try {
checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null " + candidateBroker);
URI uri = new URI(candidateBroker);
String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(),
uri.getPort());
checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null %s", candidateBroker);
String path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + parseHostAndPort(candidateBroker);

localBrokerDataCache.get(path).thenAccept(reportData -> {
if (reportData.isPresent()) {
LocalBrokerData lookupData = (LocalBrokerData) reportData.get();
LocalBrokerData lookupData = reportData.get();
if (StringUtils.isNotBlank(advertisedListenerName)) {
AdvertisedListener listener = lookupData.getAdvertisedListeners().get(advertisedListenerName);
if (listener == null) {
Expand Down Expand Up @@ -627,22 +649,36 @@ protected CompletableFuture<LookupResult> createLookupResult(String candidateBro
}

private boolean isBrokerActive(String candidateBroker) {
List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();

for (String brokerHostPort : brokers) {
if (candidateBroker.equals("http://" + brokerHostPort)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
}
return true;
String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
Set<String> availableBrokers = getAvailableBrokers();
if (availableBrokers.contains(candidateBrokerHostAndPort)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Broker {} ({}) is available for.", candidateBroker, candidateBrokerHostAndPort);
}
return true;
} else {
LOG.warn("Broker {} ({}) couldn't be found in available brokers {}",
candidateBroker, candidateBrokerHostAndPort,
availableBrokers.stream().collect(Collectors.joining(",")));
return false;
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("Broker not found for SLA Monitoring Namespace {}",
candidateBroker + ":" + config.getWebServicePort());
private static String parseHostAndPort(String candidateBroker) {
int uriSeparatorPos = candidateBroker.indexOf("://");
if (uriSeparatorPos == -1) {
throw new IllegalArgumentException("'" + candidateBroker + "' isn't an URI.");
}
String candidateBrokerHostAndPort = candidateBroker.substring(uriSeparatorPos + 3);
return candidateBrokerHostAndPort;
}

private Set<String> getAvailableBrokers() {
try {
return loadManager.get().getAvailableBrokers();
} catch (Exception e) {
throw new RuntimeException(e);
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,19 @@
import com.google.common.collect.Range;
import com.google.common.hash.HashFunction;
import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
Expand All @@ -63,6 +66,7 @@ public class NamespaceBundleFactory {

private final PulsarService pulsar;
private final MetadataCache<Policies> policiesCache;
private final Duration maxRetryDuration = Duration.ofSeconds(10);

public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
this.hashFunc = hashFunc;
Expand Down Expand Up @@ -90,6 +94,12 @@ private CompletableFuture<NamespaceBundles> loadBundles(NamespaceName namespace,
}

CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
doLoadBundles(namespace, path, future, createBackoff(), System.nanoTime() + maxRetryDuration.toNanos());
return future;
}

private void doLoadBundles(NamespaceName namespace, String path, CompletableFuture<NamespaceBundles> future,
Backoff backoff, long retryDeadline) {
// Read the static bundle data from the policies
pulsar.getLocalMetadataStore().get(path).thenAccept(result -> {
NamespaceBundles namespaceBundles;
Expand All @@ -99,22 +109,38 @@ private CompletableFuture<NamespaceBundles> loadBundles(NamespaceName namespace,
future.complete(readBundles(namespace,
result.get().getValue(), result.get().getStat().getVersion()));
} catch (IOException e) {
future.completeExceptionally(e);
handleLoadBundlesRetry(namespace, path, future, backoff, retryDeadline, e);
}
} else {
// If no local policies defined for namespace, copy from global config
copyToLocalPolicies(namespace)
.thenAccept(b -> future.complete(b))
.exceptionally(ex -> {
future.completeExceptionally(ex);
handleLoadBundlesRetry(namespace, path, future, backoff, retryDeadline, ex);
return null;
});
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
return future;
}

private void handleLoadBundlesRetry(NamespaceName namespace, String path,
CompletableFuture<NamespaceBundles> future,
Backoff backoff, long retryDeadline, Throwable e) {
if (e instanceof Error || System.nanoTime() > retryDeadline) {
future.completeExceptionally(e);
} else {
LOG.warn("Error loading bundle for {} from path {}. Retrying exception", namespace, path, e);
long retryDelay = backoff.next();
pulsar.getExecutor().schedule(() ->
doLoadBundles(namespace, path, future, backoff, retryDeadline), retryDelay, TimeUnit.MILLISECONDS);
}
}

private static Backoff createBackoff() {
return new Backoff(100, TimeUnit.MILLISECONDS, 5, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
}

private NamespaceBundles readBundles(NamespaceName namespace, byte[] value, long version) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.MockZooKeeperSession;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -82,13 +84,13 @@ protected PulsarService createAdditionalBroker(int additionalBrokerIndex) throws
}

@Override
protected ZKMetadataStore createLocalMetadataStore() {
protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
// use MockZooKeeperSession to provide a unique session id for each instance
return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeper));
}

@Override
protected ZKMetadataStore createConfigurationMetadataStore() {
protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
// use MockZooKeeperSession to provide a unique session id for each instance
return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeperGlobal));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
Expand Down Expand Up @@ -232,6 +234,11 @@ protected final void internalCleanup() throws Exception {
}
bkExecutor = null;
}
onCleanup();
}

protected void onCleanup() {

}

protected abstract void setup() throws Exception;
Expand Down Expand Up @@ -297,7 +304,7 @@ protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration con

protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
// Override default providers with mocked ones
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
doReturn(createZooKeeperClientFactory()).when(pulsar).getZooKeeperClientFactory();
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
doReturn(createLocalMetadataStore()).when(pulsar).createLocalMetadataStore();
doReturn(createConfigurationMetadataStore()).when(pulsar).createConfigurationMetadataStore();
Expand All @@ -314,11 +321,11 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
}
}

protected ZKMetadataStore createLocalMetadataStore() {
protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
return new ZKMetadataStore(mockZooKeeper);
}

protected ZKMetadataStore createConfigurationMetadataStore() {
protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
return new ZKMetadataStore(mockZooKeeperGlobal);
}

Expand Down Expand Up @@ -391,21 +398,23 @@ public void reallyShutdown() {
}
}

protected ZooKeeperClientFactory mockZooKeeperClientFactory = new ZooKeeperClientFactory() {
protected ZooKeeperClientFactory createZooKeeperClientFactory() {
return new ZooKeeperClientFactory() {

@Override
public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
int zkSessionTimeoutMillis) {
@Override
public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
int zkSessionTimeoutMillis) {

if (serverList != null
&& (serverList.equalsIgnoreCase(conf.getConfigurationStoreServers())
|| serverList.equalsIgnoreCase(GLOBAL_DUMMY_VALUE))) {
return CompletableFuture.completedFuture(mockZooKeeperGlobal);
}
if (serverList != null
&& (serverList.equalsIgnoreCase(conf.getConfigurationStoreServers())
|| serverList.equalsIgnoreCase(GLOBAL_DUMMY_VALUE))) {
return CompletableFuture.completedFuture(mockZooKeeperGlobal);
}

return CompletableFuture.completedFuture(mockZooKeeper);
}
};
return CompletableFuture.completedFuture(mockZooKeeper);
}
};
}

private final BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() {

Expand Down
Loading

0 comments on commit 4cb00a8

Please sign in to comment.