Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decompression payload if needed in KeyShared subscription #7416

Merged
merged 4 commits into from
Jul 16, 2020

Conversation

codelipenghui
Copy link
Contributor

Fixes #7414

Motivation

Decompression payload if needed in KeyShared subscription

Verifying this change

Unit test added

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@codelipenghui codelipenghui self-assigned this Jul 1, 2020
@codelipenghui codelipenghui added release/2.6.1 type/bug The PR fixed a bug or issue reported a bug labels Jul 1, 2020
@codelipenghui codelipenghui added this to the 2.7.0 milestone Jul 1, 2020
@feeblefakie
Copy link

@codelipenghui Oh, great. it is pretty fast.
Was it related to the changes in between 2.5.0 and 2.6.0 ?
Just asking because it doesn’t look so.

@codelipenghui
Copy link
Contributor Author

@feeblefakie it's introduced by #7107.

@feeblefakie
Copy link

@codelipenghui Thank you ! 🙇‍♂️

// If the message was part of a batch (eg: a batch of 1 message), we need
// to read the key from the first single-message-metadata entry
PulsarApi.CompressionType compressionType = metadata.getCompression();
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid decompressing in the broker in any case. This is very dangerous from a CPU usage point of view.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to handle #7107 that fixed. Maybe we can use the first single message key as the batch message key, but this requires all clients catchup.

So, I think we can onboard this PR first ensure correctness, and I will create an issue for the clients catchup work.

And I have add check if (!metadata.hasOrderingKey() && !metadata.hasPartitionKey() && metadata.hasNumMessagesInBatch()), if the clients set the key of the batch message to the key of the first message, it can reduce overhead.

I have test with the KeyBasedBatcher. It already set the batch message key.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better way would be to ensure that producers (in every lang) are attaching the routing key on the message metadata protobuf (at the batch level). That is not compressed so we should always be safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, so it's better to revert #7107 and other lang clients should attach the routing key on the message metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat I have addressed your comment, please take a look, thanks.

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

3 similar comments
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui codelipenghui merged commit ed3583a into apache:master Jul 16, 2020
@codelipenghui codelipenghui deleted the penghui/fix-7414 branch July 16, 2020 01:25
codelipenghui added a commit to streamnative/pulsar-archived that referenced this pull request Jul 17, 2020
Decompression payload if needed in KeyShared subscription

(cherry picked from commit ed3583a)
merlimat pushed a commit to merlimat/pulsar that referenced this pull request Jul 21, 2020
Decompression payload if needed in KeyShared subscription
wolfstudy pushed a commit that referenced this pull request Jul 29, 2020
Decompression payload if needed in KeyShared subscription

(cherry picked from commit ed3583a)
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
Decompression payload if needed in KeyShared subscription
abhilashmandaliya pushed a commit to ashishshinde/pulsar that referenced this pull request Nov 19, 2020
Decompression payload if needed in KeyShared subscription

(cherry picked from commit ed3583a)
lhotari added a commit to lhotari/pulsar that referenced this pull request Aug 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release/2.6.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Broker throws an exception and then gets partially broken when LZ4/Snappy is used with Key_Shared in 2.6.0.
3 participants