diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index 25b31b4f2b7bd..31b335e1aea68 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -566,7 +566,6 @@ **/ResourceUsage.proto **/TransactionPendingAck.proto - **/DelayedMessageIndexBucketSegment.proto @@ -611,7 +610,6 @@ ${project.basedir}/src/main/proto/TransactionPendingAck.proto ${project.basedir}/src/main/proto/ResourceUsage.proto - ${project.basedir}/src/main/proto/DelayedMessageIndexBucketSegment.proto generated-sources/lightproto/java generated-sources/lightproto/java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index 040bbbc586f49..18a4c322f7b27 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -19,7 +19,7 @@ package org.apache.pulsar.broker.delayed.bucket; import com.google.protobuf.InvalidProtocolBufferException; -import io.netty.buffer.ByteBuf; +import java.io.IOException; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; @@ -36,8 +36,8 @@ import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata; -import org.apache.pulsar.broker.delayed.proto.SnapshotSegment; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; import org.apache.pulsar.common.util.FutureUtil; @Slf4j @@ -126,8 +126,7 @@ private CompletableFuture addSnapshotSegments(LedgerHandle ledgerHandle, private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) { try { - ByteBuf entryBuffer = ledgerEntry.getEntryBuffer(); - return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer()); + return SnapshotMetadata.parseFrom(ledgerEntry.getEntry()); } catch (InvalidProtocolBufferException e) { throw new BucketSnapshotSerializationException(e); } @@ -135,14 +134,15 @@ private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) { private List parseSnapshotSegmentEntries(Enumeration entryEnumeration) { List snapshotMetadataList = new ArrayList<>(); - while (entryEnumeration.hasMoreElements()) { - LedgerEntry ledgerEntry = entryEnumeration.nextElement(); - SnapshotSegment snapshotSegment = new SnapshotSegment(); - ByteBuf entryBuffer = ledgerEntry.getEntryBuffer(); - snapshotSegment.parseFrom(entryBuffer, entryBuffer.readableBytes()); - snapshotMetadataList.add(snapshotSegment); + try { + while (entryEnumeration.hasMoreElements()) { + LedgerEntry ledgerEntry = entryEnumeration.nextElement(); + snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry())); + } + return snapshotMetadataList; + } catch (IOException e) { + throw new BucketSnapshotSerializationException(e); } - return snapshotMetadataList; } @NotNull diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java index a1693b1553d97..4d7d3aa512be6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java @@ -31,8 +31,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata; -import org.apache.pulsar.broker.delayed.proto.SnapshotSegment; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; import org.roaringbitmap.RoaringBitmap; @@ -133,8 +132,8 @@ long getAndUpdateBucketId() { } CompletableFuture asyncSaveBucketSnapshot( - ImmutableBucket bucket, SnapshotMetadata snapshotMetadata, - List bucketSnapshotSegments) { + ImmutableBucket bucket, DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata, + List bucketSnapshotSegments) { final String bucketKey = bucket.bucketKey(); final String cursorName = Codec.decode(cursor.getName()); final String topicName = dispatcherName.substring(0, dispatcherName.lastIndexOf(" / " + cursorName)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index c90064c9137bb..b17387e276e2b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -53,8 +53,8 @@ import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker; -import org.apache.pulsar.broker.delayed.proto.DelayedIndex; -import org.apache.pulsar.broker.delayed.proto.SnapshotSegment; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.common.policies.data.stats.TopicMetricBean; import org.apache.pulsar.common.util.FutureUtil; @@ -286,7 +286,8 @@ private void afterCreateImmutableBucket(Pair immu // Put indexes back into the shared queue and downgrade to memory mode synchronized (BucketDelayedDeliveryTracker.this) { immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> { - for (SnapshotSegment snapshotSegment : snapshotSegments) { + for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment : + snapshotSegments) { for (DelayedIndex delayedIndex : snapshotSegment.getIndexesList()) { sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(), delayedIndex.getLedgerId(), delayedIndex.getEntryId()); @@ -449,7 +450,7 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot(List>> getRemainFutures = + List>> getRemainFutures = buckets.stream().map(ImmutableBucket::getRemainSnapshotSegment).toList(); return FutureUtil.waitForAll(getRemainFutures) @@ -600,11 +601,11 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa bucket.asyncDeleteBucketSnapshot(stats); return; } - DelayedIndex + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex lastDelayedIndex = indexList.get(indexList.size() - 1); this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId(), bucket); - for (DelayedIndex index : indexList) { + for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) { sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), index.getEntryId()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java index 7464ef9cd3f63..51c89bed47af2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java @@ -20,8 +20,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata; -import org.apache.pulsar.broker.delayed.proto.SnapshotSegment; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; public interface BucketSnapshotStorage { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java index 006938e9ed271..5655a26878296 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java @@ -24,8 +24,8 @@ import java.util.PriorityQueue; import javax.annotation.concurrent.NotThreadSafe; import lombok.AllArgsConstructor; -import org.apache.pulsar.broker.delayed.proto.DelayedIndex; -import org.apache.pulsar.broker.delayed.proto.SnapshotSegment; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; @NotThreadSafe class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue { @@ -40,8 +40,8 @@ static class Node { } private static final Comparator COMPARATOR_NODE = (node1, node2) -> DelayedIndexQueue.COMPARATOR.compare( - node1.segmentList.get(node1.segmentListCursor).getIndexeAt(node1.segmentCursor), - node2.segmentList.get(node2.segmentListCursor).getIndexeAt(node2.segmentCursor)); + node1.segmentList.get(node1.segmentListCursor).getIndexes(node1.segmentCursor), + node2.segmentList.get(node2.segmentListCursor).getIndexes(node2.segmentCursor)); private final PriorityQueue kpq; @@ -77,7 +77,7 @@ private DelayedIndex getValue(boolean needAdvanceCursor) { Objects.requireNonNull(node); SnapshotSegment snapshotSegment = node.segmentList.get(node.segmentListCursor); - DelayedIndex delayedIndex = snapshotSegment.getIndexeAt(node.segmentCursor); + DelayedIndex delayedIndex = snapshotSegment.getIndexes(node.segmentCursor); if (!needAdvanceCursor) { return delayedIndex; } @@ -104,16 +104,4 @@ private DelayedIndex getValue(boolean needAdvanceCursor) { return delayedIndex; } - - @Override - public void popToObject(DelayedIndex delayedIndex) { - DelayedIndex value = getValue(true); - delayedIndex.copyFrom(value); - } - - @Override - public long peekTimestamp() { - DelayedIndex value = getValue(false); - return value.getTimestamp(); - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java index f1209a3137a45..dee476c376ec9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java @@ -20,10 +20,10 @@ import java.util.Comparator; import java.util.Objects; -import org.apache.pulsar.broker.delayed.proto.DelayedIndex; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; interface DelayedIndexQueue { - Comparator COMPARATOR = (o1, o2) -> { + Comparator COMPARATOR = (o1, o2) -> { if (!Objects.equals(o1.getTimestamp(), o2.getTimestamp())) { return Long.compare(o1.getTimestamp(), o2.getTimestamp()); } else if (!Objects.equals(o1.getLedgerId(), o2.getLedgerId())) { @@ -35,11 +35,7 @@ interface DelayedIndexQueue { boolean isEmpty(); - DelayedIndex peek(); + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek(); - DelayedIndex pop(); - - void popToObject(DelayedIndex delayedIndex); - - long peekTimestamp(); + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index 0932f51f350ce..57de5c84fcd82 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -32,9 +32,9 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.mutable.MutableLong; -import org.apache.pulsar.broker.delayed.proto.DelayedIndex; -import org.apache.pulsar.broker.delayed.proto.SnapshotSegment; -import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata; import org.apache.pulsar.common.util.FutureUtil; import org.roaringbitmap.InvalidRoaringFormat; import org.roaringbitmap.RoaringBitmap; @@ -43,7 +43,7 @@ class ImmutableBucket extends Bucket { @Setter - private List snapshotSegments; + private List snapshotSegments; boolean merging = false; @@ -55,7 +55,7 @@ class ImmutableBucket extends Bucket { super(dispatcherName, cursor, sequencer, storage, startLedgerId, endLedgerId); } - public Optional> getSnapshotSegments() { + public Optional> getSnapshotSegments() { return Optional.ofNullable(snapshotSegments); } @@ -84,7 +84,7 @@ private CompletableFuture> asyncLoadNextBucketSnapshotEntry(b } }), BucketSnapshotPersistenceException.class, MaxRetryTimes) .thenApply(snapshotMetadata -> { - List metadataList = + List metadataList = snapshotMetadata.getMetadataListList(); // Skip all already reach schedule time snapshot segments @@ -125,9 +125,10 @@ private CompletableFuture> asyncLoadNextBucketSnapshotEntry(b return Collections.emptyList(); } - SnapshotSegment snapshotSegment = + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0); - List indexList = snapshotSegment.getIndexesList(); + List indexList = + snapshotSegment.getIndexesList(); this.setCurrentSegmentEntryId(nextSegmentEntryId); if (isRecover) { this.asyncUpdateSnapshotLength(); @@ -170,7 +171,7 @@ private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex, setNumberBucketDelayedMessages(numberMessages.getValue()); } - CompletableFuture> getRemainSnapshotSegment() { + CompletableFuture> getRemainSnapshotSegment() { int nextSegmentEntryId = currentSegmentEntryId + 1; if (nextSegmentEntryId > lastSegmentEntryId) { return CompletableFuture.completedFuture(Collections.emptyList()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java index b7e9e68f1bdc7..f404d5d02c15a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java @@ -30,10 +30,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.broker.delayed.proto.DelayedIndex; -import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata; -import org.apache.pulsar.broker.delayed.proto.SnapshotSegment; -import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; import org.roaringbitmap.RoaringBitmap; @@ -75,16 +75,14 @@ Pair createImmutableBucketAndAsyncPersistent( List bucketSnapshotSegments = new ArrayList<>(); List segmentMetadataList = new ArrayList<>(); Map bitMap = new HashMap<>(); - SnapshotSegment snapshotSegment = new SnapshotSegment(); + SnapshotSegment.Builder snapshotSegmentBuilder = SnapshotSegment.newBuilder(); SnapshotSegmentMetadata.Builder segmentMetadataBuilder = SnapshotSegmentMetadata.newBuilder(); List firstScheduleTimestamps = new ArrayList<>(); long currentTimestampUpperLimit = 0; long currentFirstTimestamp = 0L; while (!delayedIndexQueue.isEmpty()) { - DelayedIndex delayedIndex = snapshotSegment.addIndexe(); - delayedIndexQueue.popToObject(delayedIndex); - + DelayedIndex delayedIndex = delayedIndexQueue.peek(); long timestamp = delayedIndex.getTimestamp(); if (currentTimestampUpperLimit == 0) { currentFirstTimestamp = timestamp; @@ -102,13 +100,16 @@ Pair createImmutableBucketAndAsyncPersistent( sharedQueue.add(timestamp, ledgerId, entryId); } + delayedIndexQueue.pop(); numMessages++; bitMap.computeIfAbsent(ledgerId, k -> new RoaringBitmap()).add(entryId, entryId + 1); - if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peekTimestamp() > currentTimestampUpperLimit + snapshotSegmentBuilder.addIndexes(delayedIndex); + + if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit || (maxIndexesPerBucketSnapshotSegment != -1 - && snapshotSegment.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) { + && snapshotSegmentBuilder.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) { segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp); segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp); currentTimestampUpperLimit = 0; @@ -128,8 +129,8 @@ Pair createImmutableBucketAndAsyncPersistent( segmentMetadataList.add(segmentMetadataBuilder.build()); segmentMetadataBuilder.clear(); - bucketSnapshotSegments.add(snapshotSegment); - snapshotSegment = new SnapshotSegment(); + bucketSnapshotSegments.add(snapshotSegmentBuilder.build()); + snapshotSegmentBuilder.clear(); } } @@ -152,8 +153,8 @@ Pair createImmutableBucketAndAsyncPersistent( // Add the first snapshot segment last message to snapshotSegmentLastMessageTable checkArgument(!bucketSnapshotSegments.isEmpty()); - SnapshotSegment firstSnapshotSegment = bucketSnapshotSegments.get(0); - DelayedIndex lastDelayedIndex = firstSnapshotSegment.getIndexeAt(firstSnapshotSegment.getIndexesCount() - 1); + SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0); + DelayedIndex lastDelayedIndex = snapshotSegment.getIndexes(snapshotSegment.getIndexesCount() - 1); Pair result = Pair.of(bucket, lastDelayedIndex); CompletableFuture future = asyncSaveBucketSnapshot(bucket, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java index 4faee3b17f17f..b8d54bd78b428 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java @@ -19,7 +19,7 @@ package org.apache.pulsar.broker.delayed.bucket; import javax.annotation.concurrent.NotThreadSafe; -import org.apache.pulsar.broker.delayed.proto.DelayedIndex; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; @NotThreadSafe @@ -41,28 +41,17 @@ public boolean isEmpty() { } @Override - public DelayedIndex peek() { - DelayedIndex delayedIndex = new DelayedIndex().setTimestamp(queue.peekN1()) - .setLedgerId(queue.peekN2()).setEntryId(queue.peekN3()); + public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek() { + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex = + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setTimestamp(queue.peekN1()) + .setLedgerId(queue.peekN2()).setEntryId(queue.peekN3()).build(); return delayedIndex; } @Override - public DelayedIndex pop() { - DelayedIndex peek = peek(); + public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop() { + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek = peek(); queue.pop(); return peek; } - - @Override - public void popToObject(DelayedIndex delayedIndex) { - delayedIndex.setTimestamp(queue.peekN1()) - .setLedgerId(queue.peekN2()).setEntryId(queue.peekN3()); - queue.pop(); - } - - @Override - public long peekTimestamp() { - return queue.peekN1(); - } } diff --git a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto deleted file mode 100644 index 633d6a8f1615c..0000000000000 --- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -syntax = "proto2"; - -package pulsar.delay; -option java_package = "org.apache.pulsar.broker.delayed.proto"; -option optimize_for = SPEED; -option java_multiple_files = true; - -message DelayedIndex { - required uint64 timestamp = 1; - required uint64 ledger_id = 2; - required uint64 entry_id = 3; -} - -message SnapshotSegment { - repeated DelayedIndex indexes = 1; - map delayed_index_bit_map = 2; -} diff --git a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto similarity index 85% rename from pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto rename to pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto index 01b770c567d09..6996b860c5249 100644 --- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto +++ b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto @@ -21,7 +21,12 @@ syntax = "proto2"; package pulsar.delay; option java_package = "org.apache.pulsar.broker.delayed.proto"; option optimize_for = SPEED; -option java_multiple_files = true; + +message DelayedIndex { + required uint64 timestamp = 1; + required uint64 ledger_id = 2; + required uint64 entry_id = 3; +} message SnapshotSegmentMetadata { map delayed_index_bit_map = 1; @@ -29,6 +34,10 @@ message SnapshotSegmentMetadata { required uint64 min_schedule_timestamp = 3; } +message SnapshotSegment { + repeated DelayedIndex indexes = 1; +} + message SnapshotMetadata { repeated SnapshotSegmentMetadata metadata_list = 1; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java index d26f38fa2bc2b..7cb6b8d5865bb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java @@ -29,10 +29,7 @@ import java.util.concurrent.ExecutionException; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage; -import org.apache.pulsar.broker.delayed.proto.DelayedIndex; -import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata; -import org.apache.pulsar.broker.delayed.proto.SnapshotSegment; -import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; import org.apache.pulsar.common.util.FutureUtil; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -63,8 +60,9 @@ protected void cleanup() throws Exception { @Test public void testCreateSnapshot() throws ExecutionException, InterruptedException { - SnapshotMetadata snapshotMetadata = SnapshotMetadata.newBuilder().build(); - List bucketSnapshotSegments = new ArrayList<>(); + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata = + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().build(); + List bucketSnapshotSegments = new ArrayList<>(); CompletableFuture future = bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME); @@ -74,23 +72,24 @@ public void testCreateSnapshot() throws ExecutionException, InterruptedException @Test public void testGetSnapshot() throws ExecutionException, InterruptedException { - SnapshotSegmentMetadata segmentMetadata = - SnapshotSegmentMetadata.newBuilder() + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata = + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder() .setMinScheduleTimestamp(System.currentTimeMillis()) .setMaxScheduleTimestamp(System.currentTimeMillis()) .putDelayedIndexBitMap(100L, ByteString.copyFrom(new byte[1])).build(); - SnapshotMetadata snapshotMetadata = - SnapshotMetadata.newBuilder() + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata = + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder() .addMetadataList(segmentMetadata) .build(); - List bucketSnapshotSegments = new ArrayList<>(); + List bucketSnapshotSegments = new ArrayList<>(); long timeMillis = System.currentTimeMillis(); - DelayedIndex delayedIndex = new DelayedIndex().setLedgerId(100L).setEntryId(10L) - .setTimestamp(timeMillis); - SnapshotSegment snapshotSegment = new SnapshotSegment(); - snapshotSegment.addIndexe().copyFrom(delayedIndex); + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex = + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L) + .setTimestamp(timeMillis).build(); + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment = + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build(); bucketSnapshotSegments.add(snapshotSegment); bucketSnapshotSegments.add(snapshotSegment); @@ -100,13 +99,13 @@ public void testGetSnapshot() throws ExecutionException, InterruptedException { Long bucketId = future.get(); Assert.assertNotNull(bucketId); - CompletableFuture> bucketSnapshotSegment = + CompletableFuture> bucketSnapshotSegment = bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3); - List snapshotSegments = bucketSnapshotSegment.get(); + List snapshotSegments = bucketSnapshotSegment.get(); Assert.assertEquals(2, snapshotSegments.size()); - for (SnapshotSegment segment : snapshotSegments) { - for (DelayedIndex index : segment.getIndexesList()) { + for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment segment : snapshotSegments) { + for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : segment.getIndexesList()) { Assert.assertEquals(100L, index.getLedgerId()); Assert.assertEquals(10L, index.getEntryId()); Assert.assertEquals(timeMillis, index.getTimestamp()); @@ -122,17 +121,17 @@ public void testGetSnapshotMetadata() throws ExecutionException, InterruptedExce map.put(100L, ByteString.copyFrom("test1", StandardCharsets.UTF_8)); map.put(200L, ByteString.copyFrom("test2", StandardCharsets.UTF_8)); - SnapshotSegmentMetadata segmentMetadata = - SnapshotSegmentMetadata.newBuilder() + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata = + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder() .setMaxScheduleTimestamp(timeMillis) .setMinScheduleTimestamp(timeMillis) .putAllDelayedIndexBitMap(map).build(); - SnapshotMetadata snapshotMetadata = - SnapshotMetadata.newBuilder() + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata = + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder() .addMetadataList(segmentMetadata) .build(); - List bucketSnapshotSegments = new ArrayList<>(); + List bucketSnapshotSegments = new ArrayList<>(); CompletableFuture future = bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, @@ -140,10 +139,10 @@ public void testGetSnapshotMetadata() throws ExecutionException, InterruptedExce Long bucketId = future.get(); Assert.assertNotNull(bucketId); - SnapshotMetadata bucketSnapshotMetadata = + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata bucketSnapshotMetadata = bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId).get(); - SnapshotSegmentMetadata metadata = + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata metadata = bucketSnapshotMetadata.getMetadataList(0); Assert.assertEquals(timeMillis, metadata.getMaxScheduleTimestamp()); @@ -153,9 +152,9 @@ public void testGetSnapshotMetadata() throws ExecutionException, InterruptedExce @Test public void testDeleteSnapshot() throws ExecutionException, InterruptedException { - SnapshotMetadata snapshotMetadata = - SnapshotMetadata.newBuilder().build(); - List bucketSnapshotSegments = new ArrayList<>(); + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata = + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().build(); + List bucketSnapshotSegments = new ArrayList<>(); CompletableFuture future = bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME); @@ -174,22 +173,24 @@ public void testDeleteSnapshot() throws ExecutionException, InterruptedException @Test public void testGetBucketSnapshotLength() throws ExecutionException, InterruptedException { - SnapshotSegmentMetadata segmentMetadata = - SnapshotSegmentMetadata.newBuilder() + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata = + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder() .setMinScheduleTimestamp(System.currentTimeMillis()) .setMaxScheduleTimestamp(System.currentTimeMillis()) .putDelayedIndexBitMap(100L, ByteString.copyFrom(new byte[1])).build(); - SnapshotMetadata snapshotMetadata = - SnapshotMetadata.newBuilder() + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata = + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder() .addMetadataList(segmentMetadata) .build(); - List bucketSnapshotSegments = new ArrayList<>(); + List bucketSnapshotSegments = new ArrayList<>(); long timeMillis = System.currentTimeMillis(); - DelayedIndex delayedIndex = new DelayedIndex().setLedgerId(100L).setEntryId(10L).setTimestamp(timeMillis); - SnapshotSegment snapshotSegment = new SnapshotSegment(); - snapshotSegment.addIndexe().copyFrom(delayedIndex); + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex = + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L) + .setTimestamp(timeMillis).build(); + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment = + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build(); bucketSnapshotSegments.add(snapshotSegment); bucketSnapshotSegments.add(snapshotSegment); @@ -206,22 +207,24 @@ public void testGetBucketSnapshotLength() throws ExecutionException, Interrupted @Test public void testConcurrencyGet() throws ExecutionException, InterruptedException { - SnapshotSegmentMetadata segmentMetadata = - SnapshotSegmentMetadata.newBuilder() + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata = + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder() .setMinScheduleTimestamp(System.currentTimeMillis()) .setMaxScheduleTimestamp(System.currentTimeMillis()) .putDelayedIndexBitMap(100L, ByteString.copyFrom(new byte[1])).build(); - SnapshotMetadata snapshotMetadata = - SnapshotMetadata.newBuilder() + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata = + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder() .addMetadataList(segmentMetadata) .build(); - List bucketSnapshotSegments = new ArrayList<>(); + List bucketSnapshotSegments = new ArrayList<>(); long timeMillis = System.currentTimeMillis(); - DelayedIndex delayedIndex = new DelayedIndex().setLedgerId(100L).setEntryId(10L).setTimestamp(timeMillis); - SnapshotSegment snapshotSegment = new SnapshotSegment(); - snapshotSegment.addIndexe().copyFrom(delayedIndex); + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex = + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L) + .setTimestamp(timeMillis).build(); + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment = + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build(); bucketSnapshotSegments.add(snapshotSegment); bucketSnapshotSegments.add(snapshotSegment); @@ -234,7 +237,7 @@ public void testConcurrencyGet() throws ExecutionException, InterruptedException List> futures = new ArrayList<>(); for (int i = 0; i < 100; i++) { CompletableFuture future0 = CompletableFuture.runAsync(() -> { - List list = + List list = bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3).join(); Assert.assertTrue(list.size() > 0); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java index dc1c0e09ca276..9e924bdeda341 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java @@ -36,8 +36,8 @@ import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage; -import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata; -import org.apache.pulsar.broker.delayed.proto.SnapshotSegment; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; import org.apache.pulsar.common.util.FutureUtil; @Slf4j @@ -139,9 +139,13 @@ public CompletableFuture> getBucketSnapshotSegment(long bu long lastEntryId = Math.min(lastSegmentEntryId, this.bucketSnapshots.get(bucketId).size()); for (int i = (int) firstSegmentEntryId; i <= lastEntryId ; i++) { ByteBuf byteBuf = this.bucketSnapshots.get(bucketId).get(i); - SnapshotSegment snapshotSegment = new SnapshotSegment(); - snapshotSegment.parseFrom(byteBuf, byteBuf.readableBytes()); - snapshotSegments.add(snapshotSegment); + SnapshotSegment snapshotSegment; + try { + snapshotSegment = SnapshotSegment.parseFrom(byteBuf.nioBuffer()); + snapshotSegments.add(snapshotSegment); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } } return snapshotSegments; }, executorService); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java index 5dc3dcc7cb9a7..8f87f0d49a20c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java @@ -23,8 +23,8 @@ import java.util.List; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.delayed.proto.DelayedIndex; -import org.apache.pulsar.broker.delayed.proto.SnapshotSegment; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; import org.testng.Assert; import org.testng.annotations.Test; @@ -35,19 +35,27 @@ public class DelayedIndexQueueTest { @Test public void testCompare() { DelayedIndex delayedIndex = - new DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L); + DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L) + .build(); DelayedIndex delayedIndex2 = - new DelayedIndex().setTimestamp(2).setLedgerId(2L).setEntryId(2L); + DelayedIndex.newBuilder().setTimestamp(2).setLedgerId(2L).setEntryId(2L) + .build(); Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0); delayedIndex = - new DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L); + DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L) + .build(); delayedIndex2 = - new DelayedIndex().setTimestamp(1).setLedgerId(2L).setEntryId(2L); + DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(2L).setEntryId(2L) + .build(); Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0); - delayedIndex = new DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L); - delayedIndex2 = new DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(2L); + delayedIndex = + DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L) + .build(); + delayedIndex2 = + DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(2L) + .build(); Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0); } @@ -55,19 +63,21 @@ public void testCompare() { public void testCombinedSegmentDelayedIndexQueue() { List listA = new ArrayList<>(); for (int i = 0; i < 10; i++) { - DelayedIndex delayedIndex = new DelayedIndex().setTimestamp(i).setLedgerId(1L).setEntryId(1L); + DelayedIndex delayedIndex = + DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(1L).setEntryId(1L) + .build(); listA.add(delayedIndex); } - SnapshotSegment snapshotSegmentA1 = new SnapshotSegment(); - snapshotSegmentA1.addAllIndexes(listA); + SnapshotSegment snapshotSegmentA1 = SnapshotSegment.newBuilder().addAllIndexes(listA).build(); List listA2 = new ArrayList<>(); for (int i = 10; i < 20; i++) { - DelayedIndex delayedIndex = new DelayedIndex().setTimestamp(i).setLedgerId(1L).setEntryId(1L); + DelayedIndex delayedIndex = + DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(1L).setEntryId(1L) + .build(); listA2.add(delayedIndex); } - SnapshotSegment snapshotSegmentA2 = new SnapshotSegment(); - snapshotSegmentA2.addAllIndexes(listA2); + SnapshotSegment snapshotSegmentA2 = SnapshotSegment.newBuilder().addAllIndexes(listA2).build(); List segmentListA = new ArrayList<>(); segmentListA.add(snapshotSegmentA1); @@ -75,32 +85,36 @@ public void testCombinedSegmentDelayedIndexQueue() { List listB = new ArrayList<>(); for (int i = 0; i < 9; i++) { - DelayedIndex delayedIndex = new DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(1L); + DelayedIndex delayedIndex = + DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(1L) + .build(); - DelayedIndex delayedIndex2 = new DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(2L); + DelayedIndex delayedIndex2 = + DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(2L) + .build(); listB.add(delayedIndex); listB.add(delayedIndex2); } - SnapshotSegment snapshotSegmentB = new SnapshotSegment(); - snapshotSegmentB.addAllIndexes(listB); + SnapshotSegment snapshotSegmentB = SnapshotSegment.newBuilder().addAllIndexes(listB).build(); List segmentListB = new ArrayList<>(); segmentListB.add(snapshotSegmentB); - segmentListB.add(new SnapshotSegment()); + segmentListB.add(SnapshotSegment.newBuilder().build()); List listC = new ArrayList<>(); for (int i = 10; i < 30; i+=2) { DelayedIndex delayedIndex = - new DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(1L); + DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(1L) + .build(); DelayedIndex delayedIndex2 = - new DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(2L); + DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(2L) + .build(); listC.add(delayedIndex); listC.add(delayedIndex2); } - SnapshotSegment snapshotSegmentC = new SnapshotSegment(); - snapshotSegmentC.addAllIndexes(listC); + SnapshotSegment snapshotSegmentC = SnapshotSegment.newBuilder().addAllIndexes(listC).build(); List segmentListC = new ArrayList<>(); segmentListC.add(snapshotSegmentC); @@ -109,14 +123,11 @@ public void testCombinedSegmentDelayedIndexQueue() { int count = 0; while (!delayedIndexQueue.isEmpty()) { - DelayedIndex pop = new DelayedIndex(); - delayedIndexQueue.popToObject(pop); + DelayedIndex pop = delayedIndexQueue.pop(); log.info("{} , {}, {}", pop.getTimestamp(), pop.getLedgerId(), pop.getEntryId()); count++; if (!delayedIndexQueue.isEmpty()) { DelayedIndex peek = delayedIndexQueue.peek(); - long timestamp = delayedIndexQueue.peekTimestamp(); - Assert.assertEquals(timestamp, peek.getTimestamp()); Assert.assertTrue(COMPARATOR.compare(peek, pop) >= 0); } } @@ -136,13 +147,10 @@ public void TripleLongPriorityDelayedIndexQueueTest() { int count = 0; while (!delayedIndexQueue.isEmpty()) { - DelayedIndex pop = new DelayedIndex(); - delayedIndexQueue.popToObject(pop); + DelayedIndex pop = delayedIndexQueue.pop(); count++; if (!delayedIndexQueue.isEmpty()) { DelayedIndex peek = delayedIndexQueue.peek(); - long timestamp = delayedIndexQueue.peekTimestamp(); - Assert.assertEquals(timestamp, peek.getTimestamp()); Assert.assertTrue(COMPARATOR.compare(peek, pop) >= 0); } }