From 8a6020e17a4a62e2b5e6c829304250ea036d60d8 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Wed, 7 Jun 2023 17:27:49 +0800 Subject: [PATCH 01/25] Make topic-bundle assignment strategy pluggable Make topic-bundle assignment strategy pluggable --- conf/broker.conf | 3 + .../ConsistentHashingTopicBundleAssigner.java | 48 ++++++++++++++ .../common/naming/NamespaceBundleFactory.java | 4 ++ .../common/naming/NamespaceBundles.java | 12 +--- .../naming/TopicBundleAssignmentFactory.java | 41 ++++++++++++ .../naming/TopicBundleAssignmentStrategy.java | 29 +++++++++ .../TopicBundleAssignmentStrategyTest.java | 65 +++++++++++++++++++ 7 files changed, 193 insertions(+), 9 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java create mode 100644 pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java diff --git a/conf/broker.conf b/conf/broker.conf index 22ca71864e9cd..920c00c19dde0 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1308,6 +1308,9 @@ loadBalancerOverrideBrokerNicSpeedGbps= # Name of load manager to use loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl +#Name of topic bundle assignment strategy to use +topicBundleAssignmentStrategy=org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner + # Supported algorithms name for namespace bundle split. # "range_equally_divide" divides the bundle into two parts with the same hash range size. # "topic_count_equally_divide" divides the bundle into two parts with the same topics count. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java new file mode 100644 index 0000000000000..324be8a5ec89e --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.common.naming; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.base.Charsets; + +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.client.admin.PulsarAdmin; + +public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignmentStrategy { + private NamespaceService namespaceService; + @Override + public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { + NamespaceName namespaceName = topicName.getNamespaceObject(); + checkArgument(namespaceName.equals(topicName.getNamespaceObject())); + long hashCode = namespaceService.getNamespaceBundleFactory().getLongHashCode(topicName.toString()); + NamespaceBundle bundle = namespaceBundles.getBundle(hashCode); + if (topicName.getDomain().equals(TopicDomain.non_persistent)) { + bundle.setHasNonPersistentTopic(true); + } + return bundle; + } + + @Override + public void init(NamespaceService namespaceService, PulsarAdmin pulsarAdmin, ServiceConfiguration configuration) { + this.namespaceService = namespaceService; + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index 937d2763767b6..5608f0b6cfed4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -377,6 +377,10 @@ public boolean canSplitBundle(NamespaceBundle bundle) { return range.upperEndpoint() - range.lowerEndpoint() > 1; } + public PulsarService getPulsar() { + return pulsar; + } + public static void validateFullRange(SortedSet partitions) { checkArgument(partitions.first().equals(FIRST_BOUNDARY) && partitions.last().equals(LAST_BOUNDARY)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java index cb7e135662c01..f08aa0da5c65a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java @@ -46,7 +46,7 @@ public class NamespaceBundles { public static final Long FULL_LOWER_BOUND = 0x00000000L; public static final Long FULL_UPPER_BOUND = 0xffffffffL; private final NamespaceBundle fullBundle; - + private final TopicBundleAssignmentStrategy topicBundleAssignmentStrategy; private final Optional> localPolicies; public NamespaceBundles(NamespaceName nsname, NamespaceBundleFactory factory, @@ -66,7 +66,7 @@ public NamespaceBundles(NamespaceName nsname, NamespaceBundleFactory factory, this.factory = Objects.requireNonNull(factory); this.localPolicies = localPolicies; checkArgument(partitions.length > 0, "Can't create bundles w/o partition boundaries"); - + this.topicBundleAssignmentStrategy = TopicBundleAssignmentFactory.create(factory.getPulsar()); // calculate bundles based on partition boundaries this.bundles = new ArrayList<>(); fullBundle = new NamespaceBundle(nsname, @@ -94,13 +94,7 @@ public NamespaceBundles(NamespaceName nsname, NamespaceBundleFactory factory, } public NamespaceBundle findBundle(TopicName topicName) { - checkArgument(this.nsname.equals(topicName.getNamespaceObject())); - long hashCode = factory.getLongHashCode(topicName.toString()); - NamespaceBundle bundle = getBundle(hashCode); - if (topicName.getDomain().equals(TopicDomain.non_persistent)) { - bundle.setHasNonPersistentTopic(true); - } - return bundle; + return topicBundleAssignmentStrategy.findBundle(topicName, this); } public List getBundles() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java new file mode 100644 index 0000000000000..9334f7d787138 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.common.util.Reflections; + +public class TopicBundleAssignmentFactory { + + public static TopicBundleAssignmentStrategy create(PulsarService pulsar) { + ServiceConfiguration conf = pulsar.getConfiguration(); + NamespaceService namespaceService = pulsar.getNamespaceService(); + try { + TopicBundleAssignmentStrategy strategy = Reflections.createInstance(conf.getTopicBundleAssignmentStrategy(), + TopicBundleAssignmentStrategy.class, Thread.currentThread().getContextClassLoader()); + strategy.init(namespaceService, pulsar.getAdminClient(), pulsar.getConfiguration()); + return strategy; + } catch (Exception e) { + throw new RuntimeException( + "Could not load TopicBundleAssignmentStrategy:" + conf.getTopicBundleAssignmentStrategy(), e); + } + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java new file mode 100644 index 0000000000000..098ed4789b05d --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.client.admin.PulsarAdmin; + +public interface TopicBundleAssignmentStrategy { + NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles); + + void init(NamespaceService namespaceService, PulsarAdmin pulsarAdmin, ServiceConfiguration configuration); +} \ No newline at end of file diff --git a/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java new file mode 100644 index 0000000000000..931d0fd6e1e9c --- /dev/null +++ b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import com.google.common.collect.BoundType; +import com.google.common.collect.Range; + +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Test(groups = "broker-naming") +public class TopicBundleAssignmentStrategyTest { + @Test + public void testStrategyFactory() { + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setTopicBundleAssignmentStrategy( + "org.apache.pulsar.common.naming.TopicBundleAssignmentStrategyTest$TestStrategy"); + PulsarService pulsarService = mock(PulsarService.class); + doReturn(conf).when(pulsarService).getConfiguration(); + TopicBundleAssignmentStrategy strategy = TopicBundleAssignmentFactory.create(pulsarService); + NamespaceBundle bundle = strategy.findBundle(null, null); + Range keyRange = Range.range(0L, BoundType.CLOSED, 0xffffffffL, BoundType.CLOSED); + String range = String.format("0x%08x_0x%08x", keyRange.lowerEndpoint(), keyRange.upperEndpoint()); + Assert.assertEquals(bundle.getBundleRange(), range); + Assert.assertEquals(bundle.getNamespaceObject(), NamespaceName.get("my/test")); + } + + public static class TestStrategy implements TopicBundleAssignmentStrategy { + @Override + public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { + Range range = Range.range(0L, BoundType.CLOSED, 0xffffffffL, BoundType.CLOSED); + return new NamespaceBundle(NamespaceName.get("my/test"), range, + mock(NamespaceBundleFactory.class)); + } + + @Override + public void init(NamespaceService namespaceService, PulsarAdmin pulsarAdmin, + ServiceConfiguration configuration) { + } + } + +} \ No newline at end of file From ff61cc70f323796ffea5653f24e411539eb71620 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Thu, 8 Jun 2023 09:07:14 +0800 Subject: [PATCH 02/25] Make topic-bundle assignment strategy pluggable --- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a709e49e3a980..7077c69a31e28 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2477,6 +2477,10 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, doc = "Name of load manager to use" ) private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl"; + + + @FieldContext(category = CATEGORY_LOAD_BALANCER, doc = "Name of topic bundle assignment strategy to use") + private String topicBundleAssignmentStrategy = "org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner"; @FieldContext( dynamic = true, category = CATEGORY_LOAD_BALANCER, From e821dd8a27d07271478ba70be46691303474f258 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Thu, 8 Jun 2023 19:39:03 +0800 Subject: [PATCH 03/25] licence --- .../common/naming/ConsistentHashingTopicBundleAssigner.java | 3 +-- .../pulsar/common/naming/TopicBundleAssignmentFactory.java | 2 +- .../pulsar/common/naming/TopicBundleAssignmentStrategy.java | 2 +- .../common/naming/TopicBundleAssignmentStrategyTest.java | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java index 324be8a5ec89e..c0e92263445ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.pulsar.common.naming; import static com.google.common.base.Preconditions.checkArgument; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java index 9334f7d787138..cbe6e04996e86 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java index 098ed4789b05d..4ed6bc5ece6a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java index 931d0fd6e1e9c..004eaad5495f1 100644 --- a/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java +++ b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information From ab98e6f7ae959c347833a80871f7baf21f0f3091 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Thu, 8 Jun 2023 19:40:49 +0800 Subject: [PATCH 04/25] m --- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 7077c69a31e28..b6034b07e12ec 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2480,7 +2480,9 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, @FieldContext(category = CATEGORY_LOAD_BALANCER, doc = "Name of topic bundle assignment strategy to use") - private String topicBundleAssignmentStrategy = "org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner"; + private String topicBundleAssignmentStrategy = + "org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner"; + @FieldContext( dynamic = true, category = CATEGORY_LOAD_BALANCER, From 2f0255e36fc5d8fd0bda94b8e0bb3da1bd7791f9 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Thu, 8 Jun 2023 19:55:49 +0800 Subject: [PATCH 05/25] checkstyle --- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index b6034b07e12ec..faea6e29e2f63 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2480,8 +2480,8 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, @FieldContext(category = CATEGORY_LOAD_BALANCER, doc = "Name of topic bundle assignment strategy to use") - private String topicBundleAssignmentStrategy = - "org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner"; + private String topicBundleAssignmentStrategy + = "org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner"; @FieldContext( dynamic = true, From 328d39a01857d21b60b5d99b8d89455b52a85b10 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Thu, 8 Jun 2023 20:05:37 +0800 Subject: [PATCH 06/25] checkstyle --- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index faea6e29e2f63..007386ca353f4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2480,9 +2480,8 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, @FieldContext(category = CATEGORY_LOAD_BALANCER, doc = "Name of topic bundle assignment strategy to use") - private String topicBundleAssignmentStrategy - = "org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner"; - + private String topicBundleAssignmentStrategy = + "org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner"; @FieldContext( dynamic = true, category = CATEGORY_LOAD_BALANCER, From a3169c50e7ee266662a407b2a15f46f5e77b992a Mon Sep 17 00:00:00 2001 From: w00578186 Date: Thu, 8 Jun 2023 20:16:32 +0800 Subject: [PATCH 07/25] checkstyle --- .../common/naming/ConsistentHashingTopicBundleAssigner.java | 3 --- .../apache/pulsar/common/naming/NamespaceBundleFactory.java | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java index c0e92263445ec..486eda26fc6ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -19,9 +19,6 @@ package org.apache.pulsar.common.naming; import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.base.Charsets; - import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.admin.PulsarAdmin; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index 5608f0b6cfed4..07bb3c3991b66 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -380,7 +380,7 @@ public boolean canSplitBundle(NamespaceBundle bundle) { public PulsarService getPulsar() { return pulsar; } - + public static void validateFullRange(SortedSet partitions) { checkArgument(partitions.first().equals(FIRST_BOUNDARY) && partitions.last().equals(LAST_BOUNDARY)); } From 6aa3c76559d0f0b97397f551794200a0df20f3d2 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Fri, 9 Jun 2023 09:16:33 +0800 Subject: [PATCH 08/25] checkArgument fix --- .../common/naming/ConsistentHashingTopicBundleAssigner.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java index 486eda26fc6ff..8c04a6425a3fe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -27,10 +27,9 @@ public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignme private NamespaceService namespaceService; @Override public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { - NamespaceName namespaceName = topicName.getNamespaceObject(); - checkArgument(namespaceName.equals(topicName.getNamespaceObject())); long hashCode = namespaceService.getNamespaceBundleFactory().getLongHashCode(topicName.toString()); NamespaceBundle bundle = namespaceBundles.getBundle(hashCode); + checkArgument(bundle.getNamespaceObject().equals(topicName.getNamespaceObject())); if (topicName.getDomain().equals(TopicDomain.non_persistent)) { bundle.setHasNonPersistentTopic(true); } From e3681b16f4f4812081887ca83136f15580ed88a3 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Fri, 9 Jun 2023 09:27:30 +0800 Subject: [PATCH 09/25] checkArgument fix --- .../common/naming/ConsistentHashingTopicBundleAssigner.java | 2 +- .../org/apache/pulsar/common/naming/NamespaceBundles.java | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java index 8c04a6425a3fe..908a8f3c82fbf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -27,9 +27,9 @@ public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignme private NamespaceService namespaceService; @Override public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { + checkArgument(namespaceBundles.getNsname().equals(topicName.getNamespaceObject())); long hashCode = namespaceService.getNamespaceBundleFactory().getLongHashCode(topicName.toString()); NamespaceBundle bundle = namespaceBundles.getBundle(hashCode); - checkArgument(bundle.getNamespaceObject().equals(topicName.getNamespaceObject())); if (topicName.getDomain().equals(TopicDomain.non_persistent)) { bundle.setHasNonPersistentTopic(true); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java index f08aa0da5c65a..ed37df3e4ef9b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java @@ -198,4 +198,8 @@ public LocalPolicies toLocalPolicies() { localPolicies.map(lp -> lp.getLeft().bookieAffinityGroup).orElse(null), localPolicies.map(lp -> lp.getLeft().namespaceAntiAffinityGroup).orElse(null)); } + + public NamespaceName getNsname() { + return this.nsname; + } } From 2eac104d0a5cc0d00b49b5d690b6a316c337f60c Mon Sep 17 00:00:00 2001 From: w00578186 Date: Mon, 12 Jun 2023 14:51:03 +0800 Subject: [PATCH 10/25] init method change --- .../common/naming/ConsistentHashingTopicBundleAssigner.java | 6 ++++-- .../pulsar/common/naming/TopicBundleAssignmentFactory.java | 3 +-- .../pulsar/common/naming/TopicBundleAssignmentStrategy.java | 3 ++- .../apache/pulsar/common/naming/NamespaceBundlesTest.java | 2 ++ .../common/naming/TopicBundleAssignmentStrategyTest.java | 6 ++++-- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java index 908a8f3c82fbf..45b78a0054b0f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -19,6 +19,7 @@ package org.apache.pulsar.common.naming; import static com.google.common.base.Preconditions.checkArgument; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -37,7 +38,8 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac } @Override - public void init(NamespaceService namespaceService, PulsarAdmin pulsarAdmin, ServiceConfiguration configuration) { - this.namespaceService = namespaceService; + public void init(PulsarService pulsarService) { + namespaceService = pulsarService.getNamespaceService(); } + } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java index cbe6e04996e86..eec329ff56bbd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java @@ -27,11 +27,10 @@ public class TopicBundleAssignmentFactory { public static TopicBundleAssignmentStrategy create(PulsarService pulsar) { ServiceConfiguration conf = pulsar.getConfiguration(); - NamespaceService namespaceService = pulsar.getNamespaceService(); try { TopicBundleAssignmentStrategy strategy = Reflections.createInstance(conf.getTopicBundleAssignmentStrategy(), TopicBundleAssignmentStrategy.class, Thread.currentThread().getContextClassLoader()); - strategy.init(namespaceService, pulsar.getAdminClient(), pulsar.getConfiguration()); + strategy.init(pulsar); return strategy; } catch (Exception e) { throw new RuntimeException( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java index 4ed6bc5ece6a9..3b39d22250a0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.common.naming; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -25,5 +26,5 @@ public interface TopicBundleAssignmentStrategy { NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles); - void init(NamespaceService namespaceService, PulsarAdmin pulsarAdmin, ServiceConfiguration configuration); + void init(PulsarService pulsarService); } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index a8a4610f7f3fe..4226ff65ba93a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resources.LocalPoliciesResources; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; @@ -91,6 +92,7 @@ public void testConstructor() throws Exception { private NamespaceBundleFactory getNamespaceBundleFactory() { PulsarService pulsar = mock(PulsarService.class); MetadataStoreExtended store = mock(MetadataStoreExtended.class); + when(pulsar.getConfiguration()).thenReturn(new ServiceConfiguration()); when(pulsar.getLocalMetadataStore()).thenReturn(store); when(pulsar.getConfigurationMetadataStore()).thenReturn(store); diff --git a/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java index 004eaad5495f1..e4aff5aac6536 100644 --- a/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java +++ b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java @@ -57,9 +57,11 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac } @Override - public void init(NamespaceService namespaceService, PulsarAdmin pulsarAdmin, - ServiceConfiguration configuration) { + public void init(PulsarService pulsarService) { + } + + } } \ No newline at end of file From d855334af1a1b21513f6d68ed1a7c77020f9de4b Mon Sep 17 00:00:00 2001 From: w00578186 Date: Tue, 13 Jun 2023 09:38:24 +0800 Subject: [PATCH 11/25] checkstyle --- .../common/naming/ConsistentHashingTopicBundleAssigner.java | 2 -- .../pulsar/common/naming/TopicBundleAssignmentFactory.java | 1 - .../pulsar/common/naming/TopicBundleAssignmentStrategy.java | 3 --- .../common/naming/TopicBundleAssignmentStrategyTest.java | 2 -- 4 files changed, 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java index 45b78a0054b0f..15f422a9214b8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -20,9 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.client.admin.PulsarAdmin; public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignmentStrategy { private NamespaceService namespaceService; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java index eec329ff56bbd..ac084e9998b9a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java @@ -20,7 +20,6 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.common.util.Reflections; public class TopicBundleAssignmentFactory { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java index 3b39d22250a0d..27d335e921101 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java @@ -19,9 +19,6 @@ package org.apache.pulsar.common.naming; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.client.admin.PulsarAdmin; public interface TopicBundleAssignmentStrategy { NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles); diff --git a/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java index e4aff5aac6536..7346d319effc5 100644 --- a/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java +++ b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java @@ -26,8 +26,6 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.testng.Assert; import org.testng.annotations.Test; From e1e279da47833acbe002c660dd0efef35f8bc671 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Tue, 13 Jun 2023 11:25:10 +0800 Subject: [PATCH 12/25] test fix --- .../org/apache/pulsar/common/naming/NamespaceBundlesTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index 4226ff65ba93a..10d08fdb71be9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -41,6 +41,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.LocalPoliciesResources; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; @@ -95,6 +96,7 @@ private NamespaceBundleFactory getNamespaceBundleFactory() { when(pulsar.getConfiguration()).thenReturn(new ServiceConfiguration()); when(pulsar.getLocalMetadataStore()).thenReturn(store); when(pulsar.getConfigurationMetadataStore()).thenReturn(store); + when(pulsar.getNamespaceService()).thenReturn(new NamespaceService(pulsar)); PulsarResources resources = mock(PulsarResources.class); when(pulsar.getPulsarResources()).thenReturn(resources); From 7d949f52cb9c7345a5ff8903b74a890eda8c2f33 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Tue, 13 Jun 2023 19:22:25 +0800 Subject: [PATCH 13/25] test fix --- .../apache/pulsar/common/naming/NamespaceBundlesTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index 10d08fdb71be9..9c00cb226f3ae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -92,11 +92,11 @@ public void testConstructor() throws Exception { @SuppressWarnings("unchecked") private NamespaceBundleFactory getNamespaceBundleFactory() { PulsarService pulsar = mock(PulsarService.class); + NamespaceService namespaceService = mock(NamespaceService.class); MetadataStoreExtended store = mock(MetadataStoreExtended.class); when(pulsar.getConfiguration()).thenReturn(new ServiceConfiguration()); when(pulsar.getLocalMetadataStore()).thenReturn(store); when(pulsar.getConfigurationMetadataStore()).thenReturn(store); - when(pulsar.getNamespaceService()).thenReturn(new NamespaceService(pulsar)); PulsarResources resources = mock(PulsarResources.class); when(pulsar.getPulsarResources()).thenReturn(resources); @@ -107,7 +107,10 @@ private NamespaceBundleFactory getNamespaceBundleFactory() { when(resources.getNamespaceResources()).thenReturn(mock(NamespaceResources.class)); when(resources.getNamespaceResources().getPoliciesAsync(any())).thenReturn( CompletableFuture.completedFuture(Optional.empty())); - return NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32()); + NamespaceBundleFactory factory1 = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32()); + when(namespaceService.getNamespaceBundleFactory()).thenReturn(factory1); + when(pulsar.getNamespaceService()).thenReturn(namespaceService); + return factory1; } @Test From bbd916dd95f9d3f0de9a278dc7a49dd584762bc4 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Wed, 14 Jun 2023 14:35:52 +0800 Subject: [PATCH 14/25] review fix --- .../naming/ConsistentHashingTopicBundleAssigner.java | 6 +++--- .../pulsar/common/naming/NamespaceBundleFactory.java | 7 +++++++ .../org/apache/pulsar/common/naming/NamespaceBundles.java | 5 ++--- .../apache/pulsar/common/naming/NamespaceBundlesTest.java | 4 ++-- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java index 15f422a9214b8..fd30c6af31d1f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -23,11 +23,11 @@ import org.apache.pulsar.broker.namespace.NamespaceService; public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignmentStrategy { - private NamespaceService namespaceService; + private PulsarService pulsarService; @Override public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { checkArgument(namespaceBundles.getNsname().equals(topicName.getNamespaceObject())); - long hashCode = namespaceService.getNamespaceBundleFactory().getLongHashCode(topicName.toString()); + long hashCode = pulsarService.getNamespaceService().getNamespaceBundleFactory().getLongHashCode(topicName.toString()); NamespaceBundle bundle = namespaceBundles.getBundle(hashCode); if (topicName.getDomain().equals(TopicDomain.non_persistent)) { bundle.setHasNonPersistentTopic(true); @@ -37,7 +37,7 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac @Override public void init(PulsarService pulsarService) { - namespaceService = pulsarService.getNamespaceService(); + this.pulsarService = pulsarService; } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index 07bb3c3991b66..0ac68c6e6257a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -43,6 +43,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import lombok.Getter; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; @@ -68,6 +69,10 @@ public class NamespaceBundleFactory { private final AsyncLoadingCache bundlesCache; private final PulsarService pulsar; + + @Getter + private final TopicBundleAssignmentStrategy topicBundleAssignmentStrategy; + private final Duration maxRetryDuration = Duration.ofSeconds(10); public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) { @@ -82,6 +87,8 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) { pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification); this.pulsar = pulsar; + + this.topicBundleAssignmentStrategy = TopicBundleAssignmentFactory.create(pulsar); } private CompletableFuture loadBundles(NamespaceName namespace, Executor executor) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java index ed37df3e4ef9b..27a500d8c0c01 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java @@ -46,7 +46,6 @@ public class NamespaceBundles { public static final Long FULL_LOWER_BOUND = 0x00000000L; public static final Long FULL_UPPER_BOUND = 0xffffffffL; private final NamespaceBundle fullBundle; - private final TopicBundleAssignmentStrategy topicBundleAssignmentStrategy; private final Optional> localPolicies; public NamespaceBundles(NamespaceName nsname, NamespaceBundleFactory factory, @@ -66,7 +65,7 @@ public NamespaceBundles(NamespaceName nsname, NamespaceBundleFactory factory, this.factory = Objects.requireNonNull(factory); this.localPolicies = localPolicies; checkArgument(partitions.length > 0, "Can't create bundles w/o partition boundaries"); - this.topicBundleAssignmentStrategy = TopicBundleAssignmentFactory.create(factory.getPulsar()); + // calculate bundles based on partition boundaries this.bundles = new ArrayList<>(); fullBundle = new NamespaceBundle(nsname, @@ -94,7 +93,7 @@ public NamespaceBundles(NamespaceName nsname, NamespaceBundleFactory factory, } public NamespaceBundle findBundle(TopicName topicName) { - return topicBundleAssignmentStrategy.findBundle(topicName, this); + return factory.getTopicBundleAssignmentStrategy().findBundle(topicName, this); } public List getBundles() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index 9c00cb226f3ae..1bf8b4c9eadf9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -91,8 +91,7 @@ public void testConstructor() throws Exception { @SuppressWarnings("unchecked") private NamespaceBundleFactory getNamespaceBundleFactory() { - PulsarService pulsar = mock(PulsarService.class); - NamespaceService namespaceService = mock(NamespaceService.class); + PulsarService pulsar = mock(PulsarService.class); MetadataStoreExtended store = mock(MetadataStoreExtended.class); when(pulsar.getConfiguration()).thenReturn(new ServiceConfiguration()); when(pulsar.getLocalMetadataStore()).thenReturn(store); @@ -108,6 +107,7 @@ private NamespaceBundleFactory getNamespaceBundleFactory() { when(resources.getNamespaceResources().getPoliciesAsync(any())).thenReturn( CompletableFuture.completedFuture(Optional.empty())); NamespaceBundleFactory factory1 = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32()); + NamespaceService namespaceService = mock(NamespaceService.class); when(namespaceService.getNamespaceBundleFactory()).thenReturn(factory1); when(pulsar.getNamespaceService()).thenReturn(namespaceService); return factory1; From e7d493450717e97b84aabc959018d991df5b66dc Mon Sep 17 00:00:00 2001 From: w00578186 Date: Thu, 15 Jun 2023 10:26:23 +0800 Subject: [PATCH 15/25] make TopicBundleAssignmentStrategy init once --- .../ConsistentHashingTopicBundleAssigner.java | 6 ++-- .../common/naming/NamespaceBundleFactory.java | 4 --- .../common/naming/NamespaceBundles.java | 7 +++-- .../naming/TopicBundleAssignmentFactory.java | 29 ++++++++++++++----- .../common/naming/NamespaceBundlesTest.java | 4 +-- 5 files changed, 30 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java index fd30c6af31d1f..2bf17358c0d23 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -23,11 +23,11 @@ import org.apache.pulsar.broker.namespace.NamespaceService; public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignmentStrategy { - private PulsarService pulsarService; + private NamespaceService namespaceService; @Override public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { checkArgument(namespaceBundles.getNsname().equals(topicName.getNamespaceObject())); - long hashCode = pulsarService.getNamespaceService().getNamespaceBundleFactory().getLongHashCode(topicName.toString()); + long hashCode = namespaceService.getNamespaceBundleFactory().getLongHashCode(topicName.toString()); NamespaceBundle bundle = namespaceBundles.getBundle(hashCode); if (topicName.getDomain().equals(TopicDomain.non_persistent)) { bundle.setHasNonPersistentTopic(true); @@ -37,7 +37,7 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac @Override public void init(PulsarService pulsarService) { - this.pulsarService = pulsarService; + this.namespaceService = pulsarService.getNamespaceService(); } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index 0ac68c6e6257a..b8fdc102fd023 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -70,8 +70,6 @@ public class NamespaceBundleFactory { private final PulsarService pulsar; - @Getter - private final TopicBundleAssignmentStrategy topicBundleAssignmentStrategy; private final Duration maxRetryDuration = Duration.ofSeconds(10); @@ -87,8 +85,6 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) { pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification); this.pulsar = pulsar; - - this.topicBundleAssignmentStrategy = TopicBundleAssignmentFactory.create(pulsar); } private CompletableFuture loadBundles(NamespaceName namespace, Executor executor) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java index 27a500d8c0c01..d7697c62af3de 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java @@ -45,6 +45,7 @@ public class NamespaceBundles { public static final Long FULL_LOWER_BOUND = 0x00000000L; public static final Long FULL_UPPER_BOUND = 0xffffffffL; + private final TopicBundleAssignmentStrategy topicBundleAssignmentStrategy; private final NamespaceBundle fullBundle; private final Optional> localPolicies; @@ -65,7 +66,9 @@ public NamespaceBundles(NamespaceName nsname, NamespaceBundleFactory factory, this.factory = Objects.requireNonNull(factory); this.localPolicies = localPolicies; checkArgument(partitions.length > 0, "Can't create bundles w/o partition boundaries"); - + + this.topicBundleAssignmentStrategy = TopicBundleAssignmentFactory.create(factory.getPulsar()); + // calculate bundles based on partition boundaries this.bundles = new ArrayList<>(); fullBundle = new NamespaceBundle(nsname, @@ -93,7 +96,7 @@ public NamespaceBundles(NamespaceName nsname, NamespaceBundleFactory factory, } public NamespaceBundle findBundle(TopicName topicName) { - return factory.getTopicBundleAssignmentStrategy().findBundle(topicName, this); + return topicBundleAssignmentStrategy.findBundle(topicName, this); } public List getBundles() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java index ac084e9998b9a..71db40b10bfc0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java @@ -18,22 +18,35 @@ */ package org.apache.pulsar.common.naming; +import lombok.Synchronized; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.util.Reflections; public class TopicBundleAssignmentFactory { + static TopicBundleAssignmentStrategy strategy; + public static TopicBundleAssignmentStrategy create(PulsarService pulsar) { - ServiceConfiguration conf = pulsar.getConfiguration(); - try { - TopicBundleAssignmentStrategy strategy = Reflections.createInstance(conf.getTopicBundleAssignmentStrategy(), - TopicBundleAssignmentStrategy.class, Thread.currentThread().getContextClassLoader()); - strategy.init(pulsar); + if (strategy != null) { return strategy; - } catch (Exception e) { - throw new RuntimeException( - "Could not load TopicBundleAssignmentStrategy:" + conf.getTopicBundleAssignmentStrategy(), e); + } + synchronized (TopicBundleAssignmentFactory.class) { + if (strategy != null) { + return strategy; + } + ServiceConfiguration conf = pulsar.getConfiguration(); + try { + TopicBundleAssignmentStrategy tempStategy = + Reflections.createInstance(conf.getTopicBundleAssignmentStrategy(), + TopicBundleAssignmentStrategy.class, Thread.currentThread().getContextClassLoader()); + tempStategy.init(pulsar); + strategy = tempStategy; + return strategy; + } catch (Exception e) { + throw new RuntimeException( + "Could not load TopicBundleAssignmentStrategy:" + conf.getTopicBundleAssignmentStrategy(), e); + } } } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index 1bf8b4c9eadf9..4bf9637ff2a36 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -107,9 +107,7 @@ private NamespaceBundleFactory getNamespaceBundleFactory() { when(resources.getNamespaceResources().getPoliciesAsync(any())).thenReturn( CompletableFuture.completedFuture(Optional.empty())); NamespaceBundleFactory factory1 = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32()); - NamespaceService namespaceService = mock(NamespaceService.class); - when(namespaceService.getNamespaceBundleFactory()).thenReturn(factory1); - when(pulsar.getNamespaceService()).thenReturn(namespaceService); + when(pulsar.getNamespaceService().getNamespaceBundleFactory()).thenReturn(factory1); return factory1; } From f89d58bf4ec1c0ccfe91813d378a360bb5d3d8b7 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Thu, 15 Jun 2023 14:32:38 +0800 Subject: [PATCH 16/25] Update TopicBundleAssignmentFactory.java --- .../pulsar/common/naming/TopicBundleAssignmentFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java index 71db40b10bfc0..0561cc61fec3d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java @@ -25,7 +25,7 @@ public class TopicBundleAssignmentFactory { - static TopicBundleAssignmentStrategy strategy; + private static volatile TopicBundleAssignmentStrategy strategy; public static TopicBundleAssignmentStrategy create(PulsarService pulsar) { if (strategy != null) { From 998c344ceece8d612be7053069fdcd185257f8b2 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Thu, 15 Jun 2023 14:35:20 +0800 Subject: [PATCH 17/25] Update TopicBundleAssignmentFactory.java --- .../pulsar/common/naming/TopicBundleAssignmentFactory.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java index 0561cc61fec3d..2ec3f4205f958 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.common.naming; -import lombok.Synchronized; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.util.Reflections; From 4f841e85ddacb832f8c8c31575bbf982c949b596 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Thu, 15 Jun 2023 16:15:05 +0800 Subject: [PATCH 18/25] check fix --- .../org/apache/pulsar/common/naming/NamespaceBundleFactory.java | 2 -- .../java/org/apache/pulsar/common/naming/NamespaceBundles.java | 2 +- .../org/apache/pulsar/common/naming/NamespaceBundlesTest.java | 1 - 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index b8fdc102fd023..ea0d93a147c86 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -43,7 +43,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import lombok.Getter; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; @@ -70,7 +69,6 @@ public class NamespaceBundleFactory { private final PulsarService pulsar; - private final Duration maxRetryDuration = Duration.ofSeconds(10); public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java index d7697c62af3de..5724a577de590 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java @@ -68,7 +68,7 @@ public NamespaceBundles(NamespaceName nsname, NamespaceBundleFactory factory, checkArgument(partitions.length > 0, "Can't create bundles w/o partition boundaries"); this.topicBundleAssignmentStrategy = TopicBundleAssignmentFactory.create(factory.getPulsar()); - + // calculate bundles based on partition boundaries this.bundles = new ArrayList<>(); fullBundle = new NamespaceBundle(nsname, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index 4bf9637ff2a36..61cae9452c05e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -41,7 +41,6 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.LocalPoliciesResources; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; From 56dbd8e5b8c2fe15ee3aa808a89a02f00861b439 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Sat, 17 Jun 2023 10:25:55 +0800 Subject: [PATCH 19/25] Update NamespaceBundlesTest.java --- .../apache/pulsar/common/naming/NamespaceBundlesTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index 61cae9452c05e..1a2f856ed5966 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -41,6 +41,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.LocalPoliciesResources; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; @@ -106,8 +107,11 @@ private NamespaceBundleFactory getNamespaceBundleFactory() { when(resources.getNamespaceResources().getPoliciesAsync(any())).thenReturn( CompletableFuture.completedFuture(Optional.empty())); NamespaceBundleFactory factory1 = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32()); - when(pulsar.getNamespaceService().getNamespaceBundleFactory()).thenReturn(factory1); + NamespaceService namespaceService = mock(NamespaceService.class); + when(namespaceService.getNamespaceBundleFactory()).thenReturn(factory1); + when(pulsar.getNamespaceService()).thenReturn(namespaceService); return factory1; + } @Test From 4d2105d8ff9a302944ed6e088a02cc4d0e2f9a50 Mon Sep 17 00:00:00 2001 From: w00578186 Date: Mon, 26 Jun 2023 21:31:23 +0800 Subject: [PATCH 20/25] review issue fix --- conf/broker.conf | 2 +- .../ConsistentHashingTopicBundleAssigner.java | 13 ++++++------- .../common/naming/NamespaceBundleFactory.java | 12 +++++++----- .../pulsar/common/naming/NamespaceBundles.java | 11 +++-------- .../common/naming/TopicBundleAssignmentFactory.java | 8 ++++---- 5 files changed, 21 insertions(+), 25 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 86b50a2bf65aa..2a0df8d3ba581 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1308,7 +1308,7 @@ loadBalancerOverrideBrokerNicSpeedGbps= # Name of load manager to use loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl -#Name of topic bundle assignment strategy to use +# Name of topic bundle assignment strategy to use topicBundleAssignmentStrategy=org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner # Supported algorithms name for namespace bundle split. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java index 2bf17358c0d23..b5ebd5a804f9a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -18,16 +18,15 @@ */ package org.apache.pulsar.common.naming; -import static com.google.common.base.Preconditions.checkArgument; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.namespace.NamespaceService; public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignmentStrategy { - private NamespaceService namespaceService; + private PulsarService pulsarService; + @Override public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { - checkArgument(namespaceBundles.getNsname().equals(topicName.getNamespaceObject())); - long hashCode = namespaceService.getNamespaceBundleFactory().getLongHashCode(topicName.toString()); + long hashCode = + pulsarService.getNamespaceService().getNamespaceBundleFactory().getLongHashCode(topicName.toString()); NamespaceBundle bundle = namespaceBundles.getBundle(hashCode); if (topicName.getDomain().equals(TopicDomain.non_persistent)) { bundle.setHasNonPersistentTopic(true); @@ -37,7 +36,7 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac @Override public void init(PulsarService pulsarService) { - this.namespaceService = pulsarService.getNamespaceService(); + this.pulsarService = pulsarService; } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index ea0d93a147c86..c136ed42f8119 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -43,6 +43,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import lombok.Getter; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; @@ -69,6 +70,9 @@ public class NamespaceBundleFactory { private final PulsarService pulsar; + @Getter + private final TopicBundleAssignmentStrategy topicBundleAssignmentStrategy; + private final Duration maxRetryDuration = Duration.ofSeconds(10); public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) { @@ -83,6 +87,8 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) { pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification); this.pulsar = pulsar; + + topicBundleAssignmentStrategy = TopicBundleAssignmentFactory.create(pulsar); } private CompletableFuture loadBundles(NamespaceName namespace, Executor executor) { @@ -378,10 +384,6 @@ public boolean canSplitBundle(NamespaceBundle bundle) { return range.upperEndpoint() - range.lowerEndpoint() > 1; } - public PulsarService getPulsar() { - return pulsar; - } - public static void validateFullRange(SortedSet partitions) { checkArgument(partitions.first().equals(FIRST_BOUNDARY) && partitions.last().equals(LAST_BOUNDARY)); } @@ -423,4 +425,4 @@ public static Range getRange(Long lowerEndpoint, Long upperEndpoint) { (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN); } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java index 5724a577de590..fa7baeaa6067b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java @@ -45,7 +45,7 @@ public class NamespaceBundles { public static final Long FULL_LOWER_BOUND = 0x00000000L; public static final Long FULL_UPPER_BOUND = 0xffffffffL; - private final TopicBundleAssignmentStrategy topicBundleAssignmentStrategy; + private final NamespaceBundle fullBundle; private final Optional> localPolicies; @@ -67,8 +67,6 @@ public NamespaceBundles(NamespaceName nsname, NamespaceBundleFactory factory, this.localPolicies = localPolicies; checkArgument(partitions.length > 0, "Can't create bundles w/o partition boundaries"); - this.topicBundleAssignmentStrategy = TopicBundleAssignmentFactory.create(factory.getPulsar()); - // calculate bundles based on partition boundaries this.bundles = new ArrayList<>(); fullBundle = new NamespaceBundle(nsname, @@ -96,7 +94,8 @@ public NamespaceBundles(NamespaceName nsname, NamespaceBundleFactory factory, } public NamespaceBundle findBundle(TopicName topicName) { - return topicBundleAssignmentStrategy.findBundle(topicName, this); + checkArgument(nsname.equals(topicName.getNamespaceObject())); + return factory.getTopicBundleAssignmentStrategy().findBundle(topicName, this); } public List getBundles() { @@ -200,8 +199,4 @@ public LocalPolicies toLocalPolicies() { localPolicies.map(lp -> lp.getLeft().bookieAffinityGroup).orElse(null), localPolicies.map(lp -> lp.getLeft().namespaceAntiAffinityGroup).orElse(null)); } - - public NamespaceName getNsname() { - return this.nsname; - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java index 2ec3f4205f958..d39ca2966bddf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java @@ -36,11 +36,11 @@ public static TopicBundleAssignmentStrategy create(PulsarService pulsar) { } ServiceConfiguration conf = pulsar.getConfiguration(); try { - TopicBundleAssignmentStrategy tempStategy = + TopicBundleAssignmentStrategy tempStrategy = Reflections.createInstance(conf.getTopicBundleAssignmentStrategy(), TopicBundleAssignmentStrategy.class, Thread.currentThread().getContextClassLoader()); - tempStategy.init(pulsar); - strategy = tempStategy; + tempStrategy.init(pulsar); + strategy = tempStrategy; return strategy; } catch (Exception e) { throw new RuntimeException( @@ -48,4 +48,4 @@ public static TopicBundleAssignmentStrategy create(PulsarService pulsar) { } } } -} \ No newline at end of file +} From 572b665e5cc0cb625ba0c17452bf2a2b5e5ae62a Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 27 Jun 2023 11:44:30 +0800 Subject: [PATCH 21/25] Remove the blank in the tail --- .../org/apache/pulsar/common/naming/NamespaceBundlesTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index 1a2f856ed5966..809ce579ce94b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -91,7 +91,7 @@ public void testConstructor() throws Exception { @SuppressWarnings("unchecked") private NamespaceBundleFactory getNamespaceBundleFactory() { - PulsarService pulsar = mock(PulsarService.class); + PulsarService pulsar = mock(PulsarService.class); MetadataStoreExtended store = mock(MetadataStoreExtended.class); when(pulsar.getConfiguration()).thenReturn(new ServiceConfiguration()); when(pulsar.getLocalMetadataStore()).thenReturn(store); From 18b2e4df429112fd9b0a4f085a6bd3be4a020664 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 27 Jun 2023 11:48:00 +0800 Subject: [PATCH 22/25] Add line break at the end of file --- .../pulsar/common/naming/TopicBundleAssignmentStrategy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java index 27d335e921101..b43ca4afa440e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategy.java @@ -24,4 +24,4 @@ public interface TopicBundleAssignmentStrategy { NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles); void init(PulsarService pulsarService); -} \ No newline at end of file +} From afcaa1f2207e1fd1e176b4193b1d1a682e9bbe94 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 27 Jun 2023 11:48:09 +0800 Subject: [PATCH 23/25] Add line break at the end of file --- .../pulsar/common/naming/TopicBundleAssignmentStrategyTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java index 7346d319effc5..592a4e4d407db 100644 --- a/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java +++ b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java @@ -62,4 +62,4 @@ public void init(PulsarService pulsarService) { } -} \ No newline at end of file +} From ad74f90a69009ed95753b0ae62ec64a6b01b154d Mon Sep 17 00:00:00 2001 From: w00578186 Date: Tue, 27 Jun 2023 19:38:56 +0800 Subject: [PATCH 24/25] avoid some UT NullPointException --- .../ConsistentHashingTopicBundleAssigner.java | 8 +++----- .../naming/TopicBundleAssignmentFactory.java | 17 +++++++++++++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java index b5ebd5a804f9a..1e8b0d03392cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -18,15 +18,14 @@ */ package org.apache.pulsar.common.naming; +import com.google.common.hash.Hashing; +import java.nio.charset.StandardCharsets; import org.apache.pulsar.broker.PulsarService; public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignmentStrategy { - private PulsarService pulsarService; - @Override public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { - long hashCode = - pulsarService.getNamespaceService().getNamespaceBundleFactory().getLongHashCode(topicName.toString()); + long hashCode = Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong(); NamespaceBundle bundle = namespaceBundles.getBundle(hashCode); if (topicName.getDomain().equals(TopicDomain.non_persistent)) { bundle.setHasNonPersistentTopic(true); @@ -36,7 +35,6 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac @Override public void init(PulsarService pulsarService) { - this.pulsarService = pulsarService; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java index d39ca2966bddf..164d9a3f1deef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicBundleAssignmentFactory.java @@ -19,11 +19,13 @@ package org.apache.pulsar.common.naming; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.util.Reflections; public class TopicBundleAssignmentFactory { + public static final String DEFAULT_TOPIC_BUNDLE_ASSIGNMENT_STRATEGY = + "org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner"; + private static volatile TopicBundleAssignmentStrategy strategy; public static TopicBundleAssignmentStrategy create(PulsarService pulsar) { @@ -34,18 +36,25 @@ public static TopicBundleAssignmentStrategy create(PulsarService pulsar) { if (strategy != null) { return strategy; } - ServiceConfiguration conf = pulsar.getConfiguration(); + String topicBundleAssignmentStrategy = getTopicBundleAssignmentStrategy(pulsar); try { TopicBundleAssignmentStrategy tempStrategy = - Reflections.createInstance(conf.getTopicBundleAssignmentStrategy(), + Reflections.createInstance(topicBundleAssignmentStrategy, TopicBundleAssignmentStrategy.class, Thread.currentThread().getContextClassLoader()); tempStrategy.init(pulsar); strategy = tempStrategy; return strategy; } catch (Exception e) { throw new RuntimeException( - "Could not load TopicBundleAssignmentStrategy:" + conf.getTopicBundleAssignmentStrategy(), e); + "Could not load TopicBundleAssignmentStrategy:" + topicBundleAssignmentStrategy, e); } } } + + private static String getTopicBundleAssignmentStrategy(PulsarService pulsar) { + if (pulsar == null || pulsar.getConfiguration() == null) { + return DEFAULT_TOPIC_BUNDLE_ASSIGNMENT_STRATEGY; + } + return pulsar.getConfiguration().getTopicBundleAssignmentStrategy(); + } } From b6638ca066fd45fb270e8e29a330fe6c2b61b91e Mon Sep 17 00:00:00 2001 From: w00578186 Date: Tue, 27 Jun 2023 20:24:21 +0800 Subject: [PATCH 25/25] remove blank lines --- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 1 - .../common/naming/TopicBundleAssignmentStrategyTest.java | 3 --- 2 files changed, 4 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 95ff52f08c6df..9398d349be7d9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2487,7 +2487,6 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, ) private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl"; - @FieldContext(category = CATEGORY_LOAD_BALANCER, doc = "Name of topic bundle assignment strategy to use") private String topicBundleAssignmentStrategy = "org.apache.pulsar.common.naming.ConsistentHashingTopicBundleAssigner"; diff --git a/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java index 592a4e4d407db..b371106cad85a 100644 --- a/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java +++ b/pulsar-broker/src/test/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java @@ -58,8 +58,5 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac public void init(PulsarService pulsarService) { } - - } - }