-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async ZooKeeper cache implementation #76
Conversation
CLA is valid! |
2 similar comments
CLA is valid! |
CLA is valid! |
d9fda91
to
2d88225
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs few small fixes but the change seems reasonable.
@@ -202,6 +204,8 @@ public void testUnloadIfBrokerCrashes() { | |||
|
|||
log.info("Lookup for namespace {}", destination); | |||
|
|||
System.err.println("CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should remove or replace with a meaningful message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ups, I forgot to remove these debugging lines
policies = this.pulsar.getLocalZkCacheService().policiesCache().get(path); | ||
} | ||
policies.bundles = getBundlesData(nsBundles); | ||
|
||
policies.get().bundles = getBundlesData(nsBundles); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure but policies.get()
can be null
here if creating new policies failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If creating fails, it should throw an exception and thus skip that line.
// TODO: make this admin call asynchronous | ||
pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundle(nsName.toString(), | ||
nsBundle.getBundleRange(), subscription); | ||
} | ||
} catch (Exception e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intended or just left behind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I forgot it here
@@ -212,34 +222,60 @@ public NamespaceEphemeralData getOwner(ServiceUnitId suname) throws Exception { | |||
* @return The ephemeral node data showing the current ownership info in <code>ZooKeeper</code> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: should we replace ServiceUnit
with NamespaceBundle
at method-docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
if (rc == Code.OK.intValue()) { | ||
try { | ||
T obj = deserializer.deserialize(path, content); | ||
zkFuture.complete(new AbstractMap.SimpleEntry<Object, Stat>(obj, stat)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we create AbstractMap.SimpleImmutableEntry
here.?
return zkFuture; | ||
}).thenAccept(result -> { | ||
if (result != null) { | ||
future.complete(Optional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead creating a new object can we just cast the result:
future.complete(Optional.of(((Entry<T, Stat>) result)));
@@ -119,4 +120,17 @@ public void replaceAll(UnaryOperator operator) { | |||
return EMPTY_LIST; | |||
} | |||
|
|||
public static void main(String[] args) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we move it to test class.
2d88225
to
126e600
Compare
88154aa
to
238faf8
Compare
} catch (Exception e) { | ||
log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e); | ||
throw new RestException(e); | ||
if (e instanceof RestException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any usecase which can throw exception other than RestException
? If not then we may not need this try-catch block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we could get ZK error when trying to read
.get(path("clusters", cluster, "namespaceIsolationPolicies")); | ||
.get(path("clusters", cluster, "namespaceIsolationPolicies")) | ||
.orElseThrow(() -> new RestException(Status.NOT_FOUND, | ||
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we log warn-message where-ever we are throwing RestException
in ``orElseThrow`, which may help at broker-debugging incase if requires.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the error is being logged at line 227
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), | ||
@ApiResponse(code = 404, message = "Cluster doesn't exist") }) | ||
public Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(@PathParam("cluster") String cluster) | ||
throws Exception { | ||
validateSuperUserAccess(); | ||
validateClusterExists(cluster); | ||
if (!clustersCache().get(path("clusters", cluster)).isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we use validateClusterExists()
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validateClusterExists()
throws 412 HTTP error if the cluster doesn't exist. But in this case we to return 404.
public void createPolicies(String path, boolean readFromGlobal) throws Exception { | ||
@SuppressWarnings("deprecation") | ||
public CompletableFuture<Optional<LocalPolicies>> createPolicies(String path, boolean readFromGlobal) { | ||
checkNotNull(path, "path can't be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead throwing runtime-exception, should we just complete future with failure. As caller of getAsync()
might not be handling the exception ie: OwnershipCache.getOwnerAsync()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case is more of a sanity check.. we could even remove the check alltogether
if (LOG.isDebugEnabled()) { | ||
LOG.debug("Global policies not found at {}", globalPath); | ||
} | ||
future.complete(Optional.empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it readFromGlobalFuture.complete(Optional.empty());
or we can return future
as it's already complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I guess both options would work here.
} | ||
checkNotNull(serviceUnit); | ||
|
||
public boolean isServiceUnitDisabled(NamespaceBundle bundle) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: as this method is under NamespaceService
and we have changed argument as NamespaceBundle
then should we rename this method as : isNamespaceBundleDisabled()
@@ -151,6 +153,14 @@ public BrokerService(PulsarService pulsar) throws Exception { | |||
this.pulsarStats = new PulsarStats(pulsar); | |||
this.offlineTopicStatCache = new ConcurrentOpenHashMap<>(); | |||
|
|||
ServiceConfiguration serviceConfig = pulsar.getConfiguration(); | |||
this.defaultPolicies = new Policies(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we use defaultPolicies
anywhere.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. That initialization was moved to when the topic is loaded.
public CompletableFuture<Optional<T>> getAsync(String path) { | ||
CompletableFuture<Optional<T>> future = new CompletableFuture<>(); | ||
cache.getDataAsync(path, this, this).thenAccept(entry -> { | ||
future.complete(entry.map(Entry::getKey)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a change of NPE:
ZookeeperCache.getDataAsync returns null
as future-result in case of NoNodeException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this should be OK, the zkFuture.complete(null)
is to make sure the AsyncLoadingCache
will not retain the entry in the map. That null
value is then converted into an Optional.empty()
:
}).thenAccept(result -> {
if (result != null) {
future.complete(Optional.of((Entry<T, Stat>) result));
} else {
future.complete(Optional.empty());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.. I missed the last part of ZookeeperCache.getDataAsync. We should be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. 👍
* Name our threads * We already set the name of runner thread
Motivation
Current implementation of Pulsar ZooKeeper cache is synchronous and based on Guava
cache class. Whenever a
get()
request doesn't find an object in the cache, it willbe fetched from ZooKeeper, deserialized and inserted into cache.
The synchronous nature of Guava cache, forces us to use sync ZK API, since the
cache loading has to be performed atomically.
To enable ZK cache lookups from IO threads, we need to be able to change
cache interface to expose asynchronous methods, that returns a
CompletableFuture
rather than the real object.
This change is needed by #5
Modifications
Optional<>
to treat ZKNoNodeError
instead of propagating the exception to the cacheResult