Skip to content

Commit

Permalink
[improve] [broker] Create partitioned topics automatically when enabl…
Browse files Browse the repository at this point in the history
…e topic level replication (#22537)
  • Loading branch information
poorbarcode committed Apr 24, 2024
1 parent a3cd1f8 commit d475655
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -43,9 +44,11 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
Expand Down Expand Up @@ -621,35 +624,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n

private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) {
getNamespaceReplicatedClustersAsync(namespaceName)
.thenAccept(clusters -> {
for (String cluster : clusters) {
if (!cluster.equals(pulsar().getConfiguration().getClusterName())) {
// this call happens in the background without async composition. completion is logged.
pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(cluster)
.thenCompose(clusterDataOp ->
((TopicsImpl) pulsar().getBrokerService()
.getClusterPulsarAdmin(cluster,
clusterDataOp).topics())
.createPartitionedTopicAsync(
topicName.getPartitionedTopicName(),
numPartitions,
true, null))
.whenComplete((__, ex) -> {
if (ex != null) {
log.error(
"[{}] Failed to create partitioned topic {} in cluster {}.",
clientAppId(), topicName, cluster, ex);
} else {
log.info(
"[{}] Successfully created partitioned topic {} in "
+ "cluster {}",
clientAppId(), topicName, cluster);
}
});
}
.thenAccept(clusters -> {
// this call happens in the background without async composition. completion is logged.
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions);
});
}

protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToReplicatedClustersInBackground(
Set<String> clusters, int numPartitions) {
final String shortTopicName = topicName.getPartitionedTopicName();
Map<String, CompletableFuture<Void>> tasksForAllClusters = new HashMap<>();
for (String cluster : clusters) {
if (cluster.equals(pulsar().getConfiguration().getClusterName())) {
continue;
}
ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources();
CompletableFuture<Void> createRemoteTopicFuture = new CompletableFuture<>();
tasksForAllClusters.put(cluster, createRemoteTopicFuture);
clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> {
if (ex1 != null) {
// Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck.
log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster"
+ " {}.", clientAppId(), topicName, cluster, ex1);
createRemoteTopicFuture.completeExceptionally(new RestException(ex1));
return;
}
// Get cluster data success.
TopicsImpl topics =
(TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics();
topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null)
.whenComplete((ignore, ex2) -> {
if (ex2 == null) {
// Create success.
log.info("[{}] Successfully created partitioned topic {} in cluster {}",
clientAppId(), topicName, cluster);
createRemoteTopicFuture.complete(null);
return;
}
// Create topic on the remote cluster error.
Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2);
// The topic has been created before, check the partitions count is expected.
if (unwrapEx2 instanceof PulsarAdminException.ConflictException) {
topics.getPartitionedTopicMetadataAsync(shortTopicName).whenComplete((topicMeta, ex3) -> {
if (ex3 != null) {
// Unexpected error, such as NPE. Catch all error to avoid the
// "createRemoteTopicFuture" stuck.
log.error("[{}] Failed to check remote-cluster's topic metadata when creating"
+ " partitioned topic {} in cluster {}.",
clientAppId(), topicName, cluster, ex3);
createRemoteTopicFuture.completeExceptionally(new RestException(ex3));
}
// Call get partitioned metadata of remote cluster success.
if (topicMeta.partitions == numPartitions) {
log.info("[{}] Skip created partitioned topic {} in cluster {}, because that {}",
clientAppId(), topicName, cluster, unwrapEx2.getMessage());
createRemoteTopicFuture.complete(null);
} else {
String errorMsg = String.format("[%s] There is an exists topic %s with different"
+ " partitions %s on the remote cluster %s, you want to create it"
+ " with partitions %s",
clientAppId(), shortTopicName, topicMeta.partitions, cluster,
numPartitions);
log.error(errorMsg);
createRemoteTopicFuture.completeExceptionally(
new RestException(Status.PRECONDITION_FAILED, errorMsg));
}
});
} else {
// An HTTP error was responded from the remote cluster.
log.error("[{}] Failed to create partitioned topic {} in cluster {}.",
clientAppId(), topicName, cluster, ex2);
createRemoteTopicFuture.completeExceptionally(new RestException(unwrapEx2));
}
});
});
}
return tasksForAllClusters;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3253,12 +3253,13 @@ protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQu
}

protected CompletableFuture<Void> internalSetReplicationClusters(List<String> clusterIds) {
if (CollectionUtils.isEmpty(clusterIds)) {
return CompletableFuture.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"ClusterIds should not be null or empty"));
}
Set<String> replicationClusters = Sets.newHashSet(clusterIds);
return validatePoliciesReadOnlyAccessAsync()
.thenCompose(__ -> {
if (CollectionUtils.isEmpty(clusterIds)) {
throw new RestException(Status.PRECONDITION_FAILED, "ClusterIds should not be null or empty");
}
Set<String> replicationClusters = Sets.newHashSet(clusterIds);
if (replicationClusters.contains("global")) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot specify global in the list of replication clusters");
Expand All @@ -3273,6 +3274,20 @@ protected CompletableFuture<Void> internalSetReplicationClusters(List<String> cl
futures.add(validateClusterForTenantAsync(namespaceName.getTenant(), clusterId));
}
return FutureUtil.waitForAll(futures);
}).thenCompose(__ -> {
// Sync to create partitioned topic on the remote cluster if needed.
TopicName topicNameWithoutPartition = TopicName.get(topicName.getPartitionedTopicName());
return pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.getPartitionedTopicMetadataAsync(topicNameWithoutPartition).thenCompose(topicMetaOp -> {
// Skip to create topic if the topic is non-partitioned, because the replicator will create
// it automatically.
if (topicMetaOp.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return FutureUtil.waitForAll(
internalCreatePartitionedTopicToReplicatedClustersInBackground(replicationClusters,
topicMetaOp.get().partitions).values());
});
}).thenCompose(__ ->
getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -59,6 +61,9 @@
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
Expand Down Expand Up @@ -92,6 +97,20 @@ private void waitReplicatorStarted(String topicName) {
});
}

private void waitReplicatorStopped(String topicName) {
Awaitility.await().untilAsserted(() -> {
Optional<Topic> topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get();
assertTrue(topicOptional2.isPresent());
PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get();
assertTrue(persistentTopic2.getProducers().isEmpty());
Optional<Topic> topicOptional1 = pulsar2.getBrokerService().getTopic(topicName, false).get();
assertTrue(topicOptional1.isPresent());
PersistentTopic persistentTopic1 = (PersistentTopic) topicOptional2.get();
assertTrue(persistentTopic1.getReplicators().isEmpty()
|| !persistentTopic1.getReplicators().get(cluster2).isConnected());
});
}

/**
* Override "AbstractReplicator.producer" by {@param producer} and return the original value.
*/
Expand All @@ -108,7 +127,7 @@ private ProducerImpl overrideProducerForReplicator(AbstractReplicator replicator

@Test(timeOut = 45 * 1000)
public void testReplicatorProducerStatInTopic() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
final String subscribeName = "subscribe_1";
final byte[] msgValue = "test".getBytes();

Expand All @@ -134,7 +153,7 @@ public void testReplicatorProducerStatInTopic() throws Exception {

@Test(timeOut = 45 * 1000)
public void testCreateRemoteConsumerFirst() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).create();

// The topic in cluster2 has a replicator created producer(schema Auto_Produce), but does not have any schema。
Expand All @@ -154,7 +173,7 @@ public void testCreateRemoteConsumerFirst() throws Exception {

@Test(timeOut = 45 * 1000)
public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
admin1.topics().createNonPartitionedTopic(topicName);
// Wait for replicator started.
waitReplicatorStarted(topicName);
Expand Down Expand Up @@ -210,7 +229,7 @@ private void injectMockReplicatorProducerBuilder(
BrokerService brokerService = pulsar1.getBrokerService();
// Wait for the internal client created.
final String topicNameTriggerInternalClientCreate =
BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate);
waitReplicatorStarted(topicNameTriggerInternalClientCreate);
cleanupTopics(() -> {
Expand Down Expand Up @@ -338,7 +357,7 @@ void startCallback() {
*/
@Test(timeOut = 120 * 1000)
public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
// Inject an error for "replicator.producer" creation.
// The delay time of next retry to create producer is below:
// 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
Expand Down Expand Up @@ -409,4 +428,62 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception
admin2.topics().delete(topicName);
});
}

@Test
public void testPartitionedTopicLevelReplication() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
final String partition0 = TopicName.get(topicName).getPartition(0).toString();
final String partition1 = TopicName.get(topicName).getPartition(1).toString();
admin1.topics().createPartitionedTopic(topicName, 2);
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
// Check the partitioned topic has been created at the remote cluster.
PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName);
assertEquals(topicMetadata2.partitions, 2);
// cleanup.
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
waitReplicatorStopped(partition0);
waitReplicatorStopped(partition1);
admin1.topics().deletePartitionedTopic(topicName);
admin2.topics().deletePartitionedTopic(topicName);
}

@Test
public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
final String partition0 = TopicName.get(topicName).getPartition(0).toString();
final String partition1 = TopicName.get(topicName).getPartition(1).toString();
admin1.topics().createPartitionedTopic(topicName, 2);
admin2.topics().createPartitionedTopic(topicName, 2);
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
// Check the partitioned topic has been created at the remote cluster.
PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName);
assertEquals(topicMetadata2.partitions, 2);
// cleanup.
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
waitReplicatorStopped(partition0);
waitReplicatorStopped(partition1);
admin1.topics().deletePartitionedTopic(topicName);
admin2.topics().deletePartitionedTopic(topicName);
}

@Test
public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
admin2.topics().createPartitionedTopic(topicName, 3);
admin1.topics().createPartitionedTopic(topicName, 2);
try {
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
fail("Expected error due to a conflict partitioned topic already exists.");
} catch (Exception ex) {
Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unWrapEx.getMessage().contains("with different partitions"));
}
// Check nothing changed.
PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName);
assertEquals(topicMetadata2.partitions, 3);
assertEquals(admin1.topics().getReplicationClusters(topicName, true).size(), 1);
// cleanup.
admin1.topics().deletePartitionedTopic(topicName);
admin2.topics().deletePartitionedTopic(topicName);
}
}

0 comments on commit d475655

Please sign in to comment.