Skip to content

Commit

Permalink
[CARBONDATA-3304] Distinguish the thread names created by thread pool…
Browse files Browse the repository at this point in the history
… of CarbonThreadFactory
  • Loading branch information
qiuchenjian committed Mar 1, 2019
1 parent e7b5745 commit 0bce001
Show file tree
Hide file tree
Showing 17 changed files with 36 additions and 30 deletions.
Expand Up @@ -21,7 +21,6 @@
import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
Expand Down Expand Up @@ -74,12 +73,9 @@ public abstract class UnsafeAbstractDimensionDataChunkStore implements Dimension
*/
public UnsafeAbstractDimensionDataChunkStore(long totalSize, boolean isInvertedIdex,
int numberOfRows, int dataLength) {
try {
// allocating the data page
this.dataPageMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, totalSize);
} catch (MemoryException e) {
throw new RuntimeException(e);
}
// allocating the data page
this.dataPageMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, totalSize);

this.dataLength = dataLength;
this.isExplicitSorted = isInvertedIdex;
}
Expand Down
Expand Up @@ -185,8 +185,7 @@ public long getUsableMemory() {
/**
* It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
*/
public static MemoryBlock allocateMemoryWithRetry(String taskId, long size)
throws MemoryException {
public static MemoryBlock allocateMemoryWithRetry(String taskId, long size) {
return allocateMemoryWithRetry(INSTANCE.memoryType, taskId, size);
}

Expand Down
Expand Up @@ -262,7 +262,8 @@ public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter(
DataTypeUtil.clearFormatter();
final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();
final ExecutorService executorService = Executors.newFixedThreadPool(1,
new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));
new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName(),
true));
// It should be started in new thread as the underlying iterator uses blocking queue.
Future future = executorService.submit(new Thread() {
@Override public void run() {
Expand Down
Expand Up @@ -126,7 +126,8 @@ public static void deleteLocalDataLoadFolderLocation(String tempLocationKey, Str
}
// submit local folder clean up in another thread so that main thread execution is not blocked
ExecutorService localFolderDeletionService = Executors
.newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName));
.newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName,
true));
try {
localFolderDeletionService.submit(new Callable<Void>() {
@Override public Void call() throws Exception {
Expand Down
Expand Up @@ -121,7 +121,7 @@ private DictionaryClient createDictionaryClient() {
if (executorService == null) {
executorService = Executors.newCachedThreadPool(new CarbonThreadFactory(
"DictionaryClientPool:" + configuration.getTableIdentifier().getCarbonTableIdentifier()
.getTableName()));
.getTableName(), true));
}
DictionaryOnePassService
.setDictionaryServiceProvider(configuration.getDictionaryServiceProvider());
Expand Down
Expand Up @@ -94,7 +94,8 @@ public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
throw new CarbonDataLoadingException(e);
}
this.executorService = Executors.newFixedThreadPool(iterators.length,
new CarbonThreadFactory("SafeParallelSorterPool:" + sortParameters.getTableName()));
new CarbonThreadFactory("SafeParallelSorterPool:" + sortParameters.getTableName(),
true));
this.threadStatusObserver = new ThreadStatusObserver(executorService);

try {
Expand Down
Expand Up @@ -86,7 +86,8 @@ public UnsafeParallelReadMergeSorterImpl(AtomicLong rowCounter) {
throw new CarbonDataLoadingException(e);
}
this.executorService = Executors.newFixedThreadPool(iterators.length,
new CarbonThreadFactory("UnsafeParallelSorterPool:" + sortParameters.getTableName()));
new CarbonThreadFactory("UnsafeParallelSorterPool:" + sortParameters.getTableName(),
true));
this.threadStatusObserver = new ThreadStatusObserver(executorService);

try {
Expand Down
Expand Up @@ -138,7 +138,8 @@ public void initialize() throws MemoryException, CarbonSortKeyAndGroupByExceptio
CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
this.dataSorterAndWriterExecutorService = Executors
.newFixedThreadPool(parameters.getNumberOfCores(),
new CarbonThreadFactory("UnsafeSortDataRowPool:" + parameters.getTableName()));
new CarbonThreadFactory("UnsafeSortDataRowPool:" + parameters.getTableName(),
true));
semaphore = new Semaphore(parameters.getNumberOfCores());
}

Expand Down Expand Up @@ -206,8 +207,8 @@ private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGrou
}
bytesAdded += rowPage.addRow(rowBatch[i], reUsableByteArrayDataOutputStream.get());
} catch (Exception e) {
if (e.getMessage().contains("cannot handle this row. create new page"))
{
if (e.getMessage().contains("cannot handle this row. create new page")) {
LOGGER.info("cannot handle this row. create new page");
rowPage.makeCanAddFail();
// so that same rowBatch will be handled again in new page
i--;
Expand Down Expand Up @@ -243,8 +244,8 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
}
rowPage.addRow(row, reUsableByteArrayDataOutputStream.get());
} catch (Exception e) {
if (e.getMessage().contains("cannot handle this row. create new page"))
{
if (e.getMessage().contains("cannot handle this row. create new page")) {
LOGGER.info("cannot handle this row. create new page");
rowPage.makeCanAddFail();
addRow(row);
} else {
Expand Down
Expand Up @@ -103,7 +103,7 @@ public UnsafeIntermediateFileMerger(SortParameters mergerParameters, File[] inte
}
double intermediateMergeCostTime =
(System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
LOGGER.info("============================== Intermediate Merge of " + fileConterConst
LOGGER.info("Intermediate Merge of " + fileConterConst
+ " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
} catch (Exception e) {
LOGGER.error("Problem while intermediate merging", e);
Expand Down
Expand Up @@ -75,7 +75,8 @@ public UnsafeIntermediateMerger(SortParameters parameters) {
this.rowPages = new ArrayList<UnsafeCarbonRowPage>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
this.mergedPages = new ArrayList<>();
this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores(),
new CarbonThreadFactory("UnsafeIntermediatePool:" + parameters.getTableName()));
new CarbonThreadFactory("UnsafeIntermediatePool:" + parameters.getTableName(),
true));
this.procFiles = new ArrayList<>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
this.mergerTask = new ArrayList<>();

Expand Down Expand Up @@ -182,7 +183,7 @@ public void startInmemoryMergingIfPossible() throws CarbonSortKeyAndGroupByExcep
* @param spillDisk whether to spill the merged result to disk
*/
private void startIntermediateMerging(UnsafeCarbonRowPage[] rowPages, int totalRows,
boolean spillDisk) throws CarbonSortKeyAndGroupByException {
boolean spillDisk) {
UnsafeInMemoryIntermediateDataMerger merger =
new UnsafeInMemoryIntermediateDataMerger(rowPages, totalRows, parameters, spillDisk);
mergedPages.add(merger);
Expand Down
Expand Up @@ -134,7 +134,7 @@ private String[] getStoreLocation() {
} else {
executorService = Executors.newFixedThreadPool(iterators.length,
new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier()
.getCarbonTableIdentifier().getTableName()));
.getCarbonTableIdentifier().getTableName(), true));
Future[] futures = new Future[iterators.length];
for (int i = 0; i < iterators.length; i++) {
futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i));
Expand Down
Expand Up @@ -115,7 +115,7 @@ public CarbonFactDataHandlerModel getDataHandlerModel() {
.recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID,
System.currentTimeMillis());
rangeExecutorService = Executors.newFixedThreadPool(iterators.length,
new CarbonThreadFactory("WriterForwardPool: " + tableName));
new CarbonThreadFactory("WriterForwardPool: " + tableName, true));
List<Future<Void>> rangeExecutorServiceSubmitList = new ArrayList<>(iterators.length);
int i = 0;
// do this concurrently
Expand Down
Expand Up @@ -71,7 +71,7 @@ public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
rowParser = new RowParserImpl(getOutput(), configuration);
executorService = Executors.newCachedThreadPool(new CarbonThreadFactory(
"InputProcessorPool:" + configuration.getTableIdentifier().getCarbonTableIdentifier()
.getTableName()));
.getTableName(), true));
// if logger is enabled then raw data will be required.
this.isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(configuration);
}
Expand Down
Expand Up @@ -112,7 +112,8 @@ public void initialize() throws CarbonSortKeyAndGroupByException {
CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation());
this.dataSorterAndWriterExecutorService = Executors
.newFixedThreadPool(parameters.getNumberOfCores(),
new CarbonThreadFactory("SortDataRowPool:" + parameters.getTableName()));
new CarbonThreadFactory("SortDataRowPool:" + parameters.getTableName(),
true));
semaphore = new Semaphore(parameters.getNumberOfCores());
}

Expand Down
Expand Up @@ -61,7 +61,8 @@ public SortIntermediateFileMerger(SortParameters parameters) {
// processed file list
this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores(),
new CarbonThreadFactory("SafeIntermediateMergerPool:" + parameters.getTableName()));
new CarbonThreadFactory("SafeIntermediateMergerPool:" + parameters.getTableName(),
true));
mergerTask = new ArrayList<>();
}

Expand Down
Expand Up @@ -124,7 +124,8 @@ public SortTempFileChunkHolder(File tempFile, SortParameters sortParameters, Str
this.compressorName = sortParameters.getSortTempCompressorName();
this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
this.executorService = Executors
.newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
.newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName,
true));
this.convertToActualField = convertToActualField;
}

Expand Down
Expand Up @@ -186,7 +186,8 @@ public AbstractFactDataWriter(CarbonFactDataHandlerModel model) {
}

this.executorService = Executors.newFixedThreadPool(1,
new CarbonThreadFactory("CompleteHDFSBackendPool:" + this.model.getTableName()));
new CarbonThreadFactory("CompleteHDFSBackendPool:" + this.model.getTableName(),
true));
executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
// in case of compaction we will pass the cardinality.
this.localCardinality = this.model.getColCardinality();
Expand All @@ -208,7 +209,8 @@ public AbstractFactDataWriter(CarbonFactDataHandlerModel model) {
numberOfCores = model.getNumberOfCores() / 2;
}
fallbackExecutorService = Executors.newFixedThreadPool(numberOfCores, new CarbonThreadFactory(
"FallbackPool:" + model.getTableName() + ", range: " + model.getBucketId()));
"FallbackPool:" + model.getTableName() + ", range: " + model.getBucketId(),
true));
}
}

Expand Down

0 comments on commit 0bce001

Please sign in to comment.