diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index e422f3d8c6fda..30c7a41e28d3e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -94,15 +94,6 @@ public void cleanup() throws Exception { super.cleanup(); } - private void waitReplicatorStarted(String topicName) { - Awaitility.await().untilAsserted(() -> { - Optional topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get(); - assertTrue(topicOptional2.isPresent()); - PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get(); - assertFalse(persistentTopic2.getProducers().isEmpty()); - }); - } - private void waitReplicatorStopped(String topicName) { Awaitility.await().untilAsserted(() -> { Optional topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index b4eed00c4470f..317e43306e356 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; import java.net.URL; import java.time.Duration; @@ -29,6 +31,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.policies.data.ClusterData; @@ -55,7 +58,7 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { protected ZookeeperServerTest brokerConfigZk1; protected LocalBookkeeperEnsemble bkEnsemble1; protected PulsarService pulsar1; - protected BrokerService ns1; + protected BrokerService broker1; protected PulsarAdmin admin1; protected PulsarClient client1; @@ -66,7 +69,7 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { protected ZookeeperServerTest brokerConfigZk2; protected LocalBookkeeperEnsemble bkEnsemble2; protected PulsarService pulsar2; - protected BrokerService ns2; + protected BrokerService broker2; protected PulsarAdmin admin2; protected PulsarClient client2; @@ -89,23 +92,29 @@ protected void startBrokers() throws Exception { setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk1); pulsar1 = new PulsarService(config1); pulsar1.start(); - ns1 = pulsar1.getBrokerService(); - + broker1 = pulsar1.getBrokerService(); url1 = new URL(pulsar1.getWebServiceAddress()); urlTls1 = new URL(pulsar1.getWebServiceAddressTls()); - admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build(); - client1 = PulsarClient.builder().serviceUrl(url1.toString()).build(); // Start region 2 setConfigDefaults(config2, cluster2, bkEnsemble2, brokerConfigZk2); pulsar2 = new PulsarService(config2); pulsar2.start(); - ns2 = pulsar2.getBrokerService(); - + broker2 = pulsar2.getBrokerService(); url2 = new URL(pulsar2.getWebServiceAddress()); urlTls2 = new URL(pulsar2.getWebServiceAddressTls()); + } + + protected void startAdminClient() throws Exception { + admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build(); admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build(); - client2 = PulsarClient.builder().serviceUrl(url2.toString()).build(); + } + + protected void startPulsarClient() throws Exception{ + ClientBuilder clientBuilder1 = PulsarClient.builder().serviceUrl(url1.toString()); + client1 = initClient(clientBuilder1); + ClientBuilder clientBuilder2 = PulsarClient.builder().serviceUrl(url2.toString()); + client2 = initClient(clientBuilder2); } protected void createDefaultTenantsAndClustersAndNamespace() throws Exception { @@ -196,8 +205,12 @@ protected void setup() throws Exception { startBrokers(); + startAdminClient(); + createDefaultTenantsAndClustersAndNamespace(); + startPulsarClient(); + Thread.sleep(100); log.info("--- OneWayReplicatorTestBase::setup completed ---"); } @@ -287,4 +300,17 @@ protected void cleanup() throws Exception { config1 = new ServiceConfiguration(); config2 = new ServiceConfiguration(); } + + protected void waitReplicatorStarted(String topicName) { + Awaitility.await().untilAsserted(() -> { + Optional topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get(); + assertTrue(topicOptional2.isPresent()); + PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get(); + assertFalse(persistentTopic2.getProducers().isEmpty()); + }); + } + + protected PulsarClient initClient(ClientBuilder clientBuilder) throws Exception { + return clientBuilder.build(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java new file mode 100644 index 0000000000000..3caf4a1f2398c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTxnTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; +import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import com.google.common.collect.Sets; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.pulsar.zookeeper.ZookeeperServerTest; +import org.awaitility.reflect.WhiteboxImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ReplicationTxnTest extends OneWayReplicatorTestBase { + + private boolean transactionBufferSegmentedSnapshotEnabled = false; + private int txnLogPartitions = 4; + + @Override + @BeforeClass(alwaysRun = true, timeOut = 300000) + public void setup() throws Exception { + super.setup(); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + @Override + protected PulsarClient initClient(ClientBuilder clientBuilder) throws Exception { + return clientBuilder.enableTransaction(true).build(); + } + + @Override + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, + LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { + super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk); + config.setSystemTopicEnabled(true); + config.setTopicLevelPoliciesEnabled(true); + config.setTransactionCoordinatorEnabled(true); + config.setTransactionLogBatchedWriteEnabled(true); + config.setTransactionPendingAckBatchedWriteEnabled(true); + config.setTransactionBufferSegmentedSnapshotEnabled(transactionBufferSegmentedSnapshotEnabled); + } + + @Override + protected void createDefaultTenantsAndClustersAndNamespace() throws Exception { + super.createDefaultTenantsAndClustersAndNamespace(); + + // Create resource that transaction function relies on. + admin1.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(), new TenantInfoImpl(Collections.emptySet(), + Sets.newHashSet(cluster1, cluster2))); + admin1.namespaces().createNamespace(SYSTEM_NAMESPACE.toString(), 4); + pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().createPartitionedTopic( + SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, new PartitionedTopicMetadata(txnLogPartitions)); + //admin1.topics().createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString(), 4); + + admin2.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(), new TenantInfoImpl(Collections.emptySet(), + Sets.newHashSet(cluster1, cluster2))); + admin2.namespaces().createNamespace(SYSTEM_NAMESPACE.toString(), 4); + pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().createPartitionedTopic( + SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, new PartitionedTopicMetadata(txnLogPartitions)); + } + + private void pubAndSubOneMsg(String topic, String subscription) throws Exception { + Consumer consumer1 = client1.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription) + .isAckReceiptEnabled(true).subscribe(); + Producer producer1 = client1.newProducer(Schema.STRING).topic(topic).create(); + producer1.newMessage().value("msg1").send(); + // start txn. + Transaction txn = client1.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + // consume. + Message c1Msg1 = consumer1.receive(5, TimeUnit.SECONDS); + assertNotNull(c1Msg1); + assertEquals(c1Msg1.getValue(), "msg1"); + consumer1.acknowledgeAsync(c1Msg1.getMessageId(), txn).join(); + // send. + producer1.newMessage(txn).value("msg2").send(); + // commit. + txn.commit().get(); + + // Consume the msg with TXN. + Message c1Msg2 = consumer1.receive(5, TimeUnit.SECONDS); + assertNotNull(c1Msg2); + assertEquals(c1Msg2.getValue(), "msg2"); + consumer1.acknowledgeAsync(c1Msg2.getMessageId()).join(); + + // Consume messages on the remote cluster. + Consumer consumer2 = client2.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription).subscribe(); + Message c2Msg1 = consumer2.receive(15, TimeUnit.SECONDS); + assertNotNull(c2Msg1); + MessageMetadata msgMetadata1 = WhiteboxImpl.getInternalState(c2Msg1, "msgMetadata"); + // Verify: the messages replicated has no TXN id. + assertFalse(msgMetadata1.hasTxnidMostBits()); + assertFalse(msgMetadata1.hasTxnidLeastBits()); + consumer2.acknowledge(c2Msg1); + Message c2Msg2 = consumer2.receive(15, TimeUnit.SECONDS); + assertNotNull(c2Msg2); + MessageMetadata msgMetadata2 = WhiteboxImpl.getInternalState(c2Msg2, "msgMetadata"); + // Verify: the messages replicated has no TXN id. + assertFalse(msgMetadata2.hasTxnidMostBits()); + assertFalse(msgMetadata2.hasTxnidLeastBits()); + consumer2.acknowledge(c2Msg2); + + // cleanup. + producer1.close(); + consumer1.close(); + consumer2.close(); + } + + private void verifyNoReplicator(BrokerService broker, TopicName topicName) throws Exception { + String tpStr = topicName.toString(); + CompletableFuture> future = broker.getTopic(tpStr, true); + if (future == null) { + return; + } + PersistentTopic persistentTopic = (PersistentTopic) future.join().get(); + assertTrue(persistentTopic.getReplicators().isEmpty()); + } + + @Test + public void testTxnLogNotBeReplicated() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp"); + final String subscription = "s1"; + admin1.topics().createNonPartitionedTopic(topic); + waitReplicatorStarted(topic); + admin1.topics().createSubscription(topic, subscription, MessageId.earliest); + admin2.topics().createSubscription(topic, subscription, MessageId.earliest); + // Pub & Sub. + pubAndSubOneMsg(topic, subscription); + // To cover more cases, sleep 3s. + Thread.sleep(3000); + + // Verify: messages on the TXN system topic did not been replicated. + // __transaction_log_: it only uses ML, will not create topic. + for (int i = 0; i < txnLogPartitions; i++) { + TopicName txnLog = TopicName.get(TopicDomain.persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + i); + assertNotNull(pulsar1.getManagedLedgerFactory() + .getManagedLedgerInfo(txnLog.getPersistenceNamingEncoding())); + assertFalse(broker1.getTopics().containsKey(txnLog.toString())); + } + // __transaction_pending_ack: it only uses ML, will not create topic. + TopicName pendingAck = TopicName.get( + MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subscription)); + assertNotNull(pulsar1.getManagedLedgerFactory() + .getManagedLedgerInfo(pendingAck.getPersistenceNamingEncoding())); + assertFalse(broker1.getTopics().containsKey(pendingAck.toString())); + // __transaction_buffer_snapshot. + verifyNoReplicator(broker1, TopicName.get(TopicDomain.persistent.value(), + TopicName.get(topic).getNamespaceObject(), + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT)); + verifyNoReplicator(broker1, TopicName.get(TopicDomain.persistent.value(), + TopicName.get(topic).getNamespaceObject(), + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS)); + verifyNoReplicator(broker1, TopicName.get(TopicDomain.persistent.value(), + TopicName.get(topic).getNamespaceObject(), + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES)); + + // cleanup. + cleanupTopics(() -> { + admin1.topics().delete(topic); + admin2.topics().delete(topic); + try { + admin1.topics().delete(pendingAck.toString()); + } catch (Exception ex) {} + try { + admin2.topics().delete(pendingAck.toString()); + } catch (Exception ex) {} + }); + } + + @Test + public void testOngoingMessagesWillNotBeReplicated() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp"); + final String subscription = "s1"; + admin1.topics().createNonPartitionedTopic(topic); + waitReplicatorStarted(topic); + admin1.topics().createSubscription(topic, subscription, MessageId.earliest); + admin2.topics().createSubscription(topic, subscription, MessageId.earliest); + // Pub without commit. + Producer producer1 = client1.newProducer(Schema.STRING).topic(topic).create(); + Transaction txn = client1.newTransaction().withTransactionTimeout(1, TimeUnit.HOURS).build().get(); + producer1.newMessage(txn).value("msg1").send(); + // Verify: receive nothing on the remote cluster. + Consumer consumer2 = client2.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription).subscribe(); + Message msg = consumer2.receive(15, TimeUnit.SECONDS); + assertNull(msg); + // Verify: the repl cursor is not end of the topic. + PersistentTopic persistentTopic = (PersistentTopic) broker1.getTopic(topic, false).join().get(); + GeoPersistentReplicator replicator = + (GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next(); + assertTrue(replicator.getCursor().hasMoreEntries()); + + // cleanup. + producer1.close(); + consumer2.close(); + cleanupTopics(() -> { + admin1.topics().delete(topic); + admin2.topics().delete(topic); + TopicName pendingAck = TopicName.get( + MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subscription)); + try { + admin1.topics().delete(pendingAck.toString()); + } catch (Exception ex) {} + try { + admin2.topics().delete(pendingAck.toString()); + } catch (Exception ex) {} + }); + } +}