Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-3219] Support range partition the input data for local_sort/global sort data loading #2971

Closed
wants to merge 2 commits into from

Conversation

QiangCai
Copy link
Contributor

@QiangCai QiangCai commented Dec 3, 2018

For global_sort/local_sort table, load data command add RANGE_COLUMN option

load data inpath '<path>' into table <table name> 
options('RANGE_COLUMN'='<a column>')
  1. when we know the total size of input data, we can calculate the number of the partitions.
load data inpath '<path>' into table <table name> 
options('RANGE_COLUMN'='<a column>', 'global_sort_partitions'='10')
  1. when we don't know the total size of the input data, we can give the size of each partition.
load data inpath '<path>' into table <table name> 
options('RANGE_COLUMN'='<a column>', 'scale_factor'='10')

it will calcute the number of the partitions as follows.

splitSize =  Math.max(blocklet_size, (block_size - blocklet_size)) * scale_factor
numPartitions = Math.ceil(total size / splitSize)

Limitation:

  1. not support insert into, support only load data command,
  2. not support multiple range columns, support only one range column
  3. exists data skew

@@ -305,4 +307,107 @@ object DataLoadProcessorStepOnSpark {
e)
}
}

def sortAdnWriteFunc(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change the method name from sortAdnWriteFunc to sortAndWriteFunc

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1617/

@CarbonDataQA
Copy link

Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1828/

@CarbonDataQA
Copy link

Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9877/

@CarbonDataQA
Copy link

Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1622/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9882/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1833/

dataFrame: Option[DataFrame],
model: CarbonLoadModel,
hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
val originRDD = if (dataFrame.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method has some of the same code as loadDataUsingGlobalSort, I recommend refactoring these two methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better, but after refactoring, the code logic is not clear. Now, these two flows already reuse the process steps.

// here it assumes the compression ratio of CarbonData is about 33%,
// so it multiply by 3 to get the split size of CSV files.
val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) * 3
numPartitions = Math.ceil(totalSize / splitSize).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If insert using dataframe, I think totalSize will be 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, insert will use global sort

@@ -188,6 +188,8 @@
optionsFinal.put(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
Maps.getOrDefault(options, CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT));

optionsFinal.put("range_column", Maps.getOrDefault(options, "range_column", null));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does makeCreateTableString of CarbonDataFrameWriter need add "range_column" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now it only try to support load data command

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1627/

@CarbonDataQA
Copy link

Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1838/

@CarbonDataQA
Copy link

Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9887/

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1630/

@CarbonDataQA
Copy link

Build Success with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9890/

@CarbonDataQA
Copy link

Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1841/

@QiangCai QiangCai changed the title [TEST] Test loading performance of range_sort [TEST] Test loading performance with range_column Dec 5, 2018
@CarbonDataQA
Copy link

Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1668/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9928/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1880/

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1680/

@CarbonDataQA
Copy link

Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1892/

@CarbonDataQA
Copy link

Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9940/

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1698/

@CarbonDataQA
Copy link

Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/1908/

@CarbonDataQA
Copy link

Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/9958/

@CarbonDataQA
Copy link

Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/1943/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10196/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2152/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10396/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2366/

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2153/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10407/

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2157/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10411/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2370/

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2158/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2371/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10412/

@QiangCai
Copy link
Contributor Author

QiangCai commented Jan 4, 2019

@ravipesala
After the compaction, it will become local_sort.
In my opinion, we can use Range_column to partition the input data.
So it can reduce the scope of sorting during data loading to improve data loading performance.
In some case, it also can improve the query performance (like Global_sort).

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2159/

@CarbonDataQA
Copy link

Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2372/

@CarbonDataQA
Copy link

Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10413/

@QiangCai
Copy link
Contributor Author

QiangCai commented Jan 4, 2019

@ravipesala @kumarvishal09
please review again.

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2164/

@CarbonDataQA
Copy link

Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/2377/

@CarbonDataQA
Copy link

Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/10418/

@ravipesala
Copy link
Contributor

@QiangCai My question how the user can benefit if he chooses a different range column for each load. I feel range column should be at the table level not at the load level.
And regarding compaction, yes currently after compaction it becomes local sort but there is a way we can support range column compaction like how we do compaction for partitions. This work can be done in future. But if you allow the user to choose range column at each load level then this type of compaction cannot be done.

@QiangCai
Copy link
Contributor Author

QiangCai commented Jan 7, 2019

@ravipesala
I agree with you to add it to the table properties.
Even if it becomes the table property, maybe the user also can change it. right?
Range_column is different from the partition table.
For range_column, the range boundaries are different for all segments. (Global_SORT also)
For the partition table, the range boundaries are the same for all segments.

@ravipesala
Copy link
Contributor

@QiangCai we should restrict changing that property from table properties.
I am just explaining about how we can do the compaction on range column since there are similarities with partitioning I mentioned it here.
I feel range boundaries can be recalculated during the compaction using min/max of range column and go for the merge sort.

@QiangCai
Copy link
Contributor Author

QiangCai commented Jan 8, 2019

@ravipesala
In my opinion, it is unnecessary to restrict changing.
The users will keep the range_column as unchanged as possible.
So I only add this option into loading command.

@ravipesala
Copy link
Contributor

LGTM @QiangCai I feel it is better to keep in tableproprties as it is not supposed changed for each load. We can further discuss and raise another PR if needed, I am merging this now. Thanks for working on it.

@asfgit asfgit closed this in 45951c7 Jan 8, 2019
asfgit pushed a commit that referenced this pull request Jan 21, 2019
…rt/global sort data loading

For global_sort/local_sort table, load data command add RANGE_COLUMN option

load data inpath '<path>' into table <table name>
options('RANGE_COLUMN'='<a column>')
when we know the total size of input data, we can calculate the number of the partitions.
load data inpath '<path>' into table <table name>
options('RANGE_COLUMN'='<a column>', 'global_sort_partitions'='10')
when we don't know the total size of the input data, we can give the size of each partition.
load data inpath '<path>' into table <table name>
options('RANGE_COLUMN'='<a column>', 'scale_factor'='10')
it will calcute the number of the partitions as follows.

splitSize =  Math.max(blocklet_size, (block_size - blocklet_size)) * scale_factor
numPartitions = Math.ceil(total size / splitSize)
Limitation:

not support insert into, support only load data command,
not support multiple range columns, support only one range column
exists data skew

This closes #2971
qiuchenjian pushed a commit to qiuchenjian/carbondata that referenced this pull request Jun 14, 2019
…rt/global sort data loading

For global_sort/local_sort table, load data command add RANGE_COLUMN option

load data inpath '<path>' into table <table name>
options('RANGE_COLUMN'='<a column>')
when we know the total size of input data, we can calculate the number of the partitions.
load data inpath '<path>' into table <table name>
options('RANGE_COLUMN'='<a column>', 'global_sort_partitions'='10')
when we don't know the total size of the input data, we can give the size of each partition.
load data inpath '<path>' into table <table name>
options('RANGE_COLUMN'='<a column>', 'scale_factor'='10')
it will calcute the number of the partitions as follows.

splitSize =  Math.max(blocklet_size, (block_size - blocklet_size)) * scale_factor
numPartitions = Math.ceil(total size / splitSize)
Limitation:

not support insert into, support only load data command,
not support multiple range columns, support only one range column
exists data skew

This closes apache#2971
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants