Skip to content

Commit

Permalink
--story=871860657 namespace bundle split 支持按照特定topic切分(PIP-143#13796)…
Browse files Browse the repository at this point in the history
… (merge request !67)


Squash merge branch 'split' into '2.8.1'
--story=871860657 namespace bundle split 支持按照特定topic切分(PIP-143#13796)

TAPD: --story=871860657
  • Loading branch information
mayozhang committed Mar 17, 2022
1 parent 4cb00a8 commit b56bf2e
Show file tree
Hide file tree
Showing 25 changed files with 916 additions and 69 deletions.
2 changes: 1 addition & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManage
# Supported algorithms name for namespace bundle split.
# "range_equally_divide" divides the bundle into two parts with the same hash range size.
# "topic_count_equally_divide" divides the bundle into two parts with the same topics count.
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide

# Default algorithm name for namespace bundle split
defaultNamespaceBundleSplitAlgorithm=range_equally_divide
Expand Down
2 changes: 1 addition & 1 deletion deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManage
# Supported algorithms name for namespace bundle split.
# "range_equally_divide" divides the bundle into two parts with the same hash range size.
# "topic_count_equally_divide" divides the bundle into two parts with the same topics count.
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide

# Default algorithm name for namespace bundle split
defaultNamespaceBundleSplitAlgorithm=range_equally_divide
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_LOAD_BALANCER,
doc = "Supported algorithms name for namespace bundle split"
)
private List<String> supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide", "topic_count_equally_divide");
private List<String> supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide",
"topic_count_equally_divide", "specified_positions_divide");
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.net.URI;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -99,8 +100,10 @@
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicHashPositions;
import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
Expand Down Expand Up @@ -1064,8 +1067,8 @@ public void internalUnloadNamespaceBundle(AsyncResponse asyncResponse, String bu
}

@SuppressWarnings("deprecation")
protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleRange,
boolean authoritative, boolean unload, String splitAlgorithmName) {
protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleRange, boolean authoritative,
boolean unload, String splitAlgorithmName, List<Long> splitBoundaries) {
validateSuperUserAccess();
checkNotNull(bundleRange, "BundleRange should not be null");
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
Expand All @@ -1084,11 +1087,17 @@ protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String

List<String> supportedNamespaceBundleSplitAlgorithms =
pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
if (StringUtils.isNotBlank(splitAlgorithmName)
&& !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unsupported namespace bundle split algorithm, supported algorithms are "
+ supportedNamespaceBundleSplitAlgorithms));
if (StringUtils.isNotBlank(splitAlgorithmName)) {
if (!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unsupported namespace bundle split algorithm, supported algorithms are "
+ supportedNamespaceBundleSplitAlgorithms));
}
if (splitAlgorithmName.equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
&& (splitBoundaries == null || splitBoundaries.size() == 0)) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"With specified_positions_divide split algorithm, splitBoundaries must not be emtpy"));
}
}

NamespaceBundle nsBundle;
Expand All @@ -1102,7 +1111,7 @@ protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String
}

pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName))
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries)
.thenRun(() -> {
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
asyncResponse.resume(Response.noContent().build());
Expand Down Expand Up @@ -1132,6 +1141,56 @@ private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(Str
return algorithm;
}

protected void internalGetTopicHashPositions(AsyncResponse asyncResponse, String bundleRange, List<String> topics) {
if (log.isDebugEnabled()) {
log.debug("[{}] Getting hash position for topic list {}, bundle {}", clientAppId(), topics, bundleRange);
}
validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
false, true);
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete(
(allTopicsInThisBundle, throwable) -> {
if (throwable != null) {
log.error("[{}] {} Failed to get topic list for bundle {}.", clientAppId(),
namespaceName, bundle);
asyncResponse.resume(new RestException(throwable));
}
// if topics is empty, return all topics' hash position in this bundle
Map<String, Long> topicHashPositions = new HashMap<>();
if (topics == null || topics.size() == 0) {
allTopicsInThisBundle.forEach(t -> {
topicHashPositions.put(t,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(t));
});
} else {
for (String topic : topics.stream().map(Codec::decode).collect(Collectors.toList())) {
TopicName topicName = TopicName.get(topic);
// partitioned topic
if (topicName.getPartitionIndex() == -1) {
allTopicsInThisBundle.stream()
.filter(t -> TopicName.get(t).getPartitionedTopicName()
.equals(TopicName.get(topic).getPartitionedTopicName()))
.forEach(partition -> {
topicHashPositions.put(partition,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(partition));
});
} else { // topic partition
if (allTopicsInThisBundle.contains(topicName.toString())) {
topicHashPositions.put(topic,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(topic));
}
}
}
}
asyncResponse.resume(
new TopicHashPositions(namespaceName.toString(), bundleRange, topicHashPositions));
});
}

protected void internalSetPublishRate(PublishRate maxPublishMessageRate) {
validateSuperUserAccess();
log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,17 +599,33 @@ public void splitNamespaceBundle(
@PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload) {
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitBoundaries") @DefaultValue("") List<Long> splitBoundaries) {
try {
validateNamespaceName(property, cluster, namespace);
internalSplitNamespaceBundle(asyncResponse, bundleRange, authoritative, unload,
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME, splitBoundaries);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@GET
@Path("/{property}/{cluster}/{namespace}/{bundle}/topicHashPositions")
@ApiOperation(value = "Get hash positions for topics")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
public void getTopicHashPositions(
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@PathParam("bundle") String bundle,
@QueryParam("topics") List<String> topics,
@Suspended AsyncResponse asyncResponse) {
validateNamespaceName(property, cluster, namespace);
internalGetTopicHashPositions(asyncResponse, bundle, topics);
}

@POST
@Path("/{property}/{cluster}/{namespace}/publishRate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,18 +531,36 @@ public void splitNamespaceBundle(
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitAlgorithmName") String splitAlgorithmName) {
@QueryParam("splitAlgorithmName") String splitAlgorithmName,
@ApiParam("splitBoundaries") List<Long> splitBoundaries) {

try {
validateNamespaceName(tenant, namespace);
internalSplitNamespaceBundle(asyncResponse, bundleRange, authoritative, unload, splitAlgorithmName);
internalSplitNamespaceBundle(asyncResponse,
bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
@Path("/{tenant}/{namespace}/{bundle}/topicHashPositions")
@ApiOperation(value = "Get hash positions for topics")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
public void getTopicHashPositions(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("topics") List<String> topics,
@Suspended AsyncResponse asyncResponse) {
validateNamespaceName(tenant, namespace);
internalGetTopicHashPositions(asyncResponse, bundleRange, topics);
}

@POST
@Path("/{property}/{namespace}/publishRate")
@ApiOperation(hidden = true, value = "Set publish-rate throttling for all topics of the namespace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.BundleSplitOption;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
Expand Down Expand Up @@ -795,11 +796,12 @@ public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exceptio
* @throws Exception
*/
public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean unload,
NamespaceBundleSplitAlgorithm splitAlgorithm) {
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {

final CompletableFuture<Void> unloadFuture = new CompletableFuture<>();
final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm);
splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm, boundaries);

return unloadFuture;
}
Expand All @@ -808,13 +810,20 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
boolean unload,
AtomicInteger counter,
CompletableFuture<Void> completionFuture,
NamespaceBundleSplitAlgorithm splitAlgorithm) {
splitAlgorithm.getSplitBoundary(this, bundle).whenComplete((splitBoundary, ex) -> {
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {
BundleSplitOption bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> {
CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();
if (ex == null) {
if (splitBoundaries == null || splitBoundaries.size() == 0) {
LOG.info("[{}] No valid boundary found in {} to split bundle {}",
bundle.getNamespaceObject().toString(), boundaries, bundle.getBundleRange());
completionFuture.complete(null);
return;
}
try {
bundleFactory.splitBundles(bundle,
2 /* by default split into 2 */, splitBoundary)
bundleFactory.splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries)
.thenAccept(splittedBundles -> {
// Split and updateNamespaceBundles. Update may fail because of concurrent write to
// Zookeeper.
Expand All @@ -827,14 +836,13 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,

checkNotNull(splittedBundles.getLeft());
checkNotNull(splittedBundles.getRight());
checkArgument(splittedBundles.getRight().size() == 2,
"bundle has to be split in two bundles");
checkArgument(splittedBundles.getRight().size() == splitBoundaries.size() + 1,
"bundle has to be split in " + (splitBoundaries.size() + 1) + " bundles");
NamespaceName nsname = bundle.getNamespaceObject();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, 2 bundles: {}, {}",
LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, bundles: {}",
nsname.toString(), bundle.getBundleRange(), counter.get(),
splittedBundles.getRight().get(0).getBundleRange(),
splittedBundles.getRight().get(1).getBundleRange());
splittedBundles.getRight());
}
try {
// take ownership of newly split bundles
Expand Down Expand Up @@ -878,7 +886,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
&& (counter.decrementAndGet() >= 0)) {
pulsar.getOrderedExecutor()
.execute(() -> splitAndOwnBundleOnceAndRetry(
bundle, unload, counter, completionFuture, splitAlgorithm));
bundle, unload, counter, completionFuture, splitAlgorithm, boundaries));
} else if (t instanceof IllegalArgumentException) {
completionFuture.completeExceptionally(t);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.naming;

import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.pulsar.broker.namespace.NamespaceService;

@Getter
@NoArgsConstructor
@AllArgsConstructor
public class BundleSplitOption {
private NamespaceService service;
private NamespaceBundle bundle;
private List<Long> positions;
}

0 comments on commit b56bf2e

Please sign in to comment.