Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot seek on partitioned topic #213

Open
Samreay opened this issue May 16, 2024 · 3 comments
Open

Cannot seek on partitioned topic #213

Samreay opened this issue May 16, 2024 · 3 comments
Assignees

Comments

@Samreay
Copy link

Samreay commented May 16, 2024

As per apache/pulsar#3643, it seems pulsar should support seeking on partitioned topics.

This functionality seems partially missing in the C++ and python clients.

Reproduction

First, run a pulsar standalone instance:

docker run -it -p 6650:6650 -p 8080:8080 --tmpfs /pulsar/data apachepulsar/pulsar:3.1.0 bin/pulsar standalone

Ensure you have pulsar client and httpx dependencies, and then run the following code:

import httpx
import pulsar

# Make the topic above with a single partition
r = httpx.put("http://localhost:8080/admin/v2/persistent/public/default/example/partitions", json=1)
r.raise_for_status()

client = pulsar.Client("pulsar://localhost:6650")
consumer = client.subscribe("persistent://public/default/example", "sub")

# Seek to the latest message, raises OperationNotSupported
consumer.seek(pulsar.MessageId.latest)

Expected Behaviour

Seeking should work across partitions. A seek to latest should take you to latest across partitions. Seeking by a MessageId (which contains a partition number) should seek that partition.

Curiously, passing in an explicit integer timestamp doesn't raise this exception, which seems to disagree with the doco that seeking is just not supported.

Workarounds

Is there any way in the provided python API to seek on a per-partition basis as a workaround?

@BewareMyPower BewareMyPower self-assigned this May 16, 2024
@BewareMyPower
Copy link
Contributor

BewareMyPower commented May 16, 2024

Seeking by a MessageId (which contains a partition number) should seek that partition.

It's not the expected behavior. In Java client, a MessageId that does not contain a topic name cannot be sought as well. Messages received from a multi-topics consumer contain the topic name so that it can be sought because the multi-topics consumer can find the internal consumer to seek. This implementation was introduced since Java client 3.0.0 in apache/pulsar#19158

Is there any way in the provided python API to seek on a per-partition basis as a workaround?

Currently, you can seek to a very large timestamp to simulate seek(MessageId.latest).

@Samreay
Copy link
Author

Samreay commented May 16, 2024

Hi @BewareMyPower - I'm not sure I follow, but are multi-topic consumers entangled in this discussion? In the example above, all I am doing is making a single topic and then trying to seek latest on it. Isn't that exactly the behaviour being discussed here: apache/pulsar#3643 (comment)

Currently, you can seek to a very large timestamp to simulate seek(MessageId.latest).

Will swap over to doing this in the interim, cheers :)

@BewareMyPower
Copy link
Contributor

BewareMyPower commented May 16, 2024

but are multi-topic consumers entangled in this discussion?

It's always confusing in Pulsar when topics and partitions are mentioned. I'd rather to mention "topic" as "non-partitioned topic" or "a partition of a partitioned topic". In Java client, a multi-topics consumer could also be created when the subscribed topic is a single partitioned topic.

BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this issue May 20, 2024
### Motivation

See apache/pulsar-client-python#213

### Modifications

Add a new `forEachValue` overload that allows users to count the number
of rest running tasks through `SharedFuture` to `SynchronizedHashMap`.
Leverage this overload in seek operations when the argument is a
timestamp, or a MessageId that represents earliest or latest. When the
argument is a MessageId whose `getTopicName()` method returns a correct
topic name, seek on the internal consumer of that topic.

Add `testMultiTopicsSeekAll` and `testMultiTopicsSeekSingle` to
`ConsumerSeekTest` to cover these cases.
BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this issue May 20, 2024
### Motivation

See apache/pulsar-client-python#213

### Modifications

Add a new `forEachValue` overload that allows users to count the number
of rest running tasks through `SharedFuture` to `SynchronizedHashMap`.
Leverage this overload in seek operations when the argument is a
timestamp, or a MessageId that represents earliest or latest. When the
argument is a MessageId whose `getTopicName()` method returns a correct
topic name, seek on the internal consumer of that topic.

Add `testMultiTopicsSeekAll` and `testMultiTopicsSeekSingle` to
`ConsumerSeekTest` to cover these cases.
BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this issue May 20, 2024
### Motivation

See apache/pulsar-client-python#213

### Modifications

Add a new `forEachValue` overload that allows users to count the number
of rest running tasks through `SharedFuture` to `SynchronizedHashMap`.
Leverage this overload in seek operations when the argument is a
timestamp, or a MessageId that represents earliest or latest. When the
argument is a MessageId whose `getTopicName()` method returns a correct
topic name, seek on the internal consumer of that topic.

Add `testMultiTopicsSeekAll` and `testMultiTopicsSeekSingle` to
`ConsumerSeekTest` to cover these cases.
BewareMyPower added a commit to apache/pulsar-client-cpp that referenced this issue Jun 4, 2024
### Motivation

See apache/pulsar-client-python#213

### Modifications

Add a new `forEachValue` overload that allows users to count the number
of rest running tasks through `SharedFuture` to `SynchronizedHashMap`.
Leverage this overload in seek operations when the argument is a
timestamp, or a MessageId that represents earliest or latest. When the
argument is a MessageId whose `getTopicName()` method returns a correct
topic name, seek on the internal consumer of that topic.

Add `testMultiTopicsSeekAll` and `testMultiTopicsSeekSingle` to
`ConsumerSeekTest` to cover these cases.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants