Skip to content

Commit

Permalink
[CARBONDATA-3041]Optimize load minimum size strategy for data loading
Browse files Browse the repository at this point in the history
this PR modifies the following points:
  1. Delete system property carbon.load.min.size.enabled,modified this property load_min_size_inmb to table property,and This property can also be specified in the load option.
  2. Support to alter table xxx set TBLPROPERTIES('load_min_size_inmb '='256')
  3. If creating a table has this property load_min_size_inmb,Display this property via the desc formatted command.

This closes #2864
  • Loading branch information
ndwangsen authored and xuchuanyin committed Oct 29, 2018
1 parent db5da53 commit e2c517e
Show file tree
Hide file tree
Showing 14 changed files with 555 additions and 465 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -980,8 +980,7 @@ private CarbonCommonConstants() {
*/
@CarbonProperty
public static final String CARBON_LOAD_MIN_SIZE_INMB = "load_min_size_inmb";
public static final String CARBON_LOAD_MIN_NODE_SIZE_INMB_DEFAULT = "256";

public static final String CARBON_LOAD_MIN_SIZE_INMB_DEFAULT = "0";
/**
* the node minimum load data default value
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,4 @@ public final class CarbonLoadOptionConstants {
public static final String CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE
= "carbon.load.sortmemory.spill.percentage";
public static final String CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE_DEFAULT = "0";

/**
* if loading data is too small, the original loading method will produce many small files.
* enable set the node load minimum amount of data,avoid producing many small files.
* This option is especially useful when you encounter a lot of small amounts of data.
*/
@CarbonProperty
public static final String ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE
= "carbon.load.min.size.enabled";
public static final String ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE_DEFAULT = "false";
}
Original file line number Diff line number Diff line change
Expand Up @@ -1248,17 +1248,6 @@ public boolean isLoadSkewedDataOptimizationEnabled() {
return skewedEnabled.equalsIgnoreCase("true");
}

/**
* whether optimization for the node loads the minimum amount of data is enabled
* @return true, if enabled; false for not enabled.
*/
public boolean isLoadMinSizeOptimizationEnabled() {
String loadMinSize = getProperty(
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE,
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE_DEFAULT);
return loadMinSize.equalsIgnoreCase("true");
}

/**
* returns true if carbon property
* @param key
Expand Down
1 change: 0 additions & 1 deletion docs/configuration-parameters.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ This section provides the details of all the configurations required for the Car
| carbon.use.local.dir | true | CarbonData,during data loading, writes files to local temp directories before copying the files to HDFS. This configuration is used to specify whether CarbonData can write locally to tmp directory of the container or to the YARN application directory. |
| carbon.sort.temp.compressor | (none) | CarbonData writes every ***carbon.sort.size*** number of records to intermediate temp files during data loading to ensure memory footprint is within limits. These temporary files can be compressed and written in order to save the storage space. This configuration specifies the name of compressor to be used to compress the intermediate sort temp files during sort procedure in data loading. The valid values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD' and empty. By default, empty means that Carbondata will not compress the sort temp files. **NOTE:** Compressor will be useful if you encounter disk bottleneck.Since the data needs to be compressed and decompressed,it involves additional CPU cycles,but is compensated by the high IO throughput due to less data to be written or read from the disks. |
| carbon.load.skewedDataOptimization.enabled | false | During data loading,CarbonData would divide the number of blocks equally so as to ensure all executors process same number of blocks. This mechanism satisfies most of the scenarios and ensures maximum parallel processing for optimal data loading performance.In some business scenarios, there might be scenarios where the size of blocks vary significantly and hence some executors would have to do more work if they get blocks containing more data. This configuration enables size based block allocation strategy for data loading. When loading, carbondata will use file size based block allocation strategy for task distribution. It will make sure that all the executors process the same size of data.**NOTE:** This configuration is useful if the size of your input data files varies widely, say 1MB to 1GB.For this configuration to work effectively,knowing the data pattern and size is important and necessary. |
| carbon.load.min.size.enabled | false | During Data Loading, CarbonData would divide the number of files among the available executors to parallelize the loading operation. When the input data files are very small, this action causes to generate many small carbondata files. This configuration determines whether to enable node minumun input data size allocation strategy for data loading.It will make sure that the node load the minimum amount of data there by reducing number of carbondata files.**NOTE:** This configuration is useful if the size of the input data files are very small, like 1MB to 256MB. Refer to the load option ***load_min_size_inmb*** to configure the minimum size to be considered for splitting files among executors. |
| enable.data.loading.statistics | false | CarbonData has extensive logging which would be useful for debugging issues related to performance or hard to locate issues. This configuration when made ***true*** would log additional data loading statistics information to more accurately locate the issues being debugged. **NOTE:** Enabling this would log more debug information to log files, there by increasing the log files size significantly in short span of time.It is advised to configure the log files size, retention of log files parameters in log4j properties appropriately. Also extensive logging is an increased IO operation and hence over all data loading performance might get reduced. Therefore it is recommended to enable this configuration only for the duration of debugging. |
| carbon.dictionary.chunk.size | 10000 | CarbonData generates dictionary keys and writes them to separate dictionary file during data loading. To optimize the IO, this configuration determines the number of dictionary keys to be persisted to dictionary file at a time. **NOTE:** Writing to file also serves as a commit point to the dictionary generated.Increasing more values in memory causes more data loss during system or application failure.It is advised to alter this configuration judiciously. |
| dictionary.worker.threads | 1 | CarbonData supports Optimized data loading by relying on a dictionary server. Dictionary server helps to maintain dictionary values independent of the data loading and there by avoids reading the same input data multiples times. This configuration determines the number of concurrent dictionary generation or request that needs to be served by the dictionary server. **NOTE:** This configuration takes effect when ***carbon.options.single.pass*** is configured as true.Please refer to *carbon.options.single.pass*to understand how dictionary server optimizes data loading. |
Expand Down
19 changes: 17 additions & 2 deletions docs/ddl-of-carbondata.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ CarbonData DDL statements are documented here,which includes:
* [Hive/Parquet folder Structure](#support-flat-folder-same-as-hiveparquet)
* [Extra Long String columns](#string-longer-than-32000-characters)
* [Compression for Table](#compression-for-table)
* [Bad Records Path](#bad-records-path)
* [Bad Records Path](#bad-records-path)
* [Load Minimum Input File Size](#load-minimum-data-size)

* [CREATE TABLE AS SELECT](#create-table-as-select)
* [CREATE EXTERNAL TABLE](#create-external-table)
* [External Table on Transactional table location](#create-external-table-on-managed-table-data-location)
Expand Down Expand Up @@ -104,6 +106,7 @@ CarbonData DDL statements are documented here,which includes:
| [LONG_STRING_COLUMNS](#string-longer-than-32000-characters) | Columns which are greater than 32K characters |
| [BUCKETNUMBER](#bucketing) | Number of buckets to be created |
| [BUCKETCOLUMNS](#bucketing) | Columns which are to be placed in buckets |
| [LOAD_MIN_SIZE_INMB](#load-minimum-data-size) | Minimum input data size per node for data loading |

Following are the guidelines for TBLPROPERTIES, CarbonData's additional table options can be set via carbon.properties.

Expand Down Expand Up @@ -474,7 +477,19 @@ CarbonData DDL statements are documented here,which includes:
be later viewed in table description for reference.

```
TBLPROPERTIES('BAD_RECORD_PATH'='/opt/badrecords'')
TBLPROPERTIES('BAD_RECORD_PATH'='/opt/badrecords')
```
- ##### Load minimum data size
This property indicates the minimum input data size per node for data loading.
By default it is not enabled. Setting a non-zero integer value will enable this feature.
This property is useful if you have a large cluster and only want a small portion of the nodes to process data loading.
For example, if you have a cluster with 10 nodes and the input data is about 1GB. Without this property, each node will process about 100MB input data and result in at least 10 data files. With this property configured with 512, only 2 nodes will be chosen to process the input data, each with about 512MB input and result in about 2 or 4 files based on the compress ratio.
Moreover, this property can also be specified in the load option.
Notice that once you enable this feature, for load balance, carbondata will ignore the data locality while assigning input data to nodes, this will cause more network traffic.

```
TBLPROPERTIES('LOAD_MIN_SIZE_INMB'='256')
```

## CREATE TABLE AS SELECT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark.sql.Row
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest

Expand All @@ -37,6 +36,9 @@ class TestTableLoadMinSize extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS table_loadminsize1")
sql("DROP TABLE IF EXISTS table_loadminsize2")
sql("DROP TABLE IF EXISTS table_loadminsize3")
sql("DROP TABLE IF EXISTS table_loadminsize4")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
}

test("Value test: set table load min size in not int value") {
Expand All @@ -49,12 +51,6 @@ class TestTableLoadMinSize extends QueryTest with BeforeAndAfterAll {
TBLPROPERTIES('table_blocksize'='128 MB')
""")

CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")

CarbonProperties.getInstance()
.addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE, "true")

sql(s"""
LOAD DATA LOCAL INPATH '$testData1' into table table_loadminsize1 OPTIONS('load_min_size_inmb'='256 MB')
""")
Expand All @@ -81,12 +77,6 @@ class TestTableLoadMinSize extends QueryTest with BeforeAndAfterAll {
TBLPROPERTIES('table_blocksize'='128 MB')
""")

CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")

CarbonProperties.getInstance()
.addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE, "true")

sql(s"""
LOAD DATA LOCAL INPATH '$testData1' into table table_loadminsize2 OPTIONS('load_min_size_inmb'='256')
""")
Expand Down Expand Up @@ -114,12 +104,6 @@ class TestTableLoadMinSize extends QueryTest with BeforeAndAfterAll {
TBLPROPERTIES('table_blocksize'='128 MB')
""")

CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")

CarbonProperties.getInstance()
.addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE, "true")

sql(s"""
LOAD DATA LOCAL INPATH '$testData1' into table table_loadminsize3
""")
Expand All @@ -136,14 +120,50 @@ class TestTableLoadMinSize extends QueryTest with BeforeAndAfterAll {

}

test("Function test:: set load_min_size_inmb to table property") {

sql(
"""
CREATE TABLE IF NOT EXISTS table_loadminsize4
(ID Int, date Timestamp, country String,
name String, phonetype String, serialname String, salary Int)
STORED BY 'org.apache.carbondata.format'
TBLPROPERTIES('table_blocksize'='128 MB', 'load_min_size_inmb'='256')
""")

sql(
"""
desc formatted table_loadminsize4
""").show(false)

sql(
"""
alter table table_loadminsize4 set TBLPROPERTIES('load_min_size_inmb'='512')
""").show(false)

sql(s"""
LOAD DATA LOCAL INPATH '$testData1' into table table_loadminsize4
""")

checkAnswer(
sql("""
SELECT country, count(salary) AS amount
FROM table_loadminsize4
WHERE country IN ('china','france')
GROUP BY country
"""),
Seq(Row("china", 96), Row("france", 1))
)

}


override def afterAll {
sql("DROP TABLE IF EXISTS table_loadminsize1")
sql("DROP TABLE IF EXISTS table_loadminsize2")
sql("DROP TABLE IF EXISTS table_loadminsize3")
sql("DROP TABLE IF EXISTS table_loadminsize4")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
CarbonProperties.getInstance()
.addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE, CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_NODE_DATA_MIN_SIZE_DEFAULT)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -833,4 +833,32 @@ object CommonUtil {
})
}
}

/**
* This method will validate single node minimum load data volume of table specified by the user
*
* @param tableProperties table property specified by user
* @param propertyName property name
*/
def validateLoadMinSize(tableProperties: Map[String, String], propertyName: String): Unit = {
var size: Integer = 0
if (tableProperties.get(propertyName).isDefined) {
val loadSizeStr: String =
parsePropertyValueStringInMB(tableProperties(propertyName))
try {
size = Integer.parseInt(loadSizeStr)
} catch {
case e: NumberFormatException =>
throw new MalformedCarbonCommandException(s"Invalid $propertyName value found: " +
s"$loadSizeStr, only int value greater " +
s"than 0 is supported.")
}
// if the value is negative, set the value is 0
if(size > 0) {
tableProperties.put(propertyName, loadSizeStr)
} else {
tableProperties.put(propertyName, CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,9 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
CommonUtil.validateTableLevelCompactionProperties(tableProperties)
// validate flat folder property.
CommonUtil.validateFlatFolder(tableProperties)
// validate load_min_size_inmb property
CommonUtil.validateLoadMinSize(tableProperties,
CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB)

TableModel(
ifNotExistPresent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1171,21 +1171,27 @@ object CarbonDataRDDFactory {
.ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
val skewedDataOptimization = CarbonProperties.getInstance()
.isLoadSkewedDataOptimizationEnabled()
val loadMinSizeOptimization = CarbonProperties.getInstance()
.isLoadMinSizeOptimizationEnabled()
// get user ddl input the node loads the smallest amount of data
val expectedMinSizePerNode = carbonLoadModel.getLoadMinSize()
val blockAssignStrategy = if (skewedDataOptimization) {
CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_SIZE_FIRST
} else if (loadMinSizeOptimization) {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
var loadMinSize = carbonLoadModel.getLoadMinSize()
if (loadMinSize.equalsIgnoreCase(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT)) {
loadMinSize = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT)
}

val blockAssignStrategy = if (!loadMinSize.equalsIgnoreCase(
CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT)) {
CarbonLoaderUtil.BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST
} else if (skewedDataOptimization) {
CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_SIZE_FIRST
} else {
CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST
}
LOGGER.info(s"Allocating block to nodes using strategy: $blockAssignStrategy")

val nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(blockList.toSeq.asJava, -1,
activeNodes.toList.asJava, blockAssignStrategy, expectedMinSizePerNode).asScala.toSeq
activeNodes.toList.asJava, blockAssignStrategy, loadMinSize).asScala.toSeq
val timeElapsed: Long = System.currentTimeMillis - startTime
LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ private[sql] case class CarbonDescribeFormattedCommand(
tblProps.get(CarbonCommonConstants.LONG_STRING_COLUMNS), ""))
}

// load min size info
if (tblProps.containsKey(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB)) {
results ++= Seq(("Minimum input data size per node for data loading",
tblProps.get(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB), ""))
}

var isLocalDictEnabled = tblProps.asScala
.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
if (isLocalDictEnabled.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ object AlterTableUtil {
// validate the local dictionary properties
validateLocalDictionaryProperties(lowerCasePropertiesMap, tblPropertiesMap, carbonTable)

// validate the load min size properties
validateLoadMinSizeProperties(carbonTable, lowerCasePropertiesMap)

// below map will be used for cache invalidation. As tblProperties map is getting modified
// in the next few steps the original map need to be retained for any decision making
val existingTablePropertiesMap = mutable.Map(tblPropertiesMap.toSeq: _*)
Expand Down Expand Up @@ -399,7 +402,8 @@ object AlterTableUtil {
"LOCAL_DICTIONARY_ENABLE",
"LOCAL_DICTIONARY_THRESHOLD",
"LOCAL_DICTIONARY_INCLUDE",
"LOCAL_DICTIONARY_EXCLUDE")
"LOCAL_DICTIONARY_EXCLUDE",
"LOAD_MIN_SIZE_INMB")
supportedOptions.contains(propKey.toUpperCase)
}

Expand Down Expand Up @@ -748,4 +752,18 @@ object AlterTableUtil {
false
}
}

private def validateLoadMinSizeProperties(carbonTable: CarbonTable,
propertiesMap: mutable.Map[String, String]): Unit = {
// validate load min size property
if (propertiesMap.get(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB).isDefined) {
// load min size is not allowed for child tables and dataMaps
if (carbonTable.isChildDataMap) {
throw new MalformedCarbonCommandException(s"Table property ${
CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB} is not allowed for child datamaps")
}
CommonUtil.validateLoadMinSize(propertiesMap,
CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB)
}
}
}
Loading

0 comments on commit e2c517e

Please sign in to comment.