Skip to content

Commit

Permalink
[CARBONDATA-3607] Remove batch_sort feature #3499
Browse files Browse the repository at this point in the history
Remove batch_sort feature

This closes #3499
  • Loading branch information
jackylk authored and zzcclp committed Dec 12, 2019
1 parent ca52847 commit 2c0a1e0
Show file tree
Hide file tree
Showing 47 changed files with 70 additions and 2,934 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -969,24 +969,13 @@ private CarbonCommonConstants() {
public static final String LOAD_SORT_SCOPE = "carbon.load.sort.scope";

/**
* If set to BATCH_SORT, the sorting scope is smaller and more index tree will be created,
* thus loading is faster but query maybe slower.
* If set to LOCAL_SORT, the sorting scope is bigger and one index tree per data node will be
* created, thus loading is slower but query is faster.
* If set to GLOBAL_SORT, the sorting scope is bigger and one index tree per task will be
* created, thus loading is slower but query is faster.
*/
public static final String LOAD_SORT_SCOPE_DEFAULT = "NO_SORT";

/**
* Size of batch data to keep in memory, as a thumb rule it supposed
* to be less than 45% of sort.inmemory.size.inmb otherwise it may spill intermediate data to disk
*/
@CarbonProperty
public static final String LOAD_BATCH_SORT_SIZE_INMB = "carbon.load.batch.sort.size.inmb";

public static final String LOAD_BATCH_SORT_SIZE_INMB_DEFAULT = "0";

/**
* The Number of partitions to use when shuffling data for sort. If user don't configurate or
* configurate it less than 1, it uses the number of map tasks as reduce tasks. In general, we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,6 @@ public final class CarbonLoadOptionConstants {
@CarbonProperty(dynamicConfigurable = true)
public static final String CARBON_TABLE_LOAD_SORT_SCOPE = "carbon.table.load.sort.scope.";

/**
* option to specify the batch sort size inmb
*/
@CarbonProperty(dynamicConfigurable = true)
public static final String CARBON_OPTIONS_BATCH_SORT_SIZE_INMB =
"carbon.options.batch.sort.size.inmb";

/**
* Option to enable/ disable single_pass
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ public static SortScope getSortScope(String sortScope) {
sortScope = CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT;
}
switch (sortScope.toUpperCase()) {
case "BATCH_SORT":
return SortScope.BATCH_SORT;
case "LOCAL_SORT":
return SortScope.LOCAL_SORT;
case "GLOBAL_SORT":
Expand All @@ -41,7 +39,7 @@ public static SortScope getSortScope(String sortScope) {
}

public enum SortScope {
NO_SORT, BATCH_SORT, LOCAL_SORT, GLOBAL_SORT
NO_SORT, LOCAL_SORT, GLOBAL_SORT
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -1829,12 +1829,8 @@ public static boolean isValidSortOption(String sortScope) {
return false;
}
switch (sortScope.toUpperCase()) {
case "BATCH_SORT":
return true;
case "LOCAL_SORT":
return true;
case "NO_SORT":
return true;
case "GLOBAL_SORT":
return true;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION;
import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE;
import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH;
import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB;
import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT;
import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS;
import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD;
Expand Down Expand Up @@ -172,10 +171,9 @@ private boolean validateKeyValue(String key, String value) throws InvalidConfigu
isValid = CarbonUtil.isValidSortOption(value);
if (!isValid) {
throw new InvalidConfigurationException("The sort scope " + key
+ " can have only either NO_SORT, BATCH_SORT, LOCAL_SORT or GLOBAL_SORT.");
+ " can have only either NO_SORT, LOCAL_SORT or GLOBAL_SORT.");
}
break;
case CARBON_OPTIONS_BATCH_SORT_SIZE_INMB:
case CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS:
case NUM_CORES_LOADING:
case NUM_CORES_COMPACTING:
Expand Down Expand Up @@ -238,7 +236,7 @@ private boolean validateKeyValue(String key, String value) throws InvalidConfigu
isValid = CarbonUtil.isValidSortOption(value);
if (!isValid) {
throw new InvalidConfigurationException("The sort scope " + key
+ " can have only either NO_SORT, BATCH_SORT, LOCAL_SORT or GLOBAL_SORT.");
+ " can have only either NO_SORT, LOCAL_SORT or GLOBAL_SORT.");
}
} else {
throw new InvalidConfigurationException(
Expand Down
3 changes: 1 addition & 2 deletions docs/configuration-parameters.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ This section provides the details of all the configurations required for the Car
| carbon.number.of.cores.while.loading | 2 | Number of cores to be used while loading data. This also determines the number of threads to be used to read the input files (csv) in parallel.**NOTE:** This configured value is used in every data loading step to parallelize the operations. Configuring a higher value can lead to increased early thread pre-emption by OS and there by reduce the overall performance. |
| enable.unsafe.sort | true | CarbonData supports unsafe operations of Java to avoid GC overhead for certain operations. This configuration enables to use unsafe functions in CarbonData. **NOTE:** For operations like data loading, which generates more short lived Java objects, Java GC can be a bottle neck. Using unsafe can overcome the GC overhead and improve the overall performance. |
| enable.offheap.sort | true | CarbonData supports storing data in off-heap memory for certain operations during data loading and query. This helps to avoid the Java GC and thereby improve the overall performance. This configuration enables using off-heap memory for sorting of data during data loading.**NOTE:** ***enable.unsafe.sort*** configuration needs to be configured to true for using off-heap |
| carbon.load.sort.scope | LOCAL_SORT | CarbonData can support various sorting options to match the balance between load and query performance. LOCAL_SORT:All the data given to an executor in the single load is fully sorted and written to carbondata files. Data loading performance is reduced a little as the entire data needs to be sorted in the executor. BATCH_SORT:Sorts the data in batches of configured size and writes to carbondata files. Data loading performance increases as the entire data need not be sorted. But query performance will get reduced due to false positives in block pruning and also due to more number of carbondata files written. Due to more number of carbondata files, if identified blocks > cluster parallelism, query performance and concurrency will get reduced. GLOBAL SORT:Entire data in the data load is fully sorted and written to carbondata files. Data loading performance would get reduced as the entire data needs to be sorted. But the query performance increases significantly due to very less false positives and concurrency is also improved. **NOTE 1:** This property will be taken into account only when SORT COLUMNS are specified explicitly while creating table, otherwise it is always NO SORT **NOTE 2:** When BATCH_SORT is configured, it is recommended to keep ***carbon.load.batch.sort.size.inmb*** > ***carbon.blockletgroup.size.in.mb***.|
| carbon.load.batch.sort.size.inmb | 0 | When ***carbon.load.sort.scope*** is configured as ***BATCH_SORT***, this configuration needs to be added to specify the batch size for sorting and writing to carbondata files. **NOTE:** It is recommended to keep the value around 45% of ***carbon.sort.storage.inmemory.size.inmb*** to avoid spill to disk. Also it is recommended to keep the value higher than ***carbon.blockletgroup.size.in.mb***. Refer to *carbon.load.sort.scope* for more information on sort options and the advantages/disadvantages of each option. |
| carbon.load.sort.scope | LOCAL_SORT | CarbonData can support various sorting options to match the balance between load and query performance. LOCAL_SORT:All the data given to an executor in the single load is fully sorted and written to carbondata files. Data loading performance is reduced a little as the entire data needs to be sorted in the executor. GLOBAL SORT:Entire data in the data load is fully sorted and written to carbondata files. Data loading performance would get reduced as the entire data needs to be sorted. But the query performance increases significantly due to very less false positives and concurrency is also improved. **NOTE 1:** This property will be taken into account only when SORT COLUMNS are specified explicitly while creating table, otherwise it is always NO SORT |
| carbon.global.sort.rdd.storage.level | MEMORY_ONLY | Storage level to persist dataset of RDD/dataframe when loading data with 'sort_scope'='global_sort', if user's executor has less memory, set this parameter to 'MEMORY_AND_DISK_SER' or other storage level to correspond to different environment. [See detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). |
| carbon.load.global.sort.partitions | 0 | The number of partitions to use when shuffling data for global sort. Default value 0 means to use same number of map tasks as reduce tasks. **NOTE:** In general, it is recommended to have 2-3 tasks per CPU core in your cluster. |
| carbon.sort.size | 100000 | Number of records to hold in memory to sort and write intermediate sort temp files. **NOTE:** Memory required for data loading will increase if you turn this value bigger. Besides each thread will cache this amout of records. The number of threads is configured by *carbon.number.of.cores.while.loading*. |
Expand Down
1 change: 0 additions & 1 deletion docs/ddl-of-carbondata.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ CarbonData DDL statements are documented here,which includes:

* LOCAL_SORT: data will be locally sorted (task level sorting)
* NO_SORT: default scope. It will load the data in unsorted manner, it will significantly increase load performance.
* BATCH_SORT: It increases the load performance but decreases the query performance if identified blocks > parallelism.
* GLOBAL_SORT: It increases the query performance, especially high concurrent point query.
And if you care about loading resources isolation strictly, because the system uses the spark GroupBy to sort data, the resource can be controlled by spark.

Expand Down
2 changes: 1 addition & 1 deletion docs/dml-of-carbondata.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ CarbonData DML statements are documented here,which includes:
Requirement: Sort Columns must be set while creating table. If Sort Columns is null, Sort Scope is always NO_SORT.

```
OPTIONS('SORT_SCOPE'='BATCH_SORT')
OPTIONS('SORT_SCOPE'='GLOBAL_SORT')
```

Priority order for choosing Sort Scope is:
Expand Down
2 changes: 1 addition & 1 deletion docs/sdk-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ public CarbonWriterBuilder withLoadOptions(Map<String, String> options);
* c. local_dictionary_threshold -- positive value, default is 10000
* d. local_dictionary_enable -- true / false. Default is false
* e. sort_columns -- comma separated column. "c1,c2". Default no columns are sorted.
* j. sort_scope -- "local_sort", "no_sort", "batch_sort". default value is "no_sort"
* j. sort_scope -- "local_sort", "no_sort". default value is "no_sort"
* k. long_string_columns -- comma separated string columns which are more than 32k length.
* default value is null.
* l. inverted_index -- comma separated string columns for which inverted index needs to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
"mapreduce.carbontable.empty.data.bad.record";
public static final String SKIP_EMPTY_LINE = "mapreduce.carbontable.skip.empty.line";
public static final String SORT_SCOPE = "mapreduce.carbontable.load.sort.scope";
public static final String BATCH_SORT_SIZE_INMB =
"mapreduce.carbontable.batch.sort.size.inmb";
public static final String GLOBAL_SORT_PARTITIONS =
"mapreduce.carbontable.global.sort.partitions";
public static final String BAD_RECORD_PATH = "mapreduce.carbontable.bad.record.path";
Expand Down Expand Up @@ -391,15 +389,6 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio
CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
null)));

model.setBatchSortSizeInMb(
conf.get(
BATCH_SORT_SIZE_INMB,
carbonProperty.getProperty(
CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
carbonProperty.getProperty(
CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))));

String badRecordsPath = conf.get(BAD_RECORD_PATH);
if (StringUtils.isEmpty(badRecordsPath)) {
badRecordsPath =
Expand Down
Loading

0 comments on commit 2c0a1e0

Please sign in to comment.