Skip to content

Commit

Permalink
Add support for batch local index ack
Browse files Browse the repository at this point in the history
  • Loading branch information
codelipenghui committed Jan 22, 2020
1 parent f9674f7 commit 94fba9e
Show file tree
Hide file tree
Showing 39 changed files with 1,964 additions and 50 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ delayedDeliveryEnabled=true
# Default is 1 second.
delayedDeliveryTickTimeMillis=1000

# Whether to enable acknowledge of batch local index.
batchIndexAcknowledgeEnable = true

# Enable tracking of replicated subscriptions state across clusters.
enableReplicatedSubscriptions=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;

/**
* A ManangedCursor is a persisted cursor inside a ManagedLedger.
Expand Down Expand Up @@ -319,6 +320,21 @@ void markDelete(Position position, Map<String, Long> properties)
*/
void asyncDelete(Iterable<Position> position, DeleteCallback callback, Object ctx);

/**
* Delete a group of batch indexes in a batch message asynchronously
*
* <p/>
* Mark a group of batch indexed for deletion. When all indexes of a batch message are all deleted, then will mark the batch message
* for deletion
*
* @param position the position of the message to deleted
* @param batchSize batch size of the batch message
* @param deleteIndexRanges delete index ranges for a batch message
* @param callback callback object
* @param ctx opaque context
*/
void asyncDelete(Position position, int batchSize, List<IntRange> deleteIndexRanges, DeleteCallback callback, Object ctx);

/**
* Get the read position. This points to the next message to be read from the cursor.
*
Expand Down Expand Up @@ -608,4 +624,9 @@ Set<? extends Position> asyncReplayEntries(
* Trim delete entries for the given entries
*/
void trimDeletedEntries(List<Entry> entries);

/**
* Get deleted batch indexes list for a batch message.
*/
List<IntRange> getDeletedBatchIndexes(Position position);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ManagedLedgerConfig {

private boolean createIfMissing = true;
private int maxUnackedRangesToPersist = 10000;
private int maxBatchDeletedIndexToPersist = 10000;
private int maxUnackedRangesToPersistInZk = 1000;
private int maxEntriesPerLedger = 50000;
private int maxSizePerLedgerMb = 100;
Expand Down Expand Up @@ -471,6 +472,13 @@ public int getMaxUnackedRangesToPersist() {
return maxUnackedRangesToPersist;
}

/**
* @return max batch deleted index that will be persisted and recoverd.
*/
public int getMaxBatchDeletedIndexToPersist() {
return maxBatchDeletedIndexToPersist;
}

/**
* @param maxUnackedRangesToPersist
* max unacked message ranges that will be persisted and receverd.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@

import java.time.Clock;
import java.util.ArrayDeque;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -90,6 +92,8 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
Expand Down Expand Up @@ -148,6 +152,10 @@ public class ManagedCursorImpl implements ManagedCursor {
return position;
};
private final LongPairRangeSet<PositionImpl> individualDeletedMessages;

// Maintain the indexes deleted status of batch messages that not deleted completely
// (ledgerId, entryId, batchSize) -> deleted indexes
private final ConcurrentHashMap<PositionImpl, ConcurrentBitSet> batchDeletedIndexes;
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private RateLimiter markDeleteLimiter;
Expand Down Expand Up @@ -219,6 +227,7 @@ public interface VoidCallback {
this.individualDeletedMessages = config.isUnackedRangesOpenCacheSetEnabled()
? new ConcurrentOpenLongPairRangeSet<PositionImpl>(4096, positionRangeConverter)
: new LongPairRangeSet.DefaultRangeSet<>(positionRangeConverter);
this.batchDeletedIndexes = new ConcurrentHashMap<>();
this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
STATE_UPDATER.set(this, State.Uninitialized);
PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0);
Expand Down Expand Up @@ -366,6 +375,9 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
}
if (positionInfo.getBatchDeletedIndexesCount() > 0) {
recoverBatchDeletedIndexes(positionInfo.getBatchDeletedIndexesList());
}
recoveredCursor(position, recoveredProperties, lh);
callback.operationComplete();
}, null);
Expand All @@ -385,6 +397,24 @@ private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> i
}
}

private void recoverBatchDeletedIndexes (List<MLDataFormats.BatchDeletedIndexInfo> batchDeletedIndexInfoList) {
lock.writeLock().lock();
try {
this.batchDeletedIndexes.clear();
batchDeletedIndexInfoList.forEach(batchDeletedIndexInfo -> {
if (batchDeletedIndexInfo.getBatchDeletedIndexesCount() > 0) {
ConcurrentBitSet value = new ConcurrentBitSet(0);
batchDeletedIndexInfo.getBatchDeletedIndexesList().forEach(deletedIndexRange ->
value.set(deletedIndexRange.getLowerIndex(), deletedIndexRange.getUpperIndex()));
this.batchDeletedIndexes.put(PositionImpl.get(batchDeletedIndexInfo.getPosition().getLedgerId(),
batchDeletedIndexInfo.getPosition().getEntryId()), value);
}
});
} finally {
lock.writeLock().unlock();
}
}

private void recoveredCursor(PositionImpl position, Map<String, Long> properties,
LedgerHandle recoveredFromCursorLedger) {
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
Expand Down Expand Up @@ -890,6 +920,7 @@ public void operationComplete() {
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(),
null, null);
individualDeletedMessages.clear();
batchDeletedIndexes.clear();

PositionImpl oldReadPosition = readPosition;
if (oldReadPosition.compareTo(newPosition) >= 0) {
Expand Down Expand Up @@ -1556,6 +1587,9 @@ public void operationComplete() {
try {
individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId());
batchDeletedIndexes.entrySet().removeIf(entry ->
entry.getKey().compareTo(
PositionImpl.get(mdEntry.newPosition.getLedgerId(), mdEntry.newPosition.getEntryId())) <= 0);
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -1697,6 +1731,8 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
continue;
}

batchDeletedIndexes.remove(position);

// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make
// the RangeSet recognize the "continuity" between adjacent Positions
PositionImpl previousPosition = ledger.getPreviousPosition(position);
Expand Down Expand Up @@ -1781,6 +1817,58 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}
}

@Override
public void asyncDelete(Position position, int batchSize, List<IntRange> deleteIndexRanges, AsyncCallbacks.DeleteCallback callback, Object ctx) {
if (isClosed()) {
callback.deleteFailed(new ManagedLedgerException("Cursor was already closed"), null);
return;
}
lock.writeLock().lock();
try {

ConcurrentBitSet bitSet;
PositionImpl pos = (PositionImpl) checkNotNull(position);
if (markDeletePosition.compareTo(pos) >= 0 || individualDeletedMessages.contains(pos.ledgerId, pos.entryId)) {
batchDeletedIndexes.remove(pos);
callback.deleteComplete(ctx);
return;
}

if (!batchDeletedIndexes.containsKey(pos)) {
bitSet = new ConcurrentBitSet(batchSize);
batchDeletedIndexes.put(pos, bitSet);
} else {
bitSet = batchDeletedIndexes.get(pos);
}

for (IntRange deleteIndexRange : deleteIndexRanges) {
bitSet.set(deleteIndexRange.getStart(), deleteIndexRange.getEnd() + 1);
}

if (bitSet.nextClearBit(0) == batchSize) {
asyncDelete(pos, new AsyncCallbacks.DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
callback.deleteComplete(ctx);
}

@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
callback.deleteFailed(getManagedLedgerException(exception), ctx);
}
}, ctx);
} else {
callback.deleteComplete(ctx);
}
} catch (Exception e) {
log.warn("[{}] [{}] Error while updating batchDeletedMessages [{}]", ledger.getName(), name,
e.getMessage(), e);
callback.deleteFailed(getManagedLedgerException(e), ctx);
} finally {
lock.writeLock().unlock();
}
}

/**
* Given a list of entries, filter out the entries that have already been individually deleted.
*
Expand Down Expand Up @@ -2019,6 +2107,7 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
info.addAllProperties(buildPropertiesMap(properties));
if (persistIndividualDeletedMessageRanges) {
info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
info.addAllBatchDeletedIndexes(buildBatchDeletedIndexInfoList());
}

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -2239,11 +2328,49 @@ private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
}
}

private List<MLDataFormats.BatchDeletedIndexInfo> buildBatchDeletedIndexInfoList() {
lock.readLock().lock();
try {
if (batchDeletedIndexes.isEmpty()) {
return Collections.emptyList();
}
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
.newBuilder();
MLDataFormats.BatchDeletedIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats.BatchDeletedIndexInfo
.newBuilder();
MLDataFormats.BatchDeletedIndexRange.Builder batchDeletedIndexRangeBuilder = MLDataFormats.BatchDeletedIndexRange
.newBuilder();
List<MLDataFormats.BatchDeletedIndexInfo> result = Lists.newArrayList();
for (Map.Entry<PositionImpl, ConcurrentBitSet> entry : batchDeletedIndexes.entrySet()) {
nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId());
nestedPositionBuilder.setEntryId(entry.getKey().getEntryId());
batchDeletedIndexInfoBuilder.setPosition(nestedPositionBuilder.build());
BitSet bitSet = entry.getValue();
int nextSetBit = bitSet.nextSetBit(0);
while (nextSetBit != -1) {
int nextClearBit = bitSet.nextClearBit(nextSetBit);
batchDeletedIndexRangeBuilder.setLowerIndex(nextSetBit);
batchDeletedIndexRangeBuilder.setUpperIndex(nextClearBit);
batchDeletedIndexInfoBuilder.addBatchDeletedIndexes(batchDeletedIndexRangeBuilder.build());
nextSetBit = bitSet.nextSetBit(nextClearBit);
}
result.add(batchDeletedIndexInfoBuilder.build());
if (result.size() >= config.getMaxBatchDeletedIndexToPersist()) {
break;
}
}
return result;
} finally {
lock.readLock().unlock();
}
}

void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) {
PositionImpl position = mdEntry.newPosition;
PositionInfo pi = PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId())
.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges())
.addAllBatchDeletedIndexes(buildBatchDeletedIndexInfoList())
.addAllProperties(buildPropertiesMap(mdEntry.properties)).build();


Expand Down Expand Up @@ -2624,5 +2751,27 @@ public void trimDeletedEntries(List<Entry> entries) {
|| individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId()));
}

@Override
public List<IntRange> getDeletedBatchIndexes(Position position) {
if (batchDeletedIndexes.isEmpty()) {
return null;
}
PositionImpl pos = (PositionImpl) checkNotNull(position);
if (batchDeletedIndexes.containsKey(pos)) {
List<IntRange> result = Lists.newArrayList();
BitSet bitSet = batchDeletedIndexes.get(pos);
int nextSetBit = bitSet.nextSetBit(0);
IntRange.Builder builder = IntRange.newBuilder();
while (nextSetBit != -1) {
int nextClearBit = bitSet.nextClearBit(nextSetBit);
result.add(builder.setStart(nextSetBit).setEnd(nextClearBit - 1).build());
nextSetBit = bitSet.nextSetBit(nextClearBit);
}
return result;
} else {
return null;
}
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
}
18 changes: 17 additions & 1 deletion managed-ledger/src/main/proto/MLDataFormats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ message PositionInfo {
// Additional custom properties associated with
// the current cursor position
repeated LongProperty properties = 4;

// Store which index in the batch message has been deleted
repeated BatchDeletedIndexInfo batchDeletedIndexes = 5;
}

message NestedPositionInfo {
Expand All @@ -78,6 +81,16 @@ message MessageRange {
required NestedPositionInfo upperEndpoint = 2;
}

message BatchDeletedIndexRange {
required uint32 lowerIndex = 1;
required uint32 upperIndex = 2;
}

message BatchDeletedIndexInfo {
required NestedPositionInfo position = 1;
repeated BatchDeletedIndexRange batchDeletedIndexes = 2;
}

// Generic string and long tuple
message LongProperty {
required string name = 1;
Expand All @@ -98,5 +111,8 @@ message ManagedCursorInfo {
// the current cursor position
repeated LongProperty properties = 5;

optional int64 lastActive = 6;
optional int64 lastActive = 6;

// Store which index in the batch message has been deleted
repeated BatchDeletedIndexInfo batchDeletedIndexes = 7;
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
import org.testng.annotations.Test;

@Test
Expand Down Expand Up @@ -179,6 +180,10 @@ public void delete(Iterable<Position> positions) throws InterruptedException, Ma
public void asyncDelete(Iterable<Position> position, DeleteCallback callback, Object ctx) {
}

@Override
public void asyncDelete(Position position, int batchSize, List<IntRange> deleteIndexRanges, DeleteCallback callback, Object ctx) {
}

@Override
public void clearBacklog() throws InterruptedException, ManagedLedgerException {
}
Expand Down Expand Up @@ -326,6 +331,11 @@ public void trimDeletedEntries(List<Entry> entries) {

}

@Override
public List<IntRange> getDeletedBatchIndexes(Position position) {
return null;
}

}

@Test
Expand Down
Loading

0 comments on commit 94fba9e

Please sign in to comment.