-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue #1117: handle race in concurrent bundle split #1428
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change looks good, just minor comments
}); | ||
|
||
return future; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be simplified into:
return cache.getDataAsync(path, this, this).whenComplete((entry, ex) -> {
if (ex != null) {
cache.asyncInvalidate(path);
}
});
|
||
final CompletableFuture<Void> unloadFuture = new CompletableFuture<>(); | ||
final int retryTimes = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this constant as a private static final int BUNDLE_SPLIT_RETRY_LIMIT = 7
in class
@@ -1022,7 +1022,18 @@ public AuthenticationService getAuthenticationService() { | |||
} | |||
|
|||
public List<Topic> getAllTopicsFromNamespaceBundle(String namespace, String bundle) { | |||
return multiLayerTopicsMap.get(namespace).get(bundle).values(); | |||
if (multiLayerTopicsMap.get(namespace) == null) { | |||
return Lists.newArrayList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would Collections.emptyList()
work ?
return Lists.newArrayList(); | ||
} | ||
|
||
if (multiLayerTopicsMap.get(namespace).get(bundle) == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The multiLayerTopicsMap
might have been already changed at this point.
The safest approach here would to do something like:
Map<X,Y> map1 = multiLayerTopicsMap.get(namespace);
if (map1 == null) {
return Collections.emptyList();
}
Map<Z,W> map2 = map1.get(bundle);
if (map2 == null) {
return Collections.emptyList();
}
return map2.values();
@rdhabalia Please take a look as well |
@@ -105,6 +108,38 @@ public LocalPolicies deserialize(String path, byte[] content) throws Exception { | |||
|
|||
return future; | |||
} | |||
|
|||
@Override | |||
public CompletableFuture<Optional<Entry<LocalPolicies, Stat>>> getWithStatAsync(String path) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we remove getAsync(String path)
as it has duplicate logic and it is being used by bundle split only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Seems a lot of other place call it indirectly, remove it will cause error of BrokerServiceTest/testCreateNamespacePolicy at least.
Added it back in latest commit, which removed the duplicated logic.
// If success updateNamespaceBundles, then do invalidateBundleCache and unload. | ||
// Else retry splitAndOwnBundleOnceAndRetry. | ||
updateFuture.whenCompleteAsync((r, t)-> { | ||
if (t != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we retry only when we get BadVersion error?
() -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture)); | ||
} else { | ||
// Retry enough, fail this. | ||
String msg2 = format("Finish retry, %s not success update nsBundles, reason %S", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reason %s
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Thanks @merlimat @rdhabalia for your comments, changed it and rebased to catch up master. |
retest this please |
@zhaijack Please merge with master that has fix for a very flaky test. Also it appears there's a test that might be genuiniley failing: I don't remember having seen that test failing before. |
@merlimat thanks, will handle it now |
Motivation
in
NamespaceService.splitAndOwnBundle()
.The concurrent update of Policies of same namespace will cause some override to bundles value that kept in Zookeeper. And this involves at least 2 rounds of split.
e.g.
NamespaceBundles
value that returned byNamespaceBundleFactory.splitBundles()
would be:[0x00000000, 0x3fffffff, 0x7fffffff, 0xffffffff];
while at the same time, the
NamespaceBundles
value for second bundle split would be:[0x00000000, 0x7fffffff, 0xbfffffff, 0xffffffff]
updateNamespaceBundles
last, will override the former one.Here if second split finally success, it will override the former success value [0x00000000, 0x3fffffff, 0x7fffffff, 0xffffffff], and the final bundles partitions would be: [0x00000000, 0x7fffffff, 0xbfffffff, 0xffffffff]; But we thought it should be [0x00000000, 0x3fffffff, 0x7fffffff, 0xbfffffff, 0xffffffff];
And this will cause the error:
java.lang.IllegalArgumentException: Invalid upper boundary for bundle at com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
Modifications
Result
No user behavior will change. This concurrent bug fixed.
Fixes #1117