Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: don't set interrupt flag again after catching interrupt exception in Pulsar Client #5643

Merged
merged 1 commit into from
Nov 13, 2019

Conversation

jerrypeng
Copy link
Contributor

Motivation

Currently, if the a consumer gets interrupted will waiting on receive(), the interrupt exception just wrapped in a PulsarClientException and the thread is interrupted again. Setting the interrupt flag again for this thread is unnecessary as we already throw an exception and it can also unfavorable behavior. For example in the following code, if I have a thread...

Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    try {
                        consumer.receive();
                    } catch (Exception e) {
                        if (e instanceof PulsarClientException) {
                            break;
                        }
                    }
                }

                try {
                    consumer.close();
                    pulsarClient.close();
                } catch (Exception e) {
                    System.out.println("Closing error: " + e);
                }
            }
        });

The thread has a while loop that calls consumer receive(). If the thread gets interrupted, I would like to break from the while loop and close the consumer and client. However, closing the client and consumer cannot happen successfully because we set the interrupt flag for this thread after catching an interrupt exception causing any subsequent code e.g. close() to be interrupted as well which is not an ideal behavior.

This also causes non-ideal behavior in the pulsar-flink source as the above situation happens when a Flink job with a Pulsar source gets terminated.

Modifications

remove Thread.currentThread().interrupt();

@jerrypeng jerrypeng added the type/bug The PR fixed a bug or issue reported a bug label Nov 13, 2019
@jerrypeng jerrypeng added this to the 2.4.2 milestone Nov 13, 2019
@jerrypeng jerrypeng self-assigned this Nov 13, 2019
@wolfstudy wolfstudy merged commit b7b9926 into apache:master Nov 13, 2019
wolfstudy pushed a commit that referenced this pull request Nov 20, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants