Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add getTxnID method in Transaction.java #11438

Merged
merged 5 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.transaction;

import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import com.google.common.collect.Sets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,12 +34,18 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand All @@ -53,23 +61,26 @@ public class TransactionTest extends TransactionTestBase {

private static final String TENANT = "tnx";
private static final String NAMESPACE1 = TENANT + "/ns1";
private static final int NUM_BROKERS = 1;
private static final int NUM_PARTITIONS = 1;

@BeforeMethod
protected void setup() throws Exception {
this.setBrokerCount(1);
this.setBrokerCount(NUM_BROKERS);
this.internalSetup();

String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1];
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder()
.serviceUrl("http://localhost:" + webServicePort).build());
admin.tenants().createTenant(TENANT,
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NAMESPACE1);

admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
pulsarClient.close();
pulsarClient = PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
Expand Down Expand Up @@ -139,4 +150,21 @@ public Consumer<byte[]> getConsumer(String topicName, String subName) throws Pul
.enableBatchIndexAcknowledgment(true)
.subscribe();
}

@Test
public void testGetTxnID() throws Exception {
// wait tc init success to ready state
Assert.assertTrue(waitForCoordinatorToBeAvailable(NUM_BROKERS, NUM_PARTITIONS));
Transaction transaction = pulsarClient.newTransaction()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to create the transaction again?

.build().get();
TxnID txnID = transaction.getTxnID();
Assert.assertEquals(txnID.getLeastSigBits(), 0);
Assert.assertEquals(txnID.getMostSigBits(), 0);
transaction.abort();
transaction = pulsarClient.newTransaction()
.build().get();
txnID = transaction.getTxnID();
Assert.assertEquals(txnID.getLeastSigBits(), 1);
Assert.assertEquals(txnID.getMostSigBits(), 0);
Comment on lines +167 to +168
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will always get transaction ID (1,0) here? I mean we can just check the txnID is not null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact it should always be this

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.EventLoopGroup;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import io.netty.channel.EventLoopGroup;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -49,17 +47,22 @@
import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor;
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.MockZooKeeperSession;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;

@Slf4j
public abstract class TransactionTestBase extends TestRetrySupport {
Expand Down Expand Up @@ -291,5 +294,16 @@ protected final void internalCleanup() {
log.warn("Failed to clean up mocked pulsar service:", e);
}
}

public boolean waitForCoordinatorToBeAvailable(int numOfBroker, int numOfTCPerBroker){
// wait tc init success to ready state
Awaitility.await().untilAsserted(() -> {
TransactionMetadataStore transactionMetadataStore =
getPulsarServiceList().get(numOfBroker - 1).getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(numOfTCPerBroker - 1));
assertNotNull(transactionMetadataStore);
assertEquals(((MLTransactionMetadataStore) transactionMetadataStore).getState(),
TransactionMetadataStoreState.State.Ready);
});
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ public interface Transaction {
*/
CompletableFuture<Void> abort();

/**
* Get TxnID of the transaction.
* @return {@link TxnID} the txnID.
*/
TxnID getTxnID();
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public CompletableFuture<Void> abort() {
});
}

@Override
public TxnID getTxnID() {
return new TxnID(txnIdMostBits, txnIdLeastBits);
}

private CompletableFuture<Void> checkIfOpen() {
if (state == State.OPEN) {
return CompletableFuture.completedFuture(null);
Expand Down