Skip to content

Commit

Permalink
Merge 442c2b8 into 851dd2c
Browse files Browse the repository at this point in the history
  • Loading branch information
Sssan520 committed Nov 16, 2018
2 parents 851dd2c + 442c2b8 commit b349f3c
Show file tree
Hide file tree
Showing 13 changed files with 62 additions and 35 deletions.
Expand Up @@ -70,7 +70,7 @@ public void put(DictionaryColumnUniqueIdentifier key, Dictionary value) {
* max number of threads for a job
*/
private void initThreadPoolSize() {
thread_pool_size = CarbonProperties.getInstance().getNumberOfCores();
thread_pool_size = CarbonProperties.getInstance().getNumberOfLoadingCores();
}

/**
Expand Down
Expand Up @@ -78,7 +78,7 @@ public Integer size(DictionaryMessage key) {
}

@Override public void writeDictionaryData() {
int numOfCores = CarbonProperties.getInstance().getNumberOfCores();
int numOfCores = CarbonProperties.getInstance().getNumberOfLoadingCores();
long start = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(numOfCores);
for (final DictionaryGenerator generator : columnMap.values()) {
Expand Down
Expand Up @@ -60,12 +60,16 @@ public CarbonDeleteFilesDataReader() {
initThreadPoolSize();
}

public CarbonDeleteFilesDataReader(int thread_pool_size) {
this.thread_pool_size = thread_pool_size;
}

/**
* This method will initialize the thread pool size to be used for creating the
* max number of threads for a job
*/
private void initThreadPoolSize() {
thread_pool_size = CarbonProperties.getInstance().getNumberOfCores();
thread_pool_size = CarbonProperties.getInstance().getNumberOfLoadingCores();
}

/**
Expand Down
Expand Up @@ -949,27 +949,46 @@ public int[] getIntArray(String commaSeparatedLevels) {
return compactionSize;
}

/**
* Number of cores should be used while loading data.
*
* @return
*/
public int getNumberOfCores() {
private int getNumberOfCores(String key) {
int numberOfCores;
try {
numberOfCores = Integer.parseInt(
CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.NUM_CORES_LOADING,
key,
CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
} catch (NumberFormatException exc) {
LOGGER.warn("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
LOGGER.warn("Configured value for property " + key
+ " is wrong. Falling back to the default value "
+ CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
}
return numberOfCores;
}

/**
* Number of cores should be used while loading data.
* @return the number of cores to be used while loading data
*/
public int getNumberOfLoadingCores() {
return getNumberOfCores(CarbonCommonConstants.NUM_CORES_LOADING);
}

/**
* Number of cores to be used while compacting.
* @return the number of cores to be used while compacting
*/
public int getNumberOfCompactingCores() {
return getNumberOfCores(CarbonCommonConstants.NUM_CORES_COMPACTING);
}

/**
* Number of cores to be used while alter partition.
* @return the number of cores to be used while alter partition
*/
public int getNumberOfAltPartitionCores() {
return getNumberOfCores(CarbonCommonConstants.NUM_CORES_ALT_PARTITION);
}

/**
* Get the sort chunk memory size
* @return
Expand Down
Expand Up @@ -197,9 +197,7 @@ case class CarbonAlterTableDropPartitionCommand(
partitionId: String,
dropWithData: Boolean,
oldPartitionIds: List[Int]): Unit = {
val numberOfCores = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
CarbonCommonConstants.DEFAULT_NUMBER_CORES)
val numberOfCores = CarbonProperties.getInstance().getNumberOfAltPartitionCores
val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
try {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
Expand Down
Expand Up @@ -215,10 +215,8 @@ case class CarbonAlterTableSplitPartitionCommand(
carbonLoadModel: CarbonLoadModel,
partitionId: String,
oldPartitionIdList: List[Int]): Unit = {
val numberOfCores = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
CarbonCommonConstants.DEFAULT_NUMBER_CORES)
val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
val numberOfCores = CarbonProperties.getInstance().getNumberOfAltPartitionCores
val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores)
try {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
Expand Down
Expand Up @@ -127,6 +127,8 @@ public class CarbonDataLoadConfiguration {
*/
private String columnCompressor;

private int numberOfLoadingCores;

public CarbonDataLoadConfiguration() {
}

Expand Down Expand Up @@ -460,4 +462,12 @@ public String getColumnCompressor() {
public void setColumnCompressor(String columnCompressor) {
this.columnCompressor = columnCompressor;
}

public int getNumberOfLoadingCores() {
return numberOfLoadingCores;
}

public void setNumberOfLoadingCores(int numberOfLoadingCores) {
this.numberOfLoadingCores = numberOfLoadingCores;
}
}
Expand Up @@ -312,6 +312,8 @@ public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel lo
if (loadModel.getSdkWriterCores() > 0) {
configuration.setWritingCoresCount(loadModel.getSdkWriterCores());
}
configuration.setNumberOfLoadingCores(CarbonProperties.getInstance().getNumberOfLoadingCores());

configuration.setColumnCompressor(loadModel.getColumnCompressor());
return configuration;
}
Expand Down
Expand Up @@ -1265,7 +1265,8 @@ public static void startCompactionDeleteDeltaFiles(List<String> deleteDeltaFiles
String blockName, String fullBlockFilePath) throws IOException {

DeleteDeltaBlockDetails deleteDeltaBlockDetails = null;
CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
int numberOfcores = CarbonProperties.getInstance().getNumberOfCompactingCores();
CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader(numberOfcores);
try {
deleteDeltaBlockDetails =
dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName);
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
Expand Down Expand Up @@ -450,11 +451,12 @@ private void initSortDataRows() throws Exception {
* @return
*/
private SortParameters createSortParameters() {
int numberOfCompactingCores = CarbonProperties.getInstance().getNumberOfCompactingCores();
return SortParameters
.createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName,
dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount,
noDictionaryCount, segmentId, carbonLoadModel.getTaskNo(), noDictionaryColMapping,
sortColumnMapping, isVarcharDimMapping, true);
sortColumnMapping, isVarcharDimMapping, true, numberOfCompactingCores / 2);
}

/**
Expand Down
Expand Up @@ -442,10 +442,12 @@ public static SortParameters createSortParameters(CarbonDataLoadConfiguration co
parameters.setTempFileLocation(sortTempDirs);
LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));

int numberOfCores = carbonProperties.getNumberOfCores() / 2;
int numberOfCores = 1;
// In case of loading from partition we should use the cores specified by it
if (configuration.getWritingCoresCount() > 0) {
numberOfCores = configuration.getWritingCoresCount();
} else {
numberOfCores = configuration.getNumberOfLoadingCores() / 2;
}
parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);

Expand Down Expand Up @@ -486,7 +488,8 @@ public void setRangeId(int rangeId) {
public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName,
String tableName, int dimColCount, int complexDimColCount, int measureColCount,
int noDictionaryCount, String segmentId, String taskNo, boolean[] noDictionaryColMaping,
boolean[] sortColumnMapping, boolean[] isVarcharDimensionColumn, boolean isCompactionFlow) {
boolean[] sortColumnMapping, boolean[] isVarcharDimensionColumn, boolean isCompactionFlow,
int numberOfCores) {
SortParameters parameters = new SortParameters();
CarbonProperties carbonProperties = CarbonProperties.getInstance();
parameters.setCarbonTable(carbonTable);
Expand Down Expand Up @@ -526,7 +529,6 @@ public static SortParameters createSortParameters(CarbonTable carbonTable, Strin
parameters.setTempFileLocation(sortTempDirs);
LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));

int numberOfCores = carbonProperties.getNumberOfCores() / 2;
parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);

parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
Expand Down
Expand Up @@ -688,18 +688,9 @@ private void initNumberOfCores() {
// in compaction flow the measure with decimal type will come as spark decimal.
// need to convert it to byte array.
if (this.isCompactionFlow()) {
try {
this.numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.NUM_CORES_COMPACTING,
CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
} catch (NumberFormatException exc) {
LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_COMPACTING
+ "is wrong.Falling back to the default value "
+ CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
this.numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
}
this.numberOfCores = CarbonProperties.getInstance().getNumberOfCompactingCores();
} else {
this.numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
this.numberOfCores = CarbonProperties.getInstance().getNumberOfLoadingCores();
}

if (this.sortScope != null && this.sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
Expand Down
Expand Up @@ -686,7 +686,7 @@ public static List<CarbonIterator<Object[]>>[] partitionInputReaderIterators(
if (sdkWriterCores > 0) {
numberOfCores = sdkWriterCores;
} else {
numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
numberOfCores = CarbonProperties.getInstance().getNumberOfLoadingCores();
}
// Get the minimum of number of cores and iterators size to get the number of parallel threads
// to be launched.
Expand Down

0 comments on commit b349f3c

Please sign in to comment.