Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transaction] Transaction coordinator fence mechanism. #11357

Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,6 @@ config, localMetadataStore, getZkClient(),
transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
transactionBufferClient, transactionTimer);
transactionMetadataStoreService.start();

transactionBufferProvider = TransactionBufferProvider
.newProvider(config.getTransactionBufferProviderClassName());
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.admin.impl;

import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.zafarkhaja.semver.Version;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -130,6 +131,7 @@
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -674,7 +676,11 @@ protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authorit
}
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalUnloadNonPartitionedTopic(asyncResponse, authoritative);
if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
internalUnloadTransactionCoordinator(asyncResponse, authoritative);
} else {
internalUnloadNonPartitionedTopic(asyncResponse, authoritative);
}
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(meta -> {
Expand Down Expand Up @@ -926,6 +932,29 @@ private void internalUnloadNonPartitionedTopic(AsyncResponse asyncResponse, bool
});
}

private void internalUnloadTransactionCoordinator(AsyncResponse asyncResponse, boolean authoritative) {
try {
validateTopicOperation(topicName, TopicOperation.UNLOAD);
} catch (Exception e) {
log.error("[{}] Failed to unload tc {},{}", clientAppId(), topicName.getPartitionIndex(), e.getMessage());
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(v -> pulsar()
.getTransactionMetadataStoreService()
.removeTransactionMetadataStore(TransactionCoordinatorID.get(topicName.getPartitionIndex())))
.thenRun(() -> {
log.info("[{}] Successfully unloaded tc {}", clientAppId(), topicName.getPartitionIndex());
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
log.error("[{}] Failed to unload tc {}, {}", clientAppId(), topicName.getPartitionIndex(),
ex.getMessage());
asyncResponse.resume(ex.getCause());
return null;
});
}

protected void internalDeleteTopic(boolean authoritative, boolean force, boolean deleteSchema) {
if (force) {
internalDeleteTopicForcefully(authoritative, deleteSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.BundlesQuotas;
Expand Down Expand Up @@ -160,6 +161,7 @@
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1671,6 +1673,17 @@ private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit
: CompletableFuture.completedFuture(null)));
}
});
if (getPulsar().getConfig().isTransactionCoordinatorEnabled()
&& serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
TransactionMetadataStoreService metadataStoreService =
this.getPulsar().getTransactionMetadataStoreService();
// if the store belongs to this bundle, remove and close the store
this.getPulsar().getTransactionMetadataStoreService().getStores().values().stream().filter(store ->
serviceUnit.includes(TopicName.TRANSACTION_COORDINATOR_ASSIGN
.getPartition((int) (store.getTransactionCoordinatorID().getId()))))
.map(TransactionMetadataStore::getTransactionCoordinatorID)
.forEach(tcId -> closeFutures.add(metadataStoreService.removeTransactionMetadataStore(tcId)));
}

return FutureUtil.waitForAll(closeFutures).thenApply(v -> closeFutures.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandTcClientConnect;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
Expand Down Expand Up @@ -1911,23 +1912,62 @@ protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCrea
}

@Override
protected void handleNewTxn(CommandNewTxn command) {
protected void handleTcClientConnect(CommandTcClientConnect command) {
final long requestId = command.getRequestId();
final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
if (log.isDebugEnabled()) {
log.debug("Receive new txn request {} to transaction meta store {} from {}.",
log.debug("Receive tc client connect request {} to transaction meta store {} from {}.",
requestId, tcId, remoteAddress);
}

if (!checkTransactionEnableAndSenError(requestId)) {
return;
}

TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
if (transactionMetadataStoreService == null) {
CoordinatorException.CoordinatorNotFoundException ex =
new CoordinatorException.CoordinatorNotFoundException(
"Transaction manager is not started or not enabled");
ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));

transactionMetadataStoreService.handleTcClientConnect(tcId).thenAccept(connection -> {
if (log.isDebugEnabled()) {
log.debug("Handle tc client connect request {} to transaction meta store {} from {} success.",
requestId, tcId, remoteAddress);
}
commandSender.sendSuccessResponse(requestId);
}).exceptionally(e -> {
log.error("Handle tc client connect request {} to transaction meta store {} from {} fail.",
requestId, tcId, remoteAddress, e.getCause());
commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
return null;
});
}

private boolean checkTransactionEnableAndSenError(long requestId) {
if (!service.getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
BrokerServiceException.NotAllowedException ex =
new BrokerServiceException.NotAllowedException(
"Transaction manager is not not enabled");
commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
return false;
} else {
return true;
}
}

@Override
protected void handleNewTxn(CommandNewTxn command) {
final long requestId = command.getRequestId();
final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
if (log.isDebugEnabled()) {
log.debug("Receive new txn request {} to transaction meta store {} from {}.",
requestId, tcId, remoteAddress);
}

if (!checkTransactionEnableAndSenError(requestId)) {
return;
}

TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds())
.whenComplete(((txnID, ex) -> {
if (ex == null) {
Expand All @@ -1940,21 +1980,31 @@ protected void handleNewTxn(CommandNewTxn command) {
if (log.isDebugEnabled()) {
log.debug("Send response error for new txn request {}", requestId, ex);
}

ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
}));
}

@Override
protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTxnidMostBits());
final long requestId = command.getRequestId();
if (log.isDebugEnabled()) {
command.getPartitionsList().forEach(partion ->
log.debug("Receive add published partition to txn request {} "
+ "from {} with txnId {}, topic: [{}]", requestId, remoteAddress, txnID, partion));
}

if (!checkTransactionEnableAndSenError(requestId)) {
return;
}

TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID,
command.getPartitionsList())
.whenComplete(((v, ex) -> {
Expand All @@ -1968,10 +2018,18 @@ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
if (log.isDebugEnabled()) {
log.debug("Send response error for add published partition to txn request {}", requestId,
ex);
}

if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
} else {
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex.getCause()),
ex.getCause().getMessage()));
}
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
}
}));
}

Expand All @@ -1980,16 +2038,35 @@ protected void handleEndTxn(CommandEndTxn command) {
final long requestId = command.getRequestId();
final int txnAction = command.getTxnAction().getValue();
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTxnidMostBits());

if (!checkTransactionEnableAndSenError(requestId)) {
return;
}

service.pulsar().getTransactionMetadataStoreService()
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();

transactionMetadataStoreService
.endTransaction(txnID, txnAction, false)
.thenRun(() -> ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits())))
.exceptionally(throwable -> {
log.error("Send response error for end txn request.", throwable);
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(throwable.getCause()), throwable.getMessage()));
return null; });
.whenComplete((v, ex) -> {
if (ex == null) {
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
} else {
log.error("Send response error for end txn request.", ex);

if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
} else {
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex.getCause()),
ex.getCause().getMessage()));
}
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
});
}

@Override
Expand Down Expand Up @@ -2176,7 +2253,16 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
requestId, remoteAddress, txnID);
}

service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID,
final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTxnidMostBits());

if (!checkTransactionEnableAndSenError(requestId)) {
return;
}

TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();

transactionMetadataStoreService.addAckedPartitionToTxn(txnID,
MLTransactionMetadataStore.subscriptionToTxnSubscription(command.getSubscriptionsList()))
.whenComplete(((v, ex) -> {
if (ex == null) {
Expand All @@ -2192,9 +2278,17 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
log.debug("Send response error for add published partition to txn request {}",
requestId, ex);
}
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));

if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
} else {
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex.getCause()),
ex.getCause().getMessage()));
}
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats.prometheus;

import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import io.netty.util.concurrent.FastThreadLocal;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand All @@ -26,6 +27,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
Expand Down Expand Up @@ -62,10 +64,13 @@ public static void generate(PulsarService pulsar, SimpleTextOutputStream stream,
topic.getSubscriptions().values().forEach(subscription -> {
try {
localManageLedgerStats.get().reset();
ManagedLedger managedLedger =
((PersistentSubscription) subscription).getPendingAckManageLedger().get();
generateManageLedgerStats(managedLedger,
stream, cluster, namespace, name, subscription.getName());
if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))) {
ManagedLedger managedLedger =
((PersistentSubscription) subscription)
.getPendingAckManageLedger().get();
generateManageLedgerStats(managedLedger,
stream, cluster, namespace, name, subscription.getName());
}
} catch (Exception e) {
log.warn("Transaction pending ack generate managedLedgerStats fail!", e);
}
Expand Down
Loading