diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java index 83c72372721..36d5f98cbc1 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java @@ -70,7 +70,7 @@ public void put(DictionaryColumnUniqueIdentifier key, Dictionary value) { * max number of threads for a job */ private void initThreadPoolSize() { - thread_pool_size = CarbonProperties.getInstance().getNumberOfCores(); + thread_pool_size = CarbonProperties.getInstance().getNumberOfLoadingCores(); } /** diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java index 33a91d88e4d..003ab5a9cd5 100644 --- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java +++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java @@ -78,7 +78,7 @@ public Integer size(DictionaryMessage key) { } @Override public void writeDictionaryData() { - int numOfCores = CarbonProperties.getInstance().getNumberOfCores(); + int numOfCores = CarbonProperties.getInstance().getNumberOfLoadingCores(); long start = System.currentTimeMillis(); ExecutorService executorService = Executors.newFixedThreadPool(numOfCores); for (final DictionaryGenerator generator : columnMap.values()) { diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java index 32eb60de43e..ee87a75215e 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java @@ -60,12 +60,16 @@ public CarbonDeleteFilesDataReader() { initThreadPoolSize(); } + public CarbonDeleteFilesDataReader(int thread_pool_size) { + this.thread_pool_size = thread_pool_size; + } + /** * This method will initialize the thread pool size to be used for creating the * max number of threads for a job */ private void initThreadPoolSize() { - thread_pool_size = CarbonProperties.getInstance().getNumberOfCores(); + thread_pool_size = CarbonProperties.getInstance().getNumberOfLoadingCores(); } /** diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index d2f812ee7ef..f4a75a85264 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -949,20 +949,15 @@ public int[] getIntArray(String commaSeparatedLevels) { return compactionSize; } - /** - * Number of cores should be used while loading data. - * - * @return - */ - public int getNumberOfCores() { + private int getNumberOfCores(String key) { int numberOfCores; try { numberOfCores = Integer.parseInt( CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.NUM_CORES_LOADING, + key, CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); } catch (NumberFormatException exc) { - LOGGER.warn("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING + LOGGER.warn("Configured value for property " + key + " is wrong. Falling back to the default value " + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); @@ -970,6 +965,30 @@ public int getNumberOfCores() { return numberOfCores; } + /** + * Number of cores should be used while loading data. + * @return the number of cores to be used while loading data + */ + public int getNumberOfLoadingCores() { + return getNumberOfCores(CarbonCommonConstants.NUM_CORES_LOADING); + } + + /** + * Number of cores to be used while compacting. + * @return the number of cores to be used while compacting + */ + public int getNumberOfCompactingCores() { + return getNumberOfCores(CarbonCommonConstants.NUM_CORES_COMPACTING); + } + + /** + * Number of cores to be used while alter partition. + * @return the number of cores to be used while alter partition + */ + public int getNumberOfAltPartitionCores() { + return getNumberOfCores(CarbonCommonConstants.NUM_CORES_ALT_PARTITION); + } + /** * Get the sort chunk memory size * @return diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index d4a2d7f18a4..832cb00c7a5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -197,9 +197,7 @@ case class CarbonAlterTableDropPartitionCommand( partitionId: String, dropWithData: Boolean, oldPartitionIds: List[Int]): Unit = { - val numberOfCores = CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.NUM_CORES_ALT_PARTITION, - CarbonCommonConstants.DEFAULT_NUMBER_CORES) + val numberOfCores = CarbonProperties.getInstance().getNumberOfAltPartitionCores val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt) try { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index 18c47a9c6bc..f17cdd605b8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -215,10 +215,8 @@ case class CarbonAlterTableSplitPartitionCommand( carbonLoadModel: CarbonLoadModel, partitionId: String, oldPartitionIdList: List[Int]): Unit = { - val numberOfCores = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION, - CarbonCommonConstants.DEFAULT_NUMBER_CORES) - val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt) + val numberOfCores = CarbonProperties.getInstance().getNumberOfAltPartitionCores + val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores) try { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java index 616edebd1ed..d3501c72c0c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java @@ -127,6 +127,8 @@ public class CarbonDataLoadConfiguration { */ private String columnCompressor; + private int numberOfLoadingCores; + public CarbonDataLoadConfiguration() { } @@ -460,4 +462,12 @@ public String getColumnCompressor() { public void setColumnCompressor(String columnCompressor) { this.columnCompressor = columnCompressor; } + + public int getNumberOfLoadingCores() { + return numberOfLoadingCores; + } + + public void setNumberOfLoadingCores(int numberOfLoadingCores) { + this.numberOfLoadingCores = numberOfLoadingCores; + } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index 4926cd89440..89d09fe23ad 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -312,6 +312,8 @@ public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel lo if (loadModel.getSdkWriterCores() > 0) { configuration.setWritingCoresCount(loadModel.getSdkWriterCores()); } + configuration.setNumberOfLoadingCores(CarbonProperties.getInstance().getNumberOfLoadingCores()); + configuration.setColumnCompressor(loadModel.getColumnCompressor()); return configuration; } diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index fc139a67350..a9a60851ad8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -1265,7 +1265,8 @@ public static void startCompactionDeleteDeltaFiles(List deleteDeltaFiles String blockName, String fullBlockFilePath) throws IOException { DeleteDeltaBlockDetails deleteDeltaBlockDetails = null; - CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader(); + int numberOfcores = CarbonProperties.getInstance().getNumberOfCompactingCores(); + CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader(numberOfcores); try { deleteDeltaBlockDetails = dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName); diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index 7c7b8ee465e..d14f6267143 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -34,6 +34,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; @@ -450,11 +451,12 @@ private void initSortDataRows() throws Exception { * @return */ private SortParameters createSortParameters() { + int numberOfCompactingCores = CarbonProperties.getInstance().getNumberOfCompactingCores(); return SortParameters .createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName, dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount, noDictionaryCount, segmentId, carbonLoadModel.getTaskNo(), noDictionaryColMapping, - sortColumnMapping, isVarcharDimMapping, true); + sortColumnMapping, isVarcharDimMapping, true, numberOfCompactingCores / 2); } /** diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java index 7908f4f3d5b..200c5f404c2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java @@ -442,10 +442,12 @@ public static SortParameters createSortParameters(CarbonDataLoadConfiguration co parameters.setTempFileLocation(sortTempDirs); LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ",")); - int numberOfCores = carbonProperties.getNumberOfCores() / 2; + int numberOfCores = 1; // In case of loading from partition we should use the cores specified by it if (configuration.getWritingCoresCount() > 0) { numberOfCores = configuration.getWritingCoresCount(); + } else { + numberOfCores = configuration.getNumberOfLoadingCores() / 2; } parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1); @@ -486,7 +488,8 @@ public void setRangeId(int rangeId) { public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName, String tableName, int dimColCount, int complexDimColCount, int measureColCount, int noDictionaryCount, String segmentId, String taskNo, boolean[] noDictionaryColMaping, - boolean[] sortColumnMapping, boolean[] isVarcharDimensionColumn, boolean isCompactionFlow) { + boolean[] sortColumnMapping, boolean[] isVarcharDimensionColumn, boolean isCompactionFlow, + int numberOfCores) { SortParameters parameters = new SortParameters(); CarbonProperties carbonProperties = CarbonProperties.getInstance(); parameters.setCarbonTable(carbonTable); @@ -526,7 +529,6 @@ public static SortParameters createSortParameters(CarbonTable carbonTable, Strin parameters.setTempFileLocation(sortTempDirs); LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ",")); - int numberOfCores = carbonProperties.getNumberOfCores() / 2; parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1); parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 92c48bc4c71..f70e749c9ab 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -688,18 +688,9 @@ private void initNumberOfCores() { // in compaction flow the measure with decimal type will come as spark decimal. // need to convert it to byte array. if (this.isCompactionFlow()) { - try { - this.numberOfCores = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.NUM_CORES_COMPACTING, - CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); - } catch (NumberFormatException exc) { - LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_COMPACTING - + "is wrong.Falling back to the default value " - + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); - this.numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); - } + this.numberOfCores = CarbonProperties.getInstance().getNumberOfCompactingCores(); } else { - this.numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); + this.numberOfCores = CarbonProperties.getInstance().getNumberOfLoadingCores(); } if (this.sortScope != null && this.sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index 437f62819a8..1d1f4510ff1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -686,7 +686,7 @@ public static List>[] partitionInputReaderIterators( if (sdkWriterCores > 0) { numberOfCores = sdkWriterCores; } else { - numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); + numberOfCores = CarbonProperties.getInstance().getNumberOfLoadingCores(); } // Get the minimum of number of cores and iterators size to get the number of parallel threads // to be launched.