Skip to content

Commit

Permalink
The loadbalancer should avoid offload the heartbeat namespace (#12252)
Browse files Browse the repository at this point in the history
* The loadbalancer should avoid offload the heartbeat namespace

The heartbeat namespace is sticky to a broker, so it will not owned by other brokers, It will not make any sense for rebalancing the heartbeat namespace.

```
09:49:55.946 [pulsar-load-manager-1-1] WARN  org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Error when trying to perform load shedding on pulsar/pluster-2/10.1.131.232:8080/0x00000000_0xffffffff for broker 10.1.131.232:8080 org.apache.pulsar.client.admin.PulsarAdminException$NotFoundException: Namespace does not exist
        at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:220)
        at org.apache.pulsar.client.admin.internal.BaseResource$1.failed(BaseResource.java:130)
        at org.glassfish.jersey.client.JerseyInvocation$1.failed(JerseyInvocation.java:839)
        at org.glassfish.jersey.client.JerseyInvocation$1.completed(JerseyInvocation.java:820)
        at org.glassfish.jersey.client.ClientRuntime.processResponse(ClientRuntime.java:229)
        at org.glassfish.jersey.client.ClientRuntime.access$200(ClientRuntime.java:62)
        at org.glassfish.jersey.client.ClientRuntime$2.lambda$response$0(ClientRuntime.java:173)
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
        at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:288)
        at org.glassfish.jersey.client.ClientRuntime$2.response(ClientRuntime.java:173)
        at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$apply$1(AsyncHttpConnector.java:212)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$3(AsyncHttpConnector.java:253)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.asynchttpclient.netty.NettyResponseFuture.loadContent(NettyResponseFuture.java:222)
        at org.asynchttpclient.netty.NettyResponseFuture.done(NettyResponseFuture.java:257)
        at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.finishUpdate(AsyncHttpClientHandler.java:241)
        at org.asynchttpclient.netty.handler.HttpHandler.handleChunk(HttpHandler.java:114)
        at org.asynchttpclient.netty.handler.HttpHandler.handleRead(HttpHandler.java:143)
        at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelRead(AsyncHttpClientHandler.java:78)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ```

* Fix checkstyle

(cherry picked from commit 14fc0d3)
  • Loading branch information
codelipenghui committed Oct 6, 2021
1 parent b9616e0 commit c947ed4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
Expand Down Expand Up @@ -98,15 +100,17 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
// make up for at least the minimum throughput to offload

loadData.getBundleData().entrySet().stream()
.filter(e -> localData.getBundles().contains(e.getKey()))
.map((e) -> {
// Map to throughput value
// Consider short-term byte rate to address system resource burden
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData
.getMsgThroughputOut();
.filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
&& !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches()
&& localData.getBundles().contains(e.getKey()))
.map((e) -> {
// Map to throughput value
// Consider short-term byte rate to address system resource burden
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData
.getMsgThroughputOut();
return Pair.of(bundle, throughput);
}).filter(e -> {
// Only consider bundles that were not already unloaded recently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.HashMap;
Expand Down Expand Up @@ -95,12 +97,15 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);

if (localData.getBundles().size() > 1) {
loadData.getBundleData().entrySet().stream().map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
return Pair.of(bundle, throughput);
loadData.getBundleData().entrySet().stream()
.filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
&& !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches())
.map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
return Pair.of(bundle, throughput);
}).filter(e ->
!recentlyUnloadedBundles.containsKey(e.getLeft())
).filter(e ->
Expand Down

0 comments on commit c947ed4

Please sign in to comment.