Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-function] Stop calling the deprated method Thread.stop when stopping the function thread in ThreadRuntime. #11401

Merged
merged 2 commits into from
Jul 26, 2021

Conversation

zliang-min
Copy link

Motivation

Currently, when the ThreadRuntime tries to stop a function instance, it will call the stop method on the fnThread if the thread is still alive after 10 seconds since it interrupts the thread, see here.

And Thread.stop is a deprecated method, and the issue is clearly documented in its doc, and I quote:

This method is inherently unsafe. Stopping a thread with Thread.stop causes it to unlock all of the monitors that it has locked (as a natural consequence of the unchecked ThreadDeath exception propagating up the stack). If any of the objects previously protected by these monitors were in an inconsistent state, the damaged objects become visible to other threads, potentially resulting in arbitrary behavior.

And this behavior exactly caused an issue with BatchSourceExecutor. BatchSourceExecutor terminates the discoveryThread when it stops, and waits 10 seconds for the termination to complete, see here. So, if the discoveryThread took long enough to terminate, the awaitTermination method could throw an IllegalMonitorStateException because of fnThread.stop is called. Below is the backtrace stack I got for this case:

15:41:16,333 INFO [public/default/gimi-test-0] [instance: 0] ThreadPoolTaskScheduler - Shutting down ExecutorService
15:41:26,334 ERROR [public/default/gimi-test-0] [instance: 0] JavaInstanceRunnable - Failed to close source org.apach
e.pulsar.functions.source.batch.BatchSourceExecutor
java.lang.IllegalMonitorStateException: null
        at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:149) ~[?:?]
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1302) ~[?:?]
        at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:439) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1458) ~[?:?]
        at java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709) ~[?:?]
        at org.apache.pulsar.functions.source.batch.BatchSourceExecutor.stop(BatchSourceExecutor.java:228) ~[pulsar-f
unctions-instance.jar:2.9.0-SNAPSHOT]
        at org.apache.pulsar.functions.source.batch.BatchSourceExecutor.close(BatchSourceExecutor.java:210) ~[pulsar-
functions-instance.jar:2.9.0-SNAPSHOT]
        at org.apache.pulsar.functions.instance.JavaInstanceRunnable.close(JavaInstanceRunnable.java:411) [pulsar-fun
ctions-instance.jar:2.9.0-SNAPSHOT]
        at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:298) [pulsar-funct
ions-instance.jar:2.9.0-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:829) [?:?]

The same issue has been discussed here as well.

And the consequence of this error is BatchSourceExecutor will lost the chance to clean up the resources like the consumer of the intermediate topic, so it will leak a consumer that still consume from the topic without processing the in-come messages.

This PR is to solve this issue.

Modifications

The change is very simple, I just removed the fnThread.stop() method call because it's deprecated, and replace it with a warning log. So we just let the function instance take its time to clean things up.

Verifying this change

I manually verified this change by:

  1. Updated the BatchDataGeneratorSource to add a sleep inside the discover method to make it run a long time, see here, and build the connector.
  2. Start the above connector in cluster mode, below is the source config file I used to start the source:
tenant: "public"
namespace: "default"
name: "gimi-test"
# archive: builtin://batch-data-generator
archive: file:///tmp/pulsar/connectors/pulsar-io-batch-data-generator-2.9.0-SNAPSHOT.jar
classname: org.apache.pulsar.io.batchdatagenerator.BatchDataGeneratorSource
topicName: "persistent://public/default/gimi"
parallelism: 1
batchSourceConfig:
discoveryTriggererClassName: org.apache.pulsar.io.batchdiscovery.CronTriggerer
discoveryTriggererConfig:
  __CRON__: "*/30 * * * * *"
  1. Once discover starts, call the stop source API to stop the function instance, e.g. http POST localhost:8080/admin/v3/sources/public/default/gimi-test/stop.
  2. Check the function log, before the fix, you would see the IllegalMonitorStateException error. But after the fix, it won't happen anymore.
  3. Call the stats API on the intermediate topic, e.g. http localhost:8080/admin/v2/persistent/public/default/gimi-test-intermediate/stats. Before the fix, you would find that the consumer from the function instance is still there. And after the fix, you would see that the consumer got cleaned up.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): no
  • The public API: no
  • The schema: no
  • The default values of configurations: no
  • The wire protocol: no
  • The rest endpoints: no
  • The admin cli options: no
  • Anything that affects deployment: no

Documentation

No doc change is needed, since it's an internal change.

@zliang-min
Copy link
Author

@jerrypeng please review.

@Anonymitaet
Copy link
Member

Thanks for providing doc-related info!

Copy link
Contributor

@jerrypeng jerrypeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zliang-min Thanks for working on this!

I think this is the good first step to improving the ThreadRuntime. I think the follow up to this should be having a couple configs for users to determine what the behavior should be for waiting for termination of an instance. We should provide the cluster admin an option to determine how long to wait for an instance to timeout and also what to do after the timeout has expired. Some options for what to do when timeout has expired:

  1. ignore and continue
  2. worker restarts itself

@nlu90
Copy link
Member

nlu90 commented Jul 21, 2021

With this change, will there be any dangling thread in the heap dump that's not cleaned up?

@Anonymitaet Anonymitaet added the doc-not-needed Your PR changes do not impact docs label Jul 22, 2021
@jerrypeng
Copy link
Contributor

@nlu90 In java there is always is no guaranteed way to kill a thread. The thread has to cleanly exit. To make sure threads are not leaked, I suggested followup work in my previous comment

@zliang-min
Copy link
Author

/pulsarbot rerun-checks

@nlu90
Copy link
Member

nlu90 commented Jul 23, 2021

@jerrypeng Your suggested changes are safer, may be we can add the changes in this PR?

@jerrypeng
Copy link
Contributor

@nlu90 what I suggested is a much larger change and perhaps be released as part of several PRs.

The current PR does not make things worse only better thus I think we merge it as it. It solves an issue we have seen.

@jerrypeng
Copy link
Contributor

/pulsarbot rerun-checks

2 similar comments
@jerrypeng
Copy link
Contributor

/pulsarbot rerun-checks

@jerrypeng
Copy link
Contributor

/pulsarbot rerun-checks

@jerrypeng jerrypeng merged commit be93e14 into apache:master Jul 26, 2021
pkumar-singh pushed a commit to pkumar-singh/pulsar that referenced this pull request Aug 10, 2021
…topping the function thread in ThreadRuntime. (apache#11401)

* Stop calling the deprated method Thread.stop when stopping the function thread in ThreadRuntime.

* Updated warning message per review comment.

(cherry picked from commit be93e14)
Signed-off-by: Gimi Liang <zliang@splunk.com>
dlg99 pushed a commit to dlg99/pulsar that referenced this pull request Oct 21, 2021
…topping the function thread in ThreadRuntime. (apache#11401)

* Stop calling the deprated method Thread.stop when stopping the function thread in ThreadRuntime.

* Updated warning message per review comment.
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
…topping the function thread in ThreadRuntime. (apache#11401)

* Stop calling the deprated method Thread.stop when stopping the function thread in ThreadRuntime.

* Updated warning message per review comment.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/function doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants