Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] When using the admin.topics().skipMessages API, the actual number of skipped messages may be less than the expected number when the value of skipNumber is greater than 8. #20262

Closed
2 tasks done
crossoverJie opened this issue May 8, 2023 · 1 comment · Fixed by #20326
Assignees
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@crossoverJie
Copy link
Contributor

crossoverJie commented May 8, 2023

Search before asking

  • I searched in the issues and found nothing similar.

Version

latest version

Minimal reproduce step

    @Test(dataProvider = "topicName")
    public void testSkipMessages(String topicName) throws Exception {
        final String subName = topicName;
        assertEquals(admin.topics().getList("prop-xyz/ns1"), new ArrayList<>());

        final String persistentTopicName = "persistent://prop-xyz/ns1/" + topicName;
        // Force to create a topic
        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 0);
        assertEquals(admin.topics().getList("prop-xyz/ns1"),
                List.of("persistent://prop-xyz/ns1/" + topicName));

        // create consumer and subscription
        @Cleanup
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(pulsar.getWebServiceAddress())
                .statsInterval(0, TimeUnit.SECONDS)
                .build();
        AtomicInteger total = new AtomicInteger();
        Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName)
                .messageListener(new MessageListener<byte[]>() {
                    @SneakyThrows
                    @Override
                    public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
                        if (total.get() %2 !=0){
                            consumer.acknowledge(msg);
                        }
                        total.incrementAndGet();
                    }
                })
                .subscriptionName(subName)
                .subscriptionType(SubscriptionType.Exclusive).subscribe();

        assertEquals(admin.topics().getSubscriptions(persistentTopicName), List.of(subName));

        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 100);
        TimeUnit.SECONDS.sleep(3);
        TopicStats topicStats = admin.topics().getStats(persistentTopicName);
        long msgBacklog = topicStats.getSubscriptions().get(subName).getMsgBacklog();
        log.info("back={}",msgBacklog);
        int skipNumber = 20;
        admin.topics().skipMessages(persistentTopicName, subName, skipNumber);
        topicStats = admin.topics().getStats(persistentTopicName);
        assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), msgBacklog - skipNumber);
    }

When I artificially created 50 hollow messages and skipped 20 messages, only 14 messages were actually skipped.

What did you expect to see?

The expected number of skipped messages is equal to the actual number of skipped messages.

What did you see instead?

The actual number of skips is less than the expected number of skips.
image

Anything else?

} else {
if (log.isDebugEnabled()) {
log.debug("[{}] deletePosition {} moved ahead without clearing deleteMsgs {} for cursor {}",
ledger.getName(), markDeletePosition, r.lowerEndpoint(), name);
}
}
return true;
} finally {
if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
((PositionImplRecyclable) r.lowerEndpoint()).recycle();
((PositionImplRecyclable) r.upperEndpoint()).recycle();
}
}
}, recyclePositionRangeConverter);

image

The reason for this issue is that the recycle() function reuses objects, causing the object referenced by r to change during runtime. When the loop count is greater than 8, the else block is entered, leading to an incorrect calculation of the message count.

I think there are two methods to fix this issue.

The first method is to perform a copy before assignment, similar to the following:

        public void setStartPosition(PositionImpl startPosition) {
            PositionImpl cp = new PositionImpl(startPosition.ledgerId, startPosition.entryId);
            this.startPosition = cp;
        }

        public void setEndPosition(PositionImpl endPosition) {
            PositionImpl cp = new PositionImpl(endPosition.ledgerId, endPosition.entryId);
            this.endPosition = cp;
        }
	
	// state.endPosition = r.lowerEndpoint();
	state.setEndPosition(r.lowerEndpoint());

	// state.startPosition = r.upperEndpoint();
	state.setStartPosition(r.upperEndpoint());

The second method is to remove recyclePositionRangeConverter.

image

any other suggestions for a better solution?

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@crossoverJie crossoverJie added the type/bug The PR fixed a bug or issue reported a bug label May 8, 2023
@crossoverJie
Copy link
Contributor Author

@rdhabalia
This issue is related to this PR, please take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant