/
LoadManagerShared.java
549 lines (505 loc) · 25.9 KB
/
LoadManagerShared.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;
import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.beust.jcommander.internal.Lists;
import com.google.common.collect.Maps;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
/**
* This class contains code which in shared between the two load manager implementations.
*/
public class LoadManagerShared {
public static final Logger log = LoggerFactory.getLogger(LoadManagerShared.class);
// Value of prefix "mibi" (e.g., number of bytes in a mibibyte).
public static final int MIBI = 1024 * 1024;
// Cache for primary brokers according to policies.
private static final FastThreadLocal<Set<String>> localPrimariesCache = new FastThreadLocal<Set<String>>() {
@Override
protected Set<String> initialValue() throws Exception {
return new HashSet<>();
}
};
// Cache for shard brokers according to policies.
private static final FastThreadLocal<Set<String>> localSecondaryCache = new FastThreadLocal<Set<String>>() {
@Override
protected Set<String> initialValue() throws Exception {
return new HashSet<>();
}
};
// update LoadReport at most every 5 seconds
public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);
private static final String DEFAULT_DOMAIN = "default";
// Don't allow construction: static method namespace only.
private LoadManagerShared() {
}
// Determines the brokers available for the given service unit according to the given policies.
// The brokers are put into brokerCandidateCache.
public static void applyNamespacePolicies(final ServiceUnitId serviceUnit,
final SimpleResourceAllocationPolicies policies, final Set<String> brokerCandidateCache,
final Set<String> availableBrokers, final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
Set<String> primariesCache = localPrimariesCache.get();
primariesCache.clear();
Set<String> secondaryCache = localSecondaryCache.get();
secondaryCache.clear();
NamespaceName namespace = serviceUnit.getNamespaceObject();
boolean isIsolationPoliciesPresent = policies.areIsolationPoliciesPresent(namespace);
boolean isNonPersistentTopic = (serviceUnit instanceof NamespaceBundle)
? ((NamespaceBundle) serviceUnit).hasNonPersistentTopic() : false;
if (isIsolationPoliciesPresent) {
log.debug("Isolation Policies Present for namespace - [{}]", namespace.toString());
}
for (final String broker : availableBrokers) {
final String brokerUrlString = String.format("http://%s", broker);
URL brokerUrl;
try {
brokerUrl = new URL(brokerUrlString);
} catch (MalformedURLException e) {
log.error("Unable to parse brokerUrl from ResourceUnitId - [{}]", e);
continue;
}
// todo: in future check if the resource unit has resources to take the namespace
if (isIsolationPoliciesPresent) {
// note: serviceUnitID is namespace name and ResourceID is brokerName
if (policies.isPrimaryBroker(namespace, brokerUrl.getHost())) {
primariesCache.add(broker);
if (log.isDebugEnabled()) {
log.debug("Added Primary Broker - [{}] as possible Candidates for"
+ " namespace - [{}] with policies", brokerUrl.getHost(), namespace.toString());
}
} else if (policies.isSecondaryBroker(namespace, brokerUrl.getHost())) {
secondaryCache.add(broker);
if (log.isDebugEnabled()) {
log.debug(
"Added Shared Broker - [{}] as possible "
+ "Candidates for namespace - [{}] with policies",
brokerUrl.getHost(), namespace.toString());
}
} else {
if (log.isDebugEnabled()) {
log.debug("Skipping Broker - [{}] not primary broker and not shared" + " for namespace - [{}] ",
brokerUrl.getHost(), namespace.toString());
}
}
} else {
// non-persistent topic can be assigned to only those brokers that enabled for non-persistent topic
if (isNonPersistentTopic
&& !brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
if (log.isDebugEnabled()) {
log.debug("Filter broker- [{}] because it doesn't support non-persistent namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
}
} else if (!isNonPersistentTopic
&& !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
// persistent topic can be assigned to only brokers that enabled for persistent-topic
if (log.isDebugEnabled()) {
log.debug("Filter broker- [{}] because broker only supports non-persistent namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
}
} else if (policies.isSharedBroker(brokerUrl.getHost())) {
secondaryCache.add(broker);
if (log.isDebugEnabled()) {
log.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
}
}
}
}
if (isIsolationPoliciesPresent) {
brokerCandidateCache.addAll(primariesCache);
if (policies.shouldFailoverToSecondaries(namespace, primariesCache.size())) {
log.debug(
"Not enough of primaries [{}] available for namespace - [{}], "
+ "adding shared [{}] as possible candidate owners",
primariesCache.size(), namespace.toString(), secondaryCache.size());
brokerCandidateCache.addAll(secondaryCache);
}
} else {
log.debug(
"Policies not present for namespace - [{}] so only "
+ "considering shared [{}] brokers for possible owner",
namespace.toString(), secondaryCache.size());
brokerCandidateCache.addAll(secondaryCache);
}
}
/**
* Using the given bundles, populate the namespace to bundle range map.
*
* @param bundles
* Bundles with which to populate.
* @param target
* Map to fill.
*/
public static void fillNamespaceToBundlesMap(final Set<String> bundles,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> target) {
bundles.forEach(bundleName -> {
final String namespaceName = getNamespaceNameFromBundleName(bundleName);
final String bundleRange = getBundleRangeFromBundleName(bundleName);
target.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
});
}
// From a full bundle name, extract the bundle range.
public static String getBundleRangeFromBundleName(String bundleName) {
// the bundle format is property/cluster/namespace/0x00000000_0xFFFFFFFF
int pos = bundleName.lastIndexOf("/");
checkArgument(pos != -1);
return bundleName.substring(pos + 1);
}
// From a full bundle name, extract the namespace name.
public static String getNamespaceNameFromBundleName(String bundleName) {
// the bundle format is property/cluster/namespace/0x00000000_0xFFFFFFFF
int pos = bundleName.lastIndexOf('/');
checkArgument(pos != -1);
return bundleName.substring(0, pos);
}
// Get the system resource usage for this broker.
public static SystemResourceUsage getSystemResourceUsage(final BrokerHostUsage brokerHostUsage) throws IOException {
SystemResourceUsage systemResourceUsage = brokerHostUsage.getBrokerHostUsage();
// Override System memory usage and limit with JVM heap usage and limit
long maxHeapMemoryInBytes = Runtime.getRuntime().maxMemory();
long memoryUsageInBytes = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
systemResourceUsage.memory.usage = (double) memoryUsageInBytes / MIBI;
systemResourceUsage.memory.limit = (double) maxHeapMemoryInBytes / MIBI;
// Collect JVM direct memory
systemResourceUsage.directMemory.usage = (double) (getJvmDirectMemoryUsed() / MIBI);
systemResourceUsage.directMemory.limit = (double) (PlatformDependent.maxDirectMemory() / MIBI);
return systemResourceUsage;
}
/**
* If load balancing is enabled, load shedding is enabled by default unless forced off by dynamic configuration
*
* @return true by default
*/
public static boolean isLoadSheddingEnabled(final PulsarService pulsar) {
return pulsar.getConfiguration().isLoadBalancerEnabled()
&& pulsar.getConfiguration().isLoadBalancerSheddingEnabled();
}
/**
* Removes the brokers which have more bundles assigned to them in the same namespace as the incoming bundle than at
* least one other available broker from consideration.
*
* @param assignedBundleName
* Name of bundle to be assigned.
* @param candidates
* BrokersBase available for placement.
* @param brokerToNamespaceToBundleRange
* Map from brokers to namespaces to bundle ranges.
*/
public static void removeMostServicingBrokersForNamespace(final String assignedBundleName,
final Set<String> candidates,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange) {
if (candidates.isEmpty()) {
return;
}
final String namespaceName = getNamespaceNameFromBundleName(assignedBundleName);
int leastBundles = Integer.MAX_VALUE;
for (final String broker : candidates) {
int bundles = (int) brokerToNamespaceToBundleRange
.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size();
leastBundles = Math.min(leastBundles, bundles);
if (leastBundles == 0) {
break;
}
}
// Since `brokerToNamespaceToBundleRange` can be updated by other threads,
// `leastBundles` may differ from the actual value.
final int finalLeastBundles = leastBundles;
candidates.removeIf(
broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size() > finalLeastBundles);
}
/**
* It tries to filter out brokers which own namespace with same anti-affinity-group as given namespace. If all the
* domains own namespace with same anti-affinity group then it will try to keep brokers with domain that has least
* number of namespaces. It also tries to keep brokers which has least number of namespace with in domain.
* eg.
* <pre>
* Before:
* Domain-count BrokersBase-count
* ____________ ____________
* d1-3 b1-2,b2-1
* d2-3 b3-2,b4-1
* d3-4 b5-2,b6-2
*
* After filtering: "candidates" brokers
* Domain-count BrokersBase-count
* ____________ ____________
* d1-3 b2-1
* d2-3 b4-1
*
* "candidate" broker to own anti-affinity-namespace = b2 or b4
*
* </pre>
*
* @param pulsar
* @param assignedBundleName
* @param candidates
* @param brokerToNamespaceToBundleRange
*/
public static void filterAntiAffinityGroupOwnedBrokers(final PulsarService pulsar, final String assignedBundleName,
final Set<String> candidates,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange,
Map<String, String> brokerToDomainMap) {
if (candidates.isEmpty()) {
return;
}
final String namespaceName = getNamespaceNameFromBundleName(assignedBundleName);
try {
final Map<String, Integer> brokerToAntiAffinityNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar,
namespaceName, brokerToNamespaceToBundleRange).get(30, TimeUnit.SECONDS);
if (brokerToAntiAffinityNamespaceCount == null) {
// none of the broker owns anti-affinity-namespace so, none of the broker will be filtered
return;
}
if (pulsar.getConfiguration().isFailureDomainsEnabled()) {
// this will remove all the brokers which are part of domains that don't have least number of
// anti-affinity-namespaces
filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(brokerToAntiAffinityNamespaceCount, candidates,
brokerToDomainMap);
}
// now, "candidates" has list of brokers which are part of domain that can accept this namespace. now,
// with in these domains, remove brokers which don't have least number of namespaces. so, brokers with least
// number of namespace can be selected
int leastNamaespaceCount = Integer.MAX_VALUE;
for (final String broker : candidates) {
if (brokerToAntiAffinityNamespaceCount.containsKey(broker)) {
Integer namespaceCount = brokerToAntiAffinityNamespaceCount.get(broker);
if (namespaceCount == null) {
// Assume that when the namespace is absent, there are no namespace assigned to
// that broker.
leastNamaespaceCount = 0;
break;
}
leastNamaespaceCount = Math.min(leastNamaespaceCount, namespaceCount);
} else {
// Assume non-present brokers have 0 bundles.
leastNamaespaceCount = 0;
break;
}
}
// filter out broker based on namespace distribution
if (leastNamaespaceCount == 0) {
candidates.removeIf(broker -> brokerToAntiAffinityNamespaceCount.containsKey(broker)
&& brokerToAntiAffinityNamespaceCount.get(broker) > 0);
} else {
final int finalLeastNamespaceCount = leastNamaespaceCount;
candidates
.removeIf(broker -> brokerToAntiAffinityNamespaceCount.get(broker) != finalLeastNamespaceCount);
}
} catch (Exception e) {
log.error("Failed to filter anti-affinity group namespace {}", e.getMessage());
}
}
/**
* It computes least number of namespace owned by any of the domain and then it filters out all the domains that own
* namespaces more than this count.
*
* @param brokerToAntiAffinityNamespaceCount
* @param candidates
* @param brokerToDomainMap
*/
private static void filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(
Map<String, Integer> brokerToAntiAffinityNamespaceCount, Set<String> candidates,
Map<String, String> brokerToDomainMap) {
if (brokerToDomainMap == null || brokerToDomainMap.isEmpty()) {
return;
}
final Map<String, Integer> domainNamespaceCount = Maps.newHashMap();
int leastNamespaceCount = Integer.MAX_VALUE;
candidates.forEach(broker -> {
final String domain = brokerToDomainMap.getOrDefault(broker, DEFAULT_DOMAIN);
final int count = brokerToAntiAffinityNamespaceCount.getOrDefault(broker, 0);
domainNamespaceCount.compute(domain, (domainName, nsCount) -> nsCount == null ? count : nsCount + count);
});
// find leastNameSpaceCount
for (Entry<String, Integer> domainNsCountEntry : domainNamespaceCount.entrySet()) {
if (domainNsCountEntry.getValue() < leastNamespaceCount) {
leastNamespaceCount = domainNsCountEntry.getValue();
}
}
final int finalLeastNamespaceCount = leastNamespaceCount;
// only keep domain brokers which has leastNamespaceCount
candidates.removeIf(broker -> {
Integer nsCount = domainNamespaceCount.get(brokerToDomainMap.getOrDefault(broker, DEFAULT_DOMAIN));
return nsCount != null && nsCount != finalLeastNamespaceCount;
});
}
/**
* It returns map of broker and count of namespace that are belong to the same anti-affinity group as given
* {@param namespaceName}
*
* @param pulsar
* @param namespaceName
* @param brokerToNamespaceToBundleRange
* @return
*/
public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOwnedBrokers(
final PulsarService pulsar, final String namespaceName,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange) {
CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult = new CompletableFuture<>();
ZooKeeperDataCache<Policies> policiesCache = pulsar.getConfigurationCache().policiesCache();
policiesCache.getAsync(path(POLICIES, namespaceName)).thenAccept(policies -> {
if (!policies.isPresent() || StringUtils.isBlank(policies.get().antiAffinityGroup)) {
antiAffinityNsBrokersResult.complete(null);
return;
}
final String antiAffinityGroup = policies.get().antiAffinityGroup;
final Map<String, Integer> brokerToAntiAffinityNamespaceCount = new ConcurrentHashMap<>();
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
brokerToNamespaceToBundleRange.forEach((broker, nsToBundleRange) -> {
nsToBundleRange.forEach((ns, bundleRange) -> {
if (bundleRange.isEmpty()) {
return;
}
CompletableFuture<Void> future = new CompletableFuture<>();
futures.add(future);
policiesCache.getAsync(path(POLICIES, ns)).thenAccept(nsPolicies -> {
if (nsPolicies.isPresent() && antiAffinityGroup.equalsIgnoreCase(nsPolicies.get().antiAffinityGroup)) {
brokerToAntiAffinityNamespaceCount.compute(broker,
(brokerName, count) -> count == null ? 1 : count + 1);
}
future.complete(null);
}).exceptionally(ex -> {
future.complete(null);
return null;
});
});
});
FutureUtil.waitForAll(futures)
.thenAccept(r -> antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount));
}).exceptionally(ex -> {
// namespace-policies has not been created yet
antiAffinityNsBrokersResult.complete(null);
return null;
});
return antiAffinityNsBrokersResult;
}
/**
*
* It checks if given anti-affinity namespace should be unloaded by broker due to load-shedding. If all the brokers
* are owning same number of anti-affinity namespaces then unloading this namespace again ends up at the same broker
* from which it was unloaded. So, this util checks that given namespace should be unloaded only if it can be loaded
* by different broker.
*
* @param namespace
* @param bundle
* @param currentBroker
* @param pulsar
* @param brokerToNamespaceToBundleRange
* @param candidateBrokers
* @return
* @throws Exception
*/
public static boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker,
final PulsarService pulsar,
final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange,
Set<String> candidateBrokers) throws Exception {
Map<String, Integer> brokerNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar, namespace,
brokerToNamespaceToBundleRange).get(10, TimeUnit.SECONDS);
if (brokerNamespaceCount != null && !brokerNamespaceCount.isEmpty()) {
int leastNsCount = Integer.MAX_VALUE;
int currentBrokerNsCount = 0;
for (String broker : candidateBrokers) {
int nsCount = brokerNamespaceCount.getOrDefault(broker, 0);
if (currentBroker.equals(broker)) {
currentBrokerNsCount = nsCount;
}
if (leastNsCount > nsCount) {
leastNsCount = nsCount;
}
}
// check if there is any other broker has less number of ns
if (leastNsCount == 0 || currentBrokerNsCount > leastNsCount) {
return true;
}
// check if all the brokers having same number of ns-count then broker can't unload
int leastNsOwnerBrokers = 0;
for (String broker : candidateBrokers) {
if (leastNsCount == brokerNamespaceCount.getOrDefault(broker, 0)) {
leastNsOwnerBrokers++;
}
}
// if all candidate brokers own same-number of ns then broker can't unload
return candidateBrokers.size() != leastNsOwnerBrokers;
}
return true;
}
public interface BrokerTopicLoadingPredicate {
boolean isEnablePersistentTopics(String brokerUrl);
boolean isEnableNonPersistentTopics(String brokerUrl);
}
/**
* It filters out brokers which owns topic higher than configured threshold at
* {@link ServiceConfiguration.loadBalancerBrokerMaxTopics}. <br/>
* if all the brokers own topic higher than threshold then it resets the list with original broker candidates
*
* @param brokerCandidateCache
* @param loadData
* @param loadBalancerBrokerMaxTopics
*/
public static void filterBrokersWithLargeTopicCount(Set<String> brokerCandidateCache, LoadData loadData,
int loadBalancerBrokerMaxTopics) {
Set<String> filteredBrokerCandidates = brokerCandidateCache.stream().filter((broker) -> {
BrokerData brokerData = loadData.getBrokerData().get(broker);
long totalTopics = brokerData != null && brokerData.getPreallocatedBundleData() != null
? brokerData.getPreallocatedBundleData().values().stream()
.mapToLong((preAllocatedBundle) -> preAllocatedBundle.getTopics()).sum()
+ brokerData.getLocalData().getNumTopics()
: 0;
return totalTopics <= loadBalancerBrokerMaxTopics;
}).collect(Collectors.toSet());
if (!filteredBrokerCandidates.isEmpty()) {
brokerCandidateCache.clear();
brokerCandidateCache.addAll(filteredBrokerCandidates);
}
}
}