Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-45: Migrate NamespaceService to use MetadataStore #10532

Merged
merged 31 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3f82f3f
PIP-45: Migrate NamespaceService to use MetadataStore
merlimat May 6, 2021
cd8b7bb
Fixed copying policies to local store
merlimat May 11, 2021
b7d4dc3
Fixed checkstyle
merlimat May 11, 2021
af21315
Fixed tests
merlimat May 12, 2021
e783e26
Fixed test
merlimat May 12, 2021
fc4d2ee
Fixed handling of BadVersionException
merlimat May 17, 2021
f2b69b2
Converted bundle split into an HTTP async operation
merlimat May 17, 2021
155d23b
Merge remote-tracking branch 'apache/master' into namespace-service
merlimat May 17, 2021
d7b9dd2
Merge remote-tracking branch 'apache/master' into namespace-service
merlimat May 18, 2021
373c094
Fixed issues in concurrent bundle splits
merlimat May 18, 2021
e0df1d7
Fixed checkstyle
merlimat May 18, 2021
f1c72e6
Make sure the ownership is triggered for the first attempt
merlimat May 18, 2021
89a8682
validateTopicOwnershipAsync
merlimat May 20, 2021
1f52f28
Fixed deadlock when removeTopicFromCache calls NamespaceService.getBu…
merlimat May 21, 2021
14a47c9
Fixed checkstyle
merlimat May 21, 2021
818bfee
Fixed handling of future composition in validateTopicOwnershipAsync
merlimat May 21, 2021
7b40b70
Merge remote-tracking branch 'apache/master' into namespace-service
merlimat May 22, 2021
cd27654
Merge remote-tracking branch 'apache/master' into namespace-service
merlimat May 22, 2021
03530ec
In internalCreateSubscriptionForNonPartitionedTopic() we shouldn't ha…
merlimat May 22, 2021
a7d7f3a
Fixed checkstyle
merlimat May 22, 2021
dcce868
Fixed internalUnloadNonPartitionedTopic
merlimat May 22, 2021
bb57243
Fixed mixed sync-async usage of NamespaceService.getBundle
merlimat May 23, 2021
f5970c4
Use awaitility in AdminTest
merlimat May 23, 2021
c1a5114
validateGlobalNamespaceOwnership should also be done asynchronously
merlimat May 24, 2021
b5e7b75
Fixed checkstyle
merlimat May 24, 2021
9820962
Fixed missing ,
merlimat May 24, 2021
f98ff8c
Try to get stack traces for timedout test
merlimat May 24, 2021
f9b3314
Added async version of getTopicReference
merlimat May 24, 2021
012f12c
Removed import
merlimat May 24, 2021
fb17e10
Fixed test mocks
merlimat May 24, 2021
d813e56
Reflect that removeTopicFromCache is an async operation
merlimat May 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ public Boolean get() {

state = State.Started;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e);
throw new PulsarServerException(e);
} finally {
mutex.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -128,13 +126,6 @@ protected void validateAdminAccessForTenant(String property) {
super.validateAdminAccessForTenant(property);
}

// This is a stub method for Mockito
@Override
protected void validateNamespaceOwnershipWithBundles(String property, String cluster, String namespace,
boolean authoritative, boolean readOnly, BundlesData bundleData) {
super.validateNamespaceOwnershipWithBundles(property, cluster, namespace, authoritative, readOnly, bundleData);
}

// This is a stub method for Mockito
@Override
protected void validateBundleOwnership(String property, String cluster, String namespace, boolean authoritative,
Expand Down Expand Up @@ -336,9 +327,8 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
Policies policies = namespaceResources().get(policyPath)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
BundlesData bundleData = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName).getBundlesData();
policies.bundles = bundleData != null ? bundleData : policies.bundles;

return policies;
Expand All @@ -364,7 +354,7 @@ protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName na
.thenCompose(bundles -> {
BundlesData bundleData = null;
try {
bundleData = NamespaceBundleFactory.getBundlesData(bundles);
bundleData = bundles.getBundlesData();
} catch (Exception e) {
log.error("[{}] Failed to get namespace policies {}", clientAppId(), namespaceName, e);
return FutureUtil.failedFuture(new RestException(e));
Expand Down Expand Up @@ -582,9 +572,8 @@ protected Policies getNamespacePolicies(String property, String cluster, String
Policies policies = namespaceResources().get(AdminResource.path(POLICIES, property, cluster, namespace))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(NamespaceName.get(property, cluster, namespace));
BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
BundlesData bundleData = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(NamespaceName.get(property, cluster, namespace)).getBundlesData();
policies.bundles = bundleData != null ? bundleData : policies.bundles;
return policies;
} catch (RestException re) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public CompletableFuture<OwnedBundle> asyncLoad(String namespaceBundleZNode, Exe
}
ownershipReadOnlyCache.invalidate(namespaceBundleZNode);
future.complete(new OwnedBundle(
ServiceUnitZkUtils.suBundleFromPath(namespaceBundleZNode, bundleFactory)));
ServiceUnitUtils.suBundleFromPath(namespaceBundleZNode, bundleFactory)));
} else {
// Failed to acquire lock
future.completeExceptionally(KeeperException.create(rc));
Expand Down Expand Up @@ -184,7 +184,7 @@ private CompletableFuture<Optional<Map.Entry<NamespaceEphemeralData, Stat>>> res
Stat stat = ownerDataWithStat.getValue();
if (stat.getEphemeralOwner() == localZkCache.getZooKeeper().getSessionId()) {
LOG.info("Successfully reestablish ownership of {}", path);
OwnedBundle ownedBundle = new OwnedBundle(ServiceUnitZkUtils.suBundleFromPath(path, bundleFactory));
OwnedBundle ownedBundle = new OwnedBundle(ServiceUnitUtils.suBundleFromPath(path, bundleFactory));
if (selfOwnerInfo.getNativeUrl().equals(ownerDataWithStat.getKey().getNativeUrl())) {
ownedBundlesCache.put(path, CompletableFuture.completedFuture(ownedBundle));
}
Expand All @@ -206,14 +206,8 @@ public CompletableFuture<Boolean> checkOwnership(NamespaceBundle bundle) {
if (ownedBundle != null) {
return CompletableFuture.completedFuture(true);
}
String bundlePath = ServiceUnitZkUtils.path(bundle);
return resolveOwnership(bundlePath).thenApply(optionalOwnedDataWithStat -> {
if (!optionalOwnedDataWithStat.isPresent()) {
return false;
}
Stat stat = optionalOwnedDataWithStat.get().getValue();
return stat.getEphemeralOwner() == localZkCache.getZooKeeper().getSessionId();
});
String bundlePath = ServiceUnitUtils.path(bundle);
return resolveOwnership(bundlePath).thenApply(Optional::isPresent);
}

/**
Expand All @@ -226,7 +220,7 @@ public CompletableFuture<Boolean> checkOwnership(NamespaceBundle bundle) {
* throws exception if no ownership info is found
*/
public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle suName) {
String path = ServiceUnitZkUtils.path(suName);
String path = ServiceUnitUtils.path(suName);

CompletableFuture<OwnedBundle> ownedBundleFuture = ownedBundlesCache.getIfPresent(path);
if (ownedBundleFuture != null) {
Expand All @@ -250,7 +244,7 @@ public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(Namespa
* @throws Exception
*/
public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle bundle) throws Exception {
String path = ServiceUnitZkUtils.path(bundle);
String path = ServiceUnitUtils.path(bundle);

CompletableFuture<NamespaceEphemeralData> future = new CompletableFuture<>();

Expand Down Expand Up @@ -311,7 +305,7 @@ public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(Namespace
*/
public CompletableFuture<Void> removeOwnership(NamespaceBundle bundle) {
CompletableFuture<Void> result = new CompletableFuture<>();
String key = ServiceUnitZkUtils.path(bundle);
String key = ServiceUnitUtils.path(bundle);
localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> {
// Invalidate cache even in error since this operation may succeed in server side.
ownedBundlesCache.synchronous().invalidate(key);
Expand Down Expand Up @@ -375,7 +369,7 @@ public boolean isNamespaceBundleOwned(NamespaceBundle bundle) {
* @return
*/
public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
CompletableFuture<OwnedBundle> future = ownedBundlesCache.getIfPresent(ServiceUnitZkUtils.path(bundle));
CompletableFuture<OwnedBundle> future = ownedBundlesCache.getIfPresent(ServiceUnitUtils.path(bundle));
if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
return future.join();
} else {
Expand All @@ -390,7 +384,7 @@ public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
* @throws Exception
*/
public CompletableFuture<Void> disableOwnership(NamespaceBundle bundle) {
String path = ServiceUnitZkUtils.path(bundle);
String path = ServiceUnitUtils.path(bundle);
CompletableFuture<Void> future = new CompletableFuture<>();

updateBundleState(bundle, false)
Expand Down Expand Up @@ -428,7 +422,7 @@ public CompletableFuture<Void> disableOwnership(NamespaceBundle bundle) {
* @throws Exception
*/
public CompletableFuture<Void> updateBundleState(NamespaceBundle bundle, boolean isActive) {
String path = ServiceUnitZkUtils.path(bundle);
String path = ServiceUnitUtils.path(bundle);
// Disable owned instance in local cache
CompletableFuture<OwnedBundle> f = ownedBundlesCache.getIfPresent(path);
if (f != null && f.isDone() && !f.isCompletedExceptionally()) {
Expand All @@ -442,10 +436,6 @@ public void invalidateLocalOwnerCache() {
this.ownedBundlesCache.synchronous().invalidateAll();
}

public NamespaceEphemeralData getSelfOwnerInfo() {
return selfOwnerInfo;
}

public synchronized boolean refreshSelfOwnerInfo() {
if (selfOwnerInfo.getNativeUrl() == null) {
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getSafeBrokerServiceUrl(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.namespace;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.policies.data.Policies.LAST_BOUNDARY;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;

/**
* This class encapsulate some utility functions for
* <code>ServiceUnit</code> related metadata operations.
*/
public final class ServiceUnitUtils {
/**
* <code>ZooKeeper</code> root path for namespace ownership info.
*/
public static final String OWNER_INFO_ROOT = LocalZooKeeperCacheService.OWNER_INFO_ROOT;

public static String path(NamespaceBundle suname) {
// The ephemeral node path for new namespaces should always have bundle name appended
return OWNER_INFO_ROOT + "/" + suname.toString();
}

public static NamespaceBundle suBundleFromPath(String path, NamespaceBundleFactory factory) {
String[] parts = path.split("/");
checkArgument(parts.length > 2);
checkArgument(parts[1].equals("namespace"));
checkArgument(parts.length > 4);

if (parts.length > 5) {
// this is a V1 path prop/cluster/namespace/hash
Range<Long> range = getHashRange(parts[5]);
return factory.getBundle(NamespaceName.get(parts[2], parts[3], parts[4]), range);
} else {
// this is a V2 path prop/namespace/hash
Range<Long> range = getHashRange(parts[4]);
return factory.getBundle(NamespaceName.get(parts[2], parts[3]), range);
}
}

private static Range<Long> getHashRange(String rangePathPart) {
String[] endPoints = rangePathPart.split("_");
checkArgument(endPoints.length == 2, "Malformed bundle hash range path part:" + rangePathPart);
Long startLong = Long.decode(endPoints[0]);
Long endLong = Long.decode(endPoints[1]);
BoundType endType = (endPoints[1].equals(LAST_BOUNDARY)) ? BoundType.CLOSED : BoundType.OPEN;
return Range.range(startLong, BoundType.CLOSED, endLong, endType);
}
}
Loading