Skip to content

Commit

Permalink
[fix] [broker] fix deadlock when disable topic level Geo-Replication (a…
Browse files Browse the repository at this point in the history
…pache#22738)

(cherry picked from commit 6372b9c)
(cherry picked from commit 1a8ba13)
  • Loading branch information
poorbarcode authored and srinath-ctds committed Jun 7, 2024
1 parent 6b7e727 commit 323013a
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3416,11 +3416,12 @@ protected CompletableFuture<Void> internalSetReplicationClusters(List<String> cl
Set<String> replicationClusters = Sets.newHashSet(clusterIds);
return validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> {
.thenAccept(__ -> {
if (CollectionUtils.isEmpty(clusterIds)) {
throw new RestException(Status.PRECONDITION_FAILED, "ClusterIds should not be null or empty");
}
Set<String> clusters = clusters();
}).thenCompose(__ -> clustersAsync())
.thenCompose(clusters -> {
List<CompletableFuture<Void>> futures = new ArrayList<>(replicationClusters.size());
for (String clusterId : replicationClusters) {
if (!clusters.contains(clusterId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,4 +658,63 @@ public void testUnFenceTopicToReuse() throws Exception {
admin2.topics().delete(topicName);
});
}

@Test
public void testDeleteNonPartitionedTopic() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
admin1.topics().createNonPartitionedTopic(topicName);

// Verify replicator works.
verifyReplicationWorks(topicName);

// Disable replication.
setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, pulsar1);
setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, pulsar2);

// Delete topic.
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);

// Verify the topic was deleted.
assertFalse(pulsar1.getPulsarResources().getTopicResources()
.persistentTopicExists(TopicName.get(topicName)).join());
assertFalse(pulsar2.getPulsarResources().getTopicResources()
.persistentTopicExists(TopicName.get(topicName)).join());
}

@Test
public void testDeletePartitionedTopic() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
admin1.topics().createPartitionedTopic(topicName, 2);

// Verify replicator works.
verifyReplicationWorks(topicName);

// Disable replication.
setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, pulsar1);
setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, pulsar2);

// Delete topic.
admin1.topics().deletePartitionedTopic(topicName);
if (!usingGlobalZK) {
admin2.topics().deletePartitionedTopic(topicName);
}

// Verify the topic was deleted.
assertFalse(pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.partitionedTopicExists(TopicName.get(topicName)));
assertFalse(pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.partitionedTopicExists(TopicName.get(topicName)));
if (!usingGlobalZK) {
// So far, the topic partitions on the remote cluster are needed to delete manually when using global ZK.
assertFalse(pulsar1.getPulsarResources().getTopicResources()
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
assertFalse(pulsar2.getPulsarResources().getTopicResources()
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
assertFalse(pulsar1.getPulsarResources().getTopicResources()
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
assertFalse(pulsar2.getPulsarResources().getTopicResources()
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,33 @@
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
import com.google.common.collect.Sets;
import java.net.URL;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.tests.TestRetrySupport;
Expand All @@ -52,6 +64,9 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport {
protected final String nonReplicatedNamespace = defaultTenant + "/ns1";

protected final String cluster1 = "r1";

protected boolean usingGlobalZK = false;

protected URL url1;
protected URL urlTls1;
protected ServiceConfiguration config1 = new ServiceConfiguration();
Expand All @@ -77,8 +92,12 @@ protected void startZKAndBK() throws Exception {
// Start ZK.
brokerConfigZk1 = new ZookeeperServerTest(0);
brokerConfigZk1.start();
brokerConfigZk2 = new ZookeeperServerTest(0);
brokerConfigZk2.start();
if (usingGlobalZK) {
brokerConfigZk2 = brokerConfigZk1;
} else {
brokerConfigZk2 = new ZookeeperServerTest(0);
brokerConfigZk2.start();
}

// Start BK.
bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
Expand Down Expand Up @@ -132,37 +151,42 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
.brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
.brokerClientTlsEnabled(false)
.build());
admin2.clusters().createCluster(cluster1, ClusterData.builder()
.serviceUrl(url1.toString())
.serviceUrlTls(urlTls1.toString())
.brokerServiceUrl(pulsar1.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
.brokerClientTlsEnabled(false)
.build());
admin2.clusters().createCluster(cluster2, ClusterData.builder()
.serviceUrl(url2.toString())
.serviceUrlTls(urlTls2.toString())
.brokerServiceUrl(pulsar2.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
.brokerClientTlsEnabled(false)
.build());

admin1.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(),
Sets.newHashSet(cluster1, cluster2)));
admin2.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(),
Sets.newHashSet(cluster1, cluster2)));

admin1.namespaces().createNamespace(replicatedNamespace, Sets.newHashSet(cluster1, cluster2));
admin2.namespaces().createNamespace(replicatedNamespace);
admin1.namespaces().createNamespace(nonReplicatedNamespace);
admin2.namespaces().createNamespace(nonReplicatedNamespace);

if (!usingGlobalZK) {
admin2.clusters().createCluster(cluster1, ClusterData.builder()
.serviceUrl(url1.toString())
.serviceUrlTls(urlTls1.toString())
.brokerServiceUrl(pulsar1.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
.brokerClientTlsEnabled(false)
.build());
admin2.clusters().createCluster(cluster2, ClusterData.builder()
.serviceUrl(url2.toString())
.serviceUrlTls(urlTls2.toString())
.brokerServiceUrl(pulsar2.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
.brokerClientTlsEnabled(false)
.build());
admin2.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(),
Sets.newHashSet(cluster1, cluster2)));
admin2.namespaces().createNamespace(replicatedNamespace);
admin2.namespaces().createNamespace(nonReplicatedNamespace);
}

}

protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception {
cleanupTopics(replicatedNamespace, cleanupTopicAction);
}

protected void cleanupTopics(String namespace, CleanupTopicAction cleanupTopicAction) throws Exception {
if (usingGlobalZK) {
throw new IllegalArgumentException("The method cleanupTopics does not support for global ZK");
}
waitChangeEventsInit(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Collections.singleton(cluster1));
admin1.namespaces().unload(namespace);
Expand Down Expand Up @@ -242,11 +266,15 @@ protected void cleanup() throws Exception {
// delete namespaces.
waitChangeEventsInit(replicatedNamespace);
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1));
if (!usingGlobalZK) {
admin2.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster2));
}
admin1.namespaces().deleteNamespace(replicatedNamespace);
admin2.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster2));
admin2.namespaces().deleteNamespace(replicatedNamespace);
admin1.namespaces().deleteNamespace(nonReplicatedNamespace);
admin2.namespaces().deleteNamespace(nonReplicatedNamespace);
if (!usingGlobalZK) {
admin2.namespaces().deleteNamespace(replicatedNamespace);
admin2.namespaces().deleteNamespace(nonReplicatedNamespace);
}

// shutdown.
markCurrentSetupNumberCleaned();
Expand Down Expand Up @@ -291,7 +319,7 @@ protected void cleanup() throws Exception {
brokerConfigZk1.stop();
brokerConfigZk1 = null;
}
if (brokerConfigZk2 != null) {
if (!usingGlobalZK && brokerConfigZk2 != null) {
brokerConfigZk2.stop();
brokerConfigZk2 = null;
}
Expand All @@ -313,4 +341,96 @@ protected void waitReplicatorStarted(String topicName) {
protected PulsarClient initClient(ClientBuilder clientBuilder) throws Exception {
return clientBuilder.build();
}

protected void verifyReplicationWorks(String topic) throws Exception {
final String subscription = "__subscribe_1";
final String msgValue = "__msg1";
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topic).create();
Consumer<String> consumer2 = client2.newConsumer(Schema.STRING).topic(topic).isAckReceiptEnabled(true)
.subscriptionName(subscription).subscribe();
producer1.newMessage().value(msgValue).send();
pulsar1.getBrokerService().checkReplicationPolicies();
assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), msgValue);
consumer2.unsubscribe();
producer1.close();
}

protected void setTopicLevelClusters(String topic, List<String> clusters, PulsarAdmin admin,
PulsarService pulsar) throws Exception {
Set<String> expected = new HashSet<>(clusters);
TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
int partitions = ensurePartitionsAreSame(topic);
admin.topics().setReplicationClusters(topic, clusters);
Awaitility.await().untilAsserted(() -> {
TopicPolicies policies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
assertEquals(new HashSet<>(policies.getReplicationClusters()), expected);
if (partitions == 0) {
checkNonPartitionedTopicLevelClusters(topicName.toString(), clusters, admin, pulsar.getBrokerService());
} else {
for (int i = 0; i < partitions; i++) {
checkNonPartitionedTopicLevelClusters(topicName.getPartition(i).toString(), clusters, admin,
pulsar.getBrokerService());
}
}
});
}

protected void checkNonPartitionedTopicLevelClusters(String topic, List<String> clusters, PulsarAdmin admin,
BrokerService broker) throws Exception {
CompletableFuture<Optional<Topic>> future = broker.getTopic(topic, false);
if (future == null) {
return;
}
Optional<Topic> optional = future.join();
if (optional == null || !optional.isPresent()) {
return;
}
PersistentTopic persistentTopic = (PersistentTopic) optional.get();
Set<String> expected = new HashSet<>(clusters);
Set<String> act = new HashSet<>(persistentTopic.getTopicPolicies().get().getReplicationClusters());
assertEquals(act, expected);
}

protected int ensurePartitionsAreSame(String topic) throws Exception {
TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
boolean isPartitionedTopic1 = pulsar1.getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().partitionedTopicExists(topicName);
boolean isPartitionedTopic2 = pulsar2.getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().partitionedTopicExists(topicName);
if (isPartitionedTopic1 != isPartitionedTopic2) {
throw new IllegalArgumentException(String.format("Can not delete topic."
+ " isPartitionedTopic1: %s, isPartitionedTopic2: %s",
isPartitionedTopic1, isPartitionedTopic2));
}
if (!isPartitionedTopic1) {
return 0;
}
int partitions1 = pulsar1.getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().getPartitionedTopicMetadataAsync(topicName).join().get().partitions;
int partitions2 = pulsar2.getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().getPartitionedTopicMetadataAsync(topicName).join().get().partitions;
if (partitions1 != partitions2) {
throw new IllegalArgumentException(String.format("Can not delete topic."
+ " partitions1: %s, partitions2: %s",
partitions1, partitions2));
}
return partitions1;
}

protected void deleteTopicAfterDisableTopicLevelReplication(String topic) throws Exception {
setTopicLevelClusters(topic, Arrays.asList(cluster1), admin1, pulsar1);
setTopicLevelClusters(topic, Arrays.asList(cluster1), admin2, pulsar2);
admin2.topics().setReplicationClusters(topic, Arrays.asList(cluster2));

int partitions = ensurePartitionsAreSame(topic);

TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
if (partitions != 0) {
admin1.topics().deletePartitionedTopic(topicName.toString());
admin2.topics().deletePartitionedTopic(topicName.toString());
} else {
admin1.topics().delete(topicName.toString());
admin2.topics().delete(topicName.toString());
}
}
}
Loading

0 comments on commit 323013a

Please sign in to comment.