-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Query pipeline refactoring pushing down type conversion to reduce memory consumption / GC pressure #27440
Query pipeline refactoring pushing down type conversion to reduce memory consumption / GC pressure #27440
Conversation
API changes have been detected in |
This pull request is protected by Check Enforcer. What is Check Enforcer?Check Enforcer helps ensure all pull requests are covered by at least one check-run (typically an Azure Pipeline). When all check-runs associated with this pull request pass then Check Enforcer itself will pass. Why am I getting this message?You are getting this message because Check Enforcer did not detect any check-runs being associated with this pull request within five minutes. This may indicate that your pull request is not covered by any pipelines and so Check Enforcer is correctly blocking the pull request being merged. What should I do now?If the check-enforcer check-run is not passing and all other check-runs associated with this PR are passing (excluding license-cla) then you could try telling Check Enforcer to evaluate your pull request again. You can do this by adding a comment to this pull request as follows: What if I am onboarding a new service?Often, new services do not have validation pipelines associated with them, in order to bootstrap pipelines for a new service, you can issue the following command as a pull request comment: |
API changes have been detected in |
/azp run java - cosmos - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run java - cosmos - spark |
Azure Pipelines successfully started running 1 pipeline(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks a lot Fabian, great findings and improvements 👍
/azp run java - cosmos - tests |
/azp run java - cosmos - spark |
Azure Pipelines successfully started running 1 pipeline(s). |
1 similar comment
Azure Pipelines successfully started running 1 pipeline(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great @FabianMeiswinkel - thanks for the optimization.
One comment on public API surface change, please take a look. Thanks!
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/FeedResponse.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java
Show resolved
Hide resolved
API changes have been detected in |
/azp run java - cosmos - tests |
/azp run java - cosmos - spark |
Azure Pipelines successfully started running 1 pipeline(s). |
1 similar comment
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run java - cosmos - spark |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run java - cosmos - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
Description
Summary
This PR is another iteration to reduce the memory footprint / JVM GC pressure when running queries with large record count (like in many OLTP Spark scenarios). In version 4.6.1 of the Spark connector (and earlier) we had an issue because pre-fetching/buffering was too aggressive (or even unbounded) - which would cause extremely high memory consumption and GC pressure when the consumer is slower of data being retrieved form the Cosmos backend. With version 4.6.2 pre-fetch has been changed - and data will only minimally be buffered - which resulted in stable memory consumption / GC-pressure when the consumer is slower than data flows in from the backend.
This PR further improves memory footprint / GC pressure by pushing down the conversion from ObjectNode/Json into the targeted type (and allowing ObjectNode/Json --> SparkRow transformation). This helps reducing memory pressure because a little simplified an ObjectNode holds data + schema - while the strong type (Spark Row, POJO) doesn't need to encode the schema info for every record. Because the conversion is done earlier, the JVM GC can clean-up the ObjectNode/Json representation sooner - without any thread switches - which reduces the risk that ObjectNode instances get propagated to long-living sections of the GC (which are more expensive to clean-up)
The main improvement in this trivial use case (SELECT * FROM c) doing a count() is the GC usage
A run against the customer's repro notebook shows a similar pattern - but the improvement on latency, memory footprint and CPU are slightly better, because the client resources are more under pressure. But there as well the significant improvement for time in GC is the main win.
Important design change
Before the query pipeline was assembled via PipelinedDocumentQueryExecutionContext.createAsync - the pipeline created multiple pipeline components - for example for aggregates, order by, gour by etc. that would process the query. All pipeline components would operate on Doucment (a Resource->JsonSerializable subclass). For some of the pipeline components right now processing is only possible on Document because they modify the json payload etc.
So to allow the push-down of type conversion on the hot path buts till support those query constructs requiring a pipeline component operating on Document types I have split the Query Pipeline into a fast path with transformation pushdown and a slow path that would keep operating on Document and only transform into T at the end.
Pipeline construction happens in PipelinedQueryExecutionContextBase.createAsync. PipelinedDocumentQueryExecutionContext operates all pipeline components via Document (slow path) - PipelinedQueryExecutionContext is the fast path - all allowed pipeline components can handle any T.
High-level design
Reducing the too aggressive pre-fetching stabilizes the memory footprint / GC pressure
Pushing down the pipe conversion into the Query pipeline - doing it immediately after retrieving the response from the backend can reduce pressure on GC - but requires pretty extensive refactoring of the query pipeline.
Perf test
Baseline (4.6.2)
Results
Executors (Time in GC)
Relatively high GC pressure (about 10% time spent in GC)
Ganglia metrics
Heap stats (snapshot of 1 executor)
Shows relatively high memory allocation due to ObjectNodes and its children
This PR
Results
Executors (Time in GC)
Significantly lower GC pressure - saves nearly 70% of time spent in GC
Ganglia metrics
Heap stats (snapshot of 1 executor)
Lower memory allocation for ObjectNodes (and children) - therefore SparkRowItem - the Spark row representation. Memory footprint is lower, because the SparkRowItem doesn't need to encode schema info for every single row.
Client telemetry comparison (CPU)
About 3 percent lower CPU usage overall
Client telemetry comparison (Memory)
Also 3-5 percent less memory footprint
All SDK Contribution checklist:
General Guidelines and Best Practices
Testing Guidelines