From 8d600959a796ce4cff2319680c1497371d9ba302 Mon Sep 17 00:00:00 2001 From: Zhang Zhichao <441586683@qq.com> Date: Tue, 8 Aug 2017 17:42:40 +0800 Subject: [PATCH 1/4] [CARBONDATA-1366]Change rdd storage level to 'MEMORY_AND_DISK_SER' to improve loading performance when sort_scope=global_sort Change rdd storage level to 'MEMORY_AND_DISK_SER' to improve loading performance when sort_scope=global_sort --- .../carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index 534ab8877bf..01b40ea2935 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -112,7 +112,7 @@ object DataLoadProcessBuilderOnSpark { // Because if the number of partitions greater than 1, there will be action operator(sample) in // sortBy operator. So here we cache the rdd to avoid do input and convert again. if (numPartitions > 1) { - convertRDD.persist(StorageLevel.MEMORY_AND_DISK) + convertRDD.persist(StorageLevel.MEMORY_AND_DISK_SER) } import scala.reflect.classTag From 12ae0817afc41ef9b5c76d80e1f13b14ca00950a Mon Sep 17 00:00:00 2001 From: Zhang Zhichao <441586683@qq.com> Date: Wed, 9 Aug 2017 14:29:12 +0800 Subject: [PATCH 2/4] add an option 'carbon.global.sort.rdd.storage.level' the default value of new option 'carbon.global.sort.rdd.storage.level' is 'MEMORY_ONLY' --- .../core/constants/CarbonCommonConstants.java | 11 ++++++ .../core/util/CarbonProperties.java | 18 +++++++++ .../carbondata/core/util/CarbonUtil.java | 39 +++++++++++++++++++ .../load/DataLoadProcessBuilderOnSpark.scala | 4 +- 4 files changed, 71 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index c9f9373b5a6..71bd6beabad 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1304,6 +1304,17 @@ public final class CarbonCommonConstants { */ public static final String CARBON_USE_MULTI_TEMP_DIR_DEFAULT = "false"; + /** + * Which storage level to persist rdd when sort_scope=global_sort + */ + @CarbonProperty + public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL = "carbon.global.sort.rdd.storage.level"; + + /** + * default value for carbon.global.sort.rdd.storage.level + */ + public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT = "MEMORY_ONLY"; + private CarbonCommonConstants() { } } diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 12776de3ef7..ee3f548ce36 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -875,6 +875,24 @@ public boolean isUseMultiTempDir() { return usingMultiDirStr.equalsIgnoreCase("true"); } + /** + * Return valid storage level + * @return String + */ + public String getGlobalSortRddStorageLevel() { + String storageLevel = getProperty(CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL, + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT); + boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel); + if (!validateStorageLevel) { + LOGGER.info("The " + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL + + " configuration value is invalid. It will use default storage level(" + + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT + + ") to persist rdd."); + storageLevel = CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT; + } + return storageLevel.toUpperCase(); + } + /** * returns true if carbon property * @param key diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 762841557d4..878163b9fc8 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1713,6 +1713,45 @@ public static boolean isValidSortOption(String sortScope) { } } + /** + * validate the storage level + * @param storageLevel + * @return boolean + */ + public static boolean isValidStorageLevel(String storageLevel) { + if (null == storageLevel || storageLevel.trim().equals("")) { + return false; + } + switch (storageLevel.toUpperCase()) { + case "DISK_ONLY": + return true; + case "DISK_ONLY_2": + return true; + case "MEMORY_ONLY": + return true; + case "MEMORY_ONLY_2": + return true; + case "MEMORY_ONLY_SER": + return true; + case "MEMORY_ONLY_SER_2": + return true; + case "MEMORY_AND_DISK": + return true; + case "MEMORY_AND_DISK_2": + return true; + case "MEMORY_AND_DISK_SER": + return true; + case "MEMORY_AND_DISK_SER_2": + return true; + case "OFF_HEAP": + return true; + case "NONE": + return true; + default: + return false; + } + } + /** * validate teh batch size * diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index 01b40ea2935..fed8a965b50 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -32,6 +32,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.processing.csvload.{CSVInputFormat, StringArrayWritable} import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder @@ -112,7 +113,8 @@ object DataLoadProcessBuilderOnSpark { // Because if the number of partitions greater than 1, there will be action operator(sample) in // sortBy operator. So here we cache the rdd to avoid do input and convert again. if (numPartitions > 1) { - convertRDD.persist(StorageLevel.MEMORY_AND_DISK_SER) + convertRDD.persist(StorageLevel.fromString( + CarbonProperties.getInstance().getGlobalSortRddStorageLevel())) } import scala.reflect.classTag From 1cb14fa76dd6862982c4183103b48ac81b428713 Mon Sep 17 00:00:00 2001 From: Zhang Zhichao <441586683@qq.com> Date: Thu, 10 Aug 2017 08:36:49 +0800 Subject: [PATCH 3/4] fix java style error --- .../carbondata/core/constants/CarbonCommonConstants.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 71bd6beabad..3061f3e9303 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1308,7 +1308,8 @@ public final class CarbonCommonConstants { * Which storage level to persist rdd when sort_scope=global_sort */ @CarbonProperty - public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL = "carbon.global.sort.rdd.storage.level"; + public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL = + "carbon.global.sort.rdd.storage.level"; /** * default value for carbon.global.sort.rdd.storage.level From 1f61fb27d2263a872d8ecd7bbc5279c6a0eebef7 Mon Sep 17 00:00:00 2001 From: Zhang Zhichao <441586683@qq.com> Date: Thu, 10 Aug 2017 13:57:34 +0800 Subject: [PATCH 4/4] change codes according to jacky's suggestions --- .../core/constants/CarbonCommonConstants.java | 6 +++++- .../apache/carbondata/core/util/CarbonProperties.java | 4 ++-- .../org/apache/carbondata/core/util/CarbonUtil.java | 11 ----------- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 3061f3e9303..f06eb83103a 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1312,7 +1312,11 @@ public final class CarbonCommonConstants { "carbon.global.sort.rdd.storage.level"; /** - * default value for carbon.global.sort.rdd.storage.level + * The default value(MEMORY_ONLY) is designed for executors with big memory, if user's executor + * has less memory, set the CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL to MEMORY_AND_DISK_SER or + * other storage level to correspond to different environment. + * You can get more recommendations about storage level in spark website: + * http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence. */ public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT = "MEMORY_ONLY"; diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index ee3f548ce36..2620ecbf11f 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -867,7 +867,7 @@ public boolean isUseMultiTempDir() { CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT); boolean validateBoolean = CarbonUtil.validateBoolean(usingMultiDirStr); if (!validateBoolean) { - LOGGER.info("The carbon.use.multiple.temp.dir configuration value is invalid." + LOGGER.error("The carbon.use.multiple.temp.dir configuration value is invalid." + "Configured value: \"" + usingMultiDirStr + "\"." + "Data Load will not use multiple temp directories."); usingMultiDirStr = CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT; @@ -884,7 +884,7 @@ public String getGlobalSortRddStorageLevel() { CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT); boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel); if (!validateStorageLevel) { - LOGGER.info("The " + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL + LOGGER.error("The " + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL + " configuration value is invalid. It will use default storage level(" + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT + ") to persist rdd."); diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 878163b9fc8..edc4c286717 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1724,27 +1724,16 @@ public static boolean isValidStorageLevel(String storageLevel) { } switch (storageLevel.toUpperCase()) { case "DISK_ONLY": - return true; case "DISK_ONLY_2": - return true; case "MEMORY_ONLY": - return true; case "MEMORY_ONLY_2": - return true; case "MEMORY_ONLY_SER": - return true; case "MEMORY_ONLY_SER_2": - return true; case "MEMORY_AND_DISK": - return true; case "MEMORY_AND_DISK_2": - return true; case "MEMORY_AND_DISK_SER": - return true; case "MEMORY_AND_DISK_SER_2": - return true; case "OFF_HEAP": - return true; case "NONE": return true; default: