New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CARBONDATA-3088][Compaction] support prefetch for compaction #2906
[CARBONDATA-3088][Compaction] support prefetch for compaction #2906
Conversation
Current compaction performance is low. By adding logs to observe the compaction procedure, we found that in `CarbonFactDataHandlerColumnar.addDataToStore(CarbonRow)`, it will wait about 30ms before submitting a new TablePage producer. Since the method `addDataToStore` is called in single thread, it will result the waiting every 32000 records since it will collect 32000 records to form a TablePage. To reduce the waiting time, we can prepare the 32000 records ahead. This can be achived using prefetch. We will prepare two buffers, one will provide the records to the downstream (`addDataToStore`) and the other one will prepare the records asynchronously. The first is called working buffer and the second is called backup buffer. Once working buffer is exhausted, the two buffers will exchange their roles: the backup buffer will be the new working buffer and the old working buffer will be the new backup buffer and it will be filled asynchronously. Two parameters are involved for this feature: 1. carbon.detail.batch.size: This is an existed parameter and the default value is 100. This parameter controls the batch size of records that return to the client. For normal query, it is OK to keep it as 100. But for compaction, since all the records will be operated, we suggest you to set it to a larger value such as 32000. (32000 is the max rows for a table page that the down stream wants). 2. carbon.compaction.prefetch.enable: This is a new parameter and the default value is `false` (We may change it to `true` later). This parameter controls whether we will prefetch the records for compation. By using this prefetch feature, we can enhance the performance for compaction. More test results can be found in the PR description.
Please note that this PR is nearly the modification from PR #2133 plus that we meld the |
List<Object[]> converted = new ArrayList<>(); | ||
if (detailRawQueryResultIterator.hasNext()) { | ||
for (Object[] r : detailRawQueryResultIterator.next().getRows()) { | ||
converted.add(convertRow(r)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: This is the key difference with PR #2133
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1325/ |
Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1536/ |
Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9584/ |
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1326/ |
Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9585/ |
Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1537/ |
core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
Outdated
Show resolved
Hide resolved
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1328/ |
Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1539/ |
Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9587/ |
LGTM |
Current compaction performance is low. By adding logs to observe the compaction procedure, we found that in `CarbonFactDataHandlerColumnar.addDataToStore(CarbonRow)`, it will wait about 30ms before submitting a new TablePage producer. Since the method `addDataToStore` is called in single thread, it will result the waiting every 32000 records since it will collect 32000 records to form a TablePage. To reduce the waiting time, we can prepare the 32000 records ahead. This an be achived using prefetch. We will prepare two buffers, one will provide the records to the downstream (`addDataToStore`) and the other one will prepare the records asynchronously. The first is called working buffer and the second is called backup buffer. Once working buffer is exhausted, the two buffers will exchange their roles: the backup buffer will be the new working buffer and the old working buffer will be the new backup buffer and it will be filled asynchronously. Two parameters are involved for this feature: 1. carbon.detail.batch.size: This is an existed parameter and the default value is 100. This parameter controls the batch size of records that return to the client. For normal query, it is OK to keep it as 100. But for compaction, since all the records will be operated, we suggest you to set it to a larger value such as 32000. (32000 is the max rows for a table page that the down stream wants). 2. carbon.compaction.prefetch.enable: This is a new parameter and the default value is `false` (We may change it to `true` later). This parameter controls whether we will prefetch the records for compation. By using this prefetch feature, we can enhance the performance for compaction. More test results can be found in the PR description. This closes #2906
Current compaction performance is low. By adding logs to observe the
compaction procedure, we found that in
CarbonFactDataHandlerColumnar.addDataToStore(CarbonRow)
, it will waitabout 30ms before submitting a new TablePage producer. Since the method
addDataToStore
is called in single thread, it will result the waitingevery 32000 records since it will collect 32000 records to form a
TablePage.
To reduce the waiting time, we can prepare the 32000 records ahead. This
can be achived using prefetch.
We will prepare two buffers, one will provide the records to the
downstream (
addDataToStore
) and the other one will prepare the recordsasynchronously. The first is called working buffer and the second is
called backup buffer. Once working buffer is exhausted, the two buffers
will exchange their roles: the backup buffer will be the new working
buffer and the old working buffer will be the new backup buffer and it
will be filled asynchronously.
Two parameters are involved for this feature:
carbon.detail.batch.size: This is an existed parameter and the default
value is 100. This parameter controls the batch size of records that
return to the client. For normal query, it is OK to keep it as 100. But
for compaction, since all the records will be operated, we suggest you
to set it to a larger value such as 32000. (32000 is the max rows for a
table page that the down stream wants).
carbon.compaction.prefetch.enable: This is a new parameter and the
default value is
false
(We may change it totrue
later). Thisparameter controls whether we will prefetch the records for compation.
By using this prefetch feature, we can enhance the performance for
compaction. More test results can be found in the PR description.
Test1
3 huawei ecs instances as workers each with 16cores and 32GB. Spark executor use 12 cores and 24GB. Using 74GB LineItem in 100GB TPCH
Test2
1 huawei RH2288 with 32 cores and 128GB. Spark executor use 30cores and 90GB. Using 7.3GB LineItem in 10GB TPCH
Note:
Prefetch: carbon.compaction.prefetch.enable
BatchSize: carbon.detail.batch.size
Be sure to do all of the following checklist to help us incorporate
your contribution quickly and easily:
Any interfaces changed?
Any backward compatibility impacted?
Document update required?
Testing done
Please provide details on
- Whether new unit test cases have been added or why no new tests are required?
- How it is tested? Please attach test report.
- Is it a performance related change? Please attach the performance test report.
- Any additional information to help reviewers in testing this change.
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.