New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Bugs Introduced by New Load Manager #332
Conversation
@@ -310,6 +310,8 @@ public void start() throws PulsarServerException { | |||
|
|||
@Override | |||
public void disableBroker() throws Exception { | |||
loadReportCacheZk.unregisterListener(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add this logic to stop()
method which is empty for both the loadManager right now.
And I think we should not delete the znode when we dynamically change load-manager, but new load-manager should just update new load-repot on this path
can we also add unit test for changes. |
@@ -310,6 +310,7 @@ public void start() throws PulsarServerException { | |||
|
|||
@Override | |||
public void disableBroker() throws Exception { | |||
stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think disableBroker
should just disable broker by deleting the node and it should not stop the load-manager.
// do nothing | ||
loadReportCacheZk.shutdown(); | ||
availableActiveBrokers.shutdown(); | ||
scheduler.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should handle future scheduling task if it gets triggered by zk-watch, as we have already shutdown
the scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to say that it is already handled? There is actually an issue in that both the LocalBrokerData
and LoadReport
caches use the same underlying cache, being pulsar.getLocalZkCache()
. What this means is that, if we were not to shutdown the caches, the integrity of the Object in the cache is determined by which watch fires last. This is the cause of the ClassCastException
: Even though the scheduler is shutdown, the watch fires for all previously created load managers, deserializing the json to either a LocalBrokerData
or LoadReport
. The last one to do so wins, which could cause us to read a LoadReport
when we want a LocalBrokerData
or vice-versa.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I mean before scheduling task scheduler.isShutdown()
in case scheduler is already shutdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, one more concern:
- Now, we have two
ZooKeeperChildrenCache
objects (one in each load-manager) and both use the same zk-session to get data from zk. - Now, while getting data from zk, we pass these objects as watch. So, we will set multiple watch for the same znode
- and I think per zk-session we may get only 1 watch so, there could be possibility that new load-manager may not get watch ??
@merlimat
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no particular promblem in having multiple watches on the same znode within the same session
@merlimat @saandrews I think we can include #336 and this #332 PR for patch. so, can you please review this one as well. |
|
||
private static final Deserializer<ServiceLookupData> serviceLookupDataDeserializer = (key, content) -> { | ||
final String jsonString = new String(content); | ||
if (jsonString.contains("\"allocatedCPU\":")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we select deserialize class based on active load-balancer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I would prefer that. Check which implementation is active and then try both format, starting from the active one and falling back to the inactive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually as we have addressed it in #338 and
- Both
LoadReport
andLocalBrokerData
extendsServiceLookupData
which has same getter name for broker-urls - ObjectMapperFactory doesn't fail on unknown fields
So, any deserializer will always be able to parse both load-report json so, I think deserialize load report based on active load-manager #338 will be enough. Do, you think we should have logic for falling back on top of deserialize load report based on active load-manager #338 ?
@bobbeyreese can you rebase with master to get the change and if we think to fall back then we can add logic on top of it.
public ZooKeeperDataCache(final ZooKeeperCache cache) { | ||
this.cache = cache; | ||
isShutdown = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use AtomicFieldUpdater
here.
} | ||
} | ||
|
||
public void shutdown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename it to close() or something else? The name could confuse us since we call shutdown while stopping the service.
} | ||
} | ||
|
||
public void shutdown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* Fix bugs introduced by new load manager
This PR is a partial fix of apache#332. Before this PR, the MessagePublishContext completed with the current offset, which is the latest offset. Then PartitionResponse will be filled with the offset. However, Kafka producer treat the PartitionResponse's offset as the base offset. For example, before this PR, when Kafka producer sends a batch with 3 single messages to a new topic, the offsets in send callback will be (2, 3, 4) before this PR. After this PR, the offsets in send callback will be (0, 1, 2).
Fixes apache#332 The offset in produce callback may be not accurate when the messages are sent in batch, see streamnative/kop#332 (comment) for detail explanation. Since apache#9257 introduced a new API that supports fetching some metadata from the entry data, we can use this API to get the accurate offset.
Motivation
The new load management API introduced a few bugs that have been observed, namely that
LoadReport
->LocalBrokerData
and vice-versa).NamespaceService
will fail to constructLookupResult
since JSON cannot instantiate the abstract classServiceLookupData
.LeastLongTermMessageRate
always reported that brokers were overloaded due to an integer division bugModifications
Both load managers will now shutdown their schedulers and unregister themselves as watchers. Some log statements were added for the new load manager.
Result
Some bugs introduced by the new load manager will be fixed.