Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP 101] Add seek by index feature for consumer #12234

Closed
Jason918 opened this issue Sep 29, 2021 · 8 comments
Closed

[PIP 101] Add seek by index feature for consumer #12234

Jason918 opened this issue Sep 29, 2021 · 8 comments

Comments

@Jason918
Copy link
Contributor

Motivation

Currently we can reset the read position of a cursor by message id or timestamp. Since we formerly introduced index in broker metadata since 2.8.0, reset cursor by index is very helpful in other protocol handler (KoP or RoP).

Goal

Add seek by index feature for consumer.

API Changes

Add following interface in org.apache.pulsar.client.api.Consumer

  • void seekByIndex(long index)
  • CompletableFuture seekByIndexAsync(long index);

Here is the docs for this new interface.
Reset the subscription associated with this consumer to a specific message index.
For example, giving the index of message in this topic is in range [A, B), where A <= B;

  • if seekByIndex(X), where X in range [A,B), message with index a is the next message consumer will receive.
  • if seekByIndex(X), where X < A, it's the same as Consumer#seek(MessageId.earliest). Reset the subscription on the earliest message available in the topic.
  • if seekByIndex(X), where X >= B, it's the same as Consumer#seek(MessageId.latest). Reset the subscription on the latest message in the topic.

Note: "org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor" must be added to "brokerEntryMetadataInterceptors" in broker configuration to enable index meta in broker.

Implementation

For client-broker communication, we can reuse the old CommandSeek, which is used for seek(messageId) and seek(timestamp). Add an optional field "index" for this interface.

In broker, the implementation for seek by index is basically the same as seek by timestamp. The key is to find the messageId with the giving index. A new class PersistentMessageFinderByIndex is introduced to do this. The difference between PersistentMessageFinderByIndex and origin PersistentMessageFinder is that, PersistentMessageFinderByIndex use BrokerEntryMetadata#index of the searching message instead of BrokerEntryMetadata#brokerTimestamp. Once we find the messageId, we can reset the subscription with PersistentSubscription#resetCursor.

Reject Alternatives

No alternatives yet.

@BewareMyPower
Copy link
Contributor

Let's move the discussion here. @MarvinCai

But not all users will use Index or even BrokerEntryMetadata, while all messages have Timestamp and MessageID, that's a bit different in my opinion. Adding a method that rely on a property might not exist for all users is kind of weird.

I think it's a document issue. If we've documented well for the seekByIndex method, there will be nothing confused for users. Just like we've exposed BrokerEntryMetadata in #11553, the added methods of Message also requires broker to enable BrokerEntryMetadata.

/**
* Get index from broker entry metadata.
* Note that only if the feature is enabled in the broker then the value is available.
*
* @since 2.9.0
* @return index from broker entry metadata, or empty if the feature is not enabled in the broker.
*/
Optional<Long> getIndex();
}

(Though I think this doc is not well. We should make it clear that which configs should be configured in broker.)

Similarly, The current seek operation by message id also has some restrictions.

* <p>Note: this operation can only be done on non-partitioned topics. For these, one can rather perform
* the seek() on the individual partitions.

In conclusion, it provides a solution for users that are eager for message count based seek semantics. At the same time, it doesn't affect existing APIs.

@Jason918
Copy link
Contributor Author

@MarvinCai

Adding a method that rely on a property might not exist for all users is kind of weird.

IMHO, It's quite common that part of the features in client depends on some configurations in server, eg authentication, authorization, transaction. All of these features are not enabled by default, and users have to change the broke configurations to use these features with client.

@eolivelli
Copy link
Contributor

eolivelli commented Oct 1, 2021

I am fine with this new feature, mostly because we added "Optional getIndex(); " in the Message API.
So now this is a visible feature and it is part of the official API.

We could also think to enable the 'index' (AppendIndexMetadataInterceptor ) by default if the impact on the data size is negligible

@eolivelli
Copy link
Contributor

it should be better that the Consumer receives a meaningful error in case that the value is not available because AppendIndexMetadataInterceptor is not available.

@eolivelli eolivelli changed the title [PIP] Add seek by index feature for consumer [PIP 101] Add seek by index feature for consumer Oct 1, 2021
@eolivelli
Copy link
Contributor

I have assigned PIP101 and added the link in the Wiki page
https://github.com/apache/pulsar/wiki

@merlimat @michaeljmarshall

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@Jason918
Copy link
Contributor Author

Closing this for now, as there are not enough votes.
It's already implemented in #12032.
Anyone with more solid use case can reopen this.

@eolivelli
Copy link
Contributor

I am interested in this, as it is helping people transitioning from Kafka.
but we should make the AppendIndex interceptor enabled by default

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants