-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cosmos BulkExecutor: Fixing leak of threads with "cosmos-daemon-BulkExecutor-*" prefix #31082
Cosmos BulkExecutor: Fixing leak of threads with "cosmos-daemon-BulkExecutor-*" prefix #31082
Conversation
API change check API changes are not detected in this pull request. |
…ulkWriter and BulkExecutor
sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java
Outdated
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @FabianMeiswinkel
some minor comments, worth fixing the changelog.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java
Outdated
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java
Outdated
Show resolved
Hide resolved
...osmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/BulkExecutorTest.java
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java
Outdated
Show resolved
Hide resolved
...osmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/BulkExecutorTest.java
Show resolved
Hide resolved
...osmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/BulkExecutorTest.java
Outdated
Show resolved
Hide resolved
...osmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/BulkExecutorTest.java
Outdated
Show resolved
Hide resolved
...osmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/BulkExecutorTest.java
Outdated
Show resolved
Hide resolved
...osmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/BulkExecutorTest.java
Outdated
Show resolved
Hide resolved
...osmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/BulkExecutorTest.java
Outdated
Show resolved
Hide resolved
...osmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/BulkExecutorTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the quick fix :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks
…xecutor-*" prefix (Azure#31082) * Cosmos BulkExecutor: Stopping the flush task to free up the daemon thread * Fixing thread leak due to race condition on bulk completion between BulkWriter and BulkExecutor * Iterating on fix * Adding unit test coverage * Updating changelog * Fixing changelog * Reacted to code review feedback * Update CosmosAsyncContainer.java * Update CosmosAsyncContainer.java
Description
A thread/memory leak was reported in this issue - #31077
When using a Spark structured streaming job to ingest data into Cosmos DB in micro batches there is a race condition that could cause BulkExecutor instances and their corresponding thread to not be cleaned-up properly. This can eventually reult in workers crashing with the following error: 'java.io.IOException: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: unable to create new native thread'
The race condition described happens when the BulkWriter (in Spark connector) signals that all incoming operations have been consumed here -
azure-sdk-for-java/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala
Line 524 in ea0b2e8
The BulkExecutor (in Java SDK) would react to this signal and cleanly close/clean-up everything - except if the BulKWriter is disposing the subsription here -
azure-sdk-for-java/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala
Line 537 in ea0b2e8
To fix this instead we make sure that clean-up is happening independent of how the subscription is terminated (complete, cancel or error).
All SDK Contribution checklist:
General Guidelines and Best Practices
Testing Guidelines