From c12c9b177f075b1913cbe527ae1e9c72d34c7fbf Mon Sep 17 00:00:00 2001 From: sounakr Date: Fri, 9 Feb 2018 17:12:50 +0530 Subject: [PATCH] [CARBONDATA-2181] Thread Leak during compaction processing on restructured table. --- .../impl/ColumnGroupDimensionDataChunk.java | 4 +- .../chunk/impl/DimensionRawColumnChunk.java | 7 +- .../impl/FixedLengthDimensionDataChunk.java | 3 +- .../VariableLengthDimensionDataChunk.java | 3 +- ...ressedDimensionChunkFileBasedReaderV1.java | 4 +- ...ressedDimensionChunkFileBasedReaderV2.java | 4 +- ...ressedDimensionChunkFileBasedReaderV3.java | 3 +- .../store/DimensionChunkStoreFactory.java | 4 +- ...UnsafeAbstractDimensionDataChunkStore.java | 4 +- ...afeFixedLengthDimensionDataChunkStore.java | 3 +- ...eVariableLengthDimesionDataChunkStore.java | 3 +- .../executer/ExcludeFilterExecuterImpl.java | 9 ++- .../processor/AbstractDataBlockIterator.java | 68 +++++++++++-------- .../scan/scanner/AbstractBlockletScanner.java | 9 ++- .../core/scan/scanner/BlockletScanner.java | 3 +- .../core/scan/scanner/impl/FilterScanner.java | 3 +- .../ColumnGroupDimensionDataChunkTest.java | 3 +- .../FixedLengthDimensionDataChunkTest.java | 3 +- .../IncludeFilterExecuterImplTest.java | 9 +-- .../carbondata/core/util/CarbonUtilTest.java | 25 ++++--- .../merger/CompactionResultSortProcessor.java | 7 ++ 21 files changed, 116 insertions(+), 65 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java index b76ae534cd6..8076e496383 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java @@ -18,6 +18,7 @@ import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory; import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; @@ -33,7 +34,8 @@ public class ColumnGroupDimensionDataChunk extends AbstractDimensionDataChunk { * @param columnValueSize chunk attributes * @param numberOfRows */ - public ColumnGroupDimensionDataChunk(byte[] dataChunk, int columnValueSize, int numberOfRows) { + public ColumnGroupDimensionDataChunk(byte[] dataChunk, int columnValueSize, int numberOfRows) + throws MemoryException { this.dataChunkStore = DimensionChunkStoreFactory.INSTANCE .getDimensionChunkStore(columnValueSize, false, numberOfRows, dataChunk.length, DimensionStoreType.FIXEDLENGTH); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java index 705c13cc39e..51eb12543d1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java @@ -49,7 +49,8 @@ public DimensionRawColumnChunk(int columnIndex, ByteBuffer rawData, long offSet, * Convert all raw data with all pages to processed DimensionColumnDataChunk's * @return */ - public DimensionColumnDataChunk[] convertToDimColDataChunks() { + public DimensionColumnDataChunk[] convertToDimColDataChunks() + throws IOException, MemoryException, RuntimeException { if (dataChunks == null) { dataChunks = new DimensionColumnDataChunk[pagesCount]; } @@ -58,8 +59,8 @@ public DimensionColumnDataChunk[] convertToDimColDataChunks() { if (dataChunks[i] == null) { dataChunks[i] = chunkReader.convertToDimensionChunk(this, i); } - } catch (IOException | MemoryException e) { - throw new RuntimeException(e); + } catch (IOException | MemoryException | RuntimeException e) { + throw e; } } return dataChunks; diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java index 6629d318072..56f20493170 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java @@ -19,6 +19,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory; import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo; @@ -40,7 +41,7 @@ public class FixedLengthDimensionDataChunk extends AbstractDimensionDataChunk { * @param columnValueSize size of each column value */ public FixedLengthDimensionDataChunk(byte[] dataChunk, int[] invertedIndex, - int[] invertedIndexReverse, int numberOfRows, int columnValueSize) { + int[] invertedIndexReverse, int numberOfRows, int columnValueSize) throws MemoryException { long totalSize = null != invertedIndex ? dataChunk.length + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) : dataChunk.length; diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java index 6c47bf5365d..dc8b6140e9f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java @@ -19,6 +19,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory; import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; @@ -36,7 +37,7 @@ public class VariableLengthDimensionDataChunk extends AbstractDimensionDataChunk * @param numberOfRows */ public VariableLengthDimensionDataChunk(byte[] dataChunks, int[] invertedIndex, - int[] invertedIndexReverse, int numberOfRows) { + int[] invertedIndexReverse, int numberOfRows) throws MemoryException { long totalSize = null != invertedIndex ? (dataChunks.length + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) + ( numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE)) : diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java index 27a4d89e43e..1e999d4af52 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java @@ -28,6 +28,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk; import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReader; import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk; import org.apache.carbondata.core.metadata.encoder.Encoding; @@ -99,7 +100,8 @@ public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo blockletInfo } @Override public DimensionColumnDataChunk convertToDimensionChunk( - DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) throws IOException { + DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) throws IOException, + MemoryException { int blockIndex = dimensionRawColumnChunk.getColumnIndex(); byte[] dataPage = null; int[] invertedIndexes = null; diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java index b43f89c63d6..8e3315686db 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java @@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk; import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format; import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.DataChunk2; @@ -116,7 +117,8 @@ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fil } public DimensionColumnDataChunk convertToDimensionChunk( - DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) throws IOException { + DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) throws IOException, + MemoryException { byte[] dataPage = null; int[] invertedIndexes = null; int[] invertedIndexesReverse = null; diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java index 566e9b7a883..857592f93b5 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java @@ -242,7 +242,8 @@ protected DimensionColumnDataChunk decodeDimension(DimensionRawColumnChunk rawCo } private DimensionColumnDataChunk decodeDimensionLegacy(DimensionRawColumnChunk rawColumnPage, - ByteBuffer pageData, DataChunk2 pageMetadata, int offset) { + ByteBuffer pageData, DataChunk2 pageMetadata, int offset) + throws IOException, MemoryException { byte[] dataPage; int[] rlePage; int[] invertedIndexes = null; diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java index 92927e7a4f5..ff24ea37441 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java @@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeVariableLengthDimensionDataChunkStore; import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeFixedLengthDimensionDataChunkStore; import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeVariableLengthDimesionDataChunkStore; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.util.CarbonProperties; /** @@ -60,7 +61,8 @@ private DimensionChunkStoreFactory() { * @return dimension store type */ public DimensionDataChunkStore getDimensionChunkStore(int columnValueSize, - boolean isInvertedIndex, int numberOfRows, long totalSize, DimensionStoreType storeType) { + boolean isInvertedIndex, int numberOfRows, long totalSize, DimensionStoreType storeType) + throws MemoryException { if (isUnsafe) { if (storeType == DimensionStoreType.FIXEDLENGTH) { diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java index 940ca1a09bf..076d7019098 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java @@ -72,12 +72,12 @@ public abstract class UnsafeAbstractDimensionDataChunkStore implements Dimension * @param numberOfRows total number of rows */ public UnsafeAbstractDimensionDataChunkStore(long totalSize, boolean isInvertedIdex, - int numberOfRows) { + int numberOfRows) throws MemoryException { try { // allocating the data page this.dataPageMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, totalSize); } catch (MemoryException e) { - throw new RuntimeException(e); + throw e; } this.isExplicitSorted = isInvertedIdex; } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeFixedLengthDimensionDataChunkStore.java index 8c8d08f97fa..3a0a5c56b65 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeFixedLengthDimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeFixedLengthDimensionDataChunkStore.java @@ -19,6 +19,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.memory.CarbonUnsafe; +import org.apache.carbondata.core.memory.MemoryException; /** * Below class is responsible to store fixed length dimension data chunk in @@ -40,7 +41,7 @@ public class UnsafeFixedLengthDimensionDataChunkStore * @param numberOfRows total number of rows */ public UnsafeFixedLengthDimensionDataChunkStore(long totalDataSize, int columnValueSize, - boolean isInvertedIdex, int numberOfRows) { + boolean isInvertedIdex, int numberOfRows) throws MemoryException { super(totalDataSize, isInvertedIdex, numberOfRows); this.columnValueSize = columnValueSize; } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java index 36b2bd80329..b89efd16ca0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java @@ -21,6 +21,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.memory.CarbonUnsafe; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; @@ -45,7 +46,7 @@ public class UnsafeVariableLengthDimesionDataChunkStore private long dataPointersOffsets; public UnsafeVariableLengthDimesionDataChunkStore(long totalSize, boolean isInvertedIdex, - int numberOfRows) { + int numberOfRows) throws MemoryException { super(totalSize, isInvertedIdex, numberOfRows); this.numberOfRows = numberOfRows; } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java index 465bee60e23..4fbccca3867 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java @@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.filter.FilterUtil; @@ -93,8 +94,12 @@ public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder, boolean useBi } DimensionRawColumnChunk dimensionRawColumnChunk = blockChunkHolder.getDimensionRawDataChunk()[blockIndex]; - DimensionColumnDataChunk[] dimensionColumnDataChunks = - dimensionRawColumnChunk.convertToDimColDataChunks(); + DimensionColumnDataChunk[] dimensionColumnDataChunks; + try { + dimensionColumnDataChunks = dimensionRawColumnChunk.convertToDimColDataChunks(); + } catch (IOException | MemoryException | RuntimeException e) { + throw new IOException(e); + } BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount()); for (int i = 0; i < dimensionColumnDataChunks.length; i++) { BitSet bitSet = getFilteredIndexes(dimensionColumnDataChunks[i], diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java index eb5e3f9689a..0dc0d4bfcf3 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java @@ -29,6 +29,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.DataRefNode; import org.apache.carbondata.core.datastore.FileHolder; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.scan.collector.ResultCollectorFactory; import org.apache.carbondata.core.scan.collector.ScannedResultCollector; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; @@ -139,27 +140,33 @@ protected boolean updateScanner() { private AbstractScannedResult getNextScannedResult() throws Exception { AbstractScannedResult result = null; - if (blockExecutionInfo.isPrefetchBlocklet()) { - if (dataBlockIterator.hasNext() || nextBlock.get() || nextRead.get()) { - if (future == null) { - future = execute(); - } - result = future.get(); - nextBlock.set(false); - if (dataBlockIterator.hasNext() || nextRead.get()) { - nextBlock.set(true); - future = execute(); + try { + if (blockExecutionInfo.isPrefetchBlocklet()) { + if (dataBlockIterator.hasNext() || nextBlock.get() || nextRead.get()) { + if (future == null) { + future = execute(); + } + result = future.get(); + nextBlock.set(false); + if (dataBlockIterator.hasNext() || nextRead.get()) { + nextBlock.set(true); + future = execute(); + } } - } - } else { - if (dataBlockIterator.hasNext()) { - BlocksChunkHolder blocksChunkHolder = getBlocksChunkHolder(); - if (blocksChunkHolder != null) { - result = blockletScanner.scanBlocklet(blocksChunkHolder); + } else { + if (dataBlockIterator.hasNext()) { + BlocksChunkHolder blocksChunkHolder = getBlocksChunkHolder(); + if (blocksChunkHolder != null) { + result = blockletScanner.scanBlocklet(blocksChunkHolder); + } } } + + return result; + } catch (InterruptedException | ExecutionException | MemoryException e) { + LOGGER.error(e, e.getMessage()); + throw new RuntimeException(e); } - return result; } private BlocksChunkHolder getBlocksChunkHolder() throws IOException { @@ -187,20 +194,25 @@ private BlocksChunkHolder getBlocksChunkHolderInternal() throws IOException { private Future execute() { return executorService.submit(new Callable() { @Override public AbstractScannedResult call() throws Exception { - if (futureIo == null) { - futureIo = executeRead(); - } - BlocksChunkHolder blocksChunkHolder = futureIo.get(); - futureIo = null; - nextRead.set(false); - if (blocksChunkHolder != null) { - if (dataBlockIterator.hasNext()) { - nextRead.set(true); + try { + if (futureIo == null) { futureIo = executeRead(); } - return blockletScanner.scanBlocklet(blocksChunkHolder); + BlocksChunkHolder blocksChunkHolder = futureIo.get(); + futureIo = null; + nextRead.set(false); + if (blocksChunkHolder != null) { + if (dataBlockIterator.hasNext()) { + nextRead.set(true); + futureIo = executeRead(); + } + return blockletScanner.scanBlocklet(blocksChunkHolder); + } + return null; + } catch (IOException | InterruptedException | ExecutionException | MemoryException e) { + LOGGER.error(e, "Failed in Scanned the result"); + throw e; } - return null; } }); } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java index bf26ca32580..310e563051e 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java @@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; @@ -52,7 +53,7 @@ public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) { } @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder) - throws IOException, FilterUnsupportedException { + throws IOException, MemoryException, FilterUnsupportedException { long startTime = System.currentTimeMillis(); AbstractScannedResult scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo); QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap() @@ -96,7 +97,11 @@ public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) { if (blockExecutionInfo.isPrefetchBlocklet()) { for (int i = 0; i < dimensionRawColumnChunks.length; i++) { if (dimensionRawColumnChunks[i] != null) { - dimensionColumnDataChunks[i] = dimensionRawColumnChunks[i].convertToDimColDataChunks(); + try { + dimensionColumnDataChunks[i] = dimensionRawColumnChunks[i].convertToDimColDataChunks(); + } catch (IOException | MemoryException | RuntimeException e) { + throw e; + } } } for (int i = 0; i < measureRawColumnChunks.length; i++) { diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java index 0ed0d43ef3a..23e264ff50f 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java @@ -18,6 +18,7 @@ import java.io.IOException; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; import org.apache.carbondata.core.scan.result.AbstractScannedResult; @@ -44,7 +45,7 @@ public interface BlockletScanner { * result after processing */ AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder) - throws IOException, FilterUnsupportedException; + throws IOException, MemoryException, FilterUnsupportedException; /** * Just reads the blocklet from file, does not uncompress it. diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java index e77093bf6df..810b9c63eac 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java @@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; @@ -96,7 +97,7 @@ public FilterScanner(BlockExecutionInfo blockExecutionInfo, * @throws FilterUnsupportedException */ @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder) - throws IOException, FilterUnsupportedException { + throws IOException, MemoryException, FilterUnsupportedException { return fillScannedResult(blocksChunkHolder); } diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/chunk/impl/ColumnGroupDimensionDataChunkTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/chunk/impl/ColumnGroupDimensionDataChunkTest.java index bdb83cd8bf8..c41da1e49d5 100644 --- a/core/src/test/java/org/apache/carbondata/core/datastore/chunk/impl/ColumnGroupDimensionDataChunkTest.java +++ b/core/src/test/java/org/apache/carbondata/core/datastore/chunk/impl/ColumnGroupDimensionDataChunkTest.java @@ -27,6 +27,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.core.keygenerator.mdkey.MultiDimKeyVarLengthGenerator; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo; import org.apache.carbondata.core.scan.executor.util.QueryUtil; @@ -40,7 +41,7 @@ public class ColumnGroupDimensionDataChunkTest { static ColumnGroupDimensionDataChunk columnGroupDimensionDataChunk; static KeyGenerator keyGenerator; - @BeforeClass public static void setup() { + @BeforeClass public static void setup() throws MemoryException { int[] bitLength = CarbonUtil.getDimensionBitLength(new int[] { 10, 10, 10 }, new int[] { 3 }); // create a key generator keyGenerator = new MultiDimKeyVarLengthGenerator(bitLength); diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunkTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunkTest.java index 04b7a80a1f4..3483acf6cc7 100644 --- a/core/src/test/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunkTest.java +++ b/core/src/test/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunkTest.java @@ -21,6 +21,7 @@ import java.util.Arrays; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo; import org.junit.Assert; @@ -33,7 +34,7 @@ public class FixedLengthDimensionDataChunkTest { static FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk; static byte[] data; - @BeforeClass public static void setup() { + @BeforeClass public static void setup() throws MemoryException { data = "dummy string".getBytes(); int invertedIndex[] = { 1, 3, 5, 7, 8 }; diff --git a/core/src/test/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImplTest.java b/core/src/test/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImplTest.java index 29dda5287bc..fc617d231f2 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImplTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImplTest.java @@ -20,6 +20,7 @@ import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.util.CarbonUtil; import org.junit.Assert; @@ -99,7 +100,7 @@ private byte[] transferIntToByteArr(int value, int size) { } @Test - public void testPerformance() { + public void testPerformance() throws MemoryException { // dimension's data number in a blocklet, usually default is 32000 int dataChunkSize = 32000; @@ -122,7 +123,7 @@ public void testPerformance() { * Tests the filterKeys.length = 0 and filterKeys.length = 1 */ @Test - public void testBoundary() { + public void testBoundary() throws MemoryException { // dimension's data number in a blocklet, usually default is 32000 int dataChunkSize = 32000; @@ -149,7 +150,7 @@ public void testBoundary() { * @return */ private void comparePerformance(int dataChunkSize, int filteredValueCnt, - int queryTimes, int repeatTimes) { + int queryTimes, int repeatTimes) throws MemoryException { long start; long oldTime = 0; long newTime = 0; @@ -264,7 +265,7 @@ private BitSet setFilterdIndexToBitSetWithColumnIndexNew(FixedLengthDimensionDat } @Test - public void testRangBinarySearch() { + public void testRangBinarySearch() throws MemoryException { long oldTime = 0; long newTime = 0; diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java index 984efdba972..376843d83b0 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java @@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel; import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.ValueEncoderMeta; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; @@ -263,7 +264,7 @@ public void testToDeleteFoldersAndFilesSintlyWithInterruptedException() assertTrue(!file2.exists()); } - @Test public void testToGetNextLesserValue() { + @Test public void testToGetNextLesserValue() throws MemoryException { byte[] dataChunks = { 5, 6, 7, 8, 9 }; byte[] compareValues = { 7 }; FixedLengthDimensionDataChunk fixedLengthDataChunk = @@ -272,7 +273,7 @@ public void testToDeleteFoldersAndFilesSintlyWithInterruptedException() assertEquals(result, 1); } - @Test public void testToGetNextLesserValueToTarget() { + @Test public void testToGetNextLesserValueToTarget() throws MemoryException { byte[] dataChunks = { 7, 7, 7, 8, 9 }; byte[] compareValues = { 7 }; FixedLengthDimensionDataChunk fixedLengthDataChunk = @@ -281,7 +282,7 @@ public void testToDeleteFoldersAndFilesSintlyWithInterruptedException() assertEquals(result, -1); } - @Test public void testToGetnextGreaterValue() { + @Test public void testToGetnextGreaterValue() throws MemoryException { byte[] dataChunks = { 5, 6, 7, 8, 9 }; byte[] compareValues = { 7 }; FixedLengthDimensionDataChunk fixedLengthDataChunk = @@ -298,7 +299,7 @@ public void testToDeleteFoldersAndFilesSintlyWithInterruptedException() } } - @Test public void testToGetnextGreaterValueToTarget() { + @Test public void testToGetnextGreaterValueToTarget() throws MemoryException { byte[] dataChunks = { 5, 6, 7, 7, 7 }; byte[] compareValues = { 7 }; FixedLengthDimensionDataChunk fixedLengthDataChunk = @@ -802,7 +803,7 @@ public DataInputStream getDataInputStream(String path, FileFactory.FileType file assertThat(result, is(equalTo(new boolean[] { true, true, false }))); } - @Test public void testToGetFirstIndexUsingBinarySearchWithCompareTo1() { + @Test public void testToGetFirstIndexUsingBinarySearchWithCompareTo1() throws MemoryException { byte[] dataChunks = { 10, 20, 30, 40, 50, 60 }; byte[] compareValue = { 5 }; FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk = @@ -812,7 +813,8 @@ public DataInputStream getDataInputStream(String path, FileFactory.FileType file assertEquals(-2, result); } - @Test public void testToGetFirstIndexUsingBinarySearchWithCompareToLessThan0() { + @Test public void testToGetFirstIndexUsingBinarySearchWithCompareToLessThan0() + throws MemoryException { byte[] dataChunks = { 10, 20, 30, 40, 50, 60 }; byte[] compareValue = { 30 }; FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk = @@ -822,7 +824,7 @@ public DataInputStream getDataInputStream(String path, FileFactory.FileType file assertEquals(2, result); } - @Test public void testToGetFirstIndexUsingBinarySearchWithCompareTo0() { + @Test public void testToGetFirstIndexUsingBinarySearchWithCompareTo0() throws MemoryException { byte[] dataChunks = { 10, 10, 10, 40, 50, 60 }; byte[] compareValue = { 10 }; FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk = @@ -832,7 +834,8 @@ public DataInputStream getDataInputStream(String path, FileFactory.FileType file assertEquals(0, result); } - @Test public void testToGetFirstIndexUsingBinarySearchWithMatchUpLimitTrue() { + @Test public void testToGetFirstIndexUsingBinarySearchWithMatchUpLimitTrue() + throws MemoryException { byte[] dataChunks = { 10, 10, 10, 40, 50, 60 }; byte[] compareValue = { 10 }; FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk = @@ -843,7 +846,7 @@ public DataInputStream getDataInputStream(String path, FileFactory.FileType file } @Test - public void testBinaryRangeSearch() { + public void testBinaryRangeSearch() throws MemoryException { byte[] dataChunk = new byte[10]; FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk; @@ -940,7 +943,7 @@ public void testBinaryRangeSearch() { } @Test - public void IndexUsingBinarySearchLengthTwo() { + public void IndexUsingBinarySearchLengthTwo() throws MemoryException { byte[] dataChunk = new byte[10]; FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk; @@ -982,7 +985,7 @@ public void IndexUsingBinarySearchLengthTwo() { } @Test - public void IndexUsingBinarySearchLengthThree() { + public void IndexUsingBinarySearchLengthThree() throws MemoryException { byte[] dataChunk = new byte[10]; FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk; diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index 2480a39337b..a0254449089 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -166,6 +166,12 @@ public boolean execute(List resultIteratorList) { } isCompactionSuccess = true; } catch (Exception e) { + try { + intermediateFileMerger.finish(); + sortDataRows.close(); + } catch (CarbonSortKeyAndGroupByException ex) { + LOGGER.error(ex, "Compaction failed: " + e.getMessage()); + } LOGGER.error(e, "Compaction failed: " + e.getMessage()); } finally { if (partitionNames != null) { @@ -177,6 +183,7 @@ public boolean execute(List resultIteratorList) { carbonLoadModel.getTaskNo(), partitionNames); } catch (IOException e) { + LOGGER.error(e, "Compaction failed: " + e.getMessage()); isCompactionSuccess = false; }