Skip to content

Commit

Permalink
Introduce BitSetRecyclable that leverage with netty recycler
Browse files Browse the repository at this point in the history
  • Loading branch information
codelipenghui committed Feb 19, 2020
1 parent 814371c commit 335cdff
Show file tree
Hide file tree
Showing 9 changed files with 1,208 additions and 35 deletions.
1 change: 1 addition & 0 deletions buildtools/src/main/resources/pulsar/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@
<suppress checks=".*" files=".+[\\/]com[\\/]scurrilous[\\/]circe[\\/].+\.java" />

<suppress checks=".*" files="MLDataFormats.java" />
<suppress checks=".*" files="BitSetRecyclable.java" />
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -156,7 +155,7 @@ public class ManagedCursorImpl implements ManagedCursor {

// Maintain the deletion status for batch messages
// (ledgerId, entryId) -> deletion indexes
private final ConcurrentSkipListMap<PositionImpl, BitSet> batchDeletedIndexes;
private final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable> batchDeletedIndexes;
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private RateLimiter markDeleteLimiter;
Expand Down Expand Up @@ -413,7 +412,7 @@ private void recoverBatchDeletedIndexes (List<MLDataFormats.BatchedEntryDeletion
array[i] = batchDeletedIndexInfo.getDeleteSetList().get(i);
}
this.batchDeletedIndexes.put(PositionImpl.get(batchDeletedIndexInfo.getPosition().getLedgerId(),
batchDeletedIndexInfo.getPosition().getEntryId()), BitSet.valueOf(array));
batchDeletedIndexInfo.getPosition().getEntryId()), BitSetRecyclable.create().resetWords(array));
}
});
} finally {
Expand Down Expand Up @@ -927,6 +926,7 @@ public void operationComplete() {
null, null);
individualDeletedMessages.clear();
if (config.isDeletionAtBatchIndexLevelEnabled()) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
batchDeletedIndexes.clear();
}

Expand Down Expand Up @@ -1502,7 +1502,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
}
callback.markDeleteFailed(
new ManagedLedgerException("Reset cursor in progress - unable to mark delete position "
+ ((PositionImpl) position).toString()),
+ position.toString()),
ctx);
}

Expand All @@ -1514,10 +1514,12 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie

if (config.isDeletionAtBatchIndexLevelEnabled()) {
if (newPosition.ackSet != null) {
batchDeletedIndexes.put(newPosition, newPosition.ackSet);
batchDeletedIndexes.put(newPosition, BitSetRecyclable.create().resetWords(newPosition.ackSet));
newPosition = ledger.getPreviousPosition(newPosition);
}
batchDeletedIndexes.subMap(PositionImpl.earliest, newPosition).clear();
Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.earliest, newPosition);
subMap.values().forEach(BitSetRecyclable::recycle);
subMap.clear();
} else if (newPosition.ackSet != null) {
callback.markDeleteFailed(new ManagedLedgerException("Batch ack set not support"), ctx);
return;
Expand Down Expand Up @@ -1615,7 +1617,9 @@ public void operationComplete() {
individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId());
if (config.isDeletionAtBatchIndexLevelEnabled()) {
batchDeletedIndexes.subMap(PositionImpl.earliest, false, PositionImpl.get(mdEntry.newPosition.getLedgerId(), mdEntry.newPosition.getEntryId()), true).clear();
Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.earliest, false, PositionImpl.get(mdEntry.newPosition.getLedgerId(), mdEntry.newPosition.getEntryId()), true);
subMap.values().forEach(BitSetRecyclable::recycle);
subMap.clear();
}
} finally {
lock.writeLock().unlock();
Expand Down Expand Up @@ -1752,7 +1756,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
if (individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())
|| position.compareTo(markDeletePosition) <= 0) {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
batchDeletedIndexes.remove(position);
batchDeletedIndexes.remove(position).recycle();
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
Expand All @@ -1761,7 +1765,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
}
if (position.ackSet == null) {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
batchDeletedIndexes.remove(position);
batchDeletedIndexes.remove(position).recycle();
}
// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make
// the RangeSet recognize the "continuity" between adjacent Positions
Expand All @@ -1775,14 +1779,16 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
individualDeletedMessages);
}
} else if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSet bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> BitSet.valueOf(position.ackSet.toLongArray()));
bitSet.and(position.ackSet);
BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> BitSetRecyclable.create().resetWords(position.ackSet));
BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet);
bitSet.and(givenBitSet);
givenBitSet.recycle();
if (bitSet.isEmpty()) {
PositionImpl previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
position.getLedgerId(), position.getEntryId());
++messagesConsumedCounter;
batchDeletedIndexes.remove(position);
batchDeletedIndexes.remove(position).recycle();
}
}
}
Expand Down Expand Up @@ -2327,9 +2333,9 @@ private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletio
MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats.BatchedEntryDeletionIndexInfo
.newBuilder();
List<MLDataFormats.BatchedEntryDeletionIndexInfo> result = Lists.newArrayList();
Iterator<Map.Entry<PositionImpl, BitSet>> iterator = batchDeletedIndexes.entrySet().iterator();
Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
while (iterator.hasNext() && result.size() < config.getMaxBatchDeletedIndexToPersist()) {
Map.Entry<PositionImpl, BitSet> entry = iterator.next();
Map.Entry<PositionImpl, BitSetRecyclable> entry = iterator.next();
nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId());
nestedPositionBuilder.setEntryId(entry.getKey().getEntryId());
batchDeletedIndexInfoBuilder.setPosition(nestedPositionBuilder.build());
Expand Down Expand Up @@ -2732,7 +2738,7 @@ public void trimDeletedEntries(List<Entry> entries) {

@Override
public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
BitSet bitSet = batchDeletedIndexes.get(position);
BitSetRecyclable bitSet = batchDeletedIndexes.get(position);
return bitSet == null ? null : bitSet.toLongArray();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;

import java.util.BitSet;

public class PositionImpl implements Position, Comparable<PositionImpl> {

protected long ledgerId;
protected long entryId;
protected BitSet ackSet;
protected long[] ackSet;

public static final PositionImpl earliest = new PositionImpl(-1, -1);
public static final PositionImpl latest = new PositionImpl(Long.MAX_VALUE, Long.MAX_VALUE);
Expand All @@ -52,7 +50,7 @@ public PositionImpl(long ledgerId, long entryId) {
this.entryId = entryId;
}

public PositionImpl(long ledgerId, long entryId, BitSet ackSet) {
public PositionImpl(long ledgerId, long entryId, long[] ackSet) {
this.ledgerId = ledgerId;
this.entryId = entryId;
this.ackSet = ackSet;
Expand All @@ -67,7 +65,7 @@ public static PositionImpl get(long ledgerId, long entryId) {
return new PositionImpl(ledgerId, entryId);
}

public static PositionImpl get(long ledgerId, long entryId, BitSet ackSet) {
public static PositionImpl get(long ledgerId, long entryId, long[] ackSet) {
return new PositionImpl(ledgerId, entryId, ackSet);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3138,7 +3138,7 @@ private void deleteBatchIndex(ManagedCursor cursor, Position position, int batch
deleteIndexes.forEach(intRange -> {
bitSet.clear(intRange.getStart(), intRange.getEnd() + 1);
});
pos.ackSet = bitSet;
pos.ackSet = bitSet.toLongArray();

cursor.asyncDelete(pos,
new DeleteCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ void messageAcked(CommandAck ack) {
if (ack.getMessageIdCount() == 1) {
MessageIdData msgId = ack.getMessageId(0);
if (msgId.getAckSetCount() > 0) {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), BitSet.valueOf(SafeCollectionUtils.longListToArray(msgId.getAckSetList())));
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), SafeCollectionUtils.longListToArray(msgId.getAckSetList()));
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
}
Expand All @@ -386,7 +386,7 @@ void messageAcked(CommandAck ack) {
MessageIdData msgId = ack.getMessageId(i);
PositionImpl position;
if (msgId.getAckSetCount() > 0) {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), BitSet.valueOf(SafeCollectionUtils.longListToArray(msgId.getAckSetList())));
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), SafeCollectionUtils.longListToArray(msgId.getAckSetList()));
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;

/**
Expand Down Expand Up @@ -190,7 +189,7 @@ private boolean doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchInde
if (cnx == null) {
return false;
}
ConcurrentBitSet bitSet = new ConcurrentBitSet(batchSize);
BitSetRecyclable bitSet = BitSetRecyclable.create();
bitSet.set(0, batchSize);
if (ackType == AckType.Cumulative) {
bitSet.clear(0, batchIndex + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.netty.buffer.Unpooled;

import java.io.IOException;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -119,6 +118,7 @@
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
Expand Down Expand Up @@ -901,12 +901,12 @@ public static ByteBuf newMultiMessageAck(long consumerId,
return res;
}

public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSet ackSet, AckType ackType,
public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType,
ValidationError validationError, Map<String, Long> properties) {
return newAck(consumerId, ledgerId, entryId, ackSet, ackType, validationError, properties, 0, 0);
}

public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSet ackSet, AckType ackType,
public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType,
ValidationError validationError, Map<String, Long> properties, long txnIdLeastBits,
long txnIdMostBits) {
CommandAck.Builder ackBuilder = CommandAck.newBuilder();
Expand Down Expand Up @@ -937,6 +937,7 @@ public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSe

ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.ACK).setAck(ack));
ack.recycle();
ackSet.recycle();
ackBuilder.recycle();
messageIdDataBuilder.recycle();
messageIdData.recycle();
Expand Down
Loading

0 comments on commit 335cdff

Please sign in to comment.