Skip to content

Commit

Permalink
[CARBONDATA-3008] Optimize default value for multiple temp dir
Browse files Browse the repository at this point in the history
The feature of supporting multiple temp dirs for data loading is
introduced about 1.5 year ago. This feature is to solve the single disk
hot spot problem. After one year's verification in real production
environment, the feature turns out to be effective and correct. So in
this commit, we change the default behavior of this feature -- change it
from disable to enable by default.

Moreover, we remove the parameter 'carbon.use.multiple.temp.dir' and
only keep the parameter 'carbon.use.local.dir' and enable it by default.
If the cluster is not configured with yarn-local-dirs, the java temp dir
will be used.

This closes #2824
  • Loading branch information
xuchuanyin authored and jackylk committed Oct 24, 2018
1 parent 8af7372 commit b21a6d4
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 177 deletions.
Expand Up @@ -1371,16 +1371,15 @@ public final class CarbonCommonConstants {
public static final String CARBON_SECURE_DICTIONARY_SERVER_DEFAULT = "true";

/**
* whether to use multi directories when loading data,
* the main purpose is to avoid single-disk-hot-spot
* for loading, whether to use yarn's local dir the main purpose is to avoid single disk hot spot
*/
@CarbonProperty
public static final String CARBON_USE_MULTI_TEMP_DIR = "carbon.use.multiple.temp.dir";
public static final String CARBON_LOADING_USE_YARN_LOCAL_DIR = "carbon.use.local.dir";

/**
* default value for multi temp dir
* default value for whether to enable carbon use yarn local dir
*/
public static final String CARBON_USE_MULTI_TEMP_DIR_DEFAULT = "false";
public static final String CARBON_LOADING_USE_YARN_LOCAL_DIR_DEFAULT = "true";

/**
* name of compressor to compress sort temp files
Expand Down
Expand Up @@ -1134,23 +1134,6 @@ public int getNoDeleteDeltaFilesThresholdForIUDCompaction() {
return numberOfDeltaFilesThreshold;
}

/**
* Returns whether to use multi temp dirs
* @return boolean
*/
public boolean isUseMultiTempDir() {
String usingMultiDirStr = getProperty(CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR,
CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT);
boolean validateBoolean = CarbonUtil.validateBoolean(usingMultiDirStr);
if (!validateBoolean) {
LOGGER.warn("The carbon.use.multiple.temp.dir configuration value is invalid."
+ "Configured value: \"" + usingMultiDirStr + "\"."
+ "Data Load will not use multiple temp directories.");
usingMultiDirStr = CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT;
}
return usingMultiDirStr.equalsIgnoreCase("true");
}

/**
* Return valid storage level
* @return String
Expand Down
3 changes: 2 additions & 1 deletion docs/configuration-parameters.md
Expand Up @@ -83,8 +83,9 @@ This section provides the details of all the configurations required for the Car
| carbon.enable.calculate.size | true | **For Load Operation**: Enabling this property will let carbondata calculate the size of the carbon data file (.carbondata) and the carbon index file (.carbonindex) for each load and update the table status file. **For Describe Formatted**: Enabling this property will let carbondata calculate the total size of the carbon data files and the carbon index files for the each table and display it in describe formatted command. **NOTE:** This is useful to determine the overall size of the carbondata table and also get an idea of how the table is growing in order to take up other backup strategy decisions. |
| carbon.cutOffTimestamp | (none) | CarbonData has capability to generate the Dictionary values for the timestamp columns from the data itself without the need to store the computed dictionary values. This configuration sets the start date for calculating the timestamp. Java counts the number of milliseconds from start of "1970-01-01 00:00:00". This property is used to customize the start of position. For example "2000-01-01 00:00:00". **NOTE:** The date must be in the form ***carbon.timestamp.format***. CarbonData supports storing data for upto 68 years.For example, if the cut-off time is 1970-01-01 05:30:00, then data upto 2038-01-01 05:30:00 will be supported by CarbonData. |
| carbon.timegranularity | SECOND | The configuration is used to specify the data granularity level such as DAY, HOUR, MINUTE, or SECOND. This helps to store more than 68 years of data into CarbonData. |
| carbon.use.local.dir | false | CarbonData,during data loading, writes files to local temp directories before copying the files to HDFS. This configuration is used to specify whether CarbonData can write locally to tmp directory of the container or to the YARN application directory. |
| carbon.use.local.dir | true | CarbonData,during data loading, writes files to local temp directories before copying the files to HDFS. This configuration is used to specify whether CarbonData can write locally to tmp directory of the container or to the YARN application directory. |
| carbon.use.multiple.temp.dir | false | When multiple disks are present in the system, YARN is generally configured with multiple disks to be used as temp directories for managing the containers. This configuration specifies whether to use multiple YARN local directories during data loading for disk IO load balancing.Enable ***carbon.use.local.dir*** for this configuration to take effect. **NOTE:** Data Loading is an IO intensive operation whose performance can be limited by the disk IO threshold, particularly during multi table concurrent data load.Configuring this parameter, balances the disk IO across multiple disks there by improving the over all load performance. |
| carbon.sort.temp.compressor | (none) | CarbonData writes every ***carbon.sort.size*** number of records to intermediate temp files during data loading to ensure memory footprint is within limits. These temporary files can be compressed and written in order to save the storage space. This configuration specifies the name of compressor to be used to compress the intermediate sort temp files during sort procedure in data loading. The valid values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD' and empty. By default, empty means that Carbondata will not compress the sort temp files. **NOTE:** Compressor will be useful if you encounter disk bottleneck.Since the data needs to be compressed and decompressed,it involves additional CPU cycles,but is compensated by the high IO throughput due to less data to be written or read from the disks. |
| carbon.load.skewedDataOptimization.enabled | false | During data loading,CarbonData would divide the number of blocks equally so as to ensure all executors process same number of blocks. This mechanism satisfies most of the scenarios and ensures maximum parallel processing for optimal data loading performance.In some business scenarios, there might be scenarios where the size of blocks vary significantly and hence some executors would have to do more work if they get blocks containing more data. This configuration enables size based block allocation strategy for data loading. When loading, carbondata will use file size based block allocation strategy for task distribution. It will make sure that all the executors process the same size of data.**NOTE:** This configuration is useful if the size of your input data files varies widely, say 1MB to 1GB.For this configuration to work effectively,knowing the data pattern and size is important and necessary. |
| carbon.load.min.size.enabled | false | During Data Loading, CarbonData would divide the number of files among the available executors to parallelize the loading operation. When the input data files are very small, this action causes to generate many small carbondata files. This configuration determines whether to enable node minumun input data size allocation strategy for data loading.It will make sure that the node load the minimum amount of data there by reducing number of carbondata files.**NOTE:** This configuration is useful if the size of the input data files are very small, like 1MB to 256MB. Refer to the load option ***load_min_size_inmb*** to configure the minimum size to be considered for splitting files among executors. |
| enable.data.loading.statistics | false | CarbonData has extensive logging which would be useful for debugging issues related to performance or hard to locate issues. This configuration when made ***true*** would log additional data loading statistics information to more accurately locate the issues being debugged. **NOTE:** Enabling this would log more debug information to log files, there by increasing the log files size significantly in short span of time.It is advised to configure the log files size, retention of log files parameters in log4j properties appropriately. Also extensive logging is an increased IO operation and hence over all data loading performance might get reduced. Therefore it is recommended to enable this configuration only for the duration of debugging. |
Expand Down
1 change: 0 additions & 1 deletion docs/performance-tuning.md
Expand Up @@ -170,7 +170,6 @@
| spark.executor.instances/spark.executor.cores/spark.executor.memory | spark/conf/spark-defaults.conf | Querying | The number of executors, CPU cores, and memory used for CarbonData query. | In the bank scenario, we provide the 4 CPUs cores and 15 GB for each executor which can get good performance. This 2 value does not mean more the better. It needs to be configured properly in case of limited resources. For example, In the bank scenario, it has enough CPU 32 cores each node but less memory 64 GB each node. So we cannot give more CPU but less memory. For example, when 4 cores and 12GB for each executor. It sometimes happens GC during the query which impact the query performance very much from the 3 second to more than 15 seconds. In this scenario need to increase the memory or decrease the CPU cores. |
| carbon.detail.batch.size | spark/carbonlib/carbon.properties | Querying | The buffer size to store records, returned from the block scan. | In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12000. |
| carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | Whether use YARN local directories for multi-table load disk load balance | If this is set it to true CarbonData will use YARN local directories for multi-table load disk load balance, that will improve the data load performance. |
| carbon.use.multiple.temp.dir | spark/carbonlib/carbon.properties | Data loading | Whether to use multiple YARN local directories during table data loading for disk load balance | After enabling 'carbon.use.local.dir', if this is set to true, CarbonData will use all YARN local directories during data load for disk load balance, that will improve the data load performance. Please enable this property when you encounter disk hotspot problem during data loading. |
| carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data loading | Specify the name of compressor to compress the intermediate sort temporary files during sort procedure in data loading. | The optional values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD', and empty. By default, empty means that Carbondata will not compress the sort temp files. This parameter will be useful if you encounter disk bottleneck. |
| carbon.load.skewedDataOptimization.enabled | spark/carbonlib/carbon.properties | Data loading | Whether to enable size based block allocation strategy for data loading. | When loading, carbondata will use file size based block allocation strategy for task distribution. It will make sure that all the executors process the same size of data -- It's useful if the size of your input data files varies widely, say 1MB to 1GB. |
| carbon.load.min.size.enabled | spark/carbonlib/carbon.properties | Data loading | Whether to enable node minumun input data size allocation strategy for data loading.| When loading, carbondata will use node minumun input data size allocation strategy for task distribution. It will make sure the nodes load the minimum amount of data -- It's useful if the size of your input data files very small, say 1MB to 256MB,Avoid generating a large number of small files. |
Expand Down
2 changes: 0 additions & 2 deletions docs/usecases.md
Expand Up @@ -72,7 +72,6 @@ Apart from these, the following CarbonData configuration was suggested to be con
| Data Loading | table_blocksize | 256 | To efficiently schedule multiple tasks during query |
| Data Loading | carbon.sort.intermediate.files.limit | 100 | Increased to 100 as number of cores are more.Can perform merging in backgorund.If less number of files to merge, sort threads would be idle |
| Data Loading | carbon.use.local.dir | TRUE | yarn application directory will be usually on a single disk.YARN would be configured with multiple disks to be used as temp or to assign randomly to applications. Using the yarn temp directory will allow carbon to use multiple disks and improve IO performance |
| Data Loading | carbon.use.multiple.temp.dir | TRUE | multiple disks to write sort files will lead to better IO and reduce the IO bottleneck |
| Compaction | carbon.compaction.level.threshold | 6,6 | Since frequent small loads, compacting more segments will give better query results |
| Compaction | carbon.enable.auto.load.merge | true | Since data loading is small,auto compacting keeps the number of segments less and also compaction can complete in time |
| Compaction | carbon.number.of.cores.while.compacting | 4 | Higher number of cores can improve the compaction speed |
Expand Down Expand Up @@ -127,7 +126,6 @@ Use all columns are no-dictionary as the cardinality is high.
| Data Loading | table_blocksize | 512 | To efficiently schedule multiple tasks during query. This size depends on data scenario.If data is such that the filters would select less number of blocklets to scan, keeping higher number works well.If the number blocklets to scan is more, better to reduce the size as more tasks can be scheduled in parallel. |
| Data Loading | carbon.sort.intermediate.files.limit | 100 | Increased to 100 as number of cores are more.Can perform merging in backgorund.If less number of files to merge, sort threads would be idle |
| Data Loading | carbon.use.local.dir | TRUE | yarn application directory will be usually on a single disk.YARN would be configured with multiple disks to be used as temp or to assign randomly to applications. Using the yarn temp directory will allow carbon to use multiple disks and improve IO performance |
| Data Loading | carbon.use.multiple.temp.dir | TRUE | multiple disks to write sort files will lead to better IO and reduce the IO bottleneck |
| Data Loading | sort.inmemory.size.in.mb | 92160 | Memory allocated to do inmemory sorting. When more memory is available in the node, configuring this will retain more sort blocks in memory so that the merge sort is faster due to no/very less IO |
| Compaction | carbon.major.compaction.size | 921600 | Sum of several loads to combine into single segment |
| Compaction | carbon.number.of.cores.while.compacting | 12 | Higher number of cores can improve the compaction speed.Data size is huge.Compaction need to use more threads to speed up the process |
Expand Down
Expand Up @@ -65,15 +65,14 @@ class TestLoadDataWithYarnLocalDirs extends QueryTest with BeforeAndAfterAll {
}

private def enableMultipleDir = {
CarbonProperties.getInstance().addProperty("carbon.use.local.dir", "true")
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR, "true")
CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR, "true")
}

private def disableMultipleDir = {
CarbonProperties.getInstance().addProperty("carbon.use.local.dir", "false")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR,
CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT)
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR,
CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR_DEFAULT)
}

test("test carbon table data loading for multiple temp dir") {
Expand Down
Expand Up @@ -17,8 +17,6 @@

package org.apache.carbondata.spark.load

import scala.util.Random

import com.univocity.parsers.common.TextParsingException
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Accumulator, SparkEnv, TaskContext}
Expand All @@ -30,8 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations}
import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
Expand All @@ -43,7 +40,7 @@ import org.apache.carbondata.processing.sort.sortdata.SortParameters
import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
import org.apache.carbondata.spark.util.CommonUtil

object DataLoadProcessorStepOnSpark {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
Expand Down Expand Up @@ -238,7 +235,7 @@ object DataLoadProcessorStepOnSpark {
var dataWriter: DataWriterProcessorStepImpl = null
try {
model = modelBroadcast.value.getCopyWithTaskNo(index.toString)
val storeLocation = Array(getTempStoreLocation(index))
val storeLocation = CommonUtil.getTempStoreLocations(index.toString)
val conf = DataLoadProcessBuilder.createConfiguration(model, storeLocation)

tableName = model.getTableName
Expand Down Expand Up @@ -291,27 +288,6 @@ object DataLoadProcessorStepOnSpark {
}
}

private def getTempStoreLocation(index: Int): String = {
var storeLocation = ""
// this property is used to determine whether temp location for carbon is inside
// container temp dir or is yarn application directory.
val carbonUseLocalDir = CarbonProperties.getInstance()
.getProperty("carbon.use.local.dir", "false")
if (carbonUseLocalDir.equalsIgnoreCase("true")) {
val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
if (null != storeLocations && storeLocations.nonEmpty) {
storeLocation = storeLocations(Random.nextInt(storeLocations.length))
}
if (storeLocation == null) {
storeLocation = System.getProperty("java.io.tmpdir")
}
} else {
storeLocation = System.getProperty("java.io.tmpdir")
}
storeLocation = storeLocation + '/' + System.nanoTime() + '_' + index
storeLocation
}

private def wrapException(e: Throwable, model: CarbonLoadModel): Unit = {
e match {
case e: CarbonDataLoadingException => throw e
Expand Down

0 comments on commit b21a6d4

Please sign in to comment.