Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable seeking individual topic partitions #829

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

severinson
Copy link

@severinson severinson commented Aug 16, 2022

Currently, seeking on a consumer with a KeyShared subscription fails. This PR removes an unnecessary check to seek the underlying partitionConsumer.

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also help add a test to cover the new changes.

@severinson
Copy link
Author

@codelipenghui I've added tests.

Comment on lines +583 to +585
if mid.partitionIdx < 0 {
return newError(SeekFailed, "partitionIdx is negative")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this break the non-partitioned topic message acknowledgment?
The partition index -1 means the message from a non-partitioned topic.

@freeznet @wolfstudy Please help confirm

Copy link
Author

@severinson severinson Aug 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that really true? The pre-existing code would do the following:

return c.consumers[mid.partitionIdx].Seek(mid)

This suggests that the partition index is 0 for non-partitioned topics. Otherwise seek() on a non-partitioned topic would've always resulted in an index out of bounds panic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Go SDK, the partition index less than 0 is not allowed.

// did we receive a valid partition index?
	if partition < 0 || partition >= len(c.consumers) {
		c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
			partition, len(c.consumers))
		return trackingMessageID{}, false
	}

And why do we need to modify the current logic, where the number of consumers is equivalent to the number of partitions, and each partition corresponds to a unique consumer.

The Go SDK currently does not support seek operations on partitioned topics, and this logic is still being implemented.

Copy link
Author

@severinson severinson Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. I have a few questions.

And why do we need to modify the current logic, where the number of consumers is equivalent to the number of partitions, and each partition corresponds to a unique consumer.

I don't understand what you mean. I'm not changing the current logic. I'm just passing through the message id to the partitionConsumer responsible for the partition.

The Go SDK currently does not support seek operations on partitioned topics, and this logic is still being implemented.

All right. Do you know what work still remains to have the Go client support seeking by partition?

@wolfstudy wolfstudy added this to the v0.10.0 milestone Aug 22, 2022
Copy link
Member

@wolfstudy wolfstudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few changes, please check

Comment on lines +583 to +585
if mid.partitionIdx < 0 {
return newError(SeekFailed, "partitionIdx is negative")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Go SDK, the partition index less than 0 is not allowed.

// did we receive a valid partition index?
	if partition < 0 || partition >= len(c.consumers) {
		c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
			partition, len(c.consumers))
		return trackingMessageID{}, false
	}

And why do we need to modify the current logic, where the number of consumers is equivalent to the number of partitions, and each partition corresponds to a unique consumer.

The Go SDK currently does not support seek operations on partitioned topics, and this logic is still being implemented.

@wolfstudy
Copy link
Member

If you want to directly perform the seek operation on a partitioned topic, you can refer to the behavior here. #782

@codelipenghui
Copy link
Contributor

@severinson After checking more details about this part. Before 2.8.0, the Java client only supported seek with the earliest/latest position. apache/pulsar#10033 has introduced a new API https://github.com/apache/pulsar/pull/10033/files#diff-6010f94fead60a5a3ed8aa58a37fe96fdf2cd9c5c1573989cdf89abcb9f9c256R613 which able to provide seek message ID for each partition or topic(For multi-topic/regex consumer)

I think the go client should also follow the same way.

@RobertIndie RobertIndie modified the milestones: v0.10.0, v0.11.0 Mar 27, 2023
@RobertIndie RobertIndie modified the milestones: v0.11.0, v0.12.0 Jul 4, 2023
@RobertIndie RobertIndie modified the milestones: v0.12.0, v0.13.0 Jan 10, 2024
@RobertIndie RobertIndie modified the milestones: v0.13.0, v0.14.0 Jul 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants