Skip to content

[fix] [broker] When sending a message, broker don't verifies the logic of recordSequenceId > highestSequenceId.#21887

Closed
luky116 wants to merge 1 commit intoapache:masterfrom
luky116:fixCheckSequenceId
Closed

[fix] [broker] When sending a message, broker don't verifies the logic of recordSequenceId > highestSequenceId.#21887
luky116 wants to merge 1 commit intoapache:masterfrom
luky116:fixCheckSequenceId

Conversation

@luky116
Copy link

@luky116 luky116 commented Jan 12, 2024

Fixes #21886

Main Issue: #21886

PIP:

Motivation

This is a question I had after reading the pulsar source code. I’m not sure if it’s a real problem.

In function org.apache.pulsar.broker.service.ServerCnx#handleSend#line 1838,There is a logic to verify sequenceId, the code is as follows:

// Persist the message
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
    producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
            headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
} else {
    producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
            send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
}

Modifications

I find org.apache.pulsar.broker.service.Producer#publishMessage function makes a judgment of lowestSequenceId > highestSequenceId

public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
            ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker, Position position) {
        if (lowestSequenceId > highestSequenceId) {

so org.apache.pulsar.broker.service.ServerCnx#handleSend function can be changed to:

// Persist the message
if (send.hasHighestSequenceId()) {
    producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
            headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
} else {
    producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
            send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
}

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@luky116 luky116 changed the title When sending a message, pulsar verifies the logic of recordSequenceId… When sending a message, broker don't verifies the logic of recordSequenceId > highestSequenceId. Jan 12, 2024
@github-actions
Copy link

@luky116 Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@luky116 luky116 changed the title When sending a message, broker don't verifies the logic of recordSequenceId > highestSequenceId. [fix][broker]When sending a message, broker don't verifies the logic of recordSequenceId > highestSequenceId. Jan 12, 2024
@luky116 luky116 changed the title [fix][broker]When sending a message, broker don't verifies the logic of recordSequenceId > highestSequenceId. [fix] [broker] When sending a message, broker don't verifies the logic of recordSequenceId > highestSequenceId. Jan 12, 2024
@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Jan 12, 2024

// Persist the message
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
if (send.hasHighestSequenceId()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not make sense without adding a test to show which issue it fixes.

IMO, the original code is just defensive programming that handles the exceptional case that sequence_id > highest_sequence_id when the client constructs the CommandSend request. Though with the correct implementation, the client should not set a sequence id that is greater than the highest sequence id.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it,thank you

@luky116 luky116 closed this Jan 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] When sending a message, broker don't verifies the logic of recordSequenceId > highestSequenceId.

2 participants