From 0bce0019b4ba359b1a6864ea950f8d002179e742 Mon Sep 17 00:00:00 2001 From: qiuchenjian <807169000@qq.com> Date: Wed, 27 Feb 2019 22:04:36 +0800 Subject: [PATCH] [CARBONDATA-3304] Distinguish the thread names created by thread pool of CarbonThreadFactory --- .../unsafe/UnsafeAbstractDimensionDataChunkStore.java | 10 +++------- .../carbondata/core/memory/UnsafeMemoryManager.java | 3 +-- .../hadoop/api/CarbonTableOutputFormat.java | 3 ++- .../processing/loading/TableProcessingOperations.java | 3 ++- .../loading/converter/impl/RowConverterImpl.java | 2 +- .../sort/impl/ParallelReadMergeSorterImpl.java | 3 ++- .../sort/impl/UnsafeParallelReadMergeSorterImpl.java | 3 ++- .../loading/sort/unsafe/UnsafeSortDataRows.java | 11 ++++++----- .../unsafe/merger/UnsafeIntermediateFileMerger.java | 2 +- .../sort/unsafe/merger/UnsafeIntermediateMerger.java | 5 +++-- .../steps/CarbonRowDataWriterProcessorStepImpl.java | 2 +- .../loading/steps/DataWriterProcessorStepImpl.java | 2 +- .../loading/steps/InputProcessorStepImpl.java | 2 +- .../processing/sort/sortdata/SortDataRows.java | 3 ++- .../sort/sortdata/SortIntermediateFileMerger.java | 3 ++- .../sort/sortdata/SortTempFileChunkHolder.java | 3 ++- .../store/writer/AbstractFactDataWriter.java | 6 ++++-- 17 files changed, 36 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java index 01501793a79..ca1bfa7fb28 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java @@ -21,7 +21,6 @@ import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore; import org.apache.carbondata.core.memory.CarbonUnsafe; import org.apache.carbondata.core.memory.MemoryBlock; -import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.memory.UnsafeMemoryManager; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; @@ -74,12 +73,9 @@ public abstract class UnsafeAbstractDimensionDataChunkStore implements Dimension */ public UnsafeAbstractDimensionDataChunkStore(long totalSize, boolean isInvertedIdex, int numberOfRows, int dataLength) { - try { - // allocating the data page - this.dataPageMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, totalSize); - } catch (MemoryException e) { - throw new RuntimeException(e); - } + // allocating the data page + this.dataPageMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, totalSize); + this.dataLength = dataLength; this.isExplicitSorted = isInvertedIdex; } diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java index c59698f175f..f4c4f8571ae 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java @@ -185,8 +185,7 @@ public long getUsableMemory() { /** * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully. */ - public static MemoryBlock allocateMemoryWithRetry(String taskId, long size) - throws MemoryException { + public static MemoryBlock allocateMemoryWithRetry(String taskId, long size) { return allocateMemoryWithRetry(INSTANCE.memoryType, taskId, size); } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 85fb315d863..9ba5e97ae14 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -262,7 +262,8 @@ public RecordWriter getRecordWriter( DataTypeUtil.clearFormatter(); final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor(); final ExecutorService executorService = Executors.newFixedThreadPool(1, - new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName())); + new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName(), + true)); // It should be started in new thread as the underlying iterator uses blocking queue. Future future = executorService.submit(new Thread() { @Override public void run() { diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java index f08de59aad9..d67979a1285 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java @@ -126,7 +126,8 @@ public static void deleteLocalDataLoadFolderLocation(String tempLocationKey, Str } // submit local folder clean up in another thread so that main thread execution is not blocked ExecutorService localFolderDeletionService = Executors - .newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName)); + .newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName, + true)); try { localFolderDeletionService.submit(new Callable() { @Override public Void call() throws Exception { diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java index 458b3abaf1b..896a4822e24 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java @@ -121,7 +121,7 @@ private DictionaryClient createDictionaryClient() { if (executorService == null) { executorService = Executors.newCachedThreadPool(new CarbonThreadFactory( "DictionaryClientPool:" + configuration.getTableIdentifier().getCarbonTableIdentifier() - .getTableName())); + .getTableName(), true)); } DictionaryOnePassService .setDictionaryServiceProvider(configuration.getDictionaryServiceProvider()); diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java index 55b336ed884..3f81c0073a1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java @@ -94,7 +94,8 @@ public Iterator[] sort(Iterator[] iterators) throw new CarbonDataLoadingException(e); } this.executorService = Executors.newFixedThreadPool(iterators.length, - new CarbonThreadFactory("SafeParallelSorterPool:" + sortParameters.getTableName())); + new CarbonThreadFactory("SafeParallelSorterPool:" + sortParameters.getTableName(), + true)); this.threadStatusObserver = new ThreadStatusObserver(executorService); try { diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java index 6e11ca66f03..b00eb09d1e4 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java @@ -86,7 +86,8 @@ public UnsafeParallelReadMergeSorterImpl(AtomicLong rowCounter) { throw new CarbonDataLoadingException(e); } this.executorService = Executors.newFixedThreadPool(iterators.length, - new CarbonThreadFactory("UnsafeParallelSorterPool:" + sortParameters.getTableName())); + new CarbonThreadFactory("UnsafeParallelSorterPool:" + sortParameters.getTableName(), + true)); this.threadStatusObserver = new ThreadStatusObserver(executorService); try { diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java index e8e1c082cde..3555d7eaa0b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java @@ -138,7 +138,8 @@ public void initialize() throws MemoryException, CarbonSortKeyAndGroupByExceptio CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation()); this.dataSorterAndWriterExecutorService = Executors .newFixedThreadPool(parameters.getNumberOfCores(), - new CarbonThreadFactory("UnsafeSortDataRowPool:" + parameters.getTableName())); + new CarbonThreadFactory("UnsafeSortDataRowPool:" + parameters.getTableName(), + true)); semaphore = new Semaphore(parameters.getNumberOfCores()); } @@ -206,8 +207,8 @@ private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGrou } bytesAdded += rowPage.addRow(rowBatch[i], reUsableByteArrayDataOutputStream.get()); } catch (Exception e) { - if (e.getMessage().contains("cannot handle this row. create new page")) - { + if (e.getMessage().contains("cannot handle this row. create new page")) { + LOGGER.info("cannot handle this row. create new page"); rowPage.makeCanAddFail(); // so that same rowBatch will be handled again in new page i--; @@ -243,8 +244,8 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { } rowPage.addRow(row, reUsableByteArrayDataOutputStream.get()); } catch (Exception e) { - if (e.getMessage().contains("cannot handle this row. create new page")) - { + if (e.getMessage().contains("cannot handle this row. create new page")) { + LOGGER.info("cannot handle this row. create new page"); rowPage.makeCanAddFail(); addRow(row); } else { diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java index 041544b6eb7..f7e38b3235c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java @@ -103,7 +103,7 @@ public UnsafeIntermediateFileMerger(SortParameters mergerParameters, File[] inte } double intermediateMergeCostTime = (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0; - LOGGER.info("============================== Intermediate Merge of " + fileConterConst + LOGGER.info("Intermediate Merge of " + fileConterConst + " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)"); } catch (Exception e) { LOGGER.error("Problem while intermediate merging", e); diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java index 1389ff7d295..53d77078f33 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java @@ -75,7 +75,8 @@ public UnsafeIntermediateMerger(SortParameters parameters) { this.rowPages = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); this.mergedPages = new ArrayList<>(); this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores(), - new CarbonThreadFactory("UnsafeIntermediatePool:" + parameters.getTableName())); + new CarbonThreadFactory("UnsafeIntermediatePool:" + parameters.getTableName(), + true)); this.procFiles = new ArrayList<>(CarbonCommonConstants.CONSTANT_SIZE_TEN); this.mergerTask = new ArrayList<>(); @@ -182,7 +183,7 @@ public void startInmemoryMergingIfPossible() throws CarbonSortKeyAndGroupByExcep * @param spillDisk whether to spill the merged result to disk */ private void startIntermediateMerging(UnsafeCarbonRowPage[] rowPages, int totalRows, - boolean spillDisk) throws CarbonSortKeyAndGroupByException { + boolean spillDisk) { UnsafeInMemoryIntermediateDataMerger merger = new UnsafeInMemoryIntermediateDataMerger(rowPages, totalRows, parameters, spillDisk); mergedPages.add(merger); diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java index 68e8e224534..ebd72355a56 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java @@ -134,7 +134,7 @@ private String[] getStoreLocation() { } else { executorService = Executors.newFixedThreadPool(iterators.length, new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier() - .getCarbonTableIdentifier().getTableName())); + .getCarbonTableIdentifier().getTableName(), true)); Future[] futures = new Future[iterators.length]; for (int i = 0; i < iterators.length; i++) { futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i)); diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java index 7beca485d63..d1b1e76c777 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java @@ -115,7 +115,7 @@ public CarbonFactDataHandlerModel getDataHandlerModel() { .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID, System.currentTimeMillis()); rangeExecutorService = Executors.newFixedThreadPool(iterators.length, - new CarbonThreadFactory("WriterForwardPool: " + tableName)); + new CarbonThreadFactory("WriterForwardPool: " + tableName, true)); List> rangeExecutorServiceSubmitList = new ArrayList<>(iterators.length); int i = 0; // do this concurrently diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java index f540b3e3da3..c44c3f51872 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java @@ -71,7 +71,7 @@ public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration, rowParser = new RowParserImpl(getOutput(), configuration); executorService = Executors.newCachedThreadPool(new CarbonThreadFactory( "InputProcessorPool:" + configuration.getTableIdentifier().getCarbonTableIdentifier() - .getTableName())); + .getTableName(), true)); // if logger is enabled then raw data will be required. this.isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(configuration); } diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java index 996b844e2f7..01493795027 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java @@ -112,7 +112,8 @@ public void initialize() throws CarbonSortKeyAndGroupByException { CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation()); this.dataSorterAndWriterExecutorService = Executors .newFixedThreadPool(parameters.getNumberOfCores(), - new CarbonThreadFactory("SortDataRowPool:" + parameters.getTableName())); + new CarbonThreadFactory("SortDataRowPool:" + parameters.getTableName(), + true)); semaphore = new Semaphore(parameters.getNumberOfCores()); } diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java index 1f4f1e72c52..70794436710 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java @@ -61,7 +61,8 @@ public SortIntermediateFileMerger(SortParameters parameters) { // processed file list this.procFiles = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores(), - new CarbonThreadFactory("SafeIntermediateMergerPool:" + parameters.getTableName())); + new CarbonThreadFactory("SafeIntermediateMergerPool:" + parameters.getTableName(), + true)); mergerTask = new ArrayList<>(); } diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java index 82e6b3742f0..8414068a02d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java @@ -124,7 +124,8 @@ public SortTempFileChunkHolder(File tempFile, SortParameters sortParameters, Str this.compressorName = sortParameters.getSortTempCompressorName(); this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat); this.executorService = Executors - .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName)); + .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName, + true)); this.convertToActualField = convertToActualField; } diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 472f1433fb0..eb1b15db76d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -186,7 +186,8 @@ public AbstractFactDataWriter(CarbonFactDataHandlerModel model) { } this.executorService = Executors.newFixedThreadPool(1, - new CarbonThreadFactory("CompleteHDFSBackendPool:" + this.model.getTableName())); + new CarbonThreadFactory("CompleteHDFSBackendPool:" + this.model.getTableName(), + true)); executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); // in case of compaction we will pass the cardinality. this.localCardinality = this.model.getColCardinality(); @@ -208,7 +209,8 @@ public AbstractFactDataWriter(CarbonFactDataHandlerModel model) { numberOfCores = model.getNumberOfCores() / 2; } fallbackExecutorService = Executors.newFixedThreadPool(numberOfCores, new CarbonThreadFactory( - "FallbackPool:" + model.getTableName() + ", range: " + model.getBucketId())); + "FallbackPool:" + model.getTableName() + ", range: " + model.getBucketId(), + true)); } }