Skip to content

Commit

Permalink
Revert "[improve][broker] Optimization protobuf code in the bucket de…
Browse files Browse the repository at this point in the history
…layed tracker (#20158)"

This reverts commit 3da39b2.
  • Loading branch information
coderzc committed Apr 23, 2023
1 parent 3da39b2 commit e3e1c08
Show file tree
Hide file tree
Showing 15 changed files with 172 additions and 210 deletions.
2 changes: 0 additions & 2 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,6 @@
<excludes>
<exclude>**/ResourceUsage.proto</exclude>
<exclude>**/TransactionPendingAck.proto</exclude>
<exclude>**/DelayedMessageIndexBucketSegment.proto</exclude>
</excludes>
</configuration>
<executions>
Expand Down Expand Up @@ -611,7 +610,6 @@
<sources>
<source>${project.basedir}/src/main/proto/TransactionPendingAck.proto</source>
<source>${project.basedir}/src/main/proto/ResourceUsage.proto</source>
<source>${project.basedir}/src/main/proto/DelayedMessageIndexBucketSegment.proto</source>
</sources>
<targetSourcesSubDir>generated-sources/lightproto/java</targetSourcesSubDir>
<targetTestSourcesSubDir>generated-sources/lightproto/java</targetTestSourcesSubDir>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.broker.delayed.bucket;

import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
Expand All @@ -36,8 +36,8 @@
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
Expand Down Expand Up @@ -126,23 +126,23 @@ private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,

private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
try {
ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer());
return SnapshotMetadata.parseFrom(ledgerEntry.getEntry());
} catch (InvalidProtocolBufferException e) {
throw new BucketSnapshotSerializationException(e);
}
}

private List<SnapshotSegment> parseSnapshotSegmentEntries(Enumeration<LedgerEntry> entryEnumeration) {
List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
while (entryEnumeration.hasMoreElements()) {
LedgerEntry ledgerEntry = entryEnumeration.nextElement();
SnapshotSegment snapshotSegment = new SnapshotSegment();
ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
snapshotSegment.parseFrom(entryBuffer, entryBuffer.readableBytes());
snapshotMetadataList.add(snapshotSegment);
try {
while (entryEnumeration.hasMoreElements()) {
LedgerEntry ledgerEntry = entryEnumeration.nextElement();
snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
}
return snapshotMetadataList;
} catch (IOException e) {
throw new BucketSnapshotSerializationException(e);
}
return snapshotMetadataList;
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.roaringbitmap.RoaringBitmap;
Expand Down Expand Up @@ -133,8 +132,8 @@ long getAndUpdateBucketId() {
}

CompletableFuture<Long> asyncSaveBucketSnapshot(
ImmutableBucket bucket, SnapshotMetadata snapshotMetadata,
List<SnapshotSegment> bucketSnapshotSegments) {
ImmutableBucket bucket, DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments) {
final String bucketKey = bucket.bucketKey();
final String cursorName = Codec.decode(cursor.getName());
final String topicName = dispatcherName.substring(0, dispatcherName.lastIndexOf(" / " + cursorName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -286,7 +286,8 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
// Put indexes back into the shared queue and downgrade to memory mode
synchronized (BucketDelayedDeliveryTracker.this) {
immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> {
for (SnapshotSegment snapshotSegment : snapshotSegments) {
for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment :
snapshotSegments) {
for (DelayedIndex delayedIndex : snapshotSegment.getIndexesList()) {
sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(),
delayedIndex.getLedgerId(), delayedIndex.getEntryId());
Expand Down Expand Up @@ -449,7 +450,7 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(List<Immut
return FutureUtil.failedFuture(new RuntimeException("Can't merge buckets due to bucket create failed"));
}

List<CompletableFuture<List<SnapshotSegment>>> getRemainFutures =
List<CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>> getRemainFutures =
buckets.stream().map(ImmutableBucket::getRemainSnapshotSegment).toList();

return FutureUtil.waitForAll(getRemainFutures)
Expand Down Expand Up @@ -600,11 +601,11 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
bucket.asyncDeleteBucketSnapshot(stats);
return;
}
DelayedIndex
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
lastDelayedIndex = indexList.get(indexList.size() - 1);
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getEntryId(), bucket);
for (DelayedIndex index : indexList) {
for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) {
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;

public interface BucketSnapshotStorage {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.PriorityQueue;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.AllArgsConstructor;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;

@NotThreadSafe
class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
Expand All @@ -40,8 +40,8 @@ static class Node {
}

private static final Comparator<Node> COMPARATOR_NODE = (node1, node2) -> DelayedIndexQueue.COMPARATOR.compare(
node1.segmentList.get(node1.segmentListCursor).getIndexeAt(node1.segmentCursor),
node2.segmentList.get(node2.segmentListCursor).getIndexeAt(node2.segmentCursor));
node1.segmentList.get(node1.segmentListCursor).getIndexes(node1.segmentCursor),
node2.segmentList.get(node2.segmentListCursor).getIndexes(node2.segmentCursor));

private final PriorityQueue<Node> kpq;

Expand Down Expand Up @@ -77,7 +77,7 @@ private DelayedIndex getValue(boolean needAdvanceCursor) {
Objects.requireNonNull(node);

SnapshotSegment snapshotSegment = node.segmentList.get(node.segmentListCursor);
DelayedIndex delayedIndex = snapshotSegment.getIndexeAt(node.segmentCursor);
DelayedIndex delayedIndex = snapshotSegment.getIndexes(node.segmentCursor);
if (!needAdvanceCursor) {
return delayedIndex;
}
Expand All @@ -104,16 +104,4 @@ private DelayedIndex getValue(boolean needAdvanceCursor) {

return delayedIndex;
}

@Override
public void popToObject(DelayedIndex delayedIndex) {
DelayedIndex value = getValue(true);
delayedIndex.copyFrom(value);
}

@Override
public long peekTimestamp() {
DelayedIndex value = getValue(false);
return value.getTimestamp();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import java.util.Comparator;
import java.util.Objects;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;

interface DelayedIndexQueue {
Comparator<DelayedIndex> COMPARATOR = (o1, o2) -> {
Comparator<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> COMPARATOR = (o1, o2) -> {
if (!Objects.equals(o1.getTimestamp(), o2.getTimestamp())) {
return Long.compare(o1.getTimestamp(), o2.getTimestamp());
} else if (!Objects.equals(o1.getLedgerId(), o2.getLedgerId())) {
Expand All @@ -35,11 +35,7 @@ interface DelayedIndexQueue {

boolean isEmpty();

DelayedIndex peek();
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek();

DelayedIndex pop();

void popToObject(DelayedIndex delayedIndex);

long peekTimestamp();
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.roaringbitmap.InvalidRoaringFormat;
import org.roaringbitmap.RoaringBitmap;
Expand All @@ -43,7 +43,7 @@
class ImmutableBucket extends Bucket {

@Setter
private List<SnapshotSegment> snapshotSegments;
private List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments;

boolean merging = false;

Expand All @@ -55,7 +55,7 @@ class ImmutableBucket extends Bucket {
super(dispatcherName, cursor, sequencer, storage, startLedgerId, endLedgerId);
}

public Optional<List<SnapshotSegment>> getSnapshotSegments() {
public Optional<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> getSnapshotSegments() {
return Optional.ofNullable(snapshotSegments);
}

Expand Down Expand Up @@ -84,7 +84,7 @@ private CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry(b
}
}), BucketSnapshotPersistenceException.class, MaxRetryTimes)
.thenApply(snapshotMetadata -> {
List<SnapshotSegmentMetadata> metadataList =
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata> metadataList =
snapshotMetadata.getMetadataListList();

// Skip all already reach schedule time snapshot segments
Expand Down Expand Up @@ -125,9 +125,10 @@ private CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry(b
return Collections.emptyList();
}

SnapshotSegment snapshotSegment =
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
bucketSnapshotSegments.get(0);
List<DelayedIndex> indexList = snapshotSegment.getIndexesList();
List<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> indexList =
snapshotSegment.getIndexesList();
this.setCurrentSegmentEntryId(nextSegmentEntryId);
if (isRecover) {
this.asyncUpdateSnapshotLength();
Expand Down Expand Up @@ -170,7 +171,7 @@ private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex,
setNumberBucketDelayedMessages(numberMessages.getValue());
}

CompletableFuture<List<SnapshotSegment>> getRemainSnapshotSegment() {
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> getRemainSnapshotSegment() {
int nextSegmentEntryId = currentSegmentEntryId + 1;
if (nextSegmentEntryId > lastSegmentEntryId) {
return CompletableFuture.completedFuture(Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.RoaringBitmap;
Expand Down Expand Up @@ -75,16 +75,14 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
List<SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<>();
Map<Long, RoaringBitmap> bitMap = new HashMap<>();
SnapshotSegment snapshotSegment = new SnapshotSegment();
SnapshotSegment.Builder snapshotSegmentBuilder = SnapshotSegment.newBuilder();
SnapshotSegmentMetadata.Builder segmentMetadataBuilder = SnapshotSegmentMetadata.newBuilder();

List<Long> firstScheduleTimestamps = new ArrayList<>();
long currentTimestampUpperLimit = 0;
long currentFirstTimestamp = 0L;
while (!delayedIndexQueue.isEmpty()) {
DelayedIndex delayedIndex = snapshotSegment.addIndexe();
delayedIndexQueue.popToObject(delayedIndex);

DelayedIndex delayedIndex = delayedIndexQueue.peek();
long timestamp = delayedIndex.getTimestamp();
if (currentTimestampUpperLimit == 0) {
currentFirstTimestamp = timestamp;
Expand All @@ -102,13 +100,16 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
sharedQueue.add(timestamp, ledgerId, entryId);
}

delayedIndexQueue.pop();
numMessages++;

bitMap.computeIfAbsent(ledgerId, k -> new RoaringBitmap()).add(entryId, entryId + 1);

if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peekTimestamp() > currentTimestampUpperLimit
snapshotSegmentBuilder.addIndexes(delayedIndex);

if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit
|| (maxIndexesPerBucketSnapshotSegment != -1
&& snapshotSegment.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
&& snapshotSegmentBuilder.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
currentTimestampUpperLimit = 0;
Expand All @@ -128,8 +129,8 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
segmentMetadataList.add(segmentMetadataBuilder.build());
segmentMetadataBuilder.clear();

bucketSnapshotSegments.add(snapshotSegment);
snapshotSegment = new SnapshotSegment();
bucketSnapshotSegments.add(snapshotSegmentBuilder.build());
snapshotSegmentBuilder.clear();
}
}

Expand All @@ -152,8 +153,8 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(

// Add the first snapshot segment last message to snapshotSegmentLastMessageTable
checkArgument(!bucketSnapshotSegments.isEmpty());
SnapshotSegment firstSnapshotSegment = bucketSnapshotSegments.get(0);
DelayedIndex lastDelayedIndex = firstSnapshotSegment.getIndexeAt(firstSnapshotSegment.getIndexesCount() - 1);
SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0);
DelayedIndex lastDelayedIndex = snapshotSegment.getIndexes(snapshotSegment.getIndexesCount() - 1);
Pair<ImmutableBucket, DelayedIndex> result = Pair.of(bucket, lastDelayedIndex);

CompletableFuture<Long> future = asyncSaveBucketSnapshot(bucket,
Expand Down
Loading

0 comments on commit e3e1c08

Please sign in to comment.