-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Open
Labels
lifecycle/staletype/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug
Description
Describe the bug
When produce message to topic and trigger topic compaction, and then the original message expired by retention policy, the message which compacted into the compact ledger will also be lost.
If we set topic policy into __change_events topic and trigger compaction. And then the original policy in __change_events topic has been expired, the topic policy will lost.
The following test will show this case.
@Test(timeOut = 30000)
public void testCompactWithMessageTimeout() throws Exception {
final String topic = "persistent://my-property/use/my-ns/testCompactWithMessageTimeout-" + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 1);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
// send 10 messages
for (int i = 0; i < 10; ++i) {
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
}
// trigger topic compaction
admin.topics().triggerCompaction(topic);
boolean succeed = retryStrategically((test) -> {
try {
return LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status);
} catch (PulsarAdminException e) {
return false;
}
}, 10, 200);
Assert.assertTrue(succeed);
// unload topic to trigger ledger roll over
admin.topics().unload(topic);
// change ledger retention time and trim expired ledger to ensure the compacted message will be expired
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic).get();
ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
managedLedgerConfig.setRetentionTime(1, TimeUnit.MILLISECONDS);
persistentTopic.getManagedLedger().trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
// send another 10 messages
for (int i = 10; i < 20; ++i) {
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
}
// trigger topic compaction
admin.topics().triggerCompaction(topic);
succeed = retryStrategically((test) -> {
try {
return LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status);
} catch (PulsarAdminException e) {
return false;
}
}, 10, 200);
Assert.assertTrue(succeed);
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.subscriptionName("test")
.readCompacted(true)
.startMessageId(MessageId.earliest)
.create();
// check message in compacted topic, it should contains the whole messages
for (int i = 0; i < 20; ++i) {
Message<String> msg = reader.readNext();
Assert.assertEquals(msg.getKey(), String.valueOf(i));
Assert.assertEquals(msg.getValue(), String.valueOf(i));
}
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
lifecycle/staletype/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug