diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index ce8b3ae9abd44..26a02605c281b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.transaction; import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; import com.google.common.collect.Sets; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -32,12 +34,18 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; +import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -53,15 +61,18 @@ public class TransactionTest extends TransactionTestBase { private static final String TENANT = "tnx"; private static final String NAMESPACE1 = TENANT + "/ns1"; + private static final int NUM_BROKERS = 1; + private static final int NUM_PARTITIONS = 1; @BeforeMethod protected void setup() throws Exception { - this.setBrokerCount(1); + this.setBrokerCount(NUM_BROKERS); this.internalSetup(); String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1]; - admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build()); + admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder() + .serviceUrl("http://localhost:" + webServicePort).build()); admin.tenants().createTenant(TENANT, new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); admin.namespaces().createNamespace(NAMESPACE1); @@ -69,7 +80,7 @@ protected void setup() throws Exception { admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); + admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS); pulsarClient.close(); pulsarClient = PulsarClient.builder() .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) @@ -139,4 +150,21 @@ public Consumer getConsumer(String topicName, String subName) throws Pul .enableBatchIndexAcknowledgment(true) .subscribe(); } + + @Test + public void testGetTxnID() throws Exception { + // wait tc init success to ready state + Assert.assertTrue(waitForCoordinatorToBeAvailable(NUM_BROKERS, NUM_PARTITIONS)); + Transaction transaction = pulsarClient.newTransaction() + .build().get(); + TxnID txnID = transaction.getTxnID(); + Assert.assertEquals(txnID.getLeastSigBits(), 0); + Assert.assertEquals(txnID.getMostSigBits(), 0); + transaction.abort(); + transaction = pulsarClient.newTransaction() + .build().get(); + txnID = transaction.getTxnID(); + Assert.assertEquals(txnID.getLeastSigBits(), 1); + Assert.assertEquals(txnID.getMostSigBits(), 0); + } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index a2091f443607f..651b2ddb792b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -21,19 +21,17 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; - +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.channel.EventLoopGroup; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import io.netty.channel.EventLoopGroup; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -49,17 +47,22 @@ import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor; import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; +import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.tests.TestRetrySupport; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl; -import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.MockZooKeeperSession; import org.apache.zookeeper.ZooKeeper; +import org.awaitility.Awaitility; @Slf4j public abstract class TransactionTestBase extends TestRetrySupport { @@ -291,5 +294,16 @@ protected final void internalCleanup() { log.warn("Failed to clean up mocked pulsar service:", e); } } - + public boolean waitForCoordinatorToBeAvailable(int numOfBroker, int numOfTCPerBroker){ + // wait tc init success to ready state + Awaitility.await().untilAsserted(() -> { + TransactionMetadataStore transactionMetadataStore = + getPulsarServiceList().get(numOfBroker - 1).getTransactionMetadataStoreService() + .getStores().get(TransactionCoordinatorID.get(numOfTCPerBroker - 1)); + assertNotNull(transactionMetadataStore); + assertEquals(((MLTransactionMetadataStore) transactionMetadataStore).getState(), + TransactionMetadataStoreState.State.Ready); + }); + return true; + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java index af2df9edf9127..fd4cf0bc1665c 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java @@ -43,4 +43,9 @@ public interface Transaction { */ CompletableFuture abort(); + /** + * Get TxnID of the transaction. + * @return {@link TxnID} the txnID. + */ + TxnID getTxnID(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java index 1b6655e7c2397..60c7829b11f41 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java @@ -208,6 +208,11 @@ public CompletableFuture abort() { }); } + @Override + public TxnID getTxnID() { + return new TxnID(txnIdMostBits, txnIdLeastBits); + } + private CompletableFuture checkIfOpen() { if (state == State.OPEN) { return CompletableFuture.completedFuture(null);