Skip to content

Commit

Permalink
[fix][broker] Fix avoid future of clear delayed message can't complete (
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Apr 15, 2023
1 parent 091ee25 commit 6152cc8
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.FutureUtil;

public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory {

Expand Down Expand Up @@ -72,6 +78,28 @@ public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers d
delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets);
}

/**
* Clean up residual snapshot data.
* If tracker has not been created or has been closed, then we can't clean up the snapshot with `tracker.clear`,
* this method can clean up the residual snapshots without creating a tracker.
*/
public CompletableFuture<Void> cleanResidualSnapshots(ManagedCursor cursor) {
Map<String, String> cursorProperties = cursor.getCursorProperties();
List<CompletableFuture<Void>> futures = new ArrayList<>();
FutureUtil.Sequencer<Void> sequencer = FutureUtil.Sequencer.create();
cursorProperties.forEach((k, v) -> {
if (k != null && v != null && k.startsWith(BucketDelayedDeliveryTracker.DELAYED_BUCKET_KEY_PREFIX)) {
CompletableFuture<Void> future = sequencer.sequential(() -> {
return cursor.removeCursorProperty(k)
.thenCompose(__ -> bucketSnapshotStorage.deleteBucketSnapshot(Long.parseLong(v)));
});
futures.add(future);
}
});

return FutureUtil.waitForAll(futures);
}

@Override
public void close() throws Exception {
if (bucketSnapshotStorage != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/
package org.apache.pulsar.broker.delayed.bucket;

import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.DELAYED_BUCKET_KEY_PREFIX;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -41,7 +41,6 @@
@AllArgsConstructor
abstract class Bucket {

static final String DELAYED_BUCKET_KEY_PREFIX = CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket";
static final String DELIMITER = "_";
static final int MaxRetryTimes = 3;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.broker.delayed.bucket;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -66,6 +66,8 @@
@ThreadSafe
public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker {

public static final String DELAYED_BUCKET_KEY_PREFIX = CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket";

static final CompletableFuture<Long> NULL_LONG_PROMISE = CompletableFuture.completedFuture(null);

static final int AsyncOperationTimeoutSeconds = 60;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
Expand Down Expand Up @@ -1098,19 +1099,15 @@ public CompletableFuture<Void> clearDelayedMessages() {
return CompletableFuture.completedFuture(null);
}

if (delayedDeliveryTracker.isEmpty() && topic.getBrokerService()
.getDelayedDeliveryTrackerFactory() instanceof BucketDelayedDeliveryTrackerFactory) {
synchronized (this) {
if (delayedDeliveryTracker.isEmpty()) {
delayedDeliveryTracker = Optional
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
}
}
}

if (delayedDeliveryTracker.isPresent()) {
return this.delayedDeliveryTracker.get().clear();
} else {
DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory =
topic.getBrokerService().getDelayedDeliveryTrackerFactory();
if (delayedDeliveryTrackerFactory instanceof BucketDelayedDeliveryTrackerFactory
bucketDelayedDeliveryTrackerFactory) {
return bucketDelayedDeliveryTrackerFactory.cleanResidualSnapshots(cursor);
}
return CompletableFuture.completedFuture(null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
import org.apache.pulsar.broker.namespace.NamespaceService;
Expand Down Expand Up @@ -1169,15 +1170,21 @@ private void asyncDeleteCursorWithClearDelayedMessage(String subscriptionName,
}

Dispatcher dispatcher = persistentSubscription.getDispatcher();
final Dispatcher temporaryDispatcher;
if (dispatcher == null) {
log.info("[{}][{}] Dispatcher is null, try to create temporary dispatcher to clear delayed message", topic,
subscriptionName);
dispatcher = temporaryDispatcher =
new PersistentDispatcherMultipleConsumers(this, persistentSubscription.cursor,
persistentSubscription);
} else {
temporaryDispatcher = null;
DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory =
brokerService.getDelayedDeliveryTrackerFactory();
if (delayedDeliveryTrackerFactory instanceof BucketDelayedDeliveryTrackerFactory
bucketDelayedDeliveryTrackerFactory) {
ManagedCursor cursor = persistentSubscription.getCursor();
bucketDelayedDeliveryTrackerFactory.cleanResidualSnapshots(cursor).whenComplete((__, ex) -> {
if (ex != null) {
unsubscribeFuture.completeExceptionally(ex);
} else {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
}
});
}
return;
}

dispatcher.clearDelayedMessages().whenComplete((__, ex) -> {
Expand All @@ -1186,9 +1193,6 @@ private void asyncDeleteCursorWithClearDelayedMessage(String subscriptionName,
} else {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
}
if (temporaryDispatcher != null) {
temporaryDispatcher.close();
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,4 +305,52 @@ public void testBucketDelayedIndexMetrics() throws Exception {
assertTrue(namespaceMetric.isPresent());
assertEquals(6, namespaceMetric.get().value);
}

@Test
public void testDelete() throws Exception {
String topic = BrokerTestUtil.newUniqueName("persistent://public/default/testDelete");

@Cleanup
Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Shared)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

for (int i = 0; i < 1000; i++) {
producer.newMessage()
.value("msg")
.deliverAfter(1, TimeUnit.HOURS)
.send();
}

Dispatcher dispatcher = pulsar.getBrokerService().getTopicReference(topic).get().getSubscription("sub").getDispatcher();
Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 1000));

Map<String, String> cursorProperties =
((PersistentDispatcherMultipleConsumers) dispatcher).getCursor().getCursorProperties();
List<Long> bucketIds = cursorProperties.entrySet().stream()
.filter(x -> x.getKey().startsWith(CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket")).map(
x -> Long.valueOf(x.getValue())).toList();

assertTrue(bucketIds.size() > 0);

admin.topics().delete(topic, true);

for (Long bucketId : bucketIds) {
try {
LedgerHandle ledgerHandle =
pulsarTestContext.getBookKeeperClient()
.openLedger(bucketId, BookKeeper.DigestType.CRC32C, new byte[]{});
Assert.fail("Should fail");
} catch (BKException.BKNoSuchLedgerExistsException e) {
// ignore it
}
}
}
}

0 comments on commit 6152cc8

Please sign in to comment.