Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce continuous offset for pulsar #9038

Closed
aloyszhang opened this issue Dec 23, 2020 · 0 comments · Fixed by #9039
Closed

Introduce continuous offset for pulsar #9038

aloyszhang opened this issue Dec 23, 2020 · 0 comments · Fixed by #9039
Assignees
Labels
type/feature The PR added a new feature or issue requested a new feature
Milestone

Comments

@aloyszhang
Copy link
Contributor

aloyszhang commented Dec 23, 2020

Is your feature request related to a problem? Please describe.
Currently, pulsar provides <LedgerId, EntryId> to locate an Entry, and provides BatchIndex for the inner message of Entry.
But there is not a continuous offset for each Message in Entry and we could not use the offset to search an Entry.

For protocol handler like KOP, the map between kakfa offset and pulsar position is implenment by split 64 bits to three parts which represents ledgerId, entryId and batchIndex seperatelly. This way makes the max range of ledgerId much lower than Long.MAX_VALUE and ledgerId space will used up quickly, see streamnative/kop#175.

Describe the solution you'd like
We should introduce continuous offset for pulsar, which can be achieved by broker entry metadata introduced in PIP-70.

Additional context
We can introduce offset for each message first, and with this feature, we can more things futher

  1. implement a more stable offset manager for KOP
  2. provide more preceise metrics on message level like backlog
  3. ...
@aloyszhang aloyszhang added the type/feature The PR added a new feature or issue requested a new feature label Dec 23, 2020
@codelipenghui codelipenghui added this to the 2.8.0 milestone Dec 23, 2020
codelipenghui pushed a commit that referenced this issue Dec 24, 2020
Fixes #9038 
### Motivation

As described in [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata).
One of the use case for Broker entry metadata is  providing continuous message sequence-Id for messages in one topic-partition which is useful for Protocol Hanlder like KOP.

This PR enable Pulsar to support continuous offset for message based on Broker entry metadata.

### Modifications

Introduce a new field for broker entry metadta named `offset`;
Introduce a new interceptor type `ManagedLedgerInterceptor` which intercept entry in `ManagedLedger`;
Each partition will be assigned a `ManagedLedgerInterceptor` when `ManagedLedger` created;
Each Entry will be intercept for adding a  monotone increasing offset in Broker entry metadata and the offet is added by batchSize of entry;
Support find position by a given offset.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants