Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Introduce continuous offset for pulsar #9039

Merged
merged 12 commits into from
Dec 24, 2020

Conversation

aloyszhang
Copy link
Contributor

@aloyszhang aloyszhang commented Dec 23, 2020

Fixes #9038

Motivation

As described in PIP-70.
One of the use case for Broker entry metadata is providing continuous message sequence-Id for messages in one topic-partition which is useful for Protocol Hanlder like KOP.

This PR enable Pulsar to support continuous offset for message based on Broker entry metadata.

Modifications

Introduce a new field for broker entry metadta named offset;
Introduce a new interceptor type ManagedLedgerInterceptor which intercept entry in ManagedLedger
Each partition will be assigned a ManagedLedgerInterceptor when ManagedLedger created;
Each Entry will be intercept for adding a monotone increasing offset in Broker entry metadata and the offet is added by batchSize of entry;
Support find position by a given offset.

@aloyszhang
Copy link
Contributor Author

/pulsarbot run-failure-checks

/**
* Find position by sequenceId.
* */
CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate);
CompletableFuture<Position> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

/**
* Find position by sequenceId.
* */
CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate);
CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate);

Can we use the java.util.Predicate?

Copy link
Contributor Author

@aloyszhang aloyszhang Dec 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

condition of OpFindNewest is com.google.common.base.Predicate type, so the parameter type here should also be com.google.common.base.Predicate

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend using java.util.Predicate in the interface. You can just write a wrapper to convert a java.util.Predicate to a guava Predicate.

@@ -550,6 +566,11 @@ public Position addEntry(byte[] data) throws InterruptedException, ManagedLedger
return addEntry(data, 0, data.length);
}

@Override
public Position addEntry(byte[] data, int batchSize) throws InterruptedException, ManagedLedgerException {
return addEntry(data, batchSize,0, data.length);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return addEntry(data, batchSize,0, data.length);
return addEntry(data, batchSize, 0, data.length);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

addOperation.failed(
new ManagedLedgerInterceptException("Interceptor managed ledger before add to bookie failed."));
ReferenceCountUtil.release(addOperation.data);
log.error("[{}] Failed to interceptor entry before add to bookie.", name, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.error("[{}] Failed to interceptor entry before add to bookie.", name, e);
log.error("[{}] Failed to intercept adding an entry to bookie.", name, e);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

/**
* Interceptor for ManagedLedger.
* */
public interface ManagedLedgerInterceptor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* Interceptor for ManagedLedger.
* */
public interface ManagedLedgerInterceptor {
OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add javadoc to the methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor {
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImpl.class);
private static final String OFFSET = "offset";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka offset is not a very good term to be used for describing the index of a message within a log stream. Instead, I would suggest calling it index or logIndex similar to the term used in the Raft algorithm (https://cs.stackexchange.com/questions/97542/raft-algorithm-whats-the-meaning-of-concept-index).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modify offset to index

if (appendBrokerEntryMetadata(headersAndPayload, publishContext)) {
ledger.asyncAddEntry(headersAndPayload, this, publishContext);
}
ledger.asyncAddEntry(headersAndPayload, this, publishContext);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the batchSize passed to asyncAddEntry? I failed to see how did you do that in this pull request.

At the same time, I think batchSize is not a good term. If I understand this correctly, it should be numberOfMessages, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@sijie sijie added area/broker type/feature The PR added a new feature or issue requested a new feature labels Dec 23, 2020
@Anonymitaet
Copy link
Member

@aloyszhang thanks for your great work.

Shall the changes be documented to the Pulsar docs?

If so, could you please help add the docs accordingly? Then you can ping me to review, thanks

BewareMyPower added a commit that referenced this pull request Sep 18, 2021
### Motivation

When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See streamnative/kop#442 for example. It's because the broker entry metadata has been added twice.

This bug might be introduced from #9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518

It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified.

### Modifications

Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`.
codelipenghui pushed a commit that referenced this pull request Sep 19, 2021
### Motivation

When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See streamnative/kop#442 for example. It's because the broker entry metadata has been added twice.

This bug might be introduced from #9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518

It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified.

### Modifications

Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`.

(cherry picked from commit 9d44617)
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Oct 19, 2021
### Motivation

When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See streamnative/kop#442 for example. It's because the broker entry metadata has been added twice.

This bug might be introduced from apache#9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518

It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified.

### Modifications

Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`.

(cherry picked from commit 9d44617)
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
### Motivation

When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See streamnative/kop#442 for example. It's because the broker entry metadata has been added twice.

This bug might be introduced from apache#9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518

It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified.

### Modifications

Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Introduce continuous offset for pulsar
4 participants