Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java Client] Use epoch to version producer's cnx to prevent early delivery of messages #12779

Merged

Conversation

michaeljmarshall
Copy link
Member

…livery of messages

Motivation

I discovered a race condition in Pulsar’s Java Client ProducerImpl that can lead to messages persisted out-of-order for a single producer sending to a non-partitioned topic. This PR removes the race condition by keeping track of the current cnx and introducing a new update to the state while in a lock on the ProducerImpl.this.

Reproducing the issue

I can consistently reproduce the issue by interrupting an active producer sending messages to Pulsar. I interrupt the producer by restarting the broker hosting the producer's topic or by unloading the producer's topic/namespace. Because of the nature of this race condition, the out of order issue does not happen every time, but it does happen frequently enough to have noticed it. In order to increase the probability of the error, my test set up included producing to 100 topics on a 3 broker pulsar cluster at a total rate of 50k msgs/sec. I determined that the messages were out of order in two ways. First, by producing messages from a single thread and putting my own, monotonically increasing sequence id as the messages payload. Second, by inspecting the message's sequence id assigned by the pulsar java client. Inspecting the messages using the reader api revealed messages in sequences like 1,4,2,3,4. The 4 in that sequence is duplicated because when the client receives an ack for an unexpected message, it re-sends all pending messages.

Description of the problem

The problem comes from the way that the pulsar producer changes state when a connection is closed and when a connection is opened. Here are the state changes that take place when a connection closes and when one opens:

Connection closed:

  1. Set cnx to null, as long as the current cnx is the connection being closed.
  2. Set the state to Connecting.
  3. Schedule a runnable to get a new cnx.
  4. Once the task from step 3 is run, asynchronously get a cnx and then call ProducerImpl#connectionOpened.

ProducerImpl#connectionOpened:

  1. Set cnx to the new connection.
  2. Send Producer command to the broker to register the new producer.
  3. Once the producer is registered, schedule a runnable to redeliver pendingMessages.
  4. Once the task from step 3 is run, go to Ready, as long as the current state is Uninitialized, Connecting, or RegisteringSchema.

There is nothing that prevents a connection from being closed while another connection is currently being established in the connectionOpened method. This is exactly what exposes the race condition fixed by this PR. In the race, a connection is established and we call connectionOpened and successfully register the producer with the broker. Then, that connection is closed before step 4 of connectionOpened and the state changes from Connecting to Connecting. Because step 4 only checks that the current state is Uninitialized, Connecting, or RegisteringSchema, it updates the state to Ready. When adding some extra logging, I could see that the we changed the state to Ready while cnx() returned null. At this point, messages wouldn't yet deliver. When the new connection is established, the connectionOpened is called, and we set the new cnx. Since our state is still Ready, we start delivering any new messages received by the client. These are out of order. We asynchronously register the producer with the broker and then messages start to persist.

Modifications

  1. Update where the producer’s epoch value is incremented. It was previously updated before getting a connection. However, this seems prone to races. By updating it when we have the connection and within a lock on the producer, we ensure that the connection gets the next highest epoch number.
  2. Set the state to Connecting in the connectionOpened method. This is important because the connection could have been closed after the check for cnx() != null in the recoverProcessOpSendMsgFrom method but before that method gets to the point of setting state to Ready. Since we update the state within the lock on ProducerImpl.this, we won't delivery any messages. There is still a chance that the broker will have state Ready and cnx() == null.
  3. Use the producer's epoch value to ensure the cnx reference passed to recoverProcessOpSendMsgFrom is still the current cnx. Ensure that cnx() != null. These checks ensure that it is safe to update state to Ready.

Alternatives

Instead of using the epoch value, we could have checked that cnx() == cnx in the recoverProcessOpSendMsgFrom. This check would be valid in all cases except where the next connection is the same connection. I think this would happen if a topic were unloaded from a broker and then loaded back on to the same broker. The epoch value gives a consistent way to know that the cnx is the current cnx.

We could have added a new state called RegisteringProducer. I investigated this option, but it seemed more complicated and less elegant than this solution. I chose the simpler solution here.

Verifying this change

I tried to implement a unit test that would consistently reproduce this issue. I was only able to do so when I introduced a 50 millisecond delay in the scheduling of the runnable within resendMessages. Using that method, this PR prevented the race. I also built a custom client with this PR, and I can confirm that I observed the log line indicating Producer epoch mismatch or the current connection is null.. This log confirms that the race would have happened but was avoided.

Also, I provided extra detail in this PR description since I wasn't able to add a new test to specifically verify this change. I think the change is pretty straightforward, and I try to add enough context to make the change easy to understand.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): no
  • The public API: no
  • The schema: no
  • The default values of configurations: no
  • The wire protocol: no
  • The rest endpoints: no
  • The admin cli options: no
  • Anything that affects deployment: no

Documentation

  • no-need-doc

    This is an internal change to the client. We should make sure to include this fix in the release notes, but no documentation changes need to be made.

@michaeljmarshall
Copy link
Member Author

I sent a note to the mailing list to discuss this change: https://lists.apache.org/thread/6y6jfdx432j2gqxgk9cnhdw48fq1m6b1.

@michaeljmarshall
Copy link
Member Author

/pulsarbot run-failure-checks

Copy link
Contributor

@Jason918 Jason918 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test to cover this case?
Not sure if it's easy to do.

@@ -37,7 +37,8 @@
protected final Backoff backoff;
private static final AtomicLongFieldUpdater<ConnectionHandler> EPOCH_UPDATER = AtomicLongFieldUpdater
.newUpdater(ConnectionHandler.class, "epoch");
private volatile long epoch = 0L;
// Start with -1L because it gets incremented before sending on the first connection
private volatile long epoch = -1L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the starting value of epoch mean anything? In my understanding, it's just used for comparison, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe the starting value inherently means anything. However, the test ProducerCreationTest#testGeneratedNameProducerReconnect asserts that the epoch value is 2 after a single producer reconnect. I could have updated the test or the starting value. I chose to update the starting value here to maintain the original behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see. I am OK with this new starting value.
But IMHO, it's better to assert that the epoch value increased after a single producer reconnect to avoid more flaky test.

@michaeljmarshall
Copy link
Member Author

Can you add a test to cover this case?
Not sure if it's easy to do.

@Jason918 - as I mentioned in the PR description, I was only able to get a unit test to consistently reproduce the underlying issue by modifying the client code (scheduling a delay for one of the callbacks). I am pretty sure we have tests that verify producer (re)connection, which will verify that this code path works for the happy path. Also, I verified that this change correctly removes the race condition by testing in the k8s environment when I discovered the race. At this point, I'm not exactly sure how to add a test, but I am open to suggestions.

@Jason918
Copy link
Contributor

Can you add a test to cover this case?
Not sure if it's easy to do.

@Jason918 - as I mentioned in the PR description, I was only able to get a unit test to consistently reproduce the underlying issue by modifying the client code (scheduling a delay for one of the callbacks). I am pretty sure we have tests that verify producer (re)connection, which will verify that this code path works for the happy path. Also, I verified that this change correctly removes the race condition by testing in the k8s environment when I discovered the race. At this point, I'm not exactly sure how to add a test, but I am open to suggestions.

Great. Thanks for the detailed explanation.

@michaeljmarshall
Copy link
Member Author

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@eolivelli eolivelli merged commit ab652b8 into apache:master Dec 7, 2021
@michaeljmarshall michaeljmarshall deleted the use-epoch-to-version-producer-cnx branch December 7, 2021 19:19
@michaeljmarshall michaeljmarshall added the type/bug The PR fixed a bug or issue reported a bug label Dec 7, 2021
michaeljmarshall added a commit that referenced this pull request Dec 7, 2021
#12779)

* [Java Client] Use epoch to version producer's cnx to prevent early delivery of messages

* Update initial epoch value: it now gets incremented before the first connection

(cherry picked from commit ab652b8)
michaeljmarshall added a commit to datastax/pulsar that referenced this pull request Dec 8, 2021
apache#12779)

* [Java Client] Use epoch to version producer's cnx to prevent early delivery of messages

* Update initial epoch value: it now gets incremented before the first connection

(cherry picked from commit ab652b8)
lhotari pushed a commit that referenced this pull request Dec 9, 2021
#12779)

* [Java Client] Use epoch to version producer's cnx to prevent early delivery of messages

* Update initial epoch value: it now gets incremented before the first connection

(cherry picked from commit ab652b8)
@lhotari lhotari added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Dec 9, 2021
@eolivelli eolivelli changed the title [Java Client] Use epoch to version producer's cnx to prevent early de… [Java Client] Use epoch to version producer's cnx to prevent early delivery of messages Dec 15, 2021
fxbing pushed a commit to fxbing/pulsar that referenced this pull request Dec 19, 2021
apache#12779)

* [Java Client] Use epoch to version producer's cnx to prevent early delivery of messages

* Update initial epoch value: it now gets incremented before the first connection
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client cherry-picked/branch-2.8 Archived: 2.8 is end of life cherry-picked/branch-2.9 Archived: 2.9 is end of life doc-not-needed Your PR changes do not impact docs release/2.8.2 release/2.9.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants