diff --git a/conf/broker.conf b/conf/broker.conf index 688e23905caeaa..c9f474d0b3c215 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1039,7 +1039,7 @@ loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManage # Supported algorithms name for namespace bundle split. # "range_equally_divide" divides the bundle into two parts with the same hash range size. # "topic_count_equally_divide" divides the bundle into two parts with the same topics count. -supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide +supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide # Default algorithm name for namespace bundle split defaultNamespaceBundleSplitAlgorithm=range_equally_divide diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index 4ddc90b3e26b98..186251ae075a8e 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -928,7 +928,7 @@ loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManage # Supported algorithms name for namespace bundle split. # "range_equally_divide" divides the bundle into two parts with the same hash range size. # "topic_count_equally_divide" divides the bundle into two parts with the same topics count. -supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide +supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide # Default algorithm name for namespace bundle split defaultNamespaceBundleSplitAlgorithm=range_equally_divide diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 5a7951be700bbf..331e1d61b08737 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1828,7 +1828,8 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_LOAD_BALANCER, doc = "Supported algorithms name for namespace bundle split" ) - private List supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide", "topic_count_equally_divide"); + private List supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide", + "topic_count_equally_divide", "specified_positions_divide"); @FieldContext( dynamic = true, category = CATEGORY_LOAD_BALANCER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 1a08e6ced9ca87..fe0eafc0d488ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -32,6 +32,7 @@ import java.net.URI; import java.net.URL; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -99,8 +100,10 @@ import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.pulsar.common.policies.data.TenantOperation; +import org.apache.pulsar.common.policies.data.TopicHashPositions; import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; +import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; @@ -1064,8 +1067,8 @@ public void internalUnloadNamespaceBundle(AsyncResponse asyncResponse, String bu } @SuppressWarnings("deprecation") - protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleRange, - boolean authoritative, boolean unload, String splitAlgorithmName) { + protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleRange, boolean authoritative, + boolean unload, String splitAlgorithmName, List splitBoundaries) { validateSuperUserAccess(); checkNotNull(bundleRange, "BundleRange should not be null"); log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange); @@ -1084,11 +1087,17 @@ protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String List supportedNamespaceBundleSplitAlgorithms = pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms(); - if (StringUtils.isNotBlank(splitAlgorithmName) - && !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) { - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, - "Unsupported namespace bundle split algorithm, supported algorithms are " - + supportedNamespaceBundleSplitAlgorithms)); + if (StringUtils.isNotBlank(splitAlgorithmName)) { + if (!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "Unsupported namespace bundle split algorithm, supported algorithms are " + + supportedNamespaceBundleSplitAlgorithms)); + } + if (splitAlgorithmName.equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE) + && (splitBoundaries == null || splitBoundaries.size() == 0)) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + "With specified_positions_divide split algorithm, splitBoundaries must not be emtpy")); + } } NamespaceBundle nsBundle; @@ -1102,7 +1111,7 @@ protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String } pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, - getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)) + getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries) .thenRun(() -> { log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString()); asyncResponse.resume(Response.noContent().build()); @@ -1132,6 +1141,56 @@ private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(Str return algorithm; } + protected void internalGetTopicHashPositions(AsyncResponse asyncResponse, String bundleRange, List topics) { + if (log.isDebugEnabled()) { + log.debug("[{}] Getting hash position for topic list {}, bundle {}", clientAppId(), topics, bundleRange); + } + validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ); + Policies policies = getNamespacePolicies(namespaceName); + NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, + false, true); + pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete( + (allTopicsInThisBundle, throwable) -> { + if (throwable != null) { + log.error("[{}] {} Failed to get topic list for bundle {}.", clientAppId(), + namespaceName, bundle); + asyncResponse.resume(new RestException(throwable)); + } + // if topics is empty, return all topics' hash position in this bundle + Map topicHashPositions = new HashMap<>(); + if (topics == null || topics.size() == 0) { + allTopicsInThisBundle.forEach(t -> { + topicHashPositions.put(t, + pulsar().getNamespaceService().getNamespaceBundleFactory() + .getLongHashCode(t)); + }); + } else { + for (String topic : topics.stream().map(Codec::decode).collect(Collectors.toList())) { + TopicName topicName = TopicName.get(topic); + // partitioned topic + if (topicName.getPartitionIndex() == -1) { + allTopicsInThisBundle.stream() + .filter(t -> TopicName.get(t).getPartitionedTopicName() + .equals(TopicName.get(topic).getPartitionedTopicName())) + .forEach(partition -> { + topicHashPositions.put(partition, + pulsar().getNamespaceService().getNamespaceBundleFactory() + .getLongHashCode(partition)); + }); + } else { // topic partition + if (allTopicsInThisBundle.contains(topicName.toString())) { + topicHashPositions.put(topic, + pulsar().getNamespaceService().getNamespaceBundleFactory() + .getLongHashCode(topic)); + } + } + } + } + asyncResponse.resume( + new TopicHashPositions(namespaceName.toString(), bundleRange, topicHashPositions)); + }); + } + protected void internalSetPublishRate(PublishRate maxPublishMessageRate) { validateSuperUserAccess(); log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 529b8ce0cbe62a..a66ae86635d4e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -599,17 +599,33 @@ public void splitNamespaceBundle( @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @QueryParam("unload") @DefaultValue("false") boolean unload) { + @QueryParam("unload") @DefaultValue("false") boolean unload, + @QueryParam("splitBoundaries") @DefaultValue("") List splitBoundaries) { try { validateNamespaceName(property, cluster, namespace); internalSplitNamespaceBundle(asyncResponse, bundleRange, authoritative, unload, - NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME); + NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME, splitBoundaries); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { asyncResponse.resume(new RestException(e)); } } + @GET + @Path("/{property}/{cluster}/{namespace}/{bundle}/topicHashPositions") + @ApiOperation(value = "Get hash positions for topics") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace does not exist")}) + public void getTopicHashPositions( + @PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, + @PathParam("bundle") String bundle, + @QueryParam("topics") List topics, + @Suspended AsyncResponse asyncResponse) { + validateNamespaceName(property, cluster, namespace); + internalGetTopicHashPositions(asyncResponse, bundle, topics); + } @POST @Path("/{property}/{cluster}/{namespace}/publishRate") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 9a1f684c425147..88b566f8ac8705 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -531,11 +531,13 @@ public void splitNamespaceBundle( @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("unload") @DefaultValue("false") boolean unload, - @QueryParam("splitAlgorithmName") String splitAlgorithmName) { + @QueryParam("splitAlgorithmName") String splitAlgorithmName, + @ApiParam("splitBoundaries") List splitBoundaries) { try { validateNamespaceName(tenant, namespace); - internalSplitNamespaceBundle(asyncResponse, bundleRange, authoritative, unload, splitAlgorithmName); + internalSplitNamespaceBundle(asyncResponse, + bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { @@ -543,6 +545,22 @@ public void splitNamespaceBundle( } } + @GET + @Path("/{tenant}/{namespace}/{bundle}/topicHashPositions") + @ApiOperation(value = "Get hash positions for topics") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace does not exist")}) + public void getTopicHashPositions( + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("bundle") String bundleRange, + @QueryParam("topics") List topics, + @Suspended AsyncResponse asyncResponse) { + validateNamespaceName(tenant, namespace); + internalGetTopicHashPositions(asyncResponse, bundleRange, topics); + } + @POST @Path("/{property}/{namespace}/publishRate") @ApiOperation(hidden = true, value = "Set publish-rate throttling for all topics of the namespace") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index ae8d03014d0b9e..f680f5f6fb69ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -75,6 +75,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.lookup.data.LookupData; +import org.apache.pulsar.common.naming.BundleSplitOption; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; @@ -795,11 +796,12 @@ public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exceptio * @throws Exception */ public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle, boolean unload, - NamespaceBundleSplitAlgorithm splitAlgorithm) { + NamespaceBundleSplitAlgorithm splitAlgorithm, + List boundaries) { final CompletableFuture unloadFuture = new CompletableFuture<>(); final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT); - splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm); + splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm, boundaries); return unloadFuture; } @@ -808,13 +810,20 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, boolean unload, AtomicInteger counter, CompletableFuture completionFuture, - NamespaceBundleSplitAlgorithm splitAlgorithm) { - splitAlgorithm.getSplitBoundary(this, bundle).whenComplete((splitBoundary, ex) -> { + NamespaceBundleSplitAlgorithm splitAlgorithm, + List boundaries) { + BundleSplitOption bundleSplitOption = new BundleSplitOption(this, bundle, boundaries); + splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> { CompletableFuture> updateFuture = new CompletableFuture<>(); if (ex == null) { + if (splitBoundaries == null || splitBoundaries.size() == 0) { + LOG.info("[{}] No valid boundary found in {} to split bundle {}", + bundle.getNamespaceObject().toString(), boundaries, bundle.getBundleRange()); + completionFuture.complete(null); + return; + } try { - bundleFactory.splitBundles(bundle, - 2 /* by default split into 2 */, splitBoundary) + bundleFactory.splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries) .thenAccept(splittedBundles -> { // Split and updateNamespaceBundles. Update may fail because of concurrent write to // Zookeeper. @@ -827,14 +836,13 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, checkNotNull(splittedBundles.getLeft()); checkNotNull(splittedBundles.getRight()); - checkArgument(splittedBundles.getRight().size() == 2, - "bundle has to be split in two bundles"); + checkArgument(splittedBundles.getRight().size() == splitBoundaries.size() + 1, + "bundle has to be split in " + (splitBoundaries.size() + 1) + " bundles"); NamespaceName nsname = bundle.getNamespaceObject(); if (LOG.isDebugEnabled()) { - LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, 2 bundles: {}, {}", + LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, bundles: {}", nsname.toString(), bundle.getBundleRange(), counter.get(), - splittedBundles.getRight().get(0).getBundleRange(), - splittedBundles.getRight().get(1).getBundleRange()); + splittedBundles.getRight()); } try { // take ownership of newly split bundles @@ -878,7 +886,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, && (counter.decrementAndGet() >= 0)) { pulsar.getOrderedExecutor() .execute(() -> splitAndOwnBundleOnceAndRetry( - bundle, unload, counter, completionFuture, splitAlgorithm)); + bundle, unload, counter, completionFuture, splitAlgorithm, boundaries)); } else if (t instanceof IllegalArgumentException) { completionFuture.completeExceptionally(t); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/BundleSplitOption.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/BundleSplitOption.java new file mode 100644 index 00000000000000..716f86a405ab19 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/BundleSplitOption.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.apache.pulsar.broker.namespace.NamespaceService; + +@Getter +@NoArgsConstructor +@AllArgsConstructor +public class BundleSplitOption { + private NamespaceService service; + private NamespaceBundle bundle; + private List positions; +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index dc1c72d3030f81..2accdcd105d63d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -34,6 +34,7 @@ import com.google.common.hash.HashFunction; import java.io.IOException; import java.time.Duration; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -268,20 +269,23 @@ private NamespaceBundles getBundles(NamespaceName nsname, Optional>> splitBundles( - NamespaceBundle targetBundle, int argNumBundles, Long splitBoundary) { - checkArgument(canSplitBundle(targetBundle), "%s bundle can't be split further", targetBundle); - if (splitBoundary != null) { - checkArgument(splitBoundary > targetBundle.getLowerEndpoint() - && splitBoundary < targetBundle.getUpperEndpoint(), - "The given fixed key must between the key range of the %s bundle", targetBundle); - argNumBundles = 2; + NamespaceBundle targetBundle, int argNumBundles, List splitBoundaries) { + + checkArgument(canSplitBundle(targetBundle), + "%s bundle can't be split further since range not larger than 1", targetBundle); + if (splitBoundaries != null && splitBoundaries.size() > 0) { + Collections.sort(splitBoundaries); + checkArgument(splitBoundaries.get(0) > targetBundle.getLowerEndpoint() + && splitBoundaries.get(splitBoundaries.size() - 1) < targetBundle.getUpperEndpoint(), + "The given fixed keys must between the key range of the %s bundle", targetBundle); + argNumBundles = splitBoundaries.size() + 1; } checkNotNull(targetBundle, "can't split null bundle"); checkNotNull(targetBundle.getNamespaceObject(), "namespace must be present"); @@ -300,14 +304,20 @@ public CompletableFuture>> splitBun if (sourceBundle.partitions[i] == range.lowerEndpoint() && (range.upperEndpoint() == sourceBundle.partitions[i + 1])) { splitPartition = i; - Long maxVal = sourceBundle.partitions[i + 1]; Long minVal = sourceBundle.partitions[i]; - Long segSize = splitBoundary == null ? (maxVal - minVal) / numBundles : splitBoundary - minVal; partitions[pos++] = minVal; - Long curPartition = minVal + segSize; - for (int j = 0; j < numBundles - 1; j++) { - partitions[pos++] = curPartition; - curPartition += segSize; + if (splitBoundaries == null || splitBoundaries.size() == 0) { + long maxVal = sourceBundle.partitions[i + 1]; + long segSize = (maxVal - minVal) / numBundles; + long curPartition = minVal + segSize; + for (int j = 0; j < numBundles - 1; j++) { + partitions[pos++] = curPartition; + curPartition += segSize; + } + } else { + for (long splitBoundary : splitBoundaries) { + partitions[pos++] = splitBoundary; + } } } else { partitions[pos++] = sourceBundle.partitions[i]; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java index 1dc19f30ab2dc8..1b72f81eb3e8c2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java @@ -21,7 +21,6 @@ import com.google.common.collect.Lists; import java.util.List; import java.util.concurrent.CompletableFuture; -import org.apache.pulsar.broker.namespace.NamespaceService; /** * Algorithm interface for namespace bundle split. @@ -30,11 +29,14 @@ public interface NamespaceBundleSplitAlgorithm { String RANGE_EQUALLY_DIVIDE_NAME = "range_equally_divide"; String TOPIC_COUNT_EQUALLY_DIVIDE = "topic_count_equally_divide"; + String SPECIFIED_POSITIONS_DIVIDE = "specified_positions_divide"; - List AVAILABLE_ALGORITHMS = Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME, TOPIC_COUNT_EQUALLY_DIVIDE); - + List AVAILABLE_ALGORITHMS = Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME, + TOPIC_COUNT_EQUALLY_DIVIDE, SPECIFIED_POSITIONS_DIVIDE); NamespaceBundleSplitAlgorithm RANGE_EQUALLY_DIVIDE_ALGO = new RangeEquallyDivideBundleSplitAlgorithm(); NamespaceBundleSplitAlgorithm TOPIC_COUNT_EQUALLY_DIVIDE_ALGO = new TopicCountEquallyDivideBundleSplitAlgorithm(); + NamespaceBundleSplitAlgorithm SPECIFIED_POSITIONS_DIVIDE_ALGO = + new SpecifiedPositionsBundleSplitAlgorithm(); static NamespaceBundleSplitAlgorithm of(String algorithmName) { if (algorithmName == null) { @@ -45,10 +47,12 @@ static NamespaceBundleSplitAlgorithm of(String algorithmName) { return RANGE_EQUALLY_DIVIDE_ALGO; case TOPIC_COUNT_EQUALLY_DIVIDE: return TOPIC_COUNT_EQUALLY_DIVIDE_ALGO; + case SPECIFIED_POSITIONS_DIVIDE: + return SPECIFIED_POSITIONS_DIVIDE_ALGO; default: return null; } } - CompletableFuture getSplitBoundary(NamespaceService service, NamespaceBundle bundle); + CompletableFuture> getSplitBoundary(BundleSplitOption bundleSplitOption); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java index e0b234796efb56..e4b3bcf142c18d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java @@ -18,8 +18,9 @@ */ package org.apache.pulsar.common.naming; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; -import org.apache.pulsar.broker.namespace.NamespaceService; /** * This algorithm divides the bundle into two parts with the same hash range size. @@ -27,8 +28,9 @@ public class RangeEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm { @Override - public CompletableFuture getSplitBoundary(NamespaceService service, NamespaceBundle bundle) { - return CompletableFuture.completedFuture(bundle.getLowerEndpoint() - + (bundle.getUpperEndpoint() - bundle.getLowerEndpoint()) / 2); + public CompletableFuture> getSplitBoundary(BundleSplitOption bundleSplitOption) { + NamespaceBundle bundle = bundleSplitOption.getBundle(); + return CompletableFuture.completedFuture(Collections.singletonList(bundle.getLowerEndpoint() + + (bundle.getUpperEndpoint() - bundle.getLowerEndpoint()) / 2)); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java new file mode 100644 index 00000000000000..7f75ef0df2da32 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.apache.pulsar.broker.namespace.NamespaceService; + +/** + * This algorithm divides the bundle into several parts by the specified positions. + */ +public class SpecifiedPositionsBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm{ + @Override + public CompletableFuture> getSplitBoundary(BundleSplitOption bundleSplitOption) { + NamespaceService service = bundleSplitOption.getService(); + NamespaceBundle bundle = bundleSplitOption.getBundle(); + List positions = bundleSplitOption.getPositions(); + if (positions == null || positions.size() == 0) { + throw new IllegalArgumentException("SplitBoundaries can't be empty"); + } + // sort all positions + Collections.sort(positions); + return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> { + if (topics == null || topics.size() <= 1) { + return CompletableFuture.completedFuture(null); + } + List splitBoundaries = positions + .stream() + .filter(position -> position > bundle.getLowerEndpoint() && position < bundle.getUpperEndpoint()) + .collect(Collectors.toList()); + + if (splitBoundaries.size() == 0) { + return CompletableFuture.completedFuture(null); + } + return CompletableFuture.completedFuture(splitBoundaries); + }); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java index 64c10bcabaf13e..2be8392a0afa37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java @@ -30,7 +30,9 @@ public class TopicCountEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm { @Override - public CompletableFuture getSplitBoundary(NamespaceService service, NamespaceBundle bundle) { + public CompletableFuture> getSplitBoundary(BundleSplitOption bundleSplitOption) { + NamespaceService service = bundleSplitOption.getService(); + NamespaceBundle bundle = bundleSplitOption.getBundle(); return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> { if (topics == null || topics.size() <= 1) { return CompletableFuture.completedFuture(null); @@ -43,7 +45,7 @@ public CompletableFuture getSplitBoundary(NamespaceService service, Namesp long splitStart = topicNameHashList.get(Math.max((topicNameHashList.size() / 2) - 1, 0)); long splitEnd = topicNameHashList.get(topicNameHashList.size() / 2); long splitMiddle = splitStart + (splitEnd - splitStart) / 2; - return CompletableFuture.completedFuture(splitMiddle); + return CompletableFuture.completedFuture(Collections.singletonList(splitMiddle)); }); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 5fa6b46a9c59fd..d4dbf1e11884f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -30,11 +30,13 @@ import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.base.Charsets; import com.google.common.collect.BoundType; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.Sets; +import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import java.lang.reflect.Field; import java.net.URL; @@ -43,6 +45,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -113,6 +116,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy; import org.apache.pulsar.common.policies.data.BrokerAssignment; import org.apache.pulsar.common.policies.data.BrokerInfo; +import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; @@ -128,6 +132,7 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicHashPositions; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -1339,6 +1344,184 @@ public void testNamespaceSplitBundleWithTopicCountEquallyDivideAlgorithm() throw producers.forEach(Producer::closeAsync); } + @Test + public void testNamespacesGetTopicHashPositions() throws Exception { + // Force to create a namespace with only one bundle and create a topic + final String namespace = "prop-xyz/ns-one-bundle"; + final String topic = "persistent://"+ namespace + "/topic"; + final int topicPartitionNumber = 4; + + Policies policies = new Policies(); + policies.bundles = PoliciesUtil.getBundles(1); + admin.namespaces().createNamespace(namespace, policies); + admin.topics().createPartitionedTopic(topic, topicPartitionNumber); + admin.lookups().lookupPartitionedTopic(topic); + + // check bundles and bundle boundaries + BundlesData bundleData = admin.namespaces().getBundles(namespace); + assertEquals(bundleData.getNumBundles(), 1); + assertEquals(bundleData.getBoundaries().size(), 2); + assertEquals(bundleData.getBoundaries().get(0), "0x00000000"); + assertEquals(bundleData.getBoundaries().get(1), "0xffffffff"); + + // test get topic position for partitioned-topic name + String bundleRange = "0x00000000_0xffffffff"; + TopicHashPositions topicHashPositions = + admin.namespaces().getTopicHashPositions(namespace, bundleRange, Collections.singletonList(topic)); + assertEquals(topicHashPositions.getNamespace(), "prop-xyz/ns-one-bundle"); + assertEquals(topicHashPositions.getBundle(), "0x00000000_0xffffffff"); + assertEquals(topicHashPositions.getTopicHashPositions().size(), topicPartitionNumber); + + final HashFunction hashFunction = Hashing.crc32(); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-0"), + hashFunction.hashString(topic + "-partition-0", Charsets.UTF_8).padToLong()); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-1"), + hashFunction.hashString(topic + "-partition-1", Charsets.UTF_8).padToLong()); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-2"), + hashFunction.hashString(topic + "-partition-2", Charsets.UTF_8).padToLong()); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-3"), + hashFunction.hashString(topic + "-partition-3", Charsets.UTF_8).padToLong()); + + // test get hash position for topic partition + List partitions = new ArrayList<>(); + partitions.add(topic + "-partition-0"); + partitions.add(topic + "-partition-1"); + partitions.add(topic + "-partition-2"); + partitions.add(topic + "-partition-3"); + topicHashPositions = admin.namespaces().getTopicHashPositions(namespace, bundleRange, partitions); + + assertEquals(topicHashPositions.getNamespace(), "prop-xyz/ns-one-bundle"); + assertEquals(topicHashPositions.getBundle(), "0x00000000_0xffffffff"); + assertEquals(topicHashPositions.getTopicHashPositions().size(), topicPartitionNumber); + + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-0"), + hashFunction.hashString(topic + "-partition-0", Charsets.UTF_8).padToLong()); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-1"), + hashFunction.hashString(topic + "-partition-1", Charsets.UTF_8).padToLong()); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-2"), + hashFunction.hashString(topic + "-partition-2", Charsets.UTF_8).padToLong()); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-3"), + hashFunction.hashString(topic + "-partition-3", Charsets.UTF_8).padToLong()); + + // test non-exist topic + topicHashPositions = admin.namespaces().getTopicHashPositions(namespace, + bundleRange, Collections.singletonList(topic + "no-exist")); + assertEquals(topicHashPositions.getTopicHashPositions().size(), 0); + + // test topics is null + topicHashPositions = admin.namespaces().getTopicHashPositions(namespace, + bundleRange, null); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-0"), + hashFunction.hashString(topic + "-partition-0", Charsets.UTF_8).padToLong()); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-1"), + hashFunction.hashString(topic + "-partition-1", Charsets.UTF_8).padToLong()); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-2"), + hashFunction.hashString(topic + "-partition-2", Charsets.UTF_8).padToLong()); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-3"), + hashFunction.hashString(topic + "-partition-3", Charsets.UTF_8).padToLong()); + + // test topics is empty + topicHashPositions = admin.namespaces().getTopicHashPositions(namespace, + bundleRange, new ArrayList<>()); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-0"), + hashFunction.hashString(topic + "-partition-0", Charsets.UTF_8).padToLong()); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-1"), + hashFunction.hashString(topic + "-partition-1", Charsets.UTF_8).padToLong()); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-2"), + hashFunction.hashString(topic + "-partition-2", Charsets.UTF_8).padToLong()); + assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-3"), + hashFunction.hashString(topic + "-partition-3", Charsets.UTF_8).padToLong()); + } + + + @Test + public void testNamespaceSplitBundleWithSpecifiedPositionsDivideAlgorithm() throws Exception { + // 1. Force to create a topic + final String namespace = "prop-xyz/ns-one-bundle"; + final String topic = "persistent://"+ namespace + "/topic"; + final int topicPartitionNumber = 4; + + Policies policies = new Policies(); + policies.bundles = PoliciesUtil.getBundles(1); + admin.namespaces().createNamespace(namespace, policies); + admin.topics().createPartitionedTopic(topic, topicPartitionNumber); + // 2. trigger bundle loading + admin.lookups().lookupPartitionedTopic(topic); + + // 3. check namespace bundle and topics exist + List topics = admin.topics().getList(namespace); + assertTrue(topics.contains(topic + "-partition-0")); + assertTrue(topics.contains(topic + "-partition-1")); + assertTrue(topics.contains(topic + "-partition-2")); + assertTrue(topics.contains(topic + "-partition-3")); + + // 4. check bundles and bundle boundaries + BundlesData bundleData = admin.namespaces().getBundles(namespace); + assertEquals(bundleData.getNumBundles(), 1); + assertEquals(bundleData.getBoundaries().size(), 2); + assertEquals(bundleData.getBoundaries().get(0), "0x00000000"); + assertEquals(bundleData.getBoundaries().get(1), "0xffffffff"); + + // 5. calculate positions for split + final HashFunction hashFunction = Hashing.crc32(); + List hashPositions = new ArrayList<>(); + hashPositions.add(hashFunction.hashString(topic + "-partition-0", Charsets.UTF_8).padToLong()); + hashPositions.add(hashFunction.hashString(topic + "-partition-1", Charsets.UTF_8).padToLong()); + hashPositions.add(hashFunction.hashString(topic + "-partition-2", Charsets.UTF_8).padToLong()); + hashPositions.add(hashFunction.hashString(topic + "-partition-3", Charsets.UTF_8).padToLong()); + + // 6. do split by SPECIFIED_POSITIONS_DIVIDE + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false, + NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE, hashPositions); + + // 7. check split result + NamespaceBundles bundles = bundleFactory.getBundles(NamespaceName.get(namespace)); + assertEquals(bundles.getBundles().size(), 5); + + Collections.sort(hashPositions); + String[] splitRange = { + "0x00000000_" + String.format("0x%08x",hashPositions.get(0)), + String.format("0x%08x", hashPositions.get(0)) + "_" + String.format("0x%08x", hashPositions.get(1)), + String.format("0x%08x", hashPositions.get(1)) + "_" + String.format("0x%08x", hashPositions.get(2)), + String.format("0x%08x", hashPositions.get(2)) + "_" + String.format("0x%08x", hashPositions.get(3)), + String.format("0x%08x", hashPositions.get(3)) + "_0xffffffff" + }; + + Set bundleRanges = new HashSet<>(); + bundles.getBundles().forEach(bundle -> bundleRanges.add(bundle.getBundleRange())); + Lists.newArrayList(splitRange).forEach(bundleRanges::remove); + assertEquals(bundleRanges.size(), 0); + + // 8. check split result from admin cli tool + BundlesData adminBundleData = admin.namespaces().getBundles(namespace); + assertEquals(adminBundleData.getNumBundles(), 5); + String[] boundaries = { + "0x00000000", + String.format("0x%08x", hashPositions.get(0)), + String.format("0x%08x", hashPositions.get(1)), + String.format("0x%08x", hashPositions.get(2)), + String.format("0x%08x", hashPositions.get(3)), + "0xffffffff" + }; + Lists.newArrayList(boundaries).forEach(adminBundleData.getBoundaries()::remove); + assertEquals(adminBundleData.getBoundaries().size(), 0); + + // 9. test split at full upper and lower boundaries + List fullBoundaries = + Lists.newArrayList(NamespaceBundles.FULL_UPPER_BOUND, NamespaceBundles.FULL_UPPER_BOUND); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_" + String.format("0x%08x",hashPositions.get(0)), + false, NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE, fullBoundaries); + admin.namespaces().splitNamespaceBundle(namespace, String.format("0x%08x", hashPositions.get(3)) + "_0xffffffff", + false, NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE, fullBoundaries); + + // 10. full upper or lower boundaries does not split bundle + NamespaceBundles nbs = bundleFactory.getBundles(NamespaceName.get(namespace)); + assertEquals(nbs.getBundles().size(), 5); + BundlesData adminBD = admin.namespaces().getBundles(namespace); + assertEquals(adminBD.getNumBundles(), 5); + assertEquals(adminBD.getBoundaries().size(), 6); + } + @Test public void testNamespaceSplitBundleWithInvalidAlgorithm() { // Force to create a topic diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 1f840b1177e1ea..b9b698da1dcc9b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -908,7 +908,7 @@ public void testSplitBundles() throws Exception { try { AsyncResponse response = mock(AsyncResponse.class); namespaces.splitNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal, "0x00000000_0xffffffff", - false, true); + false, true, null); ArgumentCaptor captor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(5000).times(1)).resume(captor.capture()); // verify split bundles @@ -945,7 +945,7 @@ public void testSplitBundleWithUnDividedRange() throws Exception { // split bundles AsyncResponse response = mock(AsyncResponse.class); namespaces.splitNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal, - "0x08375b1a_0x08375b1b", false, false); + "0x08375b1a_0x08375b1b", false, false, null); ArgumentCaptor captor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(5000).times(1)).resume(any(RestException.class)); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index eeedef570eb3f6..dde2c6fd1a7a7d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -118,7 +118,7 @@ public void testSplitAndOwnBundles() throws Exception { CompletableFuture result = namespaceService.splitAndOwnBundle( originalBundle, false, - NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO); + NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO, null); try { result.get(); @@ -201,7 +201,7 @@ public void testSplitMapWithRefreshedStatMap() throws Exception { CompletableFuture result = namespaceService.splitAndOwnBundle( originalBundle, false, - NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO); + NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO, null); try { result.get(); } catch (Exception e) { @@ -416,7 +416,7 @@ public void testCreateNamespaceWithDefaultNumberOfBundles() throws Exception { NamespaceBundle originalBundle = bundles.findBundle(topicName); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO, null); try { result.get(); @@ -481,7 +481,7 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception { NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname); NamespaceBundle originalBundle = bundles.findBundle(topicName); - CompletableFuture result1 = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO); + CompletableFuture result1 = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO, null); try { result1.get(); } catch (Exception e) { @@ -500,7 +500,7 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception { } }); - CompletableFuture result2 = namespaceService.splitAndOwnBundle(splittedBundle, true, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO); + CompletableFuture result2 = namespaceService.splitAndOwnBundle(splittedBundle, true, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO, null); try { result2.get(); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithmTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithmTest.java new file mode 100644 index 00000000000000..2f4d3a39eee109 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithmTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + + +import static org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME; +import static org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE; +import static org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class NamespaceBundleSplitAlgorithmTest { + @Test + public void testOfMethodReturnCorrectValue() { + NamespaceBundleSplitAlgorithm nullValue = NamespaceBundleSplitAlgorithm.of(null); + Assert.assertNull(nullValue); + NamespaceBundleSplitAlgorithm whatever = NamespaceBundleSplitAlgorithm.of("whatever"); + Assert.assertNull(whatever); + NamespaceBundleSplitAlgorithm rangeEquallyDivideName = NamespaceBundleSplitAlgorithm.of(RANGE_EQUALLY_DIVIDE_NAME); + Assert.assertTrue(rangeEquallyDivideName instanceof RangeEquallyDivideBundleSplitAlgorithm); + NamespaceBundleSplitAlgorithm topicCountEquallyDivide = NamespaceBundleSplitAlgorithm.of(TOPIC_COUNT_EQUALLY_DIVIDE); + Assert.assertTrue(topicCountEquallyDivide instanceof TopicCountEquallyDivideBundleSplitAlgorithm); + NamespaceBundleSplitAlgorithm specifiedTopicCountEquallyDivide = NamespaceBundleSplitAlgorithm.of(SPECIFIED_POSITIONS_DIVIDE); + Assert.assertTrue(specifiedTopicCountEquallyDivide instanceof SpecifiedPositionsBundleSplitAlgorithm); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index 059873be301c81..54bb63afc60587 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -29,6 +29,7 @@ import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -238,19 +239,19 @@ public void testSplitBundleByFixBoundary() throws Exception { NamespaceBundle bundleToSplit = bundles.getBundles().get(0); try { - factory.splitBundles(bundleToSplit, 0, bundleToSplit.getLowerEndpoint()); + factory.splitBundles(bundleToSplit, 0, Collections.singletonList(bundleToSplit.getLowerEndpoint())); } catch (IllegalArgumentException e) { //No-op } try { - factory.splitBundles(bundleToSplit, 0, bundleToSplit.getUpperEndpoint()); + factory.splitBundles(bundleToSplit, 0, Collections.singletonList(bundleToSplit.getUpperEndpoint())); } catch (IllegalArgumentException e) { //No-op } Long fixBoundary = bundleToSplit.getLowerEndpoint() + 10; Pair> splitBundles = factory.splitBundles(bundleToSplit, - 0, fixBoundary).join(); + 0, Collections.singletonList(fixBoundary)).join(); assertEquals(splitBundles.getRight().get(0).getLowerEndpoint(), bundleToSplit.getLowerEndpoint()); assertEquals(splitBundles.getRight().get(1).getLowerEndpoint().longValue(), bundleToSplit.getLowerEndpoint() + fixBoundary); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithmTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithmTest.java new file mode 100644 index 00000000000000..15edb349ee1502 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithmTest.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import com.google.common.collect.BoundType; +import com.google.common.collect.Range; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.mockito.Mockito; +import org.testng.Assert; + + +import org.testng.annotations.Test; +public class RangeEquallyDivideBundleSplitAlgorithmTest { + @Test + public void testGetSplitBoundaryMethodReturnCorrectResult() { + RangeEquallyDivideBundleSplitAlgorithm rangeEquallyDivideBundleSplitAlgorithm = new RangeEquallyDivideBundleSplitAlgorithm(); + Assert.assertThrows(NullPointerException.class, () -> rangeEquallyDivideBundleSplitAlgorithm.getSplitBoundary(new BundleSplitOption())); + long lowerRange = 10L; + long upperRange = 0xffffffffL; + long correctResult = lowerRange + (upperRange - lowerRange) / 2; + NamespaceBundle namespaceBundle = new NamespaceBundle(NamespaceName.SYSTEM_NAMESPACE, Range.range(lowerRange, BoundType.CLOSED, upperRange, BoundType.CLOSED), + Mockito.mock(NamespaceBundleFactory.class)); + CompletableFuture> splitBoundary = rangeEquallyDivideBundleSplitAlgorithm.getSplitBoundary(new BundleSplitOption(null, namespaceBundle, null)); + List value = splitBoundary.join(); + Assert.assertEquals((long)value.get(0), correctResult); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithmTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithmTest.java new file mode 100644 index 00000000000000..cafa9465f270c3 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithmTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import com.google.common.collect.Lists; +import java.util.Arrays; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +public class SpecifiedPositionsBundleSplitAlgorithmTest { + + @Test + public void testTotalTopicsSizeLessThan1() { + SpecifiedPositionsBundleSplitAlgorithm algorithm = new SpecifiedPositionsBundleSplitAlgorithm(); + NamespaceService mockNamespaceService = mock(NamespaceService.class); + NamespaceBundle mockNamespaceBundle = mock(NamespaceBundle.class); + doReturn(CompletableFuture.completedFuture(Lists.newArrayList("a"))) + .when(mockNamespaceService).getOwnedTopicListForNamespaceBundle(mockNamespaceBundle); + assertNull(algorithm.getSplitBoundary(new BundleSplitOption(mockNamespaceService, mockNamespaceBundle, + Arrays.asList(1L, 2L))).join()); + } + + @Test + public void testSpecifiedPositionsLessThan1() { + SpecifiedPositionsBundleSplitAlgorithm algorithm = new SpecifiedPositionsBundleSplitAlgorithm(); + NamespaceService mockNamespaceService = mock(NamespaceService.class); + NamespaceBundle mockNamespaceBundle = mock(NamespaceBundle.class); + try { + assertNull(algorithm.getSplitBoundary( + new BundleSplitOption(mockNamespaceService, mockNamespaceBundle, null)).join()); + fail("Should fail since split boundaries is null"); + } catch (IllegalArgumentException e) { + // ignore + } + + try { + assertNull(algorithm.getSplitBoundary( + new BundleSplitOption(mockNamespaceService, mockNamespaceBundle, new ArrayList<>())).join()); + fail("Should fail since split boundaries is empty"); + } catch (IllegalArgumentException e) { + // ignore + } + } + + @SuppressWarnings("UnstableApiUsage") + @Test + public void testAlgorithmReturnCorrectResult() { + // -- algorithm + SpecifiedPositionsBundleSplitAlgorithm algorithm = new SpecifiedPositionsBundleSplitAlgorithm(); + // -- calculate the mock result + NamespaceService mockNamespaceService = mock(NamespaceService.class); + NamespaceBundle mockNamespaceBundle = mock(NamespaceBundle.class); + doReturn(1L).when(mockNamespaceBundle).getLowerEndpoint(); + doReturn(1000L).when(mockNamespaceBundle).getUpperEndpoint(); + doReturn(CompletableFuture.completedFuture(Lists.newArrayList("topic", "topic2"))) + .when(mockNamespaceService) + .getOwnedTopicListForNamespaceBundle(mockNamespaceBundle); + + List positions = Arrays.asList(-1L, 0L, 1L, 100L, 200L, 500L, 800L, 1000L, 1100L); + List splitPositions = algorithm.getSplitBoundary( + new BundleSplitOption(mockNamespaceService, mockNamespaceBundle, positions)).join(); + + assertEquals(splitPositions.size(), 4); + assertTrue(splitPositions.contains(100L)); + assertTrue(splitPositions.contains(200L)); + assertTrue(splitPositions.contains(500L)); + assertTrue(splitPositions.contains(800L)); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithmTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithmTest.java new file mode 100644 index 00000000000000..74577f39312f10 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithmTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import com.google.common.hash.Hashing; +import org.apache.pulsar.broker.namespace.NamespaceService; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.testng.annotations.Test; +public class TopicCountEquallyDivideBundleSplitAlgorithmTest { + @Test + public void testWrongArg() { + TopicCountEquallyDivideBundleSplitAlgorithm algorithm = new TopicCountEquallyDivideBundleSplitAlgorithm(); + assertThrows(NullPointerException.class, () -> algorithm.getSplitBoundary(new BundleSplitOption())); + } + + @Test + public void testTopicsSizeLessThan1() { + TopicCountEquallyDivideBundleSplitAlgorithm algorithm = new TopicCountEquallyDivideBundleSplitAlgorithm(); + NamespaceService mockNamespaceService = mock(NamespaceService.class); + NamespaceBundle mockNamespaceBundle = mock(NamespaceBundle.class); + doReturn(CompletableFuture.completedFuture(Lists.newArrayList("a"))) + .when(mockNamespaceService).getOwnedTopicListForNamespaceBundle(mockNamespaceBundle); + assertNull(algorithm.getSplitBoundary(new BundleSplitOption(mockNamespaceService, mockNamespaceBundle, null)).join()); + } + + @SuppressWarnings("UnstableApiUsage") + @Test + public void testAlgorithmReturnCorrectResult() { + // -- algorithm + TopicCountEquallyDivideBundleSplitAlgorithm algorithm = new TopicCountEquallyDivideBundleSplitAlgorithm(); + List mockTopics = Lists.newArrayList("a", "b", "c"); + // -- calculate the mock result + NamespaceService namespaceServiceForMockResult = mock(NamespaceService.class); + NamespaceBundle namespaceBundleForMockResult = mock(NamespaceBundle.class); + doReturn(CompletableFuture.completedFuture(mockTopics)) + .when(namespaceServiceForMockResult).getOwnedTopicListForNamespaceBundle(namespaceBundleForMockResult); + List hashList = new ArrayList<>(); + NamespaceBundleFactory namespaceBundleFactoryForMockResult = mock(NamespaceBundleFactory.class); + mockTopics.forEach((topic) -> { + long hashValue = Hashing.crc32().hashString(topic, Charsets.UTF_8).padToLong(); + doReturn(namespaceBundleFactoryForMockResult) + .when(namespaceBundleForMockResult).getNamespaceBundleFactory(); + doReturn(hashValue) + .when(namespaceBundleFactoryForMockResult).getLongHashCode(topic); + hashList.add(hashValue); + }); + Collections.sort(hashList); + long splitStart = hashList.get(Math.max((hashList.size() / 2) - 1, 0)); + long splitEnd = hashList.get(hashList.size() / 2); + long splitMiddleForMockResult = splitStart + (splitEnd - splitStart) / 2; + // -- do test + NamespaceService mockNamespaceService = mock(NamespaceService.class); + NamespaceBundle mockNamespaceBundle = mock(NamespaceBundle.class); + doReturn(CompletableFuture.completedFuture(mockTopics)) + .when(mockNamespaceService).getOwnedTopicListForNamespaceBundle(mockNamespaceBundle); + NamespaceBundleFactory mockNamespaceBundleFactory = mock(NamespaceBundleFactory.class); + mockTopics.forEach((topic) -> { + doReturn(mockNamespaceBundleFactory) + .when(mockNamespaceBundle).getNamespaceBundleFactory(); + long hashValue = Hashing.crc32().hashString(topic, Charsets.UTF_8).padToLong(); + doReturn(hashValue) + .when(mockNamespaceBundleFactory).getLongHashCode(topic); + }); + assertEquals((long) algorithm.getSplitBoundary(new BundleSplitOption(mockNamespaceService, mockNamespaceBundle, null)).join().get(0), + splitMiddleForMockResult); + } +} \ No newline at end of file diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index f6f8654f2d4a7f..af3022ac545de9 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -46,6 +46,7 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; +import org.apache.pulsar.common.policies.data.TopicHashPositions; /** * Admin interface for namespaces management. @@ -2005,6 +2006,54 @@ void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBu CompletableFuture splitNamespaceBundleAsync( String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName); + /** + * Split namespace bundle. + * + * @param namespace + * @param bundle range of bundle to split + * @param unloadSplitBundles + * @param splitAlgorithmName + * @param splitBoundaries + * @throws PulsarAdminException + */ + void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName, + List splitBoundaries) throws PulsarAdminException; + + /** + * Split namespace bundle asynchronously. + * + * @param namespace + * @param bundle range of bundle to split + * @param unloadSplitBundles + * @param splitAlgorithmName + * @param splitBoundaries + */ + CompletableFuture splitNamespaceBundleAsync(String namespace, String bundle, boolean unloadSplitBundles, + String splitAlgorithmName, List splitBoundaries); + + /** + * Get positions for topic list in a bundle. + * + * @param namespace + * @param bundle range of bundle + * @param topics + * @return hash positions for all topics in topicList + * @throws PulsarAdminException + */ + TopicHashPositions getTopicHashPositions(String namespace, + String bundle, List topics) throws PulsarAdminException; + + /** + * Get positions for topic list in a bundle. + * + * @param namespace + * @param bundle range of bundle + * @param topics + * @return hash positions for all topics in topicList + * @throws PulsarAdminException + */ + CompletableFuture getTopicHashPositionsAsync(String namespace, + String bundle, List topics); /** * Set message-publish-rate (topics under this namespace can publish this many messages per second). * diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicHashPositions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicHashPositions.java new file mode 100644 index 00000000000000..f5a73fdd72bc39 --- /dev/null +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicHashPositions.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Getter +@NoArgsConstructor +@AllArgsConstructor +public class TopicHashPositions { + private String namespace; + private String bundle; + private Map topicHashPositions; +} \ No newline at end of file diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index af30e936c88da7..3a08cc80690a07 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -57,6 +57,8 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; +import org.apache.pulsar.common.policies.data.TopicHashPositions; +import org.apache.pulsar.common.util.Codec; public class NamespacesImpl extends BaseResource implements Namespaces { @@ -1534,12 +1536,64 @@ public CompletableFuture unloadNamespaceBundleAsync(String namespace, Stri return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); } + @Override + public TopicHashPositions getTopicHashPositions(String namespace, String bundle, List topics) + throws PulsarAdminException { + try { + return getTopicHashPositionsAsync(namespace, bundle, topics).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getTopicHashPositionsAsync(String namespace, String bundle, + List topics) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, bundle, "topicHashPositions"); + if (topics != null && topics.size() > 0) { + path = path.queryParam("topics", topics.stream().map(Codec::encode).toArray()); + } + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(TopicHashPositions topicHashPositions) { + future.complete(topicHashPositions); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + @Override public void splitNamespaceBundle( String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) throws PulsarAdminException { + splitNamespaceBundle(namespace, bundle, unloadSplitBundles, splitAlgorithmName, null); + } + + @Override + public CompletableFuture splitNamespaceBundleAsync( + String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) { + return splitNamespaceBundleAsync(namespace, bundle, unloadSplitBundles, splitAlgorithmName, null); + } + + @Override + public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, + String splitAlgorithmName, List splitBoundaries) + throws PulsarAdminException { try { - splitNamespaceBundleAsync(namespace, bundle, unloadSplitBundles, splitAlgorithmName) + splitNamespaceBundleAsync(namespace, bundle, unloadSplitBundles, splitAlgorithmName, splitBoundaries) .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw (PulsarAdminException) e.getCause(); @@ -1552,13 +1606,17 @@ public void splitNamespaceBundle( } @Override - public CompletableFuture splitNamespaceBundleAsync( - String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) { + public CompletableFuture splitNamespaceBundleAsync(String namespace, String bundle, + boolean unloadSplitBundles, String splitAlgorithmName, + List splitBoundaries) { NamespaceName ns = NamespaceName.get(namespace); WebTarget path = namespacePath(ns, bundle, "split") .queryParam("unload", Boolean.toString(unloadSplitBundles)) .queryParam("splitAlgorithmName", splitAlgorithmName); - return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + + return (splitBoundaries == null || splitBoundaries.size() == 0) + ? asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)) + : asyncPutRequest(path, Entity.entity(splitBoundaries, MediaType.APPLICATION_JSON)); } @Override diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index bac6a39d3802ad..5d79ee9af149d1 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -794,14 +794,53 @@ private class SplitBundle extends CliCommand { private boolean unload; @Parameter(names = { "--split-algorithm-name", "-san" }, description = "Algorithm name for split namespace bundle." + - " Valid options are: [range_equally_divide, topic_count_equally_divide]." + + " Valid options are: [range_equally_divide, topic_count_equally_divide, specified_positions_divide]." + " Use broker side config if absent", required = false) private String splitAlgorithmName; + @Parameter(names = { "--split-boundaries", + "-sb" }, description = "Specified split boundary for bundle split, will split one bundle " + + "to multi bundles only works with specified_positions_divide algorithm", required = false) + private List splitBoundaries; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + if (splitBoundaries == null || splitBoundaries.size() == 0) { + getAdmin().namespaces().splitNamespaceBundle( + namespace, bundle, unload, splitAlgorithmName); + } else { + getAdmin().namespaces().splitNamespaceBundle( + namespace, bundle, unload, splitAlgorithmName, splitBoundaries); + } + } + } + + @Parameters(commandDescription = "Get the positions for one or more topic(s) in a namespace bundle") + private class GetTopicHashPositions extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List params; + + @Parameter( + names = { "--bundle", "-b" }, + description = "{start-boundary}_{end-boundary} format namespace bundle", + required = false) + private String bundle; + + @Parameter( + names = { "--topic-list", "-tl" }, + description = "The list of topics(both non-partitioned topic and partitioned topic) to get positions " + + "in this bundle, if none topic provided, will get the positions of all topics in this bundle", + required = false) + private List topics; + @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - getAdmin().namespaces().splitNamespaceBundle(namespace, bundle, unload, splitAlgorithmName); + if (StringUtils.isBlank(bundle)) { + throw new ParameterException("Must pass one of the params: --bundle "); + } + print(getAdmin().namespaces().getTopicHashPositions(namespace, bundle, topics)); } } @@ -2377,6 +2416,7 @@ public CmdNamespaces(Supplier admin) { jcommander.addCommand("unload", new Unload()); jcommander.addCommand("split-bundle", new SplitBundle()); + jcommander.addCommand("get-topic-positions", new GetTopicHashPositions()); jcommander.addCommand("set-dispatch-rate", new SetDispatchRate()); jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());