-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add load shedding strategy #345
Conversation
this.brokerData = new ConcurrentHashMap<>(); | ||
this.bundleData = new ConcurrentHashMap<>(); | ||
} | ||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
convert tabs --> spaces
LoadManagerShared.getNamespaceNameFromBundleName(bundle), | ||
LoadManagerShared.getBundleRangeFromBundleName(bundle)); | ||
if (!LoadManagerShared.isUnloadDisabledInLoadShedding(pulsar)) { | ||
if (getAvailableBrokers().size() > 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of indenting everything, consider doing :
if (LoadManagerShared.isUnloadDisabledInLoadShedding(pulsar)) {
return;
}
if (getAvailableBrokers().size() <= 1) {
return;
}
// continue
// map. | ||
final Iterator<Map.Entry<String, Long>> recentlyUnloadedIterator = loadData.getRecentlyUnloadedBundles() | ||
.entrySet().iterator(); | ||
while (recentlyUnloadedIterator.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part could be simplified by using a filter like:
loadData.getRecentlyUnloadedBundles().removeIf( e -> e.getValue() < TIMEOUT );
( don't look at the actual check above..)
@@ -0,0 +1,91 @@ | |||
package com.yahoo.pulsar.broker.loadbalance.impl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copyright header missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.. just few minor comments
return; | ||
} | ||
if (getAvailableBrokers().size() <= 1) { | ||
log.warn("Only 1 broker available: no load shedding will be performed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be log.info
?
final LocalBrokerData localData = brokerData.getLocalData(); | ||
final double maxUsage = localData.getMaxResourceUsage(); | ||
if (maxUsage >= overloadThreshold) { | ||
log.info("Attempting to shed load on {}, which has max resource usage {}%", broker, maxUsage); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, but should we check if atleast one broker is available (usage is < maxUsage
) which can load the bundle. Else, in case if all the brokers reached to maxUsage
, they will keep juggling bundles rather they should not unload until at least one broker is available.??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, ideally if they were all overloaded, the bundle would have an opportunity to be assigned to a less loaded broker by being unloaded. Then, the grace period would be set highly enough so that juggling would not be a major issue. What are your thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, the grace period would be set highly enough so that juggling would not be a major issue.
but don't we set grace-period at server-startup? will broker change this grace-period dynamically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean that it should be set highly enough on startup -- i.e., maybe something like 5x the load-shedding interval? Of course we could make it dynamic, but I feel like just always having it high makes sense since we would likely very infrequently want to unload the same bundle twice except over a long period of time
final String broker = entry.getKey(); | ||
final String bundle = entry.getValue(); | ||
log.info("Unloading bundle: {}", bundle); | ||
adminCache.get("http://" + broker).namespaces().unloadNamespaceBundle( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any specific reason of having LoadingCache<String, PulsarAdmin> adminCache
and not having only one PulsarAdmin
? Because, adminClient uses vip and request can go to any broker, but if it is not right broker then it just redirects to correct one. So, I think redirection is less expensive than keeping AdminClient
for each broker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That reasoning seems sound, I kept the admin cache because that's how it was done on SimpleLoadManagerImpl
, but since they would not seem to be repercussions for just using one, I will take it out.
@@ -448,19 +445,34 @@ public void disableBroker() throws PulsarServerException { | |||
*/ | |||
@Override | |||
public synchronized void doLoadShedding() { | |||
if (LoadManagerShared.isUnloadDisabledInLoadShedding(pulsar)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to add unit-test case for loadShedding
of ModularLoadManagerImpl
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will add one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@bobbeyreese Can you resolve the minor conflict with master? |
Motivation
ModularLoadManagerImpl
was added without load shedding supported. This PR adds a load shedding strategy that will attempt to unload the most expensive bundle from overloaded brokers.Modifications
OverloadShedder
has been added.Result
Load shedding will now be performed if the conditions checked in
LoadManagerShared.isUnloadDisabledInLoadShedding
, which was moved fromSimpleLoadManagerImpl
.