diff --git a/conf/broker.conf b/conf/broker.conf index 5f88f55fa22aa..2a0df8d3ba581 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1308,6 +1308,9 @@ loadBalancerOverrideBrokerNicSpeedGbps= # Name of load manager to use loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl +# Name of topic bundle assignment strategy to use +topicBundleAssignmentStrategy=org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner + # Supported algorithms name for namespace bundle split. # "range_equally_divide" divides the bundle into two parts with the same hash range size. # "topic_count_equally_divide" divides the bundle into two parts with the same topics count. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 10b320c9f6e66..9398d349be7d9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2486,6 +2486,10 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, doc = "Name of load manager to use" ) private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl"; + + @FieldContext(category = CATEGORY_LOAD_BALANCER, doc = "Name of topic bundle assignment strategy to use") + private String topicBundleAssignmentStrategy = + "org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner"; @FieldContext( dynamic = true, category = CATEGORY_LOAD_BALANCER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java new file mode 100644 index 0000000000000..1e8b0d03392cc --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import com.google.common.hash.Hashing; +import java.nio.charset.StandardCharsets; +import org.apache.pulsar.broker.PulsarService; + +public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignmentStrategy { + @Override + public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { + long hashCode = Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong(); + NamespaceBundle bundle = namespaceBundles.getBundle(hashCode); + if (topicName.getDomain().equals(TopicDomain.non_persistent)) { + bundle.setHasNonPersistentTopic(true); + } + return bundle; + } + + @Override + public void init(PulsarService pulsarService) { + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index 937d2763767b6..c136ed42f8119 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -43,6 +43,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import lombok.Getter; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; @@ -68,6 +69,10 @@ public class NamespaceBundleFactory { private final AsyncLoadingCache bundlesCache; private final PulsarService pulsar; + + @Getter + private final TopicBundleAssignmentStrategy topicBundleAssignmentStrategy; + private final Duration maxRetryDuration = Duration.ofSeconds(10); public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) { @@ -82,6 +87,8 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) { pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification); this.pulsar = pulsar; + + topicBundleAssignmentStrategy = TopicBundleAssignmentFactory.create(pulsar); } private CompletableFuture loadBundles(NamespaceName namespace, Executor executor) { @@ -418,4 +425,4 @@ public static Range getRange(Long lowerEndpoint, Long upperEndpoint) { (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN); } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java index cb7e135662c01..fa7baeaa6067b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java @@ -45,8 +45,8 @@ public class NamespaceBundles { public static final Long FULL_LOWER_BOUND = 0x00000000L; public static final Long FULL_UPPER_BOUND = 0xffffffffL; - private final NamespaceBundle fullBundle; + private final NamespaceBundle fullBundle; private final Optional> localPolicies; public NamespaceBundles(NamespaceName nsname, NamespaceBundleFactory factory, @@ -94,13 +94,8 @@ public NamespaceBundles(NamespaceName nsname, NamespaceBundleFactory factory, } public NamespaceBundle findBundle(TopicName topicName) { - checkArgument(this.nsname.equals(topicName.getNamespaceObject())); - long hashCode = factory.getLongHashCode(topicName.toString()); - NamespaceBundle bundle = getBundle(hashCode); - if (topicName.getDomain().equals(TopicDomain.non_persistent)) { - bundle.setHasNonPersistentTopic(true); - } - return bundle; + checkArgument(nsname.equals(topicName.getNamespaceObject())); + return factory.getTopicBundleAssignmentStrategy().findBundle(topicName, this); } public List getBundles() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java new file mode 100644 index 0000000000000..164d9a3f1deef --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.common.util.Reflections; + +public class TopicBundleAssignmentFactory { + + public static final String DEFAULT_TOPIC_BUNDLE_ASSIGNMENT_STRATEGY = + "org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner"; + + private static volatile TopicBundleAssignmentStrategy strategy; + + public static TopicBundleAssignmentStrategy create(PulsarService pulsar) { + if (strategy != null) { + return strategy; + } + synchronized (TopicBundleAssignmentFactory.class) { + if (strategy != null) { + return strategy; + } + String topicBundleAssignmentStrategy = getTopicBundleAssignmentStrategy(pulsar); + try { + TopicBundleAssignmentStrategy tempStrategy = + Reflections.createInstance(topicBundleAssignmentStrategy, + TopicBundleAssignmentStrategy.class, Thread.currentThread().getContextClassLoader()); + tempStrategy.init(pulsar); + strategy = tempStrategy; + return strategy; + } catch (Exception e) { + throw new RuntimeException( + "Could not load TopicBundleAssignmentStrategy:" + topicBundleAssignmentStrategy, e); + } + } + } + + private static String getTopicBundleAssignmentStrategy(PulsarService pulsar) { + if (pulsar == null || pulsar.getConfiguration() == null) { + return DEFAULT_TOPIC_BUNDLE_ASSIGNMENT_STRATEGY; + } + return pulsar.getConfiguration().getTopicBundleAssignmentStrategy(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java new file mode 100644 index 0000000000000..b43ca4afa440e --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import org.apache.pulsar.broker.PulsarService; + +public interface TopicBundleAssignmentStrategy { + NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles); + + void init(PulsarService pulsarService); +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index a8a4610f7f3fe..809ce579ce94b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -40,6 +40,8 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.LocalPoliciesResources; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; @@ -91,6 +93,7 @@ public void testConstructor() throws Exception { private NamespaceBundleFactory getNamespaceBundleFactory() { PulsarService pulsar = mock(PulsarService.class); MetadataStoreExtended store = mock(MetadataStoreExtended.class); + when(pulsar.getConfiguration()).thenReturn(new ServiceConfiguration()); when(pulsar.getLocalMetadataStore()).thenReturn(store); when(pulsar.getConfigurationMetadataStore()).thenReturn(store); @@ -103,7 +106,12 @@ private NamespaceBundleFactory getNamespaceBundleFactory() { when(resources.getNamespaceResources()).thenReturn(mock(NamespaceResources.class)); when(resources.getNamespaceResources().getPoliciesAsync(any())).thenReturn( CompletableFuture.completedFuture(Optional.empty())); - return NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32()); + NamespaceBundleFactory factory1 = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32()); + NamespaceService namespaceService = mock(NamespaceService.class); + when(namespaceService.getNamespaceBundleFactory()).thenReturn(factory1); + when(pulsar.getNamespaceService()).thenReturn(namespaceService); + return factory1; + } @Test diff --git a/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java new file mode 100644 index 0000000000000..b371106cad85a --- /dev/null +++ b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import com.google.common.collect.BoundType; +import com.google.common.collect.Range; + +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Test(groups = "broker-naming") +public class TopicBundleAssignmentStrategyTest { + @Test + public void testStrategyFactory() { + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setTopicBundleAssignmentStrategy( + "org.apache.pulsar.common.naming.TopicBundleAssignmentStrategyTest$TestStrategy"); + PulsarService pulsarService = mock(PulsarService.class); + doReturn(conf).when(pulsarService).getConfiguration(); + TopicBundleAssignmentStrategy strategy = TopicBundleAssignmentFactory.create(pulsarService); + NamespaceBundle bundle = strategy.findBundle(null, null); + Range keyRange = Range.range(0L, BoundType.CLOSED, 0xffffffffL, BoundType.CLOSED); + String range = String.format("0x%08x_0x%08x", keyRange.lowerEndpoint(), keyRange.upperEndpoint()); + Assert.assertEquals(bundle.getBundleRange(), range); + Assert.assertEquals(bundle.getNamespaceObject(), NamespaceName.get("my/test")); + } + + public static class TestStrategy implements TopicBundleAssignmentStrategy { + @Override + public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { + Range range = Range.range(0L, BoundType.CLOSED, 0xffffffffL, BoundType.CLOSED); + return new NamespaceBundle(NamespaceName.get("my/test"), range, + mock(NamespaceBundleFactory.class)); + } + + @Override + public void init(PulsarService pulsarService) { + + } + } +}