From f6ecfd7d65960f3a9e43aafd9744ad12748898a3 Mon Sep 17 00:00:00 2001 From: liangyepianzhou Date: Fri, 23 Jul 2021 15:28:53 +0800 Subject: [PATCH 1/5] Add getTxnID method in Transaction.java --- .../broker/transaction/TransactionTest.java | 20 +++++++++++++++++++ .../client/api/transaction/Transaction.java | 5 +++++ .../impl/transaction/TransactionImpl.java | 5 +++++ 3 files changed, 30 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index ce8b3ae9abd44..7808f7f08d450 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -32,6 +32,8 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -139,4 +141,22 @@ public Consumer getConsumer(String topicName, String subName) throws Pul .enableBatchIndexAcknowledgment(true) .subscribe(); } + + @Test + public void testGetTxnID() throws Exception { + Awaitility.await().atMost(4, TimeUnit.SECONDS).until(()->{ + try { + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(0, TimeUnit.SECONDS).build().get(); + } catch (Exception e){ + return false; + } + return true; + }); + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(0, TimeUnit.SECONDS).build().get(); + TxnID txnID = transaction.getTxnID(); + Assert.assertEquals(txnID.getLeastSigBits(), 1); + Assert.assertEquals(txnID.getMostSigBits(), 0); + } } \ No newline at end of file diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java index af2df9edf9127..2267241181a11 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java @@ -43,4 +43,9 @@ public interface Transaction { */ CompletableFuture abort(); + /** + * Get TxnID of the transaction. + * @return TxnID + */ + TxnID getTxnID(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java index 1b6655e7c2397..60c7829b11f41 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java @@ -208,6 +208,11 @@ public CompletableFuture abort() { }); } + @Override + public TxnID getTxnID() { + return new TxnID(txnIdMostBits, txnIdLeastBits); + } + private CompletableFuture checkIfOpen() { if (state == State.OPEN) { return CompletableFuture.completedFuture(null); From f5c5499c24952eb512615c96e23a45282c093cb0 Mon Sep 17 00:00:00 2001 From: liangyepianzhou Date: Fri, 23 Jul 2021 16:01:08 +0800 Subject: [PATCH 2/5] Modify the Awaitility.await() in TransactionTest and doc in Transaction --- .../broker/transaction/TransactionTest.java | 30 +++++++++++++------ .../client/api/transaction/Transaction.java | 2 +- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 7808f7f08d450..944aad5f9a549 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.transaction; import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; import com.google.common.collect.Sets; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -40,6 +42,10 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; +import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -144,18 +150,24 @@ public Consumer getConsumer(String topicName, String subName) throws Pul @Test public void testGetTxnID() throws Exception { - Awaitility.await().atMost(4, TimeUnit.SECONDS).until(()->{ - try { - Transaction transaction = pulsarClient.newTransaction() - .withTransactionTimeout(0, TimeUnit.SECONDS).build().get(); - } catch (Exception e){ - return false; - } - return true; + // wait tc init success to ready state + Awaitility.await().untilAsserted(() -> { + TransactionMetadataStore transactionMetadataStore = + getPulsarServiceList().get(0).getTransactionMetadataStoreService() + .getStores().get(TransactionCoordinatorID.get(0)); + assertNotNull(transactionMetadataStore); + assertEquals(((MLTransactionMetadataStore) transactionMetadataStore).getState(), + TransactionMetadataStoreState.State.Ready); }); Transaction transaction = pulsarClient.newTransaction() - .withTransactionTimeout(0, TimeUnit.SECONDS).build().get(); + .build().get(); TxnID txnID = transaction.getTxnID(); + Assert.assertEquals(txnID.getLeastSigBits(), 0); + Assert.assertEquals(txnID.getMostSigBits(), 0); + transaction.abort(); + transaction = pulsarClient.newTransaction() + .build().get(); + txnID = transaction.getTxnID(); Assert.assertEquals(txnID.getLeastSigBits(), 1); Assert.assertEquals(txnID.getMostSigBits(), 0); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java index 2267241181a11..fd4cf0bc1665c 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java @@ -45,7 +45,7 @@ public interface Transaction { /** * Get TxnID of the transaction. - * @return TxnID + * @return {@link TxnID} the txnID. */ TxnID getTxnID(); } From 8fd8e652859d6638f911f028e770d402fee6a502 Mon Sep 17 00:00:00 2001 From: liangyepianzhou Date: Fri, 23 Jul 2021 17:47:57 +0800 Subject: [PATCH 3/5] Optimize waitForCoordinatorToBeAvailable method in TransactionTest --- .../broker/transaction/TransactionTest.java | 20 ++++++------- .../transaction/TransactionTestBase.java | 28 ++++++++++++++----- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 944aad5f9a549..013d4a007447f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -61,15 +61,18 @@ public class TransactionTest extends TransactionTestBase { private static final String TENANT = "tnx"; private static final String NAMESPACE1 = TENANT + "/ns1"; + private static final int NUM_BROKER = 1; + private static final int NUM_TC_PER_ = 1; @BeforeMethod protected void setup() throws Exception { - this.setBrokerCount(1); + this.setBrokerCount(NUM_BROKER); this.internalSetup(); - String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); + String[] brokerServiceUrlArr = getPulsarServiceList().get(NUM_BROKER - 1).getBrokerServiceUrl().split(":"); String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1]; - admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build()); + admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder() + .serviceUrl("http://localhost:" + webServicePort).build()); admin.tenants().createTenant(TENANT, new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); admin.namespaces().createNamespace(NAMESPACE1); @@ -77,7 +80,7 @@ protected void setup() throws Exception { admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); + admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_TC_PER_); pulsarClient.close(); pulsarClient = PulsarClient.builder() .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) @@ -151,14 +154,7 @@ public Consumer getConsumer(String topicName, String subName) throws Pul @Test public void testGetTxnID() throws Exception { // wait tc init success to ready state - Awaitility.await().untilAsserted(() -> { - TransactionMetadataStore transactionMetadataStore = - getPulsarServiceList().get(0).getTransactionMetadataStoreService() - .getStores().get(TransactionCoordinatorID.get(0)); - assertNotNull(transactionMetadataStore); - assertEquals(((MLTransactionMetadataStore) transactionMetadataStore).getState(), - TransactionMetadataStoreState.State.Ready); - }); + Assert.assertTrue(waitForCoordinatorToBeAvailable(NUM_BROKER, NUM_TC_PER_)); Transaction transaction = pulsarClient.newTransaction() .build().get(); TxnID txnID = transaction.getTxnID(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index a2091f443607f..651b2ddb792b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -21,19 +21,17 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; - +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.channel.EventLoopGroup; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import io.netty.channel.EventLoopGroup; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -49,17 +47,22 @@ import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor; import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; +import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.tests.TestRetrySupport; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl; -import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.MockZooKeeperSession; import org.apache.zookeeper.ZooKeeper; +import org.awaitility.Awaitility; @Slf4j public abstract class TransactionTestBase extends TestRetrySupport { @@ -291,5 +294,16 @@ protected final void internalCleanup() { log.warn("Failed to clean up mocked pulsar service:", e); } } - + public boolean waitForCoordinatorToBeAvailable(int numOfBroker, int numOfTCPerBroker){ + // wait tc init success to ready state + Awaitility.await().untilAsserted(() -> { + TransactionMetadataStore transactionMetadataStore = + getPulsarServiceList().get(numOfBroker - 1).getTransactionMetadataStoreService() + .getStores().get(TransactionCoordinatorID.get(numOfTCPerBroker - 1)); + assertNotNull(transactionMetadataStore); + assertEquals(((MLTransactionMetadataStore) transactionMetadataStore).getState(), + TransactionMetadataStoreState.State.Ready); + }); + return true; + } } From 1b377cd0d38b079ff3794687b93b0d3a1935abd7 Mon Sep 17 00:00:00 2001 From: liangyepianzhou Date: Fri, 23 Jul 2021 20:09:23 +0800 Subject: [PATCH 4/5] Fixed the details in TransactionTest --- .../apache/pulsar/broker/transaction/TransactionTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 013d4a007447f..744a3e05fdfa5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -62,14 +62,14 @@ public class TransactionTest extends TransactionTestBase { private static final String TENANT = "tnx"; private static final String NAMESPACE1 = TENANT + "/ns1"; private static final int NUM_BROKER = 1; - private static final int NUM_TC_PER_ = 1; + private static final int NUM_TC_PER = 1; @BeforeMethod protected void setup() throws Exception { this.setBrokerCount(NUM_BROKER); this.internalSetup(); - String[] brokerServiceUrlArr = getPulsarServiceList().get(NUM_BROKER - 1).getBrokerServiceUrl().split(":"); + String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1]; admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder() .serviceUrl("http://localhost:" + webServicePort).build()); @@ -80,7 +80,7 @@ protected void setup() throws Exception { admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_TC_PER_); + admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_TC_PER); pulsarClient.close(); pulsarClient = PulsarClient.builder() .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) @@ -154,7 +154,7 @@ public Consumer getConsumer(String topicName, String subName) throws Pul @Test public void testGetTxnID() throws Exception { // wait tc init success to ready state - Assert.assertTrue(waitForCoordinatorToBeAvailable(NUM_BROKER, NUM_TC_PER_)); + Assert.assertTrue(waitForCoordinatorToBeAvailable(NUM_BROKER, NUM_TC_PER)); Transaction transaction = pulsarClient.newTransaction() .build().get(); TxnID txnID = transaction.getTxnID(); From ee4daff3faa8861c8a8cc0678900349ef2d7629d Mon Sep 17 00:00:00 2001 From: liangyepianzhou Date: Fri, 23 Jul 2021 20:22:51 +0800 Subject: [PATCH 5/5] Fixed the problem of naming in TransactionTest --- .../pulsar/broker/transaction/TransactionTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 744a3e05fdfa5..26a02605c281b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -61,12 +61,12 @@ public class TransactionTest extends TransactionTestBase { private static final String TENANT = "tnx"; private static final String NAMESPACE1 = TENANT + "/ns1"; - private static final int NUM_BROKER = 1; - private static final int NUM_TC_PER = 1; + private static final int NUM_BROKERS = 1; + private static final int NUM_PARTITIONS = 1; @BeforeMethod protected void setup() throws Exception { - this.setBrokerCount(NUM_BROKER); + this.setBrokerCount(NUM_BROKERS); this.internalSetup(); String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); @@ -80,7 +80,7 @@ protected void setup() throws Exception { admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_TC_PER); + admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS); pulsarClient.close(); pulsarClient = PulsarClient.builder() .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) @@ -154,7 +154,7 @@ public Consumer getConsumer(String topicName, String subName) throws Pul @Test public void testGetTxnID() throws Exception { // wait tc init success to ready state - Assert.assertTrue(waitForCoordinatorToBeAvailable(NUM_BROKER, NUM_TC_PER)); + Assert.assertTrue(waitForCoordinatorToBeAvailable(NUM_BROKERS, NUM_PARTITIONS)); Transaction transaction = pulsarClient.newTransaction() .build().get(); TxnID txnID = transaction.getTxnID();