Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-54] Support acknowledgment at batch index level #6052

Merged
merged 27 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f69fc22
Add support for batch local index ack
codelipenghui Jan 14, 2020
0bd8eab
Fix comments
codelipenghui Jan 30, 2020
6c8725c
Fix comments
codelipenghui Jan 30, 2020
0799dfd
Fix checkstyle
codelipenghui Jan 30, 2020
cab0948
Fix unit test
codelipenghui Jan 31, 2020
e320c6f
Generate PulsarApi.java
codelipenghui Feb 13, 2020
76217ac
Fix checkstyle
codelipenghui Feb 13, 2020
dd28770
Apply comments
codelipenghui Feb 18, 2020
fa2b9f7
Apply comments
codelipenghui Feb 18, 2020
a75e9f6
Fix checkstyle
codelipenghui Feb 18, 2020
aa388f3
Fix tests.
codelipenghui Feb 18, 2020
af497bd
Introduce BitSetRecyclable that leverage with netty recycler
codelipenghui Feb 19, 2020
26816c0
Check not null for recycle.
codelipenghui Feb 19, 2020
ae832ff
Fix tests.
codelipenghui Feb 19, 2020
a297843
fix tests.
codelipenghui Feb 20, 2020
75aec39
Fix test.
codelipenghui Apr 24, 2020
cca1a5a
Remove unused tests.
codelipenghui Apr 24, 2020
36d899a
Remove unused tests.
codelipenghui Apr 24, 2020
994d5c9
Merge remote-tracking branch 'apache/master' into batch_index_ack
codelipenghui May 19, 2020
ef9929a
Fix tests.
codelipenghui May 19, 2020
64af907
Merge branch 'master' into batch_index_ack
codelipenghui May 20, 2020
d6433aa
Merge remote-tracking branch 'apache/master' into batch_index_ack
codelipenghui May 22, 2020
f248dc8
Fix tests
codelipenghui May 27, 2020
e3f9e72
Merge remote-tracking branch 'apache/master' into batch_index_ack
codelipenghui May 28, 2020
15f3979
Fix tests.
codelipenghui May 28, 2020
4d65aa6
Merge remote-tracking branch 'apache/master' into batch_index_ack
codelipenghui May 29, 2020
9789f39
Merge branch 'master' into batch_index_ack
codelipenghui May 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions buildtools/src/main/resources/pulsar/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@
<suppress checks=".*" files=".+[\\/]com[\\/]scurrilous[\\/]circe[\\/].+\.java" />

<suppress checks=".*" files="MLDataFormats.java" />
<suppress checks=".*" files="BitSetRecyclable.java" />
</suppressions>
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ delayedDeliveryEnabled=true
# Default is 1 second.
delayedDeliveryTickTimeMillis=1000

# Whether to enable acknowledge of batch local index.
acknowledgmentAtBatchIndexLevelEnabled=false

# Enable tracking of replicated subscriptions state across clusters.
enableReplicatedSubscriptions=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,4 +643,9 @@ Set<? extends Position> asyncReplayEntries(
* Trim delete entries for the given entries
*/
void trimDeletedEntries(List<Entry> entries);

/**
* Get deleted batch indexes list for a batch message.
*/
long[] getDeletedBatchIndexesAsLongArray(PositionImpl position);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class ManagedLedgerConfig {

private boolean createIfMissing = true;
private int maxUnackedRangesToPersist = 10000;
private int maxBatchDeletedIndexToPersist = 10000;
private boolean deletionAtBatchIndexLevelEnabled = true;
private int maxUnackedRangesToPersistInZk = 1000;
private int maxEntriesPerLedger = 50000;
private int maxSizePerLedgerMb = 100;
Expand Down Expand Up @@ -430,6 +432,13 @@ public int getMaxUnackedRangesToPersist() {
return maxUnackedRangesToPersist;
}

/**
* @return max batch deleted index that will be persisted and recoverd.
*/
public int getMaxBatchDeletedIndexToPersist() {
return maxBatchDeletedIndexToPersist;
}

/**
* @param maxUnackedRangesToPersist
* max unacked message ranges that will be persisted and receverd.
Expand Down Expand Up @@ -585,4 +594,12 @@ public void setBookKeeperEnsemblePlacementPolicyProperties(
Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties) {
this.bookKeeperEnsemblePlacementPolicyProperties = bookKeeperEnsemblePlacementPolicyProperties;
}

public boolean isDeletionAtBatchIndexLevelEnabled() {
return deletionAtBatchIndexLevelEnabled;
}

public void setDeletionAtBatchIndexLevelEnabled(boolean deletionAtBatchIndexLevelEnabled) {
this.deletionAtBatchIndexLevelEnabled = deletionAtBatchIndexLevelEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@

import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -90,6 +93,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
Expand Down Expand Up @@ -156,6 +160,10 @@ public class ManagedCursorImpl implements ManagedCursor {
return position;
};
private final LongPairRangeSet<PositionImpl> individualDeletedMessages;

// Maintain the deletion status for batch messages
// (ledgerId, entryId) -> deletion indexes
private final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable> batchDeletedIndexes;
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private RateLimiter markDeleteLimiter;
Expand Down Expand Up @@ -232,6 +240,11 @@ public interface VoidCallback {
this.individualDeletedMessages = config.isUnackedRangesOpenCacheSetEnabled()
? new ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter)
: new LongPairRangeSet.DefaultRangeSet<>(positionRangeConverter);
if (config.isDeletionAtBatchIndexLevelEnabled()) {
this.batchDeletedIndexes = new ConcurrentSkipListMap<>();
} else {
this.batchDeletedIndexes = null;
}
this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
STATE_UPDATER.set(this, State.Uninitialized);
PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0);
Expand Down Expand Up @@ -379,6 +392,10 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
}
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
}
recoveredCursor(position, recoveredProperties, lh);
callback.operationComplete();
}, null);
Expand All @@ -398,6 +415,25 @@ private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> i
}
}

private void recoverBatchDeletedIndexes (List<MLDataFormats.BatchedEntryDeletionIndexInfo> batchDeletedIndexInfoList) {
lock.writeLock().lock();
try {
this.batchDeletedIndexes.clear();
batchDeletedIndexInfoList.forEach(batchDeletedIndexInfo -> {
if (batchDeletedIndexInfo.getDeleteSetCount() > 0) {
long[] array = new long[batchDeletedIndexInfo.getDeleteSetCount()];
for (int i = 0; i < batchDeletedIndexInfo.getDeleteSetList().size(); i++) {
array[i] = batchDeletedIndexInfo.getDeleteSetList().get(i);
}
this.batchDeletedIndexes.put(PositionImpl.get(batchDeletedIndexInfo.getPosition().getLedgerId(),
batchDeletedIndexInfo.getPosition().getEntryId()), BitSetRecyclable.create().resetWords(array));
}
});
} finally {
lock.writeLock().unlock();
}
}

private void recoveredCursor(PositionImpl position, Map<String, Long> properties,
LedgerHandle recoveredFromCursorLedger) {
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
Expand Down Expand Up @@ -920,6 +956,10 @@ public void operationComplete() {
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(),
null, null);
individualDeletedMessages.clear();
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
batchDeletedIndexes.clear();
}

PositionImpl oldReadPosition = readPosition;
if (oldReadPosition.compareTo(newPosition) >= 0) {
Expand Down Expand Up @@ -1507,8 +1547,22 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
if (log.isDebugEnabled()) {
log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position);
}

PositionImpl newPosition = (PositionImpl) position;

if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
if (newPosition.ackSet != null) {
batchDeletedIndexes.put(newPosition, BitSetRecyclable.create().resetWords(newPosition.ackSet));
newPosition = ledger.getPreviousPosition(newPosition);
}
Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.earliest, newPosition);
subMap.values().forEach(BitSetRecyclable::recycle);
subMap.clear();
} else if (newPosition.ackSet != null) {
newPosition = ledger.getPreviousPosition(newPosition);
newPosition.ackSet = null;
}

if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) {
if (log.isDebugEnabled()) {
log.debug(
Expand Down Expand Up @@ -1600,6 +1654,11 @@ public void operationComplete() {
try {
individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId());
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.earliest, false, PositionImpl.get(mdEntry.newPosition.getLedgerId(), mdEntry.newPosition.getEntryId()), true);
subMap.values().forEach(BitSetRecyclable::recycle);
subMap.clear();
}
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -1722,35 +1781,62 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb

for (Position pos : positions) {
PositionImpl position = (PositionImpl) checkNotNull(pos);

if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]",
ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
"[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]",
ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
}
callback.deleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
return;
}

if (individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())
|| position.compareTo(markDeletePosition) <= 0) {
|| position.compareTo(markDeletePosition) <= 0) {
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
}
continue;
}

// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make
// the RangeSet recognize the "continuity" between adjacent Positions
PositionImpl previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
if (position.ackSet == null) {
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
}
// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make
// the RangeSet recognize the "continuity" between adjacent Positions
PositionImpl previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
position.getLedgerId(), position.getEntryId());
MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);

if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
individualDeletedMessages);
}
} else if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> BitSetRecyclable.create().resetWords(position.ackSet));
BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet);
bitSet.and(givenBitSet);
givenBitSet.recycle();
if (bitSet.isEmpty()) {
PositionImpl previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
position.getLedgerId(), position.getEntryId());
++messagesConsumedCounter;
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
}
}
}

Expand Down Expand Up @@ -2062,6 +2148,9 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
info.addAllProperties(buildPropertiesMap(properties));
if (persistIndividualDeletedMessageRanges) {
info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
if (config.isDeletionAtBatchIndexLevelEnabled()) {
info.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList());
}
}

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -2306,11 +2395,38 @@ private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
}
}

private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletionIndexInfoList() {
if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes == null || batchDeletedIndexes.isEmpty()) {
return Collections.emptyList();
}
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
.newBuilder();
MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats.BatchedEntryDeletionIndexInfo
.newBuilder();
List<MLDataFormats.BatchedEntryDeletionIndexInfo> result = Lists.newArrayList();
Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
while (iterator.hasNext() && result.size() < config.getMaxBatchDeletedIndexToPersist()) {
Map.Entry<PositionImpl, BitSetRecyclable> entry = iterator.next();
nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId());
nestedPositionBuilder.setEntryId(entry.getKey().getEntryId());
batchDeletedIndexInfoBuilder.setPosition(nestedPositionBuilder.build());
long[] array = entry.getValue().toLongArray();
List<Long> deleteSet = new ArrayList<>(array.length);
for (long l : array) {
deleteSet.add(l);
}
batchDeletedIndexInfoBuilder.addAllDeleteSet(deleteSet);
result.add(batchDeletedIndexInfoBuilder.build());
}
return result;
}

void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) {
PositionImpl position = mdEntry.newPosition;
PositionInfo pi = PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId())
.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges())
.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList())
.addAllProperties(buildPropertiesMap(mdEntry.properties)).build();


Expand Down Expand Up @@ -2693,6 +2809,16 @@ private ManagedCursorImpl cursorImpl() {
return this;
}

@Override
public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
BitSetRecyclable bitSet = batchDeletedIndexes.get(position);
return bitSet == null ? null : bitSet.toLongArray();
} else {
return null;
}
}

void updateReadStats(int readEntriesCount, long readEntriesSize) {
this.entriesReadCount += readEntriesCount;
this.entriesReadSize += readEntriesSize;
Expand Down Expand Up @@ -2723,7 +2849,5 @@ private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {

return Math.min(maxEntriesBasedOnSize, maxEntries);
}


private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class PositionImpl implements Position, Comparable<PositionImpl> {

protected long ledgerId;
protected long entryId;
protected long[] ackSet;

public static final PositionImpl earliest = new PositionImpl(-1, -1);
public static final PositionImpl latest = new PositionImpl(Long.MAX_VALUE, Long.MAX_VALUE);
Expand All @@ -49,6 +50,12 @@ public PositionImpl(long ledgerId, long entryId) {
this.entryId = entryId;
}

public PositionImpl(long ledgerId, long entryId, long[] ackSet) {
this.ledgerId = ledgerId;
this.entryId = entryId;
this.ackSet = ackSet;
}

public PositionImpl(PositionImpl other) {
this.ledgerId = other.ledgerId;
this.entryId = other.entryId;
Expand All @@ -58,6 +65,10 @@ public static PositionImpl get(long ledgerId, long entryId) {
return new PositionImpl(ledgerId, entryId);
}

public static PositionImpl get(long ledgerId, long entryId, long[] ackSet) {
return new PositionImpl(ledgerId, entryId, ackSet);
}

public static PositionImpl get(PositionImpl other) {
return new PositionImpl(other);
}
Expand Down
Loading