Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transaction] Transaction admin api get pending ack internal stats #10725

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
Expand All @@ -50,6 +53,7 @@
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionLogStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
Expand Down Expand Up @@ -504,4 +508,69 @@ protected void internalGetCoordinatorInternalStats(AsyncResponse asyncResponse,
asyncResponse.resume(new RestException(e.getCause()));
}
}

protected void internalGetPendingAckInternalStats(AsyncResponse asyncResponse, boolean authoritative,
TopicName topicName, String subName, boolean metadata) {
try {
if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
validateTopicOwnership(topicName, authoritative);
CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
.getTopics().get(topicName.toString());
if (topicFuture != null) {
topicFuture.whenComplete((optionalTopic, e) -> {

if (e != null) {
asyncResponse.resume(new RestException(e));
return;
}
if (!optionalTopic.isPresent()) {
asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
return;
}
Topic topicObject = optionalTopic.get();
if (topicObject instanceof PersistentTopic) {
try {
ManagedLedger managedLedger =
((PersistentTopic) topicObject).getPendingAckManagedLedger(subName).get();
TransactionPendingAckInternalStats stats =
new TransactionPendingAckInternalStats();
TransactionLogStats pendingAckLogStats = new TransactionLogStats();
pendingAckLogStats.managedLedgerName = managedLedger.getName();
pendingAckLogStats.managedLedgerInternalStats =
managedLedger.getManagedLedgerInternalStats(metadata).get();
stats.pendingAckLogStats = pendingAckLogStats;
asyncResponse.resume(stats);
} catch (Exception exception) {
if (exception instanceof ExecutionException) {
if (exception.getCause() instanceof ServiceUnitNotReadyException) {
asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
exception.getCause()));
return;
} else if (exception.getCause() instanceof NotAllowedException) {
asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
exception.getCause()));
return;
} else if (exception.getCause() instanceof SubscriptionNotFoundException) {
asyncResponse.resume(new RestException(NOT_FOUND, exception.getCause()));
return;
}
}
asyncResponse.resume(new RestException(exception));
}
} else {
asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
}
});
} else {
asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
}
} else {
asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
"This Broker is not configured with transactionCoordinatorEnabled=true."));
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e.getCause()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.swagger.annotations.ApiResponses;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
Expand All @@ -34,6 +35,8 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import org.apache.pulsar.broker.admin.impl.TransactionsBase;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;

@Path("/transactions")
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -200,4 +203,28 @@ public void getCoordinatorInternalStats(@Suspended final AsyncResponse asyncResp
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
internalGetCoordinatorInternalStats(asyncResponse, authoritative, metadata, Integer.parseInt(coordinatorId));
}

@GET
@Path("/pendingAckInternalStats/{tenant}/{namespace}/{topic}/{subName}")
@ApiOperation(value = "Get transaction pending ack internal stats.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic "
+ "or subscription name doesn't exist"),
@ApiResponse(code = 503, message = "This Broker is not configured "
+ "with transactionCoordinatorEnabled=true."),
@ApiResponse(code = 307, message = "Topic is not owned by this broker!"),
@ApiResponse(code = 405, message = "Pending ack handle don't use managedLedger!"),
@ApiResponse(code = 400, message = "Topic is not a persistent topic!"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getPendingAckInternalStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("subName") String subName,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
internalGetPendingAckInternalStats(asyncResponse, authoritative,
TopicName.get(TopicDomain.persistent.value(), tenant, namespace, encodedTopic), subName, metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
Expand Down Expand Up @@ -1155,5 +1156,13 @@ public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID)
return this.pendingAckHandle.getTransactionInPendingAckStats(txnID);
}

public CompletableFuture<ManagedLedger> getPendingAckManageLedger() {
if (this.pendingAckHandle instanceof PendingAckHandleImpl) {
return ((PendingAckHandleImpl) this.pendingAckHandle).getStoreManageLedger();
} else {
return FutureUtil.failedFuture(new NotAllowedException("Pending ack handle don't use managedLedger!"));
}
}

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
Expand Down Expand Up @@ -3087,4 +3088,13 @@ protected boolean isTerminated() {
public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String subName) {
return this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID);
}

public CompletableFuture<ManagedLedger> getPendingAckManagedLedger(String subName) {
PersistentSubscription subscription = subscriptions.get(subName);
if (subscription == null) {
return FutureUtil.failedFuture(new SubscriptionNotFoundException((topic
+ " not found subscription : " + subName)));
}
return subscription.getPendingAckManageLedger();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class MLPendingAckStore implements PendingAckStore {

public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";

private static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state";
public static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state";

private final SpscArrayQueue<Entry> entryQueue;

Expand Down Expand Up @@ -384,6 +384,10 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

}

public CompletableFuture<ManagedLedger> getManagedLedger() {
return CompletableFuture.completedFuture(this.managedLedger);
}

public static String getTransactionPendingAckStoreSuffix(String originTopicName, String subName) {
return TopicName.get(originTopicName) + "-" + subName + PENDING_ACK_STORE_SUFFIX;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -733,4 +734,19 @@ public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID)
public CompletableFuture<Void> close() {
return this.pendingAckStoreFuture.thenAccept(PendingAckStore::closeAsync);
}

public CompletableFuture<ManagedLedger> getStoreManageLedger() {
if (this.pendingAckStoreFuture.isDone()) {
return this.pendingAckStoreFuture.thenCompose(pendingAckStore -> {
if (pendingAckStore instanceof MLPendingAckStore) {
return ((MLPendingAckStore) pendingAckStore).getManagedLedger();
} else {
return FutureUtil.failedFuture(
new NotAllowedException("Pending ack handle don't use managedLedger!"));
}
});
} else {
return FutureUtil.failedFuture(new ServiceUnitNotReadyException("Pending ack have not init success!"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
Expand All @@ -41,6 +42,7 @@
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
Expand Down Expand Up @@ -343,6 +345,46 @@ public void testGetCoordinatorInternalStats() throws Exception {
stats.transactionLogStats.managedLedgerName);
}

@Test(timeOut = 20000)
public void testGetPendingAckInternalStats() throws Exception {
initTransaction(1);
TransactionImpl transaction = (TransactionImpl) getTransaction();
final String topic = "persistent://public/default/testGetPendingAckInternalStats";
final String subName = "test";
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
.subscriptionName(subName).subscribe();
MessageId messageId = producer.send("Hello pulsar!".getBytes());
consumer.acknowledgeAsync(messageId, transaction).get();

TransactionPendingAckInternalStats stats = admin.transactions()
.getPendingAckInternalStatsAsync(topic, subName, true).get();
ManagedLedgerInternalStats managedLedgerInternalStats = stats.pendingAckLogStats.managedLedgerInternalStats;
assertEquals(TopicName.get(TopicDomain.persistent.toString(), "public", "default",
"testGetPendingAckInternalStats" + "-"
+ subName + MLPendingAckStore.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
stats.pendingAckLogStats.managedLedgerName);

verifyManagedLegerInternalStats(managedLedgerInternalStats, 16);

ManagedLedgerInternalStats finalManagedLedgerInternalStats = managedLedgerInternalStats;
managedLedgerInternalStats.cursors.forEach((s, cursorStats) -> {
assertEquals(s, MLPendingAckStore.PENDING_ACK_STORE_CURSOR_NAME);
assertEquals(cursorStats.readPosition, finalManagedLedgerInternalStats.lastConfirmedEntry);
});

stats = admin.transactions()
.getPendingAckInternalStatsAsync(topic, subName, false).get();
managedLedgerInternalStats = stats.pendingAckLogStats.managedLedgerInternalStats;

assertEquals(TopicName.get(TopicDomain.persistent.toString(), "public", "default",
"testGetPendingAckInternalStats" + "-"
+ subName + MLPendingAckStore.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
stats.pendingAckLogStats.managedLedgerName);
assertNull(managedLedgerInternalStats.ledgers.get(0).metadata);
}

private static void verifyCoordinatorStats(String state,
long sequenceId, long lowWaterMark) {
assertEquals(state, "Ready");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;

public interface Transactions {
Expand Down Expand Up @@ -160,4 +161,28 @@ CompletableFuture<TransactionCoordinatorInternalStats> getCoordinatorInternalSta
TransactionCoordinatorInternalStats getCoordinatorInternalStats(int coordinatorId,
boolean metadata) throws PulsarAdminException;

/**
* Get pending ack internal stats.
*
* @param topic the topic of get pending ack internal stats
* @param subName the subscription name of this pending ack
* @param metadata whether to obtain ledger metadata
*
* @return the future internal stats of pending ack
*/
CompletableFuture<TransactionPendingAckInternalStats> getPendingAckInternalStatsAsync(String topic, String subName,
boolean metadata);

/**
* Get pending ack internal stats.
*
* @param topic the topic of get pending ack internal stats
* @param subName the subscription name of this pending ack
* @param metadata whether to obtain ledger metadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param metadata whether to obtain ledger metadata
* @param metadata whether to obtain ledger metadata

double check: whether to obtain ledger metadata? or * @param metadata the ledger metadata

*
* @return the internal stats of pending ack
*/
TransactionPendingAckInternalStats getPendingAckInternalStats(String topic, String subName,
boolean metadata) throws PulsarAdminException;

}
Loading