Skip to content

Commit

Permalink
[Transaction] Add the batch size in transaction ack command. (#8659)
Browse files Browse the repository at this point in the history
## Motivation
Now in Failover sub we ack with transaction will not get the batch size from consumer pendingAcks, so we should ack request carry the batch size.

## implement
We ack with transaction will carry the bath size for individual ack delete the consumer pendingAcks.
  • Loading branch information
congbobo184 committed Nov 21, 2020
1 parent 1b790ba commit 281163b
Show file tree
Hide file tree
Showing 21 changed files with 294 additions and 114 deletions.
2 changes: 1 addition & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1171,5 +1171,5 @@ brokerServicePurgeInactiveFrequencyInSeconds=60
### --- Transaction config variables --- ###

# Enable transaction coordinator in broker
transactionCoordinatorEnabled=true
transactionCoordinatorEnabled=false
transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider
Original file line number Diff line number Diff line change
Expand Up @@ -1849,7 +1849,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_TRANSACTION,
doc = "Enable transaction coordinator in broker"
)
private boolean transactionCoordinatorEnabled = true;
private boolean transactionCoordinatorEnabled = false;

@FieldContext(
category = CATEGORY_TRANSACTION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ public void start() throws Exception {
});
broker.start();

broker.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
if (config.isTransactionCoordinatorEnabled()) {
broker.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
}

final String cluster = config.getClusterName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,6 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String,Long> properties) {
List<Position> positionsAcked = new ArrayList<>();
List<PositionImpl> checkBatchPositions = null;
if (isTransactionEnabled()) {
checkBatchPositions = new ArrayList<>();
}
for (int i = 0; i < ack.getMessageIdCount(); i++) {
MessageIdData msgId = ack.getMessageId(i);
PositionImpl position;
Expand All @@ -366,21 +363,9 @@ private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String,L
SafeCollectionUtils.longListToArray(msgId.getAckSetList()));
if (isTransactionEnabled()) {
//sync the batch position bit set point, in order to delete the position in pending acks
checkBatchPositions.add(position);
LongPair batchSizePair = this.pendingAcks.get(msgId.getLedgerId(), msgId.getEntryId());
if (batchSizePair == null) {
String error = "Batch position [" + position + "] could not find " +
"it's batch size from consumer pendingAcks!";
log.warn(error);
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException(error));
}
((PersistentSubscription) subscription)
.syncBatchPositionBitSetForPendingAck(new MutablePair<>(position, batchSizePair.first));
//check if the position can remove from the consumer pending acks.
// the bit set is empty in pending ack handle.
if (((PersistentSubscription) subscription).checkIsCanDeleteConsumerPendingAck(position)) {
removePendingAcks(position);
if (Subscription.isIndividualAckMode(subType)) {
((PersistentSubscription) subscription)
.syncBatchPositionBitSetForPendingAck(position);
}
}
} else {
Expand All @@ -393,14 +378,28 @@ private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String,L
checkAckValidationError(ack, position);
}
subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
return CompletableFuture.completedFuture(null);
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
completableFuture.complete(null);
if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) {
completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> {
//check if the position can remove from the consumer pending acks.
// the bit set is empty in pending ack handle.
if (((PositionImpl) position).getAckSet() != null) {
if (((PersistentSubscription) subscription)
.checkIsCanDeleteConsumerPendingAck((PositionImpl) position)) {
removePendingAcks((PositionImpl) position);
}
}
}));
}
return completableFuture;
}


//this method is for individual ack carry the transaction
private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {
// Individual ack
List<MutablePair<PositionImpl, Long>> positionsAcked = new ArrayList<>();
List<MutablePair<PositionImpl, Integer>> positionsAcked = new ArrayList<>();

if (!isTransactionEnabled()) {
return FutureUtil.failedFuture(
Expand All @@ -417,15 +416,11 @@ private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
}

LongPair batchSizePair = this.pendingAcks.get(msgId.getLedgerId(), msgId.getEntryId());
if (batchSizePair == null) {
String error = "Batch position [" + position + "] could not find " +
"it's batch size from consumer pendingAcks!";
log.error(error);
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException(error));
if (msgId.hasBatchIndex()) {
positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
} else {
positionsAcked.add(new MutablePair<>(position, 0));
}
positionsAcked.add(new MutablePair<>(position, batchSizePair.first));

checkCanRemovePendingAcksAndHandle(position, msgId);

Expand All @@ -434,12 +429,17 @@ private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {

CompletableFuture<Void> completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(),
ack.getTxnidLeastBits(), positionsAcked);
positionsAcked.forEach(positionLongMutablePair -> {
if (((PersistentSubscription) subscription)
.checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) {
removePendingAcks(positionLongMutablePair.left);
}
});
if (Subscription.isIndividualAckMode(subType)) {
completableFuture.whenComplete((v, e) ->
positionsAcked.forEach(positionLongMutablePair -> {
if (positionLongMutablePair.getLeft().getAckSet() != null) {
if (((PersistentSubscription) subscription)
.checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) {
removePendingAcks(positionLongMutablePair.left);
}
}
}));
}
return completableFuture;
}

Expand All @@ -465,7 +465,7 @@ private boolean isTransactionEnabled() {
private CompletableFuture<Void> transactionIndividualAcknowledge(
long txnidMostBits,
long txnidLeastBits,
List<MutablePair<PositionImpl, Long>> positionList) {
List<MutablePair<PositionImpl, Integer>> positionList) {
if (subscription instanceof PersistentSubscription) {
TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
return ((PersistentSubscription) subscription).transactionIndividualAcknowledge(txnID, positionList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ protected void readMoreEntries(Consumer consumer) {
return;
}

if (messagesToRedeliver.size() > 0) {
if (messagesToRedeliver != null && messagesToRedeliver.size() > 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule replay of {} messages", name, messagesToRedeliver.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.broker.service.Consumer;
Expand All @@ -71,7 +72,6 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -133,7 +133,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
this.pendingAckHandle = new PendingAckHandleImpl(this);
} else {
this.pendingAckHandle = null;
this.pendingAckHandle = new PendingAckHandleDisabled();
}
IS_FENCED_UPDATER.set(this, FALSE);
}
Expand Down Expand Up @@ -388,21 +388,11 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
}

public CompletableFuture<Void> transactionIndividualAcknowledge(TxnID txnId,
List<MutablePair<PositionImpl, Long>> positions) {
if (pendingAckHandle == null) {
return FutureUtil.failedFuture(
new TransactionConflictException("Broker does't support Transaction pending ack!"));
}

List<MutablePair<PositionImpl, Integer>> positions) {
return pendingAckHandle.individualAcknowledgeMessage(txnId, positions);
}

public CompletableFuture<Void> transactionCumulativeAcknowledge(TxnID txnId, List<PositionImpl> positions) {
if (pendingAckHandle == null) {
return FutureUtil.failedFuture(
new TransactionConflictException("Broker does't support Transaction pending ack!"));
}

return pendingAckHandle.cumulativeAcknowledgeMessage(txnId, positions);
}

Expand Down Expand Up @@ -1027,9 +1017,6 @@ public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapsho

@Override
public CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction) {
if (pendingAckHandle == null) {
return FutureUtil.failedFuture(new Exception("Broker does't support Transaction pending ack!"));
}
TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
if (TxnAction.COMMIT.getNumber() == txnAction) {
return pendingAckHandle.commitTxn(txnID, Collections.emptyMap());
Expand All @@ -1050,7 +1037,7 @@ public ManagedCursor getCursor() {
return cursor;
}

public void syncBatchPositionBitSetForPendingAck(MutablePair<PositionImpl, Long> position) {
public void syncBatchPositionBitSetForPendingAck(PositionImpl position) {
this.pendingAckHandle.syncBatchPositionAckSetForTransaction(position);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public interface PendingAckHandle {
* @throws NotAllowedException if Use this method incorrectly eg. not use
* PositionImpl or cumulative ack with a list of positions.
*/
CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePair<PositionImpl, Long>> positions);
CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions);

/**
* Acknowledge message(s) for an ongoing transaction.
Expand Down Expand Up @@ -103,7 +103,7 @@ public interface PendingAckHandle {
*
* @param position {@link Position} which position need to sync and carry it batch size
*/
void syncBatchPositionAckSetForTransaction(MutablePair<PositionImpl, Long> position);
void syncBatchPositionAckSetForTransaction(PositionImpl position);

/**
* Judge the all ack set point have acked by normal ack and transaction pending ack.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.pendingack.impl;

import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.util.FutureUtil;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* The disabled implementation of {@link PendingAckHandle}.
*/
public class PendingAckHandleDisabled implements PendingAckHandle {

@Override
public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
List<MutablePair<PositionImpl, Integer>> positions) {
return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled"));
}

@Override
public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions) {
return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled"));
}

@Override
public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties) {
return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled"));
}

@Override
public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer) {
return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled"));
}

@Override
public void syncBatchPositionAckSetForTransaction(PositionImpl position) {
//no operation
}

@Override
public boolean checkIsCanDeleteConsumerPendingAck(PositionImpl position) {
return false;
}

@Override
public void clearIndividualPosition(Position position) {
//no operation
}
}
Loading

0 comments on commit 281163b

Please sign in to comment.