From e7103397d96623955ad4f55fc1d0cac7c679a8d7 Mon Sep 17 00:00:00 2001 From: kumarvishal09 Date: Mon, 4 Jun 2018 15:41:50 +0530 Subject: [PATCH] [CARBONDATA-2587][CARBONDATA-2588] Local Dictionary Data Loading support What changes are proposed in this PR Added code to support Local Dictionary Data Loading for primitive type Added code to support Local Dictionary Data Loading for complex type. How this PR is tested Manual testing is done in 3 Node setup. UT will be raised in different PR This closes #2402 --- .../DictionaryByteArrayWrapper.java | 4 + .../carbondata/core/datastore/ColumnType.java | 6 +- .../blocklet/BlockletEncodedColumnPage.java | 176 ++++++++++ .../datastore/blocklet/EncodedBlocklet.java | 179 ++++++++++ ...ressedDimensionChunkFileBasedReaderV3.java | 3 +- ...exerStorageForNoInvertedIndexForShort.java | 12 +- .../core/datastore/page/ColumnPage.java | 71 ++-- .../datastore/page/ComplexColumnPage.java | 149 ++++++--- .../page/FallbackColumnPageEncoder.java | 84 +++++ .../page/FallbackEncodedColumnPage.java | 49 +++ .../datastore/page/LocalDictColumnPage.java | 316 ++++++++++++++++++ .../page/SafeVarLengthColumnPage.java | 21 +- .../page/UnsafeDecimalColumnPage.java | 12 +- .../page/UnsafeVarLengthColumnPage.java | 16 +- .../page/VarLengthColumnPageBase.java | 38 +-- .../page/encoding/ColumnPageEncoder.java | 40 ++- .../page/encoding/EncodedColumnPage.java | 31 +- .../legacy/DictDimensionIndexCodec.java | 2 +- .../legacy/DirectDictDimensionIndexCodec.java | 2 +- .../HighCardDictDimensionIndexCodec.java | 8 +- .../page/statistics/DummyStatsCollector.java | 88 +++++ .../localdictionary/PageLevelDictionary.java | 126 +++++++ .../dictionaryholder/DictionaryStore.java | 50 +++ .../MapBasedDictionaryStore.java | 137 ++++++++ .../DictionaryThresholdReachedException.java | 87 +++++ .../ColumnLocalDictionaryGenerator.java | 75 +++++ .../generator/LocalDictionaryGenerator.java | 48 +++ .../metadata/schema/table/CarbonTable.java | 2 +- .../core/util/CarbonMetadataUtil.java | 89 +++-- .../core/util/CarbonMetadataUtilTest.java | 65 ---- format/src/main/thrift/carbondata.thrift | 12 + .../processing/datatypes/ArrayDataType.java | 15 +- .../processing/datatypes/GenericDataType.java | 4 +- .../datatypes/PrimitiveDataType.java | 8 +- .../processing/datatypes/StructDataType.java | 15 +- .../store/CarbonFactDataHandlerColumnar.java | 42 +-- .../store/CarbonFactDataHandlerModel.java | 102 +++++- .../processing/store/TablePage.java | 38 ++- .../store/writer/AbstractFactDataWriter.java | 13 + .../store/writer/v3/BlockletDataHolder.java | 32 +- .../writer/v3/CarbonFactDataWriterImplV3.java | 88 +++-- 41 files changed, 2001 insertions(+), 354 deletions(-) create mode 100644 core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java create mode 100644 core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java create mode 100644 core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java create mode 100644 core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackEncodedColumnPage.java create mode 100644 core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java create mode 100644 core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java create mode 100644 core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java create mode 100644 core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/DictionaryStore.java create mode 100644 core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/MapBasedDictionaryStore.java create mode 100644 core/src/main/java/org/apache/carbondata/core/localdictionary/exception/DictionaryThresholdReachedException.java create mode 100644 core/src/main/java/org/apache/carbondata/core/localdictionary/generator/ColumnLocalDictionaryGenerator.java create mode 100644 core/src/main/java/org/apache/carbondata/core/localdictionary/generator/LocalDictionaryGenerator.java diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/DictionaryByteArrayWrapper.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/DictionaryByteArrayWrapper.java index 03c86acb376..f812f86ae57 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/DictionaryByteArrayWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/DictionaryByteArrayWrapper.java @@ -89,4 +89,8 @@ public DictionaryByteArrayWrapper(byte[] data, XXHash32 xxHash32) { result = 31 * result; return result; } + + public byte[] getData() { + return data; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java index 8bbf12df2eb..080444cee19 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java @@ -37,7 +37,9 @@ public enum ColumnType { COMPLEX_ARRAY, - COMPLEX_PRIMITIVE; + COMPLEX_PRIMITIVE, + + PLAIN_LONG_VALUE; public static ColumnType valueOf(int ordinal) { if (ordinal == GLOBAL_DICTIONARY.ordinal()) { @@ -56,6 +58,8 @@ public static ColumnType valueOf(int ordinal) { return COMPLEX_ARRAY; } else if (ordinal == COMPLEX_PRIMITIVE.ordinal()) { return COMPLEX_PRIMITIVE; + } else if (ordinal == PLAIN_LONG_VALUE.ordinal()) { + return PLAIN_LONG_VALUE; } else { throw new RuntimeException("create ColumnType with invalid ordinal: " + ordinal); } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java new file mode 100644 index 00000000000..65087878c5e --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datastore.blocklet; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.page.FallbackColumnPageEncoder; +import org.apache.carbondata.core.datastore.page.FallbackEncodedColumnPage; +import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage; +import org.apache.carbondata.core.localdictionary.PageLevelDictionary; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.format.LocalDictionaryChunk; + +/** + * Maintains the list of encoded page of a column in a blocklet + * and encoded dictionary values only if column is encoded using local + * dictionary + * Handle the fallback if all the pages in blocklet are not + * encoded with local dictionary + */ +public class BlockletEncodedColumnPage { + + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(BlockletEncodedColumnPage.class.getName()); + + /** + * list of encoded page of a column in a blocklet + */ + private List encodedColumnPageList; + + /** + * fallback executor service + */ + private ExecutorService fallbackExecutorService; + + /** + * to check whether pages are local dictionary encoded or not + */ + private boolean isLocalDictEncoded; + + /** + * page level dictionary only when column is encoded with local dictionary + */ + private PageLevelDictionary pageLevelDictionary; + + /** + * fallback future task queue; + */ + private ArrayDeque> fallbackFutureQueue; + + BlockletEncodedColumnPage(ExecutorService fallbackExecutorService) { + this.fallbackExecutorService = fallbackExecutorService; + } + + /** + * Below method will be used to add column page of a column + * + * @param encodedColumnPage + * encoded column page + */ + void addEncodedColumnColumnPage(EncodedColumnPage encodedColumnPage) { + if (null == encodedColumnPageList) { + this.encodedColumnPageList = new ArrayList<>(); + // if dimension page is local dictionary enabled and encoded with local dictionary + if (encodedColumnPage.isLocalDictGeneratedPage()) { + this.isLocalDictEncoded = true; + // get first page dictionary + this.pageLevelDictionary = encodedColumnPage.getPageDictionary(); + } + encodedColumnPageList.add(encodedColumnPage); + return; + } + // if local dictionary is false or column is encoded with local dictionary then + // add a page + if (!isLocalDictEncoded || encodedColumnPage.isLocalDictGeneratedPage()) { + this.encodedColumnPageList.add(encodedColumnPage); + // merge page level dictionary values + if (null != this.pageLevelDictionary) { + pageLevelDictionary.mergerDictionaryValues(encodedColumnPage.getPageDictionary()); + } + } else { + // if older pages were encoded with dictionary and new pages are without dictionary + isLocalDictEncoded = false; + pageLevelDictionary = null; + this.fallbackFutureQueue = new ArrayDeque<>(); + LOGGER.info( + "Local dictionary Fallback is initiated for column: " + encodedColumnPageList.get(0) + .getActualPage().getColumnSpec().getFieldName()); + // submit all the older pages encoded with dictionary for fallback + for (int pageIndex = 0; pageIndex < encodedColumnPageList.size(); pageIndex++) { + fallbackFutureQueue.add(fallbackExecutorService.submit( + new FallbackColumnPageEncoder(encodedColumnPageList.get(pageIndex), pageIndex))); + } + //add to page list + this.encodedColumnPageList.add(encodedColumnPage); + } + } + + /** + * Return the list of encoded page list for a column in a blocklet + * + * @return list of encoded page list + */ + public List getEncodedColumnPageList() { + // if fallback queue is null then for some pages fallback was initiated + if (null != this.fallbackFutureQueue) { + try { + // check if queue is not empty + while (!fallbackFutureQueue.isEmpty()) { + // get the head element of queue + FallbackEncodedColumnPage fallbackEncodedColumnPage = fallbackFutureQueue.poll().get(); + // add the encoded column page to list + encodedColumnPageList.set(fallbackEncodedColumnPage.getPageIndex(), + fallbackEncodedColumnPage.getEncodedColumnPage()); + fallbackFutureQueue.poll(); + } + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException("Problem while encoding the blocklet data during fallback", e); + } + // setting to null as all the fallback encoded page has been added to list + fallbackFutureQueue = null; + } + // in case of dictionary encoded column page memory will be freed only after + // all the pages are added in a blocklet, as fallback can happen anytime so old pages memory + // cannot be freed, so after encoding is done we can free the page memory + if (null != pageLevelDictionary) { + // clear the memory footprint for local dictionary encoded pages + for (EncodedColumnPage columnPage : encodedColumnPageList) { + columnPage.freeMemory(); + } + } + return encodedColumnPageList; + } + + /** + * Below method will be used to get the encoded dictionary + * values for local dictionary generated columns + * + * @return encoded dictionary values if column is local dictionary generated + */ + public LocalDictionaryChunk getEncodedDictionary() { + if (null != pageLevelDictionary) { + try { + return pageLevelDictionary.getLocalDictionaryChunkForBlocklet(); + } catch (IOException | MemoryException e) { + throw new RuntimeException(e); + } + } + return null; + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java new file mode 100644 index 00000000000..794c4395000 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datastore.blocklet; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.apache.carbondata.core.datastore.page.EncodedTablePage; +import org.apache.carbondata.core.datastore.page.key.TablePageKey; + +/** + * Holds the blocklet level data and metadata to be written in carbondata file + * For dimension pages it will check if all the pages are not encoded with dictionary + * then it will encode those pages for that column again + */ +public class EncodedBlocklet { + + /** + * number of rows in a blocklet + */ + private int blockletSize; + + /** + * list of page metadata + */ + private List pageMetadataList; + + /** + * maintains encoded dimension data for each column + */ + private List encodedDimensionColumnPages; + + /** + * maintains encoded measure data for each column + */ + private List encodedMeasureColumnPages; + + /** + * fallback executor service, will used to re-encode column pages + */ + private ExecutorService executorService; + + /** + * number of pages in a blocklet + */ + private int numberOfPages; + + public EncodedBlocklet(ExecutorService executorService) { + this.executorService = executorService; + } + + /** + * Below method will be used to add page metadata details + * + * @param encodedTablePage + * encoded table page + */ + private void addPageMetadata(EncodedTablePage encodedTablePage) { + // for first table page create new list + if (null == pageMetadataList) { + pageMetadataList = new ArrayList<>(); + } + // update details + blockletSize += encodedTablePage.getPageSize(); + pageMetadataList.add(encodedTablePage.getPageKey()); + this.numberOfPages++; + } + + /** + * Below method will be used to add measure column pages + * + * @param encodedTablePage + * encoded table page + */ + private void addEncodedMeasurePage(EncodedTablePage encodedTablePage) { + // for first page create new list + if (null == encodedMeasureColumnPages) { + encodedMeasureColumnPages = new ArrayList<>(); + // adding measure pages + for (int i = 0; i < encodedTablePage.getNumMeasures(); i++) { + BlockletEncodedColumnPage blockletEncodedColumnPage = new BlockletEncodedColumnPage(null); + blockletEncodedColumnPage.addEncodedColumnColumnPage(encodedTablePage.getMeasure(i)); + encodedMeasureColumnPages.add(blockletEncodedColumnPage); + } + } else { + for (int i = 0; i < encodedTablePage.getNumMeasures(); i++) { + encodedMeasureColumnPages.get(i).addEncodedColumnColumnPage(encodedTablePage.getMeasure(i)); + } + } + } + + /** + * Below method will be used to add dimension column pages + * + * @param encodedTablePage + * encoded table page + */ + private void addEncodedDimensionPage(EncodedTablePage encodedTablePage) { + // for first page create new list + if (null == encodedDimensionColumnPages) { + encodedDimensionColumnPages = new ArrayList<>(); + // adding measure pages + for (int i = 0; i < encodedTablePage.getNumDimensions(); i++) { + BlockletEncodedColumnPage blockletEncodedColumnPage = + new BlockletEncodedColumnPage(executorService); + blockletEncodedColumnPage.addEncodedColumnColumnPage(encodedTablePage.getDimension(i)); + encodedDimensionColumnPages.add(blockletEncodedColumnPage); + } + } else { + for (int i = 0; i < encodedTablePage.getNumDimensions(); i++) { + encodedDimensionColumnPages.get(i) + .addEncodedColumnColumnPage(encodedTablePage.getDimension(i)); + } + } + } + + /** + * Use to add table pages + * + * @param encodedTablePage + * encoded table page + */ + public void addEncodedTablePage(EncodedTablePage encodedTablePage) { + addPageMetadata(encodedTablePage); + addEncodedDimensionPage(encodedTablePage); + addEncodedMeasurePage(encodedTablePage); + } + + public int getBlockletSize() { + return blockletSize; + } + + public List getPageMetadataList() { + return pageMetadataList; + } + + public List getEncodedDimensionColumnPages() { + return encodedDimensionColumnPages; + } + + public List getEncodedMeasureColumnPages() { + return encodedMeasureColumnPages; + } + + public int getNumberOfDimension() { + return encodedDimensionColumnPages.size(); + } + + public int getNumberOfMeasure() { + return encodedMeasureColumnPages.size(); + } + + public int getNumberOfPages() { + return this.numberOfPages; + } + + public void clear() { + this.numberOfPages = 0; + this.encodedDimensionColumnPages = null; + this.blockletSize = 0; + this.encodedMeasureColumnPages = null; + this.pageMetadataList = null; + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java index 782a8df462e..fee114d0fc4 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java @@ -242,7 +242,8 @@ protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnP } private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawColumnPage, - ByteBuffer pageData, DataChunk2 pageMetadata, int offset) { + ByteBuffer pageData, DataChunk2 pageMetadata, int offset) throws IOException, + MemoryException { byte[] dataPage; int[] rlePage; int[] invertedIndexes = new int[0]; diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java index 911a260d077..99a7e57b8dc 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java @@ -37,11 +37,15 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora private byte[] max; public BlockIndexerStorageForNoInvertedIndexForShort(byte[][] dataPage, - boolean isNoDictonary) { + boolean isNoDictonary, boolean isVarchar) { this.dataPage = dataPage; min = this.dataPage[0]; max = this.dataPage[0]; totalSize += this.dataPage[0].length; + int lVFormatLength = 2; + if (isVarchar) { + lVFormatLength = 4; + } int minCompare = 0; int maxCompare = 0; if (!isNoDictonary) { @@ -60,9 +64,11 @@ public BlockIndexerStorageForNoInvertedIndexForShort(byte[][] dataPage, for (int i = 1; i < this.dataPage.length; i++) { totalSize += this.dataPage[i].length; minCompare = ByteUtil.UnsafeComparer.INSTANCE - .compareTo(min, 2, min.length - 2, this.dataPage[i], 2, this.dataPage[i].length - 2); + .compareTo(min, lVFormatLength, min.length - lVFormatLength, this.dataPage[i], + lVFormatLength, this.dataPage[i].length - lVFormatLength); maxCompare = ByteUtil.UnsafeComparer.INSTANCE - .compareTo(max, 2, max.length - 2, this.dataPage[i], 2, this.dataPage[i].length - 2); + .compareTo(max, lVFormatLength, max.length - lVFormatLength, this.dataPage[i], + lVFormatLength, this.dataPage[i].length - lVFormatLength); if (minCompare > 0) { min = this.dataPage[i]; } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java index 4dcf514519f..4ff13308f23 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java @@ -30,6 +30,8 @@ import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert; import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector; import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.localdictionary.PageLevelDictionary; +import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; @@ -59,7 +61,7 @@ public abstract class ColumnPage { private BitSet nullBitSet; // statistics collector for this column page - private ColumnPageStatsCollector statsCollector; + protected ColumnPageStatsCollector statsCollector; protected static final boolean unsafe = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, @@ -79,32 +81,8 @@ public DataType getDataType() { return dataType; } - private static final SimpleStatsResult statsForComplexType = new SimpleStatsResult() { - @Override public Object getMin() { - return new byte[0]; - } - - @Override public Object getMax() { - return new byte[0]; - } - - @Override public int getDecimalCount() { - return 0; - } - - @Override public DataType getDataType() { - return BYTE_ARRAY; - } - - }; - public SimpleStatsResult getStatistics() { - if (statsCollector != null) { - return statsCollector.getPageStats(); - } else { - // TODO: for sub column of complex type, there no stats yet, return a dummy result - return statsForComplexType; - } + return statsCollector.getPageStats(); } public int getPageSize() { @@ -184,6 +162,19 @@ public static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, DataTyp return newPage(columnSpec, dataType, pageSize); } + public static ColumnPage newLocalDictPage(TableSpec.ColumnSpec columnSpec, DataType dataType, + int pageSize, LocalDictionaryGenerator localDictionaryGenerator) throws MemoryException { + if (unsafe) { + return new LocalDictColumnPage(new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize), + new UnsafeVarLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize), + localDictionaryGenerator); + } else { + return new LocalDictColumnPage(new SafeVarLengthColumnPage(columnSpec, dataType, pageSize), + new SafeVarLengthColumnPage(columnSpec, DataTypes.BYTE_ARRAY, pageSize), + localDictionaryGenerator); + } + } + /** * Create a new page of dataType and number of row = pageSize */ @@ -675,6 +666,9 @@ public byte[] getBooleanPage() { */ public abstract void convertValue(ColumnPageValueConverter codec); + public PageLevelDictionary getPageDictionary() { + throw new UnsupportedOperationException("Operation Not Supported"); + } /** * Compress page data using specified compressor */ @@ -702,7 +696,9 @@ public byte[] compress(Compressor compressor) throws MemoryException, IOExceptio return compressor.compressByte(getComplexChildrenLVFlattenedBytePage()); } else if (dataType == DataTypes.BYTE_ARRAY && ( columnSpec.getColumnType() == ColumnType.COMPLEX_STRUCT - || columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY)) { + || columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY + || columnSpec.getColumnType() == ColumnType.PLAIN_LONG_VALUE + || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) { return compressor.compressByte(getComplexParentFlattenedBytePage()); } else if (dataType == DataTypes.BYTE_ARRAY) { return compressor.compressByte(getLVFlattenedBytePage()); @@ -742,8 +738,9 @@ public static ColumnPage decompress(ColumnPageEncoderMeta meta, byte[] compresse } else if (storeDataType == DataTypes.DOUBLE) { double[] doubleData = compressor.unCompressDouble(compressedData, offset, length); return newDoublePage(columnSpec, doubleData); - } else if (storeDataType == DataTypes.BYTE_ARRAY - && columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) { + } else if (storeDataType == DataTypes.BYTE_ARRAY && ( + columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE + || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) { byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); return newComplexLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.SHORT_SIZE_IN_BYTE); @@ -756,6 +753,10 @@ public static ColumnPage decompress(ColumnPageEncoderMeta meta, byte[] compresse && columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY) { byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); return newFixedByteArrayPage(columnSpec, lvVarBytes, CarbonCommonConstants.LONG_SIZE_IN_BYTE); + } else if (storeDataType == DataTypes.BYTE_ARRAY + && columnSpec.getColumnType() == ColumnType.PLAIN_LONG_VALUE) { + byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); + return newLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.INT_SIZE_IN_BYTE); } else if (storeDataType == DataTypes.BYTE_ARRAY) { byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); return newLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.INT_SIZE_IN_BYTE); @@ -816,4 +817,16 @@ public void setNullBits(BitSet nullBitSet) { public TableSpec.ColumnSpec getColumnSpec() { return columnSpec; } + + public boolean isLocalDictGeneratedPage() { + return false; + } + + public void disableLocalDictEncoding() { + throw new UnsupportedOperationException("Operation not supported"); + } + + public PageLevelDictionary getColumnPageDictionary() { + throw new UnsupportedOperationException("Operation not supported"); + } } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java index 07dc837fa39..c6b650f0b92 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java @@ -17,74 +17,129 @@ package org.apache.carbondata.core.datastore.page; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; +import java.util.Map; -import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.core.datastore.ColumnType; - -// Represent a complex column page, e.g. Array, Struct type column +import org.apache.carbondata.core.datastore.TableSpec; +import org.apache.carbondata.core.datastore.page.statistics.DummyStatsCollector; +import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.datatype.DataTypes; + +/** + * holds the complex columndata and its children data + */ public class ComplexColumnPage { - // Holds data for all rows in this page in columnar layout. - // After the complex data expand, it is of type byte[][], the first level array in the byte[][] - // representing a sub-column in the complex type, which can be retrieved by giving the depth - // of the complex type. - // TODO: further optimize it to make it more memory efficient - private List> complexColumnData; - - // depth is the number of column after complex type is expanded. It is from 1 to N - private final int pageSize; - + /** + * number of columns + */ private int depth; + /** + * type of each column + */ private List complexColumnType; - public ComplexColumnPage(int pageSize, List complexColumnType) { - this.pageSize = pageSize; + /** + * column page for each type + */ + private ColumnPage[] columnPages; + + /** + * to maintain the number of record added for each type + */ + private int[] currentRowIdList; + + public ComplexColumnPage(List complexColumnType) { this.depth = complexColumnType.size(); - complexColumnData = new ArrayList<>(depth); - for (int i = 0; i < depth; i++) { - complexColumnData.add(new ArrayList()); - } this.complexColumnType = complexColumnType; + this.columnPages = new ColumnPage[this.depth]; + this.currentRowIdList = new int[depth]; } - public void putComplexData(int rowId, int depth, List value) { - assert (depth <= this.depth); - ArrayList subColumnPage = complexColumnData.get(depth); - subColumnPage.addAll(value); - } - - // iterate on the sub-column after complex type is expanded, return columnar page of - // each sub-column - public Iterator iterator() { - - return new CarbonIterator() { - private int index = 0; - @Override public boolean hasNext() { - return index < depth; - } - - @Override public byte[][] next() { - // convert the subColumnPage from ArrayList to byte[][] - ArrayList subColumnPage = complexColumnData.get(index); - index++; - return subColumnPage.toArray(new byte[subColumnPage.size()][]); + /** + * below method will be used to initlize the column page of complex type + * @param columnToDictMap + * dictionary map + * @param columnNames + * list of columns + * @param pageSize + * number of records + * @throws MemoryException + * if memory is not sufficient + */ + public void initialize(Map columnToDictMap, + List columnNames, int pageSize) throws MemoryException { + for (int i = 0; i < this.columnPages.length; i++) { + LocalDictionaryGenerator localDictionaryGenerator = columnToDictMap.get(columnNames.get(i)); + if (null == localDictionaryGenerator) { + TableSpec.ColumnSpec spec = TableSpec.ColumnSpec + .newInstance(columnNames.get(i), DataTypes.BYTE_ARRAY, complexColumnType.get(i)); + this.columnPages[i] = ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, pageSize); + this.columnPages[i].setStatsCollector(new DummyStatsCollector()); + } else { + TableSpec.ColumnSpec spec = TableSpec.ColumnSpec + .newInstance(columnNames.get(i), DataTypes.BYTE_ARRAY, complexColumnType.get(i)); + this.columnPages[i] = ColumnPage + .newLocalDictPage(spec, DataTypes.BYTE_ARRAY, pageSize, localDictionaryGenerator); + this.columnPages[i].setStatsCollector(new DummyStatsCollector()); } - }; + } } + /** + * + * @return depth + */ public int getDepth() { return depth; } - public int getPageSize() { - return pageSize; - } - + /** + * return the type of complex column + * @param isDepth + * @return co plex column type + */ public ColumnType getComplexColumnType(int isDepth) { return complexColumnType.get(isDepth); } + + /** + * method to add complex column data + * @param depth + * depth of column + * @param dataList + * dataList + */ + public void putComplexData(int depth, List dataList) { + assert (depth <= this.depth); + int currentNumber = currentRowIdList[depth]; + for (int i = 0; i < dataList.size(); i++) { + columnPages[depth].putData(currentNumber, dataList.get(i)); + currentNumber++; + } + currentRowIdList[depth] = currentNumber; + } + + /** + * to free the used memory + */ + public void freeMemory() { + for (int i = 0; i < depth; i++) { + columnPages[i].freeMemory(); + } + } + + /** + * return the column page + * @param depth + * depth of column + * @return colum page + */ + public ColumnPage getColumnPage(int depth) { + assert (depth <= this.depth); + return columnPages[depth]; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java new file mode 100644 index 00000000000..32846a125d3 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datastore.page; + +import java.util.concurrent.Callable; + +import org.apache.carbondata.core.datastore.TableSpec; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; +import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory; +import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage; + +/** + * Below class will be used to encode column pages for which local dictionary was generated + * but all the pages in blocklet was not encoded with local dictionary. + * This is required as all the pages of a column in blocklet either it will be local dictionary + * encoded or without local dictionary encoded. + */ +public class FallbackColumnPageEncoder implements Callable { + + /** + * actual local dictionary generated column page + */ + private EncodedColumnPage encodedColumnPage; + + /** + * actual index in the page + * this is required as in a blocklet few pages will be local dictionary + * encoded and few pages will be plain text encoding + * in this case local dictionary encoded page + */ + private int pageIndex; + + public FallbackColumnPageEncoder(EncodedColumnPage encodedColumnPage, int pageIndex) { + this.encodedColumnPage = encodedColumnPage; + this.pageIndex = pageIndex; + } + + @Override public FallbackEncodedColumnPage call() throws Exception { + // disable encoding using local dictionary + encodedColumnPage.getActualPage().disableLocalDictEncoding(); + // new encoded column page + EncodedColumnPage newEncodedColumnPage; + + // get column spec for existing column page + TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec(); + switch (columnSpec.getColumnType()) { + case COMPLEX_ARRAY: + case COMPLEX_PRIMITIVE: + case COMPLEX_STRUCT: + case COMPLEX: + // for complex type column + newEncodedColumnPage = ColumnPageEncoder.encodedColumn( + encodedColumnPage.getActualPage()); + break; + default: + // for primitive column + ColumnPageEncoder columnPageEncoder = DefaultEncodingFactory.getInstance() + .createEncoder(encodedColumnPage.getActualPage().getColumnSpec(), + encodedColumnPage.getActualPage()); + newEncodedColumnPage = columnPageEncoder.encode(encodedColumnPage.getActualPage()); + } + FallbackEncodedColumnPage fallbackEncodedColumnPage = + new FallbackEncodedColumnPage(newEncodedColumnPage, pageIndex); + // here freeing the memory of raw column page as fallback is done and column page will not + // be used. + // This is required to free the memory once it is of no use + encodedColumnPage.freeMemory(); + return fallbackEncodedColumnPage; + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackEncodedColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackEncodedColumnPage.java new file mode 100644 index 00000000000..9ce87b5e14a --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackEncodedColumnPage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datastore.page; + +import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage; + +/** + * Maintains the fallback encoded page and metadata + */ +public class FallbackEncodedColumnPage { + + /** + * encode page + */ + private EncodedColumnPage encodedColumnPage; + + /** + * page index in a blocklet + */ + private int pageIndex; + + public FallbackEncodedColumnPage(EncodedColumnPage encodedColumnPage, int pageIndex) { + this.encodedColumnPage = encodedColumnPage; + this.pageIndex = pageIndex; + } + + public EncodedColumnPage getEncodedColumnPage() { + return encodedColumnPage; + } + + public int getPageIndex() { + return pageIndex; + } + +} diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java new file mode 100644 index 00000000000..2c7d3a79569 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.page; + +import java.io.IOException; +import java.math.BigDecimal; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.localdictionary.PageLevelDictionary; +import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException; +import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; +import org.apache.carbondata.core.util.ByteUtil; + +/** + * Column page implementation for Local dictionary generated columns + * Its a decorator over two column page + * 1. Which will hold the actual data + * 2. Which will hold the dictionary encoded data + */ +public class LocalDictColumnPage extends ColumnPage { + + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(LocalDictColumnPage.class.getName()); + + /** + * to maintain page level dictionary for column page + */ + private PageLevelDictionary pageLevelDictionary; + + /** + * to hold the actual data of the column + */ + private ColumnPage actualDataColumnPage; + + /** + * to hold the dictionary encoded column page + */ + private ColumnPage encodedDataColumnPage; + + /** + * to check if actual column page memory is already clear + */ + private boolean isActualPageMemoryFreed; + + /** + * Create a new column page with input data type and page size. + */ + protected LocalDictColumnPage(ColumnPage actualDataColumnPage, ColumnPage encodedColumnpage, + LocalDictionaryGenerator localDictionaryGenerator) { + super(actualDataColumnPage.getColumnSpec(), actualDataColumnPage.getDataType(), + actualDataColumnPage.getPageSize()); + // if threshold is not reached then create page level dictionary + // for encoding with local dictionary + if (!localDictionaryGenerator.isThresholdReached()) { + pageLevelDictionary = new PageLevelDictionary(localDictionaryGenerator, + actualDataColumnPage.getColumnSpec().getFieldName(), actualDataColumnPage.getDataType()); + this.encodedDataColumnPage = encodedColumnpage; + } else { + // else free the encoded column page memory as its of no use + encodedColumnpage.freeMemory(); + } + this.actualDataColumnPage = actualDataColumnPage; + } + + @Override public byte[][] getByteArrayPage() { + if (null != pageLevelDictionary) { + return encodedDataColumnPage.getByteArrayPage(); + } else { + return actualDataColumnPage.getByteArrayPage(); + } + } + + /** + * Below method will be used to check whether page is local dictionary + * generated or not. This will be used for while enoding the the page + * + * @return + */ + public boolean isLocalDictGeneratedPage() { + return null != pageLevelDictionary; + } + + /** + * Below method will be used to add column data to page + * + * @param rowId row number + * @param bytes actual data + */ + @Override public void putBytes(int rowId, byte[] bytes) { + if (null != pageLevelDictionary) { + try { + actualDataColumnPage.putBytes(rowId, bytes); + int dictionaryValue = pageLevelDictionary.getDictionaryValue(bytes); + encodedDataColumnPage.putBytes(rowId, ByteUtil.toBytes(dictionaryValue)); + } catch (DictionaryThresholdReachedException e) { + LOGGER.error(e, "Local Dictionary threshold reached for the column: " + actualDataColumnPage + .getColumnSpec().getFieldName()); + pageLevelDictionary = null; + encodedDataColumnPage.freeMemory(); + encodedDataColumnPage = null; + } + } else { + actualDataColumnPage.putBytes(rowId, bytes); + } + } + + @Override public void disableLocalDictEncoding() { + pageLevelDictionary = null; + freeEncodedColumnPage(); + } + + @Override public PageLevelDictionary getColumnPageDictionary() { + return pageLevelDictionary; + } + + @Override public void setBytePage(byte[] byteData) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void setShortPage(short[] shortData) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void setShortIntPage(byte[] shortIntData) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void setIntPage(int[] intData) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void setLongPage(long[] longData) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void setFloatPage(float[] floatData) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void setDoublePage(double[] doubleData) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void setByteArrayPage(byte[][] byteArray) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void freeMemory() { + if (null == pageLevelDictionary) { + actualDataColumnPage.freeMemory(); + isActualPageMemoryFreed = true; + } + } + + public void freeMemoryForce() { + if (!isActualPageMemoryFreed) { + actualDataColumnPage.freeMemory(); + isActualPageMemoryFreed = true; + } + freeEncodedColumnPage(); + } + + private void freeEncodedColumnPage() { + if (null != encodedDataColumnPage) { + encodedDataColumnPage.freeMemory(); + encodedDataColumnPage = null; + } + } + + @Override public void putByte(int rowId, byte value) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void putShort(int rowId, short value) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void putInt(int rowId, int value) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void putLong(int rowId, long value) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void putDouble(int rowId, double value) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void putDecimal(int rowId, BigDecimal decimal) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void putShortInt(int rowId, int value) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void putBytes(int rowId, byte[] bytes, int offset, int length) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public byte getByte(int rowId) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public short getShort(int rowId) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public int getShortInt(int rowId) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public int getInt(int rowId) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public long getLong(int rowId) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public float getFloat(int rowId) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public double getDouble(int rowId) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public BigDecimal getDecimal(int rowId) { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public byte[] getBytes(int rowId) { + return actualDataColumnPage.getBytes(rowId); + } + + @Override public byte[] getBytePage() { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public short[] getShortPage() { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public byte[] getShortIntPage() { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public int[] getIntPage() { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public long[] getLongPage() { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public float[] getFloatPage() { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public double[] getDoublePage() { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public byte[] getLVFlattenedBytePage() throws IOException { + if (null != encodedDataColumnPage) { + return encodedDataColumnPage.getLVFlattenedBytePage(); + } else { + return actualDataColumnPage.getLVFlattenedBytePage(); + } + } + + @Override public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException { + if (null != encodedDataColumnPage) { + return encodedDataColumnPage.getComplexChildrenLVFlattenedBytePage(); + } else { + return actualDataColumnPage.getComplexChildrenLVFlattenedBytePage(); + } + } + + @Override public byte[] getComplexParentFlattenedBytePage() throws IOException { + if (null != encodedDataColumnPage) { + return encodedDataColumnPage.getComplexParentFlattenedBytePage(); + } else { + return actualDataColumnPage.getComplexParentFlattenedBytePage(); + } + } + + @Override public byte[] getDecimalPage() { + throw new UnsupportedOperationException("Operation not supported"); + } + + @Override public void convertValue(ColumnPageValueConverter codec) { + throw new UnsupportedOperationException("Operation not supported"); + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java index 7b1ad2071c1..c2eb40ce1f7 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java @@ -21,6 +21,8 @@ import java.io.DataOutputStream; import java.io.IOException; import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.metadata.datatype.DataType; @@ -28,11 +30,11 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase { // for string and decimal data - private byte[][] byteArrayData; + private List byteArrayData; SafeVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) { super(columnSpec, dataType, pageSize); - byteArrayData = new byte[pageSize][]; + byteArrayData = new ArrayList<>(); } @Override @@ -42,13 +44,12 @@ public void freeMemory() { @Override public void putBytesAtRow(int rowId, byte[] bytes) { - byteArrayData[rowId] = bytes; + byteArrayData.add(bytes); } @Override public void putBytes(int rowId, byte[] bytes, int offset, int length) { - byteArrayData[rowId] = new byte[length]; - System.arraycopy(bytes, offset, byteArrayData[rowId], 0, length); + byteArrayData.add(bytes); } @Override public void putDecimal(int rowId, BigDecimal decimal) { @@ -62,12 +63,14 @@ public BigDecimal getDecimal(int rowId) { @Override public byte[] getBytes(int rowId) { - return byteArrayData[rowId]; + return byteArrayData.get(rowId); } @Override public void setByteArrayPage(byte[][] byteArray) { - byteArrayData = byteArray; + for (byte[] data : byteArray) { + byteArrayData.add(data); + } } @Override @@ -104,12 +107,12 @@ public byte[] getComplexParentFlattenedBytePage() throws IOException { @Override public byte[][] getByteArrayPage() { - return byteArrayData; + return byteArrayData.toArray(new byte[byteArrayData.size()][]); } @Override void copyBytes(int rowId, byte[] dest, int destOffset, int length) { - System.arraycopy(byteArrayData[rowId], 0, dest, destOffset, length); + System.arraycopy(byteArrayData.get(rowId), 0, dest, destOffset, length); } } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java index 1cdefc81630..7449da6c24e 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java @@ -168,7 +168,7 @@ public void putBytes(int rowId, byte[] bytes, int offset, int length) { throw new RuntimeException(e); } CarbonUnsafe.getUnsafe().copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET + offset, baseAddress, - baseOffset + rowOffset[rowId], length); + baseOffset + rowOffset.get(rowId), length); } @Override @@ -193,9 +193,9 @@ public byte getByte(int rowId) { @Override public byte[] getBytes(int rowId) { - int length = rowOffset[rowId + 1] - rowOffset[rowId]; + int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId); byte[] bytes = new byte[length]; - CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId], + CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId), bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length); return bytes; } @@ -242,9 +242,9 @@ public BigDecimal getDecimal(int rowId) { } else if (dataType == DataTypes.LONG) { value = getLong(rowId); } else { - int length = rowOffset[rowId + 1] - rowOffset[rowId]; + int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId); byte[] bytes = new byte[length]; - CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId], bytes, + CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId), bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length); return decimalConverter.getDecimal(bytes); } @@ -253,7 +253,7 @@ public BigDecimal getDecimal(int rowId) { @Override void copyBytes(int rowId, byte[] dest, int destOffset, int length) { - CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId], dest, + CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId), dest, CarbonUnsafe.BYTE_ARRAY_OFFSET + destOffset, length); } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java index 9d6e16162b5..f60e505a512 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java @@ -65,7 +65,7 @@ public void putBytes(int rowId, byte[] bytes, int offset, int length) { throw new RuntimeException(e); } CarbonUnsafe.getUnsafe().copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET + offset, - baseAddress, baseOffset + rowOffset[rowId], length); + baseAddress, baseOffset + rowOffset.get(rowId), length); } @Override @@ -89,20 +89,20 @@ public BigDecimal getDecimal(int rowId) { @Override public byte[] getBytes(int rowId) { - int length = rowOffset[rowId + 1] - rowOffset[rowId]; + int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId); byte[] bytes = new byte[length]; - CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId], + CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId), bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length); return bytes; } @Override public byte[][] getByteArrayPage() { - byte[][] bytes = new byte[pageSize][]; - for (int rowId = 0; rowId < pageSize; rowId++) { - int length = rowOffset[rowId + 1] - rowOffset[rowId]; + byte[][] bytes = new byte[rowOffset.size() - 1][]; + for (int rowId = 0; rowId < rowOffset.size() - 1; rowId++) { + int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId); byte[] rowData = new byte[length]; - CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId], + CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId), rowData, CarbonUnsafe.BYTE_ARRAY_OFFSET, length); bytes[rowId] = rowData; } @@ -111,7 +111,7 @@ public byte[][] getByteArrayPage() { @Override void copyBytes(int rowId, byte[] dest, int destOffset, int length) { - CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId], + CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset.get(rowId), dest, CarbonUnsafe.BYTE_ARRAY_OFFSET + destOffset, length); } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java index cb907a570dc..bd49b94183b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java @@ -53,7 +53,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { Object baseAddress; // the offset of row in the unsafe memory, its size is pageSize + 1 - int[] rowOffset; + List rowOffset; // the length of bytes added in the page int totalLength; @@ -66,7 +66,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { VarLengthColumnPageBase(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) { super(columnSpec, dataType, pageSize); - rowOffset = new int[pageSize + 1]; + rowOffset = new ArrayList<>(); totalLength = 0; } @@ -160,9 +160,9 @@ private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec, // set total length and rowOffset in page page.totalLength = offset; - page.rowOffset = new int[rowId + 1]; - for (int i = 0; i < rowId + 1; i++) { - page.rowOffset[i] = rowOffset.get(i); + page.rowOffset = new ArrayList<>(); + for (int i = 0; i < rowOffset.size(); i++) { + page.rowOffset.add(rowOffset.get(i)); } for (int i = 0; i < rowId; i++) { page.putBytes(i, lvEncodedBytes, i * size, size); @@ -240,9 +240,9 @@ private static VarLengthColumnPageBase getVarLengthColumnPage(TableSpec.ColumnSp // set total length and rowOffset in page page.totalLength = offset; - page.rowOffset = new int[rowId + 1]; - for (int i = 0; i < rowId + 1; i++) { - page.rowOffset[i] = rowOffset.get(i); + page.rowOffset = new ArrayList<>(); + for (int i = 0; i < rowOffset.size(); i++) { + page.rowOffset.add(rowOffset.get(i)); } // set data in page @@ -296,9 +296,9 @@ public void putBytes(int rowId, byte[] bytes) { + " exceed this limit at rowId " + rowId); } if (rowId == 0) { - rowOffset[0] = 0; + rowOffset.add(0); } - rowOffset[rowId + 1] = rowOffset[rowId] + bytes.length; + rowOffset.add(rowOffset.get(rowId) + bytes.length); putBytesAtRow(rowId, bytes); totalLength += bytes.length; } @@ -379,7 +379,7 @@ public byte[] getDecimalPage() { int offset = 0; byte[] data = new byte[totalLength]; for (int rowId = 0; rowId < pageSize; rowId++) { - int length = rowOffset[rowId + 1] - rowOffset[rowId]; + int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId); copyBytes(rowId, data, offset, length); offset += length; } @@ -395,9 +395,9 @@ public byte[] getDecimalPage() { public byte[] getLVFlattenedBytePage() throws IOException { // output LV encoded byte array int offset = 0; - byte[] data = new byte[totalLength + pageSize * 4]; - for (int rowId = 0; rowId < pageSize; rowId++) { - int length = rowOffset[rowId + 1] - rowOffset[rowId]; + byte[] data = new byte[totalLength + ((rowOffset.size() - 1) * 4)]; + for (int rowId = 0; rowId < rowOffset.size() - 1; rowId++) { + int length = rowOffset.get(rowId + 1) - rowOffset.get(rowId); ByteUtil.setInt(data, offset, length); copyBytes(rowId, data, offset + 4, length); offset += 4 + length; @@ -408,9 +408,9 @@ public byte[] getLVFlattenedBytePage() throws IOException { @Override public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException { // output LV encoded byte array int offset = 0; - byte[] data = new byte[totalLength + pageSize * 2]; - for (int rowId = 0; rowId < pageSize; rowId++) { - short length = (short) (rowOffset[rowId + 1] - rowOffset[rowId]); + byte[] data = new byte[totalLength + ((rowOffset.size() - 1) * 2)]; + for (int rowId = 0; rowId < rowOffset.size() - 1; rowId++) { + short length = (short) (rowOffset.get(rowId + 1) - rowOffset.get(rowId)); ByteUtil.setShort(data, offset, length); copyBytes(rowId, data, offset + 2, length); offset += 2 + length; @@ -423,8 +423,8 @@ public byte[] getComplexParentFlattenedBytePage() throws IOException { // output LV encoded byte array int offset = 0; byte[] data = new byte[totalLength]; - for (int rowId = 0; rowId < pageSize; rowId++) { - short length = (short) (rowOffset[rowId + 1] - rowOffset[rowId]); + for (int rowId = 0; rowId < rowOffset.size() - 1; rowId++) { + short length = (short) (rowOffset.get(rowId + 1) - rowOffset.get(rowId)); copyBytes(rowId, data, offset, length); offset += length; } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java index 8bff5cc78c9..f53024a7770 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java @@ -22,11 +22,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import org.apache.carbondata.core.datastore.ColumnType; -import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; @@ -39,6 +36,8 @@ import org.apache.carbondata.format.BlockletMinMaxIndex; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.Encoding; +import org.apache.carbondata.format.LocalDictionaryChunk; +import org.apache.carbondata.format.LocalDictionaryChunkMeta; import org.apache.carbondata.format.PresenceMeta; public abstract class ColumnPageEncoder { @@ -56,7 +55,7 @@ public abstract class ColumnPageEncoder { public EncodedColumnPage encode(ColumnPage inputPage) throws IOException, MemoryException { byte[] encodedBytes = encodeData(inputPage); DataChunk2 pageMetadata = buildPageMetadata(inputPage, encodedBytes); - return new EncodedColumnPage(pageMetadata, encodedBytes, inputPage.getStatistics()); + return new EncodedColumnPage(pageMetadata, encodedBytes, inputPage); } private DataChunk2 buildPageMetadata(ColumnPage inputPage, byte[] encodedBytes) @@ -138,22 +137,39 @@ public static EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) throws IOException, MemoryException { EncodedColumnPage[] encodedPages = new EncodedColumnPage[input.getDepth()]; int index = 0; - Iterator iterator = input.iterator(); - while (iterator.hasNext()) { - byte[][] subColumnPage = iterator.next(); - encodedPages[index] = encodeChildColumn(subColumnPage, input.getComplexColumnType(index)); + while (index < input.getDepth()) { + ColumnPage subColumnPage = input.getColumnPage(index); + encodedPages[index] = encodedColumn(subColumnPage); index++; } return encodedPages; } - private static EncodedColumnPage encodeChildColumn(byte[][] data, ColumnType complexDataType) + public static EncodedColumnPage encodedColumn(ColumnPage page) throws IOException, MemoryException { - TableSpec.ColumnSpec spec = TableSpec.ColumnSpec - .newInstance("complex_inner_column", DataTypes.BYTE_ARRAY, complexDataType); - ColumnPage page = ColumnPage.wrapByteArrayPage(spec, data); ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null); return encoder.encode(page); } + /** + * Below method to encode the dictionary page + * @param dictionaryPage + * dictionary column page + * @return local dictionary chunk + * @throws IOException + * Problem in encoding + * @throws MemoryException + * problem in encoding + */ + public LocalDictionaryChunk encodeDictionary(ColumnPage dictionaryPage) + throws IOException, MemoryException { + LocalDictionaryChunk localDictionaryChunk = new LocalDictionaryChunk(); + localDictionaryChunk.setDictionary_data(encodeData(dictionaryPage)); + LocalDictionaryChunkMeta localDictionaryChunkMeta = new LocalDictionaryChunkMeta(); + localDictionaryChunkMeta.setEncoders(getEncodingList()); + localDictionaryChunkMeta.setEncoder_meta(buildEncoderMeta(dictionaryPage)); + localDictionaryChunk.setDictionary_meta(localDictionaryChunkMeta); + return localDictionaryChunk; + } + } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java index 43d6fc658e9..6f78d9540f3 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedColumnPage.java @@ -19,7 +19,10 @@ import java.nio.ByteBuffer; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.LocalDictColumnPage; import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.localdictionary.PageLevelDictionary; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.DataChunk2; @@ -34,8 +37,7 @@ public class EncodedColumnPage { // metadata of this page private DataChunk2 pageMetadata; - // stats of this page - private SimpleStatsResult stats; + private ColumnPage actualPage; /** * Constructor @@ -43,7 +45,7 @@ public class EncodedColumnPage { * @param encodedData encoded data for this page */ public EncodedColumnPage(DataChunk2 pageMetadata, byte[] encodedData, - SimpleStatsResult stats) { + ColumnPage actualPage) { if (pageMetadata == null) { throw new IllegalArgumentException("data chunk2 must not be null"); } @@ -52,7 +54,7 @@ public EncodedColumnPage(DataChunk2 pageMetadata, byte[] encodedData, } this.pageMetadata = pageMetadata; this.encodedData = encodedData; - this.stats = stats; + this.actualPage = actualPage; } /** @@ -76,6 +78,25 @@ public int getTotalSerializedSize() { } public SimpleStatsResult getStats() { - return stats; + return actualPage.getStatistics(); + } + + public ColumnPage getActualPage() { + return actualPage; + } + + public boolean isLocalDictGeneratedPage() { + return actualPage.isLocalDictGeneratedPage(); + } + + public PageLevelDictionary getPageDictionary() { + return actualPage.getColumnPageDictionary(); + } + + public void freeMemory() { + if (actualPage instanceof LocalDictColumnPage) { + LocalDictColumnPage page = (LocalDictColumnPage) actualPage; + page.freeMemoryForce(); + } } } \ No newline at end of file diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java index d15765454a8..56948171c03 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java @@ -51,7 +51,7 @@ void encodeIndexStorage(ColumnPage inputPage) { if (isInvertedIndex) { indexStorage = new BlockIndexerStorageForShort(data, true, false, isSort); } else { - indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false); + indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false, false); } byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage()); super.compressedDataPage = compressor.compressByte(flattened); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java index 1e5015ba3e1..17a523cc110 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java @@ -52,7 +52,7 @@ void encodeIndexStorage(ColumnPage inputPage) { if (isInvertedIndex) { indexStorage = new BlockIndexerStorageForShort(data, false, false, isSort); } else { - indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false); + indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false, false); } byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage()); super.compressedDataPage = compressor.compressByte(flattened); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java index 741dbfed293..c68f3940392 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java @@ -55,10 +55,12 @@ public ColumnPageEncoder createEncoder(Map parameter) { protected void encodeIndexStorage(ColumnPage input) { IndexStorage indexStorage; byte[][] data = input.getByteArrayPage(); + boolean isDictionary = input.isLocalDictGeneratedPage(); if (isInvertedIndex) { - indexStorage = new BlockIndexerStorageForShort(data, false, true, isSort); + indexStorage = new BlockIndexerStorageForShort(data, isDictionary, !isDictionary, isSort); } else { - indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, true); + indexStorage = + new BlockIndexerStorageForNoInvertedIndexForShort(data, !isDictionary, false); } byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage()); super.compressedDataPage = compressor.compressByte(flattened); @@ -75,8 +77,6 @@ protected List getEncodingList() { } return encodings; } - }; } - } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java new file mode 100644 index 00000000000..a8bc5f16b79 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datastore.page.statistics; + +import java.math.BigDecimal; + +import org.apache.carbondata.core.metadata.datatype.DataType; + +import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE_ARRAY; + +/** + * Column Page dummy stats collector. This will be used for which stats generation + * is not required for example complex type column + */ +public class DummyStatsCollector implements ColumnPageStatsCollector { + + /** + * dummy stats used to sync with encoder + */ + protected static final SimpleStatsResult DUMMY_STATS = new SimpleStatsResult() { + @Override public Object getMin() { + return new byte[0]; + } + + @Override public Object getMax() { + return new byte[0]; + } + + @Override public int getDecimalCount() { + return 0; + } + + @Override public DataType getDataType() { + return BYTE_ARRAY; + } + + }; + + @Override public void updateNull(int rowId) { + + } + + @Override public void update(byte value) { + + } + + @Override public void update(short value) { + + } + + @Override public void update(int value) { + + } + + @Override public void update(long value) { + + } + + @Override public void update(double value) { + + } + + @Override public void update(BigDecimal value) { + + } + + @Override public void update(byte[] value) { + + } + + @Override public SimpleStatsResult getPageStats() { + return DUMMY_STATS; + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java new file mode 100644 index 00000000000..3ea36ef81e1 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/PageLevelDictionary.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.localdictionary; + +import java.io.IOException; +import java.util.BitSet; + +import org.apache.carbondata.core.datastore.ColumnType; +import org.apache.carbondata.core.datastore.TableSpec; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder; +import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec; +import org.apache.carbondata.core.datastore.page.statistics.DummyStatsCollector; +import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException; +import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.format.LocalDictionaryChunk; + +/** + * Class to maintain page level dictionary. It will store all unique dictionary values + * used in a page. This is required while writing blocklet level dictionary in carbondata + * file + */ +public class PageLevelDictionary { + + /** + * dictionary generator to generate dictionary values for page data + */ + private LocalDictionaryGenerator localDictionaryGenerator; + + /** + * set of dictionary surrogate key in this page + */ + private BitSet usedDictionaryValues; + + private String columnName; + + private DataType dataType; + + public PageLevelDictionary(LocalDictionaryGenerator localDictionaryGenerator, String columnName, + DataType dataType) { + this.localDictionaryGenerator = localDictionaryGenerator; + this.usedDictionaryValues = new BitSet(); + this.columnName = columnName; + this.dataType = dataType; + } + + /** + * Below method will be used to get the dictionary value + * + * @param data column data + * @return dictionary value + * @throws DictionaryThresholdReachedException when threshold crossed for column + */ + public int getDictionaryValue(byte[] data) throws DictionaryThresholdReachedException { + int dictionaryValue = localDictionaryGenerator.generateDictionary(data); + this.usedDictionaryValues.set(dictionaryValue); + return dictionaryValue; + } + + /** + * Method to merge the dictionary value across pages + * + * @param pageLevelDictionary other page level dictionary + */ + public void mergerDictionaryValues(PageLevelDictionary pageLevelDictionary) { + usedDictionaryValues.and(pageLevelDictionary.usedDictionaryValues); + } + + /** + * Below method will be used to get the local dictionary chunk for writing + * @TODO Support for numeric data type dictionary exclude columns + * @return encoded local dictionary chunk + * @throws MemoryException + * in case of problem in encoding + * @throws IOException + * in case of problem in encoding + */ + public LocalDictionaryChunk getLocalDictionaryChunkForBlocklet() + throws MemoryException, IOException { + // TODO support for actual data type dictionary ColumnSPEC + ColumnType columnType = ColumnType.PLAIN_VALUE; + if (DataTypes.VARCHAR == dataType) { + columnType = ColumnType.PLAIN_LONG_VALUE; + } + TableSpec.ColumnSpec spec = + TableSpec.ColumnSpec.newInstance(columnName, DataTypes.BYTE_ARRAY, columnType); + ColumnPage dictionaryColumnPage = + ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, usedDictionaryValues.cardinality()); + // TODO support data type specific stats collector for numeric data types + dictionaryColumnPage.setStatsCollector(new DummyStatsCollector()); + int rowId = 0; + for (int i = usedDictionaryValues.nextSetBit(0); + i >= 0; i = usedDictionaryValues.nextSetBit(i + 1)) { + dictionaryColumnPage + .putData(rowId++, localDictionaryGenerator.getDictionaryKeyBasedOnValue(i)); + } + // creating a encoder + ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null); + // get encoded dictionary values + LocalDictionaryChunk localDictionaryChunk = encoder.encodeDictionary(dictionaryColumnPage); + // set compressed dictionary values + localDictionaryChunk.setDictionary_values(CompressorFactory.getInstance().getCompressor() + .compressByte(usedDictionaryValues.toByteArray())); + // free the dictionary page memory + dictionaryColumnPage.freeMemory(); + return localDictionaryChunk; + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/DictionaryStore.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/DictionaryStore.java new file mode 100644 index 00000000000..226104bb650 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/DictionaryStore.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.localdictionary.dictionaryholder; + +import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException; + +/** + * Interface for storing the dictionary key and value. + * Concrete implementation can be of map based or trie based. + */ +public interface DictionaryStore { + + /** + * Below method will be used to add dictionary value to dictionary holder + * if it is already present in the holder then it will return exiting dictionary value. + * @param key + * dictionary key + * @return dictionary value + */ + int putIfAbsent(byte[] key) throws DictionaryThresholdReachedException; + + /** + * Below method to get the current size of dictionary + * @return true if threshold of store reached + */ + boolean isThresholdReached(); + + /** + * Below method will be used to get the dictionary key based on value + * @param value + * dictionary value + * @return dictionary key based on value + */ + byte[] getDictionaryKeyBasedOnValue(int value); + +} diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/MapBasedDictionaryStore.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/MapBasedDictionaryStore.java new file mode 100644 index 00000000000..05ca0023215 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/dictionaryholder/MapBasedDictionaryStore.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.localdictionary.dictionaryholder; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.carbondata.core.cache.dictionary.DictionaryByteArrayWrapper; +import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException; + +/** + * Map based dictionary holder class, it will use map to hold + * the dictionary key and its value + */ +public class MapBasedDictionaryStore implements DictionaryStore { + + /** + * use to assign dictionary value to new key + */ + private int lastAssignValue; + + /** + * to maintain dictionary key value + */ + private final Map dictionary; + + /** + * maintaining array for reverse lookup + * otherwise iterating everytime in map for reverse lookup will be slowdown the performance + * It will only maintain the reference + */ + private DictionaryByteArrayWrapper[] referenceDictionaryArray; + + /** + * dictionary threshold to check if threshold is reached + */ + private int dictionaryThreshold; + + /** + * for checking threshold is reached or not + */ + private boolean isThresholdReached; + + public MapBasedDictionaryStore(int dictionaryThreshold) { + this.dictionaryThreshold = dictionaryThreshold; + this.dictionary = new ConcurrentHashMap<>(); + this.referenceDictionaryArray = new DictionaryByteArrayWrapper[dictionaryThreshold]; + } + + /** + * Below method will be used to add dictionary value to dictionary holder + * if it is already present in the holder then it will return exiting dictionary value. + * + * @param data dictionary key + * @return dictionary value + */ + @Override public int putIfAbsent(byte[] data) throws DictionaryThresholdReachedException { + // check if threshold has already reached + checkIfThresholdReached(); + DictionaryByteArrayWrapper key = new DictionaryByteArrayWrapper(data); + // get the dictionary value + Integer value = dictionary.get(key); + // if value is null then dictionary is not present in store + if (null == value) { + // aquire the lock + synchronized (dictionary) { + // check threshold + checkIfThresholdReached(); + // get the value again as other thread might have added + value = dictionary.get(key); + // double chekcing + if (null == value) { + // increment the value + value = ++lastAssignValue; + // if new value is greater than threshold + if (value > dictionaryThreshold) { + // clear the dictionary + dictionary.clear(); + referenceDictionaryArray = null; + // set the threshold boolean to true + isThresholdReached = true; + // throw exception + checkIfThresholdReached(); + } + // add to reference array + // position is -1 as dictionary value starts from 1 + this.referenceDictionaryArray[value - 1] = key; + dictionary.put(key, value); + } + } + } + return value; + } + + private void checkIfThresholdReached() throws DictionaryThresholdReachedException { + if (isThresholdReached) { + throw new DictionaryThresholdReachedException( + "Unable to generate dictionary value. Dictionary threshold reached"); + } + } + + /** + * Below method to get the current size of dictionary + * + * @return + */ + @Override public boolean isThresholdReached() { + return isThresholdReached; + } + + /** + * Below method will be used to get the dictionary key based on value + * + * @param value dictionary value + * Caller will take of passing proper value + * @return dictionary key based on value + */ + @Override public byte[] getDictionaryKeyBasedOnValue(int value) { + assert referenceDictionaryArray != null; + // reference array index will be -1 of the value as dictionary value starts from 1 + return referenceDictionaryArray[value - 1].getData(); + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/exception/DictionaryThresholdReachedException.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/exception/DictionaryThresholdReachedException.java new file mode 100644 index 00000000000..7d648e0b6ce --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/exception/DictionaryThresholdReachedException.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.localdictionary.exception; + +import java.util.Locale; + +public class DictionaryThresholdReachedException extends Exception { + /** + * default serial version ID. + */ + private static final long serialVersionUID = 1L; + + /** + * The Error message. + */ + private String msg = ""; + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public DictionaryThresholdReachedException(String msg) { + super(msg); + this.msg = msg; + } + + /** + * Constructor + * + * @param msg exception message + * @param throwable detail exception + */ + public DictionaryThresholdReachedException(String msg, Throwable throwable) { + super(msg, throwable); + this.msg = msg; + } + + /** + * Constructor + * + * @param throwable exception + */ + public DictionaryThresholdReachedException(Throwable throwable) { + super(throwable); + } + + /** + * This method is used to get the localized message. + * + * @param locale - A Locale object represents a specific geographical, + * political, or cultural region. + * @return - Localized error message. + */ + public String getLocalizedMessage(Locale locale) { + return ""; + } + + /** + * getLocalizedMessage + */ + @Override public String getLocalizedMessage() { + return super.getLocalizedMessage(); + } + + /** + * getMessage + */ + public String getMessage() { + return this.msg; + } +} + diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/ColumnLocalDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/ColumnLocalDictionaryGenerator.java new file mode 100644 index 00000000000..5ae9e27855e --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/ColumnLocalDictionaryGenerator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.localdictionary.generator; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.localdictionary.dictionaryholder.DictionaryStore; +import org.apache.carbondata.core.localdictionary.dictionaryholder.MapBasedDictionaryStore; +import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException; + +/** + * Class to generate local dictionary for column + */ +public class ColumnLocalDictionaryGenerator implements LocalDictionaryGenerator { + + /** + * dictionary holder to hold dictionary values + */ + private DictionaryStore dictionaryHolder; + + public ColumnLocalDictionaryGenerator(int threshold) { + // adding 1 to threshold for null value + int newThreshold = threshold + 1; + this.dictionaryHolder = new MapBasedDictionaryStore(newThreshold); + // for handling null values + try { + dictionaryHolder.putIfAbsent(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY); + } catch (DictionaryThresholdReachedException e) { + // do nothing + } + } + + /** + * Below method will be used to generate dictionary + * @param data + * data for which dictionary needs to be generated + * @return dictionary value + */ + @Override public int generateDictionary(byte[] data) throws DictionaryThresholdReachedException { + int dictionaryValue = this.dictionaryHolder.putIfAbsent(data); + return dictionaryValue; + } + + /** + * Below method will be used to check if threshold is reached + * for dictionary for particular column + * @return true if dictionary threshold reached for column + */ + @Override public boolean isThresholdReached() { + return this.dictionaryHolder.isThresholdReached(); + } + + /** + * Below method will be used to get the dictionary key based on value + * @param value + * dictionary value + * @return dictionary key based on value + */ + @Override public byte[] getDictionaryKeyBasedOnValue(int value) { + return this.dictionaryHolder.getDictionaryKeyBasedOnValue(value); + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/LocalDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/LocalDictionaryGenerator.java new file mode 100644 index 00000000000..553c65b4045 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/localdictionary/generator/LocalDictionaryGenerator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.localdictionary.generator; + +import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException; + +/** + * Interface for generating dictionary for column + */ +public interface LocalDictionaryGenerator { + + /** + * Below method will be used to generate dictionary + * @param data + * data for which dictionary needs to be generated + * @return dictionary value + */ + int generateDictionary(byte[] data) throws DictionaryThresholdReachedException; + + /** + * Below method will be used to check if threshold is reached + * for dictionary for particular column + * @return true if dictionary threshold reached for column + */ + boolean isThresholdReached(); + + /** + * Below method will be used to get the dictionary key based on value + * @param value + * dictionary value + * @return dictionary key based on value + */ + byte[] getDictionaryKeyBasedOnValue(int value); +} diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 2cb19ea9191..68bd749d62f 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -482,7 +482,7 @@ public String getTableUniqueName() { * @return */ public boolean isLocalDictionaryEnabled() { - return isLocalDictionaryEnabled; + return false; } /** diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java index af5121cf58a..58de0304545 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java @@ -23,7 +23,9 @@ import java.util.Set; import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.datastore.page.EncodedTablePage; +import org.apache.carbondata.core.datastore.blocklet.BlockletEncodedColumnPage; +import org.apache.carbondata.core.datastore.blocklet.EncodedBlocklet; +import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage; import org.apache.carbondata.core.datastore.page.statistics.TablePageStatistics; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.datatype.DataType; @@ -44,6 +46,7 @@ import org.apache.carbondata.format.FileFooter3; import org.apache.carbondata.format.FileHeader; import org.apache.carbondata.format.IndexHeader; +import org.apache.carbondata.format.LocalDictionaryChunk; import org.apache.carbondata.format.SegmentInfo; /** @@ -124,18 +127,39 @@ private static long getNumberOfRowForFooter(List infoList) { return numberOfRows; } - public static BlockletIndex getBlockletIndex(List encodedTablePageList, + private static EncodedColumnPage[] getEncodedColumnPages(EncodedBlocklet encodedBlocklet, + boolean isDimension, int pageIndex) { + int size = + isDimension ? encodedBlocklet.getNumberOfDimension() : encodedBlocklet.getNumberOfMeasure(); + EncodedColumnPage [] encodedPages = new EncodedColumnPage[size]; + + for (int i = 0; i < size; i++) { + if (isDimension) { + encodedPages[i] = + encodedBlocklet.getEncodedDimensionColumnPages().get(i).getEncodedColumnPageList() + .get(pageIndex); + } else { + encodedPages[i] = + encodedBlocklet.getEncodedMeasureColumnPages().get(i).getEncodedColumnPageList() + .get(pageIndex); + } + } + return encodedPages; + } + public static BlockletIndex getBlockletIndex(EncodedBlocklet encodedBlocklet, List carbonMeasureList) { BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex(); + // Calculating min/max for every each column. - TablePageStatistics stats = new TablePageStatistics(encodedTablePageList.get(0).getDimensions(), - encodedTablePageList.get(0).getMeasures()); + TablePageStatistics stats = + new TablePageStatistics(getEncodedColumnPages(encodedBlocklet, true, 0), + getEncodedColumnPages(encodedBlocklet, false, 0)); byte[][] minCol = stats.getDimensionMinValue().clone(); byte[][] maxCol = stats.getDimensionMaxValue().clone(); - for (EncodedTablePage encodedTablePage : encodedTablePageList) { - stats = new TablePageStatistics(encodedTablePage.getDimensions(), - encodedTablePage.getMeasures()); + for (int pageIndex = 0; pageIndex < encodedBlocklet.getNumberOfPages(); pageIndex++) { + stats = new TablePageStatistics(getEncodedColumnPages(encodedBlocklet, true, pageIndex), + getEncodedColumnPages(encodedBlocklet, false, pageIndex)); byte[][] columnMaxData = stats.getDimensionMaxValue(); byte[][] columnMinData = stats.getDimensionMinValue(); for (int i = 0; i < maxCol.length; i++) { @@ -155,16 +179,16 @@ public static BlockletIndex getBlockletIndex(List encodedTable blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min)); } - stats = new TablePageStatistics(encodedTablePageList.get(0).getDimensions(), - encodedTablePageList.get(0).getMeasures()); + stats = new TablePageStatistics(getEncodedColumnPages(encodedBlocklet, true, 0), + getEncodedColumnPages(encodedBlocklet, false, 0)); byte[][] measureMaxValue = stats.getMeasureMaxValue().clone(); byte[][] measureMinValue = stats.getMeasureMinValue().clone(); byte[] minVal = null; byte[] maxVal = null; - for (int i = 1; i < encodedTablePageList.size(); i++) { + for (int i = 1; i < encodedBlocklet.getNumberOfPages(); i++) { for (int j = 0; j < measureMinValue.length; j++) { - stats = new TablePageStatistics( - encodedTablePageList.get(i).getDimensions(), encodedTablePageList.get(i).getMeasures()); + stats = new TablePageStatistics(getEncodedColumnPages(encodedBlocklet, true, i), + getEncodedColumnPages(encodedBlocklet, false, i)); minVal = stats.getMeasureMinValue()[j]; maxVal = stats.getMeasureMaxValue()[j]; if (compareMeasureData(measureMaxValue[j], maxVal, carbonMeasureList.get(j).getDataType()) @@ -185,10 +209,11 @@ public static BlockletIndex getBlockletIndex(List encodedTable blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min)); } BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex(); - byte[] startKey = encodedTablePageList.get(0).getPageKey().serializeStartKey(); + byte[] startKey = encodedBlocklet.getPageMetadataList().get(0).serializeStartKey(); blockletBTreeIndex.setStart_key(startKey); - byte[] endKey = encodedTablePageList.get( - encodedTablePageList.size() - 1).getPageKey().serializeEndKey(); + byte[] endKey = + encodedBlocklet.getPageMetadataList().get(encodedBlocklet.getPageMetadataList().size() - 1) + .serializeEndKey(); blockletBTreeIndex.setEnd_key(endKey); BlockletIndex blockletIndex = new BlockletIndex(); blockletIndex.setMin_max_index(blockletMinMaxIndex); @@ -300,7 +325,8 @@ public static BlockletInfo3 getBlocletInfo3( /** * return DataChunk3 that contains the input DataChunk2 list */ - public static DataChunk3 getDataChunk3(List dataChunksList) { + public static DataChunk3 getDataChunk3(List dataChunksList, + LocalDictionaryChunk encodedDictionary) { int offset = 0; DataChunk3 dataChunk = new DataChunk3(); List pageOffsets = new ArrayList<>(); @@ -313,6 +339,7 @@ public static DataChunk3 getDataChunk3(List dataChunksList) { pageLengths.add(length); offset += length; } + dataChunk.setLocal_dictionary(encodedDictionary); dataChunk.setData_chunk_list(dataChunksList); dataChunk.setPage_length(pageLengths); dataChunk.setPage_offset(pageOffsets); @@ -323,26 +350,32 @@ public static DataChunk3 getDataChunk3(List dataChunksList) { * return DataChunk3 for the dimension column (specifed by `columnIndex`) * in `encodedTablePageList` */ - public static DataChunk3 getDimensionDataChunk3(List encodedTablePageList, - int columnIndex) throws IOException { - List dataChunksList = new ArrayList<>(encodedTablePageList.size()); - for (EncodedTablePage encodedTablePage : encodedTablePageList) { - dataChunksList.add(encodedTablePage.getDimension(columnIndex).getPageMetadata()); + public static DataChunk3 getDimensionDataChunk3(EncodedBlocklet encodedBlocklet, + int columnIndex) { + List dataChunksList = new ArrayList<>(); + BlockletEncodedColumnPage blockletEncodedColumnPage = + encodedBlocklet.getEncodedDimensionColumnPages().get(columnIndex); + for (EncodedColumnPage encodedColumnPage : blockletEncodedColumnPage + .getEncodedColumnPageList()) { + dataChunksList.add(encodedColumnPage.getPageMetadata()); } - return CarbonMetadataUtil.getDataChunk3(dataChunksList); + return CarbonMetadataUtil + .getDataChunk3(dataChunksList, blockletEncodedColumnPage.getEncodedDictionary()); } /** * return DataChunk3 for the measure column (specifed by `columnIndex`) * in `encodedTablePageList` */ - public static DataChunk3 getMeasureDataChunk3(List encodedTablePageList, - int columnIndex) throws IOException { - List dataChunksList = new ArrayList<>(encodedTablePageList.size()); - for (EncodedTablePage encodedTablePage : encodedTablePageList) { - dataChunksList.add(encodedTablePage.getMeasure(columnIndex).getPageMetadata()); + public static DataChunk3 getMeasureDataChunk3(EncodedBlocklet encodedBlocklet, int columnIndex) { + List dataChunksList = new ArrayList<>(); + BlockletEncodedColumnPage blockletEncodedColumnPage = + encodedBlocklet.getEncodedMeasureColumnPages().get(columnIndex); + for (EncodedColumnPage encodedColumnPage : blockletEncodedColumnPage + .getEncodedColumnPageList()) { + dataChunksList.add(encodedColumnPage.getPageMetadata()); } - return CarbonMetadataUtil.getDataChunk3(dataChunksList); + return CarbonMetadataUtil.getDataChunk3(dataChunksList, null); } private static int compareMeasureData(byte[] first, byte[] second, DataType dataType) { diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java index da31ea3e0cf..2909dc44958 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java @@ -173,71 +173,6 @@ public class CarbonMetadataUtilTest { assertEquals(indexHeader, indexheaderResult); } - @Test public void testConvertFileFooter() throws Exception { - int[] cardinality = { 1, 2, 3, 4, 5 }; - - org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema colSchema = - new org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema(); - org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema colSchema1 = - new org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema(); - List - columnSchemaList = new ArrayList<>(); - columnSchemaList.add(colSchema); - columnSchemaList.add(colSchema1); - - SegmentProperties segmentProperties = new SegmentProperties(columnSchemaList, cardinality); - - final EncodedColumnPage measure = new EncodedColumnPage(new DataChunk2(), new byte[]{0,1}, - PrimitivePageStatsCollector.newInstance( - org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE)); - new MockUp() { - @SuppressWarnings("unused") @Mock - public EncodedColumnPage getMeasure(int measureIndex) { - return measure; - } - }; - - new MockUp() { - @SuppressWarnings("unused") @Mock - public byte[] serializeStartKey() { - return new byte[]{1, 2}; - } - - @SuppressWarnings("unused") @Mock - public byte[] serializeEndKey() { - return new byte[]{1, 2}; - } - }; - - TablePageKey key = new TablePageKey(3, segmentProperties, false); - EncodedTablePage encodedTablePage = EncodedTablePage.newInstance(3, new EncodedColumnPage[0], new EncodedColumnPage[0], - key); - - List encodedTablePageList = new ArrayList<>(); - encodedTablePageList.add(encodedTablePage); - - BlockletInfo3 blockletInfoColumnar1 = new BlockletInfo3(); - - List blockletInfoColumnarList = new ArrayList<>(); - blockletInfoColumnarList.add(blockletInfoColumnar1); - - byte[] byteMaxArr = "1".getBytes(); - byte[] byteMinArr = "2".getBytes(); - - BlockletIndex index = getBlockletIndex(encodedTablePageList, segmentProperties.getMeasures()); - List indexList = new ArrayList<>(); - indexList.add(index); - - BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex(); - blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(byteMaxArr)); - blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(byteMinArr)); - FileFooter3 footer = convertFileFooterVersion3(blockletInfoColumnarList, - indexList, - cardinality, 2); - assertEquals(footer.getBlocklet_index_list(), indexList); - - } - @Test public void testGetBlockIndexInfo() throws Exception { byte[] startKey = { 1, 2, 3, 4, 5 }; byte[] endKey = { 9, 3, 5, 5, 5 }; diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift index 1c15f3d2c41..a495b6dfe07 100644 --- a/format/src/main/thrift/carbondata.thrift +++ b/format/src/main/thrift/carbondata.thrift @@ -145,6 +145,7 @@ struct DataChunk3{ 1: required list data_chunk_list; // List of data chunk 2: optional list page_offset; // Offset of each chunk 3: optional list page_length; // Length of each chunk + 4: optional LocalDictionaryChunk local_dictionary; // to store blocklet local dictionary values } /** @@ -230,4 +231,15 @@ struct BlockletHeader{ 3: optional BlockletIndex blocklet_index; // Index for the following blocklet 4: required BlockletInfo blocklet_info; // Info for the following blocklet 5: optional dictionary.ColumnDictionaryChunk dictionary; // Blocklet local dictionary +} + +struct LocalDictionaryChunk { + 1: required LocalDictionaryChunkMeta dictionary_meta + 2: required binary dictionary_data; // the values in dictionary order, each value is represented in binary format + 3: required binary dictionary_values; // surrogate keys used in the blocklet +} + +struct LocalDictionaryChunkMeta { + 1: required list encoders; // The List of encoders overriden at node level + 2: required list encoder_meta; // Extra information required by encoders } \ No newline at end of file diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java index 4ce80a6f382..da34746cbf7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java @@ -65,10 +65,12 @@ public class ArrayDataType implements GenericDataType { */ private int dataCounter; - private ArrayDataType(int outputArrayIndex, int dataCounter, GenericDataType children) { + private ArrayDataType(int outputArrayIndex, int dataCounter, GenericDataType children, + String name) { this.outputArrayIndex = outputArrayIndex; this.dataCounter = dataCounter; this.children = children; + this.name = name; } @@ -108,7 +110,7 @@ public String getName() { * return column unique id */ @Override - public String getColumnId() { + public String getColumnNames() { return columnId; } @@ -285,7 +287,8 @@ public void fillCardinalityAfterDataLoad(List dimCardWithComplex, @Override public GenericDataType deepCopy() { - return new ArrayDataType(this.outputArrayIndex, this.dataCounter, this.children.deepCopy()); + return new ArrayDataType(this.outputArrayIndex, this.dataCounter, this.children.deepCopy(), + this.name); } @Override @@ -293,4 +296,10 @@ public void getChildrenType(List type) { type.add(ColumnType.COMPLEX_ARRAY); children.getChildrenType(type); } + + @Override public void getColumnNames(List columnNameList) { + columnNameList.add(name); + children.getColumnNames(columnNameList); + } + } \ No newline at end of file diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java index 8b1ccf2fe21..049bf575025 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java @@ -100,7 +100,7 @@ void parseComplexValue(ByteBuffer byteArrayInput, DataOutputStream dataOutputStr /** * @return column uuid string */ - String getColumnId(); + String getColumnNames(); /** * set array index to be referred while creating metadata column @@ -159,4 +159,6 @@ void parseComplexValue(ByteBuffer byteArrayInput, DataOutputStream dataOutputStr void getChildrenType(List type); + void getColumnNames(List columnNameList); + } diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java index 3a477ce9915..5d22e55ce57 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java @@ -235,7 +235,7 @@ public String getParentname() { * get column unique id */ @Override - public String getColumnId() { + public String getColumnNames() { return columnId; } @@ -536,11 +536,15 @@ public GenericDataType deepCopy() { dataType.nullformat = this.nullformat; dataType.setKeySize(this.keySize); dataType.setSurrogateIndex(this.index); - + dataType.name = this.name; return dataType; } public void getChildrenType(List type) { type.add(ColumnType.COMPLEX_PRIMITIVE); } + + @Override public void getColumnNames(List columnNameList) { + columnNameList.add(name); + } } \ No newline at end of file diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java index b66eef7e01a..4d3ba8705e7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java @@ -60,10 +60,12 @@ public class StructDataType implements GenericDataType { */ private int dataCounter; - private StructDataType(List children, int outputArrayIndex, int dataCounter) { + private StructDataType(List children, int outputArrayIndex, int dataCounter, + String name) { this.children = children; this.outputArrayIndex = outputArrayIndex; this.dataCounter = dataCounter; + this.name = name; } /** @@ -113,7 +115,7 @@ public String getParentname() { * get column unique id */ @Override - public String getColumnId() { + public String getColumnNames() { return columnId; } @@ -318,7 +320,7 @@ public GenericDataType deepCopy() { for (GenericDataType child : children) { childrenClone.add(child.deepCopy()); } - return new StructDataType(childrenClone, this.outputArrayIndex, this.dataCounter); + return new StructDataType(childrenClone, this.outputArrayIndex, this.dataCounter, this.name); } public void getChildrenType(List type) { @@ -327,4 +329,11 @@ public void getChildrenType(List type) { children.get(i).getChildrenType(type); } } + + @Override public void getColumnNames(List columnNameList) { + columnNameList.add(name); + for (int i = 0; i < children.size(); i++) { + children.get(i).getColumnNames(columnNameList); + } + } } \ No newline at end of file diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 5fe32612925..f3cb9c36c13 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -49,7 +49,6 @@ import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.processing.datatypes.GenericDataType; -import org.apache.carbondata.processing.loading.sort.SortScopeOptions; import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter; /** @@ -137,44 +136,19 @@ public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel model) { } private void initParameters(CarbonFactDataHandlerModel model) { - SortScopeOptions.SortScope sortScope = model.getSortScope(); this.colGrpModel = model.getSegmentProperties().getColumnGroupModel(); - - // in compaction flow the measure with decimal type will come as spark decimal. - // need to convert it to byte array. - if (model.isCompactionFlow()) { - try { - numberOfCores = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.NUM_CORES_COMPACTING, - CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); - } catch (NumberFormatException exc) { - LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_COMPACTING - + "is wrong.Falling back to the default value " - + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); - numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); - } - } else { - numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); - } - - if (sortScope != null && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { - numberOfCores = 1; - } - // Overriding it to the task specified cores. - if (model.getWritingCoresCount() > 0) { - numberOfCores = model.getWritingCoresCount(); - } - + this.numberOfCores = model.getNumberOfCores(); blockletProcessingCount = new AtomicInteger(0); - producerExecutorService = Executors.newFixedThreadPool(numberOfCores, - new CarbonThreadFactory("ProducerPool:" + model.getTableName() - + ", range: " + model.getBucketId())); + producerExecutorService = Executors.newFixedThreadPool(model.getNumberOfCores(), + new CarbonThreadFactory( + "ProducerPool_" + System.nanoTime() + ":" + model.getTableName() + ", range: " + model + .getBucketId())); producerExecutorServiceTaskList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); LOGGER.info("Initializing writer executors"); - consumerExecutorService = Executors - .newFixedThreadPool(1, new CarbonThreadFactory("ConsumerPool:" + model.getTableName() - + ", range: " + model.getBucketId())); + consumerExecutorService = Executors.newFixedThreadPool(1, new CarbonThreadFactory( + "ConsumerPool_" + System.nanoTime() + ":" + model.getTableName() + ", range: " + model + .getBucketId())); consumerExecutorServiceTaskList = new ArrayList<>(1); semaphore = new Semaphore(numberOfCores); tablePageList = new TablePageList(); diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 27249ab4eb2..5b12229f6c3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -23,10 +23,14 @@ import java.util.List; import java.util.Map; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.core.localdictionary.generator.ColumnLocalDictionaryGenerator; +import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; @@ -35,6 +39,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datamap.DataMapWriterListener; @@ -49,6 +54,12 @@ // TODO: we should try to minimize this class as refactorying loading process public class CarbonFactDataHandlerModel { + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonFactDataHandlerModel.class.getName()); + /** * dbName */ @@ -163,6 +174,10 @@ public void setBlockSizeInMB(int blockSize) { private short writingCoresCount; + private Map columnLocalDictGenMap; + + private int numberOfCores; + /** * Create the model using @{@link CarbonDataLoadConfiguration} */ @@ -272,7 +287,8 @@ public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel( } carbonFactDataHandlerModel.dataMapWriterlistener = listener; carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount(); - + setLocalDictToModel(carbonTable, wrapperColumnSchema, carbonFactDataHandlerModel); + setNumberOfCores(carbonFactDataHandlerModel); return carbonFactDataHandlerModel; } @@ -340,8 +356,9 @@ public static CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoa carbonFactDataHandlerModel.getTaskExtension(), String.valueOf(loadModel.getFactTimeStamp()), loadModel.getSegmentId())); - + setLocalDictToModel(carbonTable, wrapperColumnSchema, carbonFactDataHandlerModel); carbonFactDataHandlerModel.dataMapWriterlistener = listener; + setNumberOfCores(carbonFactDataHandlerModel); return carbonFactDataHandlerModel; } @@ -623,5 +640,86 @@ public DataMapWriterListener getDataMapWriterlistener() { return dataMapWriterlistener; } + public Map getColumnLocalDictGenMap() { + return columnLocalDictGenMap; + } + + /** + * This method prepares a map which will have column and local dictionary generator mapping for + * all the local dictionary columns. + * @param carbonTable + * @param wrapperColumnSchema + * @param carbonFactDataHandlerModel + */ + private static void setLocalDictToModel(CarbonTable carbonTable, + List wrapperColumnSchema, + CarbonFactDataHandlerModel carbonFactDataHandlerModel) { + boolean islocalDictEnabled = carbonTable.isLocalDictionaryEnabled(); + // creates a map only if local dictionary is enabled, else map will be null + Map columnLocalDictGenMap = new HashMap<>(); + if (islocalDictEnabled) { + int localDictionaryThreshold = carbonTable.getLocalDictionaryThreshold(); + for (ColumnSchema columnSchema : wrapperColumnSchema) { + // check whether the column is local dictionary column or not + if (columnSchema.isLocalDictColumn()) { + columnLocalDictGenMap.put(columnSchema.getColumnName(), + new ColumnLocalDictionaryGenerator(localDictionaryThreshold)); + } + } + } + if (islocalDictEnabled) { + LOGGER.info("Local dictionary is enabled for table: " + carbonTable.getTableUniqueName()); + LOGGER.info( + "Local dictionary threshold for table: " + carbonTable.getTableUniqueName() + " is: " + + carbonTable.getLocalDictionaryThreshold()); + Iterator> iterator = + columnLocalDictGenMap.entrySet().iterator(); + StringBuilder stringBuilder = new StringBuilder(); + while (iterator.hasNext()) { + Map.Entry next = iterator.next(); + stringBuilder.append(next.getKey()); + stringBuilder.append(','); + } + LOGGER.info("Local dictionary will be generated for the columns:" + stringBuilder.toString() + + " for table: " + carbonTable.getTableUniqueName()); + } + carbonFactDataHandlerModel.setColumnLocalDictGenMap(columnLocalDictGenMap); + } + + public void setColumnLocalDictGenMap( + Map columnLocalDictGenMap) { + this.columnLocalDictGenMap = columnLocalDictGenMap; + } + + private static void setNumberOfCores(CarbonFactDataHandlerModel model) { + // in compaction flow the measure with decimal type will come as spark decimal. + // need to convert it to byte array. + if (model.isCompactionFlow()) { + try { + model.numberOfCores = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.NUM_CORES_COMPACTING, + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); + } catch (NumberFormatException exc) { + LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_COMPACTING + + "is wrong.Falling back to the default value " + + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); + model.numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); + } + } else { + model.numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); + } + + if (model.sortScope != null && model.sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { + model.numberOfCores = 1; + } + // Overriding it to the task specified cores. + if (model.getWritingCoresCount() > 0) { + model.numberOfCores = model.getWritingCoresCount(); + } + } + + public int getNumberOfCores() { + return numberOfCores; + } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java index b1b966b4c2b..c634a6d8723 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java @@ -45,6 +45,7 @@ import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; @@ -64,8 +65,8 @@ public class TablePage { // one vector to make it efficient for sorting private ColumnPage[] dictDimensionPages; private ColumnPage[] noDictDimensionPages; - private ComplexColumnPage[] complexDimensionPages; private ColumnPage[] measurePages; + private ComplexColumnPage[] complexDimensionPages; // the num of rows in this page, it must be less than short value (65536) private int pageSize; @@ -104,19 +105,26 @@ public class TablePage { page.setStatsCollector(KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY)); dictDimensionPages[tmpNumDictDimIdx++] = page; } else { + // will be encoded using string page + LocalDictionaryGenerator localDictionaryGenerator = + model.getColumnLocalDictGenMap().get(spec.getFieldName()); + DataType dataType = DataTypes.STRING; if (DataTypes.VARCHAR == spec.getSchemaDataType()) { - page = ColumnPage.newPage(spec, DataTypes.VARCHAR, pageSize); + dataType = DataTypes.VARCHAR; + } + if (null != localDictionaryGenerator) { + page = ColumnPage.newLocalDictPage(spec, dataType, pageSize, localDictionaryGenerator); + } else { + page = ColumnPage.newPage(spec, dataType, pageSize); + } + if (DataTypes.VARCHAR == dataType) { page.setStatsCollector(LVLongStringStatsCollector.newInstance()); } else { - // In previous implementation, other data types such as string, date and timestamp - // will be encoded using string page - page = ColumnPage.newPage(spec, DataTypes.STRING, pageSize); page.setStatsCollector(LVShortStringStatsCollector.newInstance()); } noDictDimensionPages[tmpNumNoDictDimIdx++] = page; } } - complexDimensionPages = new ComplexColumnPage[model.getComplexColumnCount()]; for (int i = 0; i < complexDimensionPages.length; i++) { // here we still do not the depth of the complex column, it will be initialized when @@ -137,6 +145,7 @@ public class TablePage { PrimitivePageStatsCollector.newInstance(dataTypes[i])); measurePages[i] = page; } + boolean hasNoDictionary = noDictDimensionPages.length > 0; this.key = new TablePageKey(pageSize, model.getSegmentProperties(), hasNoDictionary); @@ -225,8 +234,16 @@ private void addComplexColumn(int index, int rowId, byte[] complexColumns) { // initialize the page if first row if (rowId == 0) { List complexColumnType = new ArrayList<>(); + List columnNames = new ArrayList<>(); complexDataType.getChildrenType(complexColumnType); - complexDimensionPages[index] = new ComplexColumnPage(pageSize, complexColumnType); + complexDataType.getColumnNames(columnNames); + complexDimensionPages[index] = new ComplexColumnPage(complexColumnType); + try { + complexDimensionPages[index] + .initialize(model.getColumnLocalDictGenMap(), columnNames, pageSize); + } catch (MemoryException e) { + throw new RuntimeException(e); + } } int depthInComplexColumn = complexDimensionPages[index].getDepth(); @@ -253,7 +270,7 @@ private void addComplexColumn(int index, int rowId, byte[] complexColumns) { } for (int depth = 0; depth < depthInComplexColumn; depth++) { - complexDimensionPages[index].putComplexData(rowId, depth, encodedComplexColumnar.get(depth)); + complexDimensionPages[index].putComplexData(depth, encodedComplexColumnar.get(depth)); } } @@ -267,6 +284,11 @@ void freeMemory() { for (ColumnPage page : measurePages) { page.freeMemory(); } + for (ComplexColumnPage page : complexDimensionPages) { + if (null != page) { + page.freeMemory(); + } + } } // Adds length as a short element (first 2 bytes) to the head of the input byte array diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index b76722bdfa8..3082b9107be 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -149,6 +149,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { */ private boolean enableDirectlyWriteData2Hdfs = false; + protected ExecutorService fallbackExecutorService; + public AbstractFactDataWriter(CarbonFactDataHandlerModel model) { this.model = model; blockIndexInfoList = new ArrayList<>(); @@ -197,6 +199,14 @@ public AbstractFactDataWriter(CarbonFactDataHandlerModel model) { blockletMetadata = new ArrayList(); blockletIndex = new ArrayList<>(); listener = this.model.getDataMapWriterlistener(); + if (model.getColumnLocalDictGenMap().size() > 0) { + int numberOfCores = 1; + if (model.getNumberOfCores() > 1) { + numberOfCores = model.getNumberOfCores() / 2; + } + fallbackExecutorService = Executors.newFixedThreadPool(numberOfCores, new CarbonThreadFactory( + "FallbackPool:" + model.getTableName() + ", range: " + model.getBucketId())); + } } /** @@ -415,6 +425,9 @@ protected void closeExecutorService() throws CarbonDataWriterException { } catch (InterruptedException | ExecutionException | IOException e) { throw new CarbonDataWriterException(e); } + if (null != fallbackExecutorService) { + fallbackExecutorService.shutdownNow(); + } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java index 36fda3c5634..7607cf008e8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java @@ -16,29 +16,34 @@ */ package org.apache.carbondata.processing.store.writer.v3; -import java.util.ArrayList; -import java.util.List; +import java.util.concurrent.ExecutorService; +import org.apache.carbondata.core.datastore.blocklet.EncodedBlocklet; import org.apache.carbondata.core.datastore.page.EncodedTablePage; import org.apache.carbondata.processing.store.TablePage; public class BlockletDataHolder { - private List encodedTablePage; + + /** + * current data size + */ private long currentSize; - public BlockletDataHolder() { - this.encodedTablePage = new ArrayList<>(); + private EncodedBlocklet encodedBlocklet; + + public BlockletDataHolder(ExecutorService fallbackpool) { + encodedBlocklet = new EncodedBlocklet(fallbackpool); } public void clear() { - encodedTablePage.clear(); currentSize = 0; + encodedBlocklet.clear(); } public void addPage(TablePage rawTablePage) { EncodedTablePage encodedTablePage = rawTablePage.getEncodedTablePage(); - this.encodedTablePage.add(encodedTablePage); currentSize += encodedTablePage.getEncodedSize(); + encodedBlocklet.addEncodedTablePage(encodedTablePage); } public long getSize() { @@ -47,19 +52,14 @@ public long getSize() { } public int getNumberOfPagesAdded() { - return encodedTablePage.size(); + return encodedBlocklet.getNumberOfPages(); } public int getTotalRows() { - int rows = 0; - for (EncodedTablePage nh : encodedTablePage) { - rows += nh.getPageSize(); - } - return rows; + return encodedBlocklet.getBlockletSize(); } - public List getEncodedTablePages() { - return encodedTablePage; + public EncodedBlocklet getEncodedBlocklet() { + return encodedBlocklet; } - } diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java index d1deef1593a..e562f26fe7d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java @@ -25,8 +25,9 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.blocklet.BlockletEncodedColumnPage; +import org.apache.carbondata.core.datastore.blocklet.EncodedBlocklet; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; -import org.apache.carbondata.core.datastore.page.EncodedTablePage; import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex; @@ -76,7 +77,7 @@ public CarbonFactDataWriterImplV3(CarbonFactDataHandlerModel model) { blockletSizeThreshold = fileSizeInBytes; LOGGER.info("Blocklet size configure for table is: " + blockletSizeThreshold); } - blockletDataHolder = new BlockletDataHolder(); + blockletDataHolder = new BlockletDataHolder(fallbackExecutorService); } @Override protected void writeBlockletInfoToFile() @@ -110,14 +111,15 @@ public CarbonFactDataWriterImplV3(CarbonFactDataHandlerModel model) { */ @Override public void writeTablePage(TablePage tablePage) throws CarbonDataWriterException,IOException { + // condition for writting all the pages if (!tablePage.isLastPage()) { boolean isAdded = false; // check if size more than blocklet size then write the page to file - if (blockletDataHolder.getSize() + tablePage.getEncodedTablePage().getEncodedSize() >= - blockletSizeThreshold) { + if (blockletDataHolder.getSize() + tablePage.getEncodedTablePage().getEncodedSize() + >= blockletSizeThreshold) { // if blocklet size exceeds threshold, write blocklet data - if (blockletDataHolder.getEncodedTablePages().size() == 0) { + if (blockletDataHolder.getNumberOfPagesAdded() == 0) { isAdded = true; addPageData(tablePage); } @@ -164,12 +166,13 @@ private void addPageData(TablePage tablePage) throws IOException { */ private void writeBlockletToFile() { // get the list of all encoded table page - List encodedTablePageList = blockletDataHolder.getEncodedTablePages(); - int numDimensions = encodedTablePageList.get(0).getNumDimensions(); - int numMeasures = encodedTablePageList.get(0).getNumMeasures(); + EncodedBlocklet encodedBlocklet = blockletDataHolder.getEncodedBlocklet(); + int numDimensions = encodedBlocklet.getNumberOfDimension(); + int numMeasures = encodedBlocklet.getNumberOfMeasure(); + // get data chunks for all the column byte[][] dataChunkBytes = new byte[numDimensions + numMeasures][]; - long metadataSize = fillDataChunk(encodedTablePageList, dataChunkBytes); + long metadataSize = fillDataChunk(encodedBlocklet, dataChunkBytes); // calculate the total size of data to be written long blockletSize = blockletDataHolder.getSize() + metadataSize; // to check if data size will exceed the block size then create a new file @@ -199,27 +202,22 @@ private void writeBlockletToFile() { /** * Fill dataChunkBytes and return total size of page metadata */ - private long fillDataChunk(List encodedTablePageList, byte[][] dataChunkBytes) { + private long fillDataChunk(EncodedBlocklet encodedBlocklet, byte[][] dataChunkBytes) { int size = 0; - int numDimensions = encodedTablePageList.get(0).getNumDimensions(); - int numMeasures = encodedTablePageList.get(0).getNumMeasures(); + int numDimensions = encodedBlocklet.getNumberOfDimension(); + int numMeasures = encodedBlocklet.getNumberOfMeasure(); int measureStartIndex = numDimensions; // calculate the size of data chunks - try { - for (int i = 0; i < numDimensions; i++) { - dataChunkBytes[i] = CarbonUtil.getByteArray( - CarbonMetadataUtil.getDimensionDataChunk3(encodedTablePageList, i)); - size += dataChunkBytes[i].length; - } - for (int i = 0; i < numMeasures; i++) { - dataChunkBytes[measureStartIndex] = CarbonUtil.getByteArray( - CarbonMetadataUtil.getMeasureDataChunk3(encodedTablePageList, i)); - size += dataChunkBytes[measureStartIndex].length; - measureStartIndex++; - } - } catch (IOException e) { - LOGGER.error(e, "Problem while getting the data chunks"); - throw new CarbonDataWriterException("Problem while getting the data chunks", e); + for (int i = 0; i < numDimensions; i++) { + dataChunkBytes[i] = + CarbonUtil.getByteArray(CarbonMetadataUtil.getDimensionDataChunk3(encodedBlocklet, i)); + size += dataChunkBytes[i].length; + } + for (int i = 0; i < numMeasures; i++) { + dataChunkBytes[measureStartIndex] = + CarbonUtil.getByteArray(CarbonMetadataUtil.getMeasureDataChunk3(encodedBlocklet, i)); + size += dataChunkBytes[measureStartIndex].length; + measureStartIndex++; } return size; } @@ -250,33 +248,30 @@ private void writeBlockletToFile(byte[][] dataChunkBytes) List currentDataChunksOffset = new ArrayList<>(); // to maintain the length of each data chunk in blocklet List currentDataChunksLength = new ArrayList<>(); - List encodedTablePages = blockletDataHolder.getEncodedTablePages(); - int numberOfDimension = encodedTablePages.get(0).getNumDimensions(); - int numberOfMeasures = encodedTablePages.get(0).getNumMeasures(); + EncodedBlocklet encodedBlocklet = blockletDataHolder.getEncodedBlocklet(); + int numberOfDimension = encodedBlocklet.getNumberOfDimension(); + int numberOfMeasures = encodedBlocklet.getNumberOfMeasure(); ByteBuffer buffer = null; long dimensionOffset = 0; long measureOffset = 0; - int numberOfRows = 0; - // calculate the number of rows in each blocklet - for (EncodedTablePage encodedTablePage : encodedTablePages) { - numberOfRows += encodedTablePage.getPageSize(); - } for (int i = 0; i < numberOfDimension; i++) { currentDataChunksOffset.add(offset); currentDataChunksLength.add(dataChunkBytes[i].length); buffer = ByteBuffer.wrap(dataChunkBytes[i]); currentOffsetInFile += fileChannel.write(buffer); offset += dataChunkBytes[i].length; - for (EncodedTablePage encodedTablePage : encodedTablePages) { - EncodedColumnPage dimension = encodedTablePage.getDimension(i); - buffer = dimension.getEncodedData(); + BlockletEncodedColumnPage blockletEncodedColumnPage = + encodedBlocklet.getEncodedDimensionColumnPages().get(i); + for (EncodedColumnPage dimensionPage : blockletEncodedColumnPage + .getEncodedColumnPageList()) { + buffer = dimensionPage.getEncodedData(); int bufferSize = buffer.limit(); currentOffsetInFile += fileChannel.write(buffer); offset += bufferSize; } } dimensionOffset = offset; - int dataChunkStartIndex = encodedTablePages.get(0).getNumDimensions(); + int dataChunkStartIndex = encodedBlocklet.getNumberOfDimension(); for (int i = 0; i < numberOfMeasures; i++) { currentDataChunksOffset.add(offset); currentDataChunksLength.add(dataChunkBytes[dataChunkStartIndex].length); @@ -284,9 +279,11 @@ private void writeBlockletToFile(byte[][] dataChunkBytes) currentOffsetInFile += fileChannel.write(buffer); offset += dataChunkBytes[dataChunkStartIndex].length; dataChunkStartIndex++; - for (EncodedTablePage encodedTablePage : encodedTablePages) { - EncodedColumnPage measure = encodedTablePage.getMeasure(i); - buffer = measure.getEncodedData(); + BlockletEncodedColumnPage blockletEncodedColumnPage = + encodedBlocklet.getEncodedMeasureColumnPages().get(i); + for (EncodedColumnPage measurePage : blockletEncodedColumnPage + .getEncodedColumnPageList()) { + buffer = measurePage.getEncodedData(); int bufferSize = buffer.limit(); currentOffsetInFile += fileChannel.write(buffer); offset += bufferSize; @@ -295,10 +292,11 @@ private void writeBlockletToFile(byte[][] dataChunkBytes) measureOffset = offset; blockletIndex.add( CarbonMetadataUtil.getBlockletIndex( - encodedTablePages, model.getSegmentProperties().getMeasures())); + encodedBlocklet, model.getSegmentProperties().getMeasures())); BlockletInfo3 blockletInfo3 = - new BlockletInfo3(numberOfRows, currentDataChunksOffset, currentDataChunksLength, - dimensionOffset, measureOffset, blockletDataHolder.getEncodedTablePages().size()); + new BlockletInfo3(encodedBlocklet.getBlockletSize(), currentDataChunksOffset, + currentDataChunksLength, dimensionOffset, measureOffset, + encodedBlocklet.getNumberOfPages()); blockletMetadata.add(blockletInfo3); }