From 2a1d542429faa370ae6dda48117a974c93cd5240 Mon Sep 17 00:00:00 2001 From: congbo Date: Wed, 17 Mar 2021 23:02:54 +0800 Subject: [PATCH] [Transaction] Transaction buffer metrics. --- .../TransactionMetadataStoreService.java | 2 +- .../service/persistent/PersistentTopic.java | 8 ++ .../prometheus/NamespaceStatsAggregator.java | 6 + .../broker/stats/prometheus/TopicStats.java | 29 +++++ .../transaction/buffer/TransactionBuffer.java | 7 + .../buffer/impl/InMemTransactionBuffer.java | 5 + .../buffer/impl/TopicTransactionBuffer.java | 30 ++++- .../buffer/impl/TransactionBufferDisable.java | 5 + .../buffer/impl/TransactionBufferStats.java | 40 ++++++ .../broker/stats/PrometheusMetricsTest.java | 122 +++++++++++++++++- .../common/policies/data/TopicStats.java | 24 ++++ .../impl/MLTransactionLogImpl.java | 11 +- 12 files changed, 281 insertions(+), 8 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferStats.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 73ff714b55f80..9825d07e3b4ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -129,7 +129,7 @@ public boolean test(NamespaceBundle namespaceBundle) { public void addTransactionMetadataStore(TransactionCoordinatorID tcId) { pulsarService.getBrokerService() - .getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + tcId)) + .getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + tcId.getId())) .whenComplete((v, e) -> { if (e != null) { LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c1dba05ce5a61..5545068336382 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -101,6 +101,7 @@ import org.apache.pulsar.broker.stats.ReplicationMetrics; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; +import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferStats; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.client.api.MessageId; @@ -1741,6 +1742,13 @@ public TopicStats getStats(boolean getPreciseBacklog, boolean subscriptionBacklo stats.deduplicationStatus = messageDeduplication.getStatus().toString(); stats.topicEpoch = topicEpoch.orElse(null); stats.offloadedStorageSize = ledger.getOffloadedSize(); + TransactionBufferStats transactionBufferStats = transactionBuffer.getTransactionBufferStats(); + stats.activeTransactions = transactionBufferStats.activeTransactions; + stats.commitTransactionCount = transactionBufferStats.commitTransactionCount; + stats.abortTransactionCount = transactionBufferStats.abortTransactionCount; + stats.registeredTransactionCount = transactionBufferStats.registeredTransactionCount; + stats.publishTxnMessageCount = transactionBufferStats.publishTxnMessageCount; + stats.existedAbortTransactions = transactionBufferStats.existedAbortTransactions; return stats; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 0a55d81aa4bc3..09e8a8e60562e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -117,6 +117,12 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include stats.bytesInCounter = tStatus.bytesInCounter; stats.msgOutCounter = tStatus.msgOutCounter; stats.bytesOutCounter = tStatus.bytesOutCounter; + stats.activeTransactions = tStatus.activeTransactions; + stats.commitTransactionCount = tStatus.commitTransactionCount; + stats.abortTransactionCount = tStatus.abortTransactionCount; + stats.registeredTransactionCount = tStatus.registeredTransactionCount; + stats.publishTxnMessageCount = tStatus.publishTxnMessageCount; + stats.existedAbortTransactions = tStatus.existedAbortTransactions; stats.producersCount = 0; topic.getProducers().values().forEach(producer -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 416136b1a09e6..484ebfd42af7a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -59,6 +59,15 @@ class TopicStats { // Used for tracking duplicate TYPE definitions static Map metricWithTypeDefinition = new HashMap<>(); + // Transaction buffer stats + long activeTransactions; + long commitTransactionCount; + long abortTransactionCount; + long registeredTransactionCount; + long publishTxnMessageCount; + long existedAbortTransactions; + + public void reset() { subscriptionsCount = 0; @@ -87,6 +96,13 @@ public void reset() { storageWriteLatencyBuckets.reset(); storageLedgerWriteLatencyBuckets.reset(); entrySizeBuckets.reset(); + + activeTransactions = 0; + commitTransactionCount = 0; + abortTransactionCount = 0; + registeredTransactionCount = 0; + publishTxnMessageCount = 0; + existedAbortTransactions = 0; } static void resetTypes() { @@ -249,6 +265,19 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter); metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter); + + metric(stream, cluster, namespace, topic, "pulsar_transaction_buffer_active_transactions", + stats.activeTransactions); + metric(stream, cluster, namespace, topic, "pulsar_transaction_buffer_commit_transaction_count", + stats.commitTransactionCount); + metric(stream, cluster, namespace, topic, "pulsar_transaction_buffer_abort_transaction_count", + stats.abortTransactionCount); + metric(stream, cluster, namespace, topic, "pulsar_transaction_buffer_registered_transaction_count", + stats.registeredTransactionCount); + metric(stream, cluster, namespace, topic, "pulsar_transaction_buffer_existed_abort_transactions", + stats.existedAbortTransactions); + metric(stream, cluster, namespace, topic, "pulsar_transaction_buffer_publish_message_count", + stats.publishTxnMessageCount); } static void metricType(SimpleTextOutputStream stream, String name) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java index 02e06cc0457d1..136151ecc360b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferStats; import org.apache.pulsar.client.api.transaction.TxnID; /** @@ -147,4 +148,10 @@ public interface TransactionBuffer { * @return the stable position. */ PositionImpl getMaxReadPosition(); + + /** + * Get the transaction buffer stats. + * @return the transaction buffer stats. + */ + TransactionBufferStats getTransactionBufferStats(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java index f5813fd9bcc21..8a52b40f7c4aa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java @@ -364,4 +364,9 @@ public PositionImpl getMaxReadPosition() { return PositionImpl.latest; } + @Override + public TransactionBufferStats getTransactionBufferStats() { + return new TransactionBufferStats(); + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 52fea556734f0..c7b841ebc48f7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -86,12 +87,21 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final int takeSnapshotIntervalTime; - private final CompletableFuture transactionBufferFuture; + private final LongAdder publishTxnMessageCount; + + private final LongAdder commitTransactionCount; + + private final LongAdder abortTransactionCount; + + private final LongAdder registeredTransactionCount; public TopicTransactionBuffer(PersistentTopic topic, CompletableFuture transactionBufferFuture) { super(State.None); + this.publishTxnMessageCount = new LongAdder(); + this.commitTransactionCount = new LongAdder(); + this.abortTransactionCount = new LongAdder(); + this.registeredTransactionCount = new LongAdder(); this.topic = topic; - this.transactionBufferFuture = transactionBufferFuture; this.changeToInitializingState(); this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar() .getTransactionBufferSnapshotService().createWriter(TopicName.get(topic.getName())); @@ -166,6 +176,7 @@ public CompletableFuture appendBufferToTxn(TxnID txnId, long sequenceI public void addComplete(Position position, ByteBuf entryData, Object ctx) { synchronized (TopicTransactionBuffer.this) { handleTransactionMessage(txnId, position); + publishTxnMessageCount.increment(); } completableFuture.complete(position); } @@ -185,6 +196,7 @@ private void handleTransactionMessage(TxnID txnId, Position position) { PositionImpl firstPosition = ongoingTxns.get(ongoingTxns.firstKey()); //max read position is less than first ongoing transaction message position, so entryId -1 maxReadPosition = PositionImpl.get(firstPosition.getLedgerId(), firstPosition.getEntryId() - 1); + registeredTransactionCount.increment(); } } @@ -210,6 +222,7 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) { updateMaxReadPosition(txnID); handleLowWaterMark(txnID, lowWaterMark); takeSnapshotByChangeTimes(); + commitTransactionCount.increment(); } completableFuture.complete(null); } @@ -240,6 +253,7 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) { handleLowWaterMark(txnID, lowWaterMark); changeMaxReadPositionAndAddAbortTimes.getAndIncrement(); takeSnapshotByChangeTimes(); + abortTransactionCount.increment(); } completableFuture.complete(null); } @@ -375,6 +389,18 @@ public PositionImpl getMaxReadPosition() { } } + @Override + public TransactionBufferStats getTransactionBufferStats() { + TransactionBufferStats transactionBufferStats = new TransactionBufferStats(); + transactionBufferStats.abortTransactionCount = this.abortTransactionCount.longValue(); + transactionBufferStats.commitTransactionCount = this.commitTransactionCount.longValue(); + transactionBufferStats.registeredTransactionCount = this.registeredTransactionCount.longValue(); + transactionBufferStats.publishTxnMessageCount = this.publishTxnMessageCount.longValue(); + transactionBufferStats.existedAbortTransactions = this.aborts.size(); + transactionBufferStats.activeTransactions = this.ongoingTxns.size(); + return transactionBufferStats; + } + @Override public void run(Timeout timeout) { if (checkIfReady()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index 26d0d72f77765..66b6f5fb31051 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -85,4 +85,9 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) { public PositionImpl getMaxReadPosition() { return PositionImpl.latest; } + + @Override + public TransactionBufferStats getTransactionBufferStats() { + return new TransactionBufferStats(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferStats.java new file mode 100644 index 0000000000000..b4f21cc340c32 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferStats.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +public class TransactionBufferStats { + + /** The active transactions. */ + public long activeTransactions; + + /** The commit transaction count of this transaction buffer. */ + public long commitTransactionCount; + + /** The abort transaction count of this transaction buffer. */ + public long abortTransactionCount; + + /** The registered transaction count of this transaction buffer. */ + public long registeredTransactionCount; + + /** The public transaction message count of this transaction buffer. */ + public long publishTxnMessageCount; + + /** The existed abort transactions of this transaction buffer. */ + public long existedAbortTransactions; +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 853df6a793af4..4aedc3b3c00bf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -27,6 +27,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import io.jsonwebtoken.SignatureAlgorithm; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -61,9 +62,18 @@ import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; +import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -75,7 +85,9 @@ public class PrometheusMetricsTest extends BrokerTestBase { @BeforeMethod @Override protected void setup() throws Exception { - super.baseSetup(); + ServiceConfiguration configuration = getDefaultConf(); + configuration.setTransactionCoordinatorEnabled(true); + super.baseSetup(configuration); } @AfterMethod(alwaysRun = true) @@ -891,6 +903,114 @@ public String getCommandData() { provider.close(); } + @Test + public void testTransactionBufferMetrics() throws Exception{ + + admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), + new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); + admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); + + pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0)); + Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() -> { + MLTransactionMetadataStore store = ((MLTransactionMetadataStore) getPulsar() + .getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(0))); + return store != null && store.getState() == TransactionMetadataStoreState.State.Ready; + }); + String ns1 = "prop/ns-abc1"; + admin.namespaces().createNamespace(ns1); + String topic = "persistent://" + ns1 + "/test_coordinator_metrics"; + + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(lookupUrl.toString()).enableTransaction(true).build(); + Producer producer= pulsarClient + .newProducer() + .topic(topic) + .sendTimeout(0, TimeUnit.SECONDS) + .create(); + + byte[] value = "Hello Pulsar !".getBytes(); + // test registered transaction and publish txn message count + Transaction txn = pulsarClient.newTransaction().build().get(); + producer.newMessage(txn).value(value).sendAsync().get(); + producer.newMessage(txn).value(value).sendAsync().get(); + txn.commit().get(); + + // test abort + txn = pulsarClient.newTransaction().build().get(); + producer.newMessage(txn).value(value).sendAsync().get(); + txn.abort().get(); + + txn = pulsarClient.newTransaction().withTransactionTimeout(20, TimeUnit.SECONDS).build().get(); + producer.newMessage(txn).value(value).sendAsync().get(); + txn.abort().get(); + + // test active transaction + txn = pulsarClient.newTransaction().build().get(); + producer.newMessage(txn).value(value).sendAsync().get(); + + pulsar.getBrokerService().updateRates(); + + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + String metricsStr = statsOut.toString(); + Multimap metrics = parseMetrics(metricsStr); + + Collection metric = metrics.get("pulsar_transaction_buffer_active_transactions"); + metric.forEach(m -> { + if (m.tags.containsValue(topic)) { + assertEquals(m.value, 1); + } else { + assertEquals(m.value, 0); + } + }); + + metric = metrics.get("pulsar_transaction_buffer_commit_transaction_count"); + metric.forEach(m -> { + if (m.tags.containsValue(topic)) { + assertEquals(m.value, 1); + } else { + assertEquals(m.value, 0); + } + }); + + metric = metrics.get("pulsar_transaction_buffer_abort_transaction_count"); + metric.forEach(m -> { + if (m.tags.containsValue(topic)) { + assertEquals(m.value, 2); + } else { + assertEquals(m.value, 0); + } + }); + + metric = metrics.get("pulsar_transaction_buffer_registered_transaction_count"); + metric.forEach(m -> { + if (m.tags.containsValue(topic)) { + assertEquals(m.value, 4); + } else { + assertEquals(m.value, 0); + } + }); + + metric = metrics.get("pulsar_transaction_buffer_existed_abort_transactions"); + metric.forEach(m -> { + if (m.tags.containsValue(topic)) { + assertEquals(m.value, 2); + } else { + assertEquals(m.value, 0); + } + }); + + metric = metrics.get("pulsar_transaction_buffer_publish_message_count"); + metric.forEach(m -> { + if (m.tags.containsValue(topic)) { + assertEquals(m.value, 5); + } else { + assertEquals(m.value, 0); + } + }); + } + @Test public void testExpiringTokenMetrics() throws Exception { SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java index 33a6f8210f5b4..313f9be1c8fd7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java @@ -91,6 +91,24 @@ public class TopicStats { /** The serialized size of non-contiguous deleted messages ranges. */ public int nonContiguousDeletedMessagesRangesSerializedSize; + /** The active transactions. */ + public long activeTransactions; + + /** The commit transaction count of this transaction buffer. */ + public long commitTransactionCount; + + /** The abort transaction count of this transaction buffer. */ + public long abortTransactionCount; + + /** The registered transaction count of this transaction buffer. */ + public long registeredTransactionCount; + + /** The public transaction message count of this transaction buffer. */ + public long publishTxnMessageCount; + + /** The existed abort transactions of this transaction buffer. */ + public long existedAbortTransactions; + public TopicStats() { this.publishers = Lists.newArrayList(); this.subscriptions = Maps.newHashMap(); @@ -119,6 +137,12 @@ public void reset() { this.nonContiguousDeletedMessagesRanges = 0; this.nonContiguousDeletedMessagesRangesSerializedSize = 0; this.offloadedStorageSize = 0; + this.activeTransactions = 0; + this.commitTransactionCount = 0; + this.abortTransactionCount = 0; + this.registeredTransactionCount = 0; + this.publishTxnMessageCount = 0; + this.existedAbortTransactions = 0; } // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java index 626a687fd8895..9404d433ce91f 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java @@ -34,6 +34,8 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionLog; import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback; @@ -49,7 +51,7 @@ public class MLTransactionLogImpl implements TransactionLog { private final ManagedLedger managedLedger; - public final static String TRANSACTION_LOG_PREFIX = NamespaceName.SYSTEM_NAMESPACE + "/transaction-log-"; + public final static String TRANSACTION_LOG_PREFIX = "transaction_log_"; private final ManagedCursor cursor; @@ -64,14 +66,15 @@ public class MLTransactionLogImpl implements TransactionLog { private final long tcId; - private final String topicName; + private final TopicName topicName; public MLTransactionLogImpl(TransactionCoordinatorID tcID, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig) throws Exception { - this.topicName = TRANSACTION_LOG_PREFIX + tcID; + this.topicName = TopicName.get(TopicDomain.persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, "transaction_log_" + tcID.getId()); this.tcId = tcID.getId(); - this.managedLedger = managedLedgerFactory.open(topicName, managedLedgerConfig); + this.managedLedger = managedLedgerFactory.open(topicName.getPersistenceNamingEncoding(), managedLedgerConfig); this.cursor = managedLedger.openCursor(TRANSACTION_SUBSCRIPTION_NAME, CommandSubscribe.InitialPosition.Earliest); this.currentLoadPosition = (PositionImpl) this.cursor.getMarkDeletedPosition();