Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public boolean test(NamespaceBundle namespaceBundle) {

public void addTransactionMetadataStore(TransactionCoordinatorID tcId) {
pulsarService.getBrokerService()
.getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + tcId))
.getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + tcId.getId()))
.whenComplete((v, e) -> {
if (e != null) {
LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.apache.pulsar.broker.stats.ReplicationMetrics;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferStats;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -1741,6 +1742,13 @@ public TopicStats getStats(boolean getPreciseBacklog, boolean subscriptionBacklo
stats.deduplicationStatus = messageDeduplication.getStatus().toString();
stats.topicEpoch = topicEpoch.orElse(null);
stats.offloadedStorageSize = ledger.getOffloadedSize();
TransactionBufferStats transactionBufferStats = transactionBuffer.getTransactionBufferStats();
stats.activeTransactions = transactionBufferStats.activeTransactions;
stats.commitTransactionCount = transactionBufferStats.commitTransactionCount;
stats.abortTransactionCount = transactionBufferStats.abortTransactionCount;
stats.registeredTransactionCount = transactionBufferStats.registeredTransactionCount;
stats.publishTxnMessageCount = transactionBufferStats.publishTxnMessageCount;
stats.existedAbortTransactions = transactionBufferStats.existedAbortTransactions;
return stats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.bytesInCounter = tStatus.bytesInCounter;
stats.msgOutCounter = tStatus.msgOutCounter;
stats.bytesOutCounter = tStatus.bytesOutCounter;
stats.activeTransactions = tStatus.activeTransactions;
stats.commitTransactionCount = tStatus.commitTransactionCount;
stats.abortTransactionCount = tStatus.abortTransactionCount;
stats.registeredTransactionCount = tStatus.registeredTransactionCount;
stats.publishTxnMessageCount = tStatus.publishTxnMessageCount;
stats.existedAbortTransactions = tStatus.existedAbortTransactions;

stats.producersCount = 0;
topic.getProducers().values().forEach(producer -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ class TopicStats {
// Used for tracking duplicate TYPE definitions
static Map<String, String> metricWithTypeDefinition = new HashMap<>();

// Transaction buffer stats
long activeTransactions;
long commitTransactionCount;
long abortTransactionCount;
long registeredTransactionCount;
long publishTxnMessageCount;
long existedAbortTransactions;



public void reset() {
subscriptionsCount = 0;
Expand Down Expand Up @@ -87,6 +96,13 @@ public void reset() {
storageWriteLatencyBuckets.reset();
storageLedgerWriteLatencyBuckets.reset();
entrySizeBuckets.reset();

activeTransactions = 0;
commitTransactionCount = 0;
abortTransactionCount = 0;
registeredTransactionCount = 0;
publishTxnMessageCount = 0;
existedAbortTransactions = 0;
}

static void resetTypes() {
Expand Down Expand Up @@ -249,6 +265,19 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin

metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter);
metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter);

metric(stream, cluster, namespace, topic, "pulsar_transaction_buffer_active_transactions",
stats.activeTransactions);
metric(stream, cluster, namespace, topic, "pulsar_transaction_buffer_commit_transaction_count",
stats.commitTransactionCount);
metric(stream, cluster, namespace, topic, "pulsar_transaction_buffer_abort_transaction_count",
stats.abortTransactionCount);
metric(stream, cluster, namespace, topic, "pulsar_transaction_buffer_registered_transaction_count",
stats.registeredTransactionCount);
metric(stream, cluster, namespace, topic, "pulsar_transaction_buffer_existed_abort_transactions",
stats.existedAbortTransactions);
metric(stream, cluster, namespace, topic, "pulsar_transaction_buffer_publish_message_count",
stats.publishTxnMessageCount);
}

static void metricType(SimpleTextOutputStream stream, String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferStats;
import org.apache.pulsar.client.api.transaction.TxnID;

/**
Expand Down Expand Up @@ -147,4 +148,10 @@ public interface TransactionBuffer {
* @return the stable position.
*/
PositionImpl getMaxReadPosition();

/**
* Get the transaction buffer stats.
* @return the transaction buffer stats.
*/
TransactionBufferStats getTransactionBufferStats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,4 +364,9 @@ public PositionImpl getMaxReadPosition() {
return PositionImpl.latest;
}

@Override
public TransactionBufferStats getTransactionBufferStats() {
return new TransactionBufferStats();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -86,12 +87,21 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen

private final int takeSnapshotIntervalTime;

private final CompletableFuture<Void> transactionBufferFuture;
private final LongAdder publishTxnMessageCount;

private final LongAdder commitTransactionCount;

private final LongAdder abortTransactionCount;

private final LongAdder registeredTransactionCount;

public TopicTransactionBuffer(PersistentTopic topic, CompletableFuture<Void> transactionBufferFuture) {
super(State.None);
this.publishTxnMessageCount = new LongAdder();
this.commitTransactionCount = new LongAdder();
this.abortTransactionCount = new LongAdder();
this.registeredTransactionCount = new LongAdder();
this.topic = topic;
this.transactionBufferFuture = transactionBufferFuture;
this.changeToInitializingState();
this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
.getTransactionBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
Expand Down Expand Up @@ -166,6 +176,7 @@ public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceI
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
synchronized (TopicTransactionBuffer.this) {
handleTransactionMessage(txnId, position);
publishTxnMessageCount.increment();
}
completableFuture.complete(position);
}
Expand All @@ -185,6 +196,7 @@ private void handleTransactionMessage(TxnID txnId, Position position) {
PositionImpl firstPosition = ongoingTxns.get(ongoingTxns.firstKey());
//max read position is less than first ongoing transaction message position, so entryId -1
maxReadPosition = PositionImpl.get(firstPosition.getLedgerId(), firstPosition.getEntryId() - 1);
registeredTransactionCount.increment();
}
}

Expand All @@ -210,6 +222,7 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
updateMaxReadPosition(txnID);
handleLowWaterMark(txnID, lowWaterMark);
takeSnapshotByChangeTimes();
commitTransactionCount.increment();
}
completableFuture.complete(null);
}
Expand Down Expand Up @@ -240,6 +253,7 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
handleLowWaterMark(txnID, lowWaterMark);
changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
takeSnapshotByChangeTimes();
abortTransactionCount.increment();
}
completableFuture.complete(null);
}
Expand Down Expand Up @@ -375,6 +389,18 @@ public PositionImpl getMaxReadPosition() {
}
}

@Override
public TransactionBufferStats getTransactionBufferStats() {
TransactionBufferStats transactionBufferStats = new TransactionBufferStats();
transactionBufferStats.abortTransactionCount = this.abortTransactionCount.longValue();
transactionBufferStats.commitTransactionCount = this.commitTransactionCount.longValue();
transactionBufferStats.registeredTransactionCount = this.registeredTransactionCount.longValue();
transactionBufferStats.publishTxnMessageCount = this.publishTxnMessageCount.longValue();
transactionBufferStats.existedAbortTransactions = this.aborts.size();
transactionBufferStats.activeTransactions = this.ongoingTxns.size();
return transactionBufferStats;
}

@Override
public void run(Timeout timeout) {
if (checkIfReady()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,9 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
public PositionImpl getMaxReadPosition() {
return PositionImpl.latest;
}

@Override
public TransactionBufferStats getTransactionBufferStats() {
return new TransactionBufferStats();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.buffer.impl;

public class TransactionBufferStats {

/** The active transactions. */
public long activeTransactions;

/** The commit transaction count of this transaction buffer. */
public long commitTransactionCount;

/** The abort transaction count of this transaction buffer. */
public long abortTransactionCount;

/** The registered transaction count of this transaction buffer. */
public long registeredTransactionCount;

/** The public transaction message count of this transaction buffer. */
public long publishTxnMessageCount;

/** The existed abort transactions of this transaction buffer. */
public long existedAbortTransactions;
}
Loading