From 30378c59ed6ad190cd1566353464ea414a2265d4 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Tue, 3 Jul 2018 19:05:09 +0530 Subject: [PATCH] [CARBONDATA-2607] [Complex Column Enhancements] Complex Primitive DataType Adaptive Encoding In this PR the improvement was done to save the complex type more effectively so that reading becomes more efficient. The changes are. Primitive types inside complex types are separate pages. Previously it was a single byte array column page for a complex column. Now all sub-levels inside the complex data types are stored as separate pages with their respective datatypes. No Dictionary Primitive DataTypes inside Complex Columns will be processed through Adaptive Encoding. Previously only snappy compression was applied. All Primitive datatypes inside complex if it is now dictionary, only value will be saved except String, Varchar which is saved as ByteArray. Previously all sub-levels are saved as Length And Value Format inside a single Byte Array. Currently only Struct And Array type column pages are saved in ByteArray. All other primitive except String and varchar are saved in respective fixed datatype length. Support for the Safe and Unsafe Fixed length Column Page to support growing dynamic array implementation. This is done to support Array datatype. Co-authored-by: sounakr --- ...ressedDimensionChunkFileBasedReaderV3.java | 24 +- .../AbstractMeasureChunkReaderV2V3Format.java | 21 - ...mpressedMeasureChunkFileBasedReaderV2.java | 3 +- ...mpressedMeasureChunkFileBasedReaderV3.java | 3 +- ...sedMsrChunkFileBasedPageLevelReaderV3.java | 3 +- .../chunk/store/ColumnPageWrapper.java | 90 ++- .../core/datastore/page/ColumnPage.java | 8 +- .../datastore/page/ComplexColumnPage.java | 129 ++-- .../page/FallbackColumnPageEncoder.java | 4 +- .../page/SafeFixLengthColumnPage.java | 79 ++- .../page/UnsafeFixLengthColumnPage.java | 130 +++- .../page/encoding/ColumnPageEncoder.java | 54 +- .../page/encoding/ColumnPageEncoderMeta.java | 4 +- .../page/encoding/DefaultEncodingFactory.java | 52 +- .../page/encoding/EncodingFactory.java | 6 +- .../adaptive/AdaptiveDeltaIntegralCodec.java | 2 +- .../PrimitivePageStatsCollector.java | 23 +- .../core/datastore/row/ComplexColumnInfo.java | 57 ++ .../scan/complextypes/PrimitiveQueryType.java | 2 +- .../core/scan/executor/util/QueryUtil.java | 22 + .../apache/carbondata/core/util/ByteUtil.java | 10 +- .../carbondata/core/util/CarbonUtil.java | 3 +- .../carbondata/core/util/DataTypeUtil.java | 25 +- .../page/encoding/TestEncodingFactory.java | 16 +- .../src/test/resources/adap.csv | 3 + .../src/test/resources/adap_double1.csv | 3 + .../src/test/resources/adap_double2.csv | 3 + .../src/test/resources/adap_double3.csv | 3 + .../src/test/resources/adap_double4.csv | 3 + .../src/test/resources/adap_int1.csv | 3 + .../src/test/resources/adap_int2.csv | 3 + .../src/test/resources/adap_int3.csv | 3 + .../complexType/TestAdaptiveComplexType.scala | 554 ++++++++++++++++++ .../TestAdaptiveEncodingForNullValues.scala | 168 ++++++ ...dingSafeColumnPageForComplexDataType.scala | 55 ++ ...ngUnsafeColumnPageForComplexDataType.scala | 59 ++ .../processing/datatypes/ArrayDataType.java | 16 +- .../processing/datatypes/GenericDataType.java | 7 +- .../datatypes/PrimitiveDataType.java | 34 +- .../processing/datatypes/StructDataType.java | 18 +- .../processing/store/TablePage.java | 13 +- .../util/CarbonDataProcessorUtil.java | 8 +- 42 files changed, 1523 insertions(+), 203 deletions(-) create mode 100644 core/src/main/java/org/apache/carbondata/core/datastore/row/ComplexColumnInfo.java create mode 100644 integration/spark-common-test/src/test/resources/adap.csv create mode 100644 integration/spark-common-test/src/test/resources/adap_double1.csv create mode 100644 integration/spark-common-test/src/test/resources/adap_double2.csv create mode 100644 integration/spark-common-test/src/test/resources/adap_double3.csv create mode 100644 integration/spark-common-test/src/test/resources/adap_double4.csv create mode 100644 integration/spark-common-test/src/test/resources/adap_int1.csv create mode 100644 integration/spark-common-test/src/test/resources/adap_int2.csv create mode 100644 integration/spark-common-test/src/test/resources/adap_int3.csv create mode 100644 integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveComplexType.scala create mode 100644 integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveEncodingForNullValues.scala create mode 100644 integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveEncodingSafeColumnPageForComplexDataType.scala create mode 100644 integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveEncodingUnsafeColumnPageForComplexDataType.scala diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java index eb5917bb640..32f84f77a7a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java @@ -35,6 +35,7 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; +import org.apache.carbondata.core.scan.executor.util.QueryUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.DataChunk3; @@ -223,6 +224,10 @@ private boolean isEncodedWithMeta(DataChunk2 pageMetadata) { switch (encoding) { case DIRECT_COMPRESS: case DIRECT_STRING: + case ADAPTIVE_INTEGRAL: + case ADAPTIVE_DELTA_INTEGRAL: + case ADAPTIVE_FLOATING: + case ADAPTIVE_DELTA_FLOATING: return true; } } @@ -234,13 +239,30 @@ protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnP throws IOException, MemoryException { if (isEncodedWithMeta(pageMetadata)) { ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset); - return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary()); + decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence)); + return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), + isEncodedWithAdaptiveMeta(pageMetadata)); } else { // following code is for backward compatibility return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset); } } + private boolean isEncodedWithAdaptiveMeta(DataChunk2 pageMetadata) { + List encodings = pageMetadata.getEncoders(); + if (encodings != null && encodings.size() == 1) { + Encoding encoding = encodings.get(0); + switch (encoding) { + case ADAPTIVE_INTEGRAL: + case ADAPTIVE_DELTA_INTEGRAL: + case ADAPTIVE_FLOATING: + case ADAPTIVE_DELTA_FLOATING: + return true; + } + } + return false; + } + private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawColumnPage, ByteBuffer pageData, DataChunk2 pageMetadata, int offset) throws IOException, MemoryException { diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java index 7d59d474e02..64d2fff4bac 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java @@ -17,13 +17,10 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure; import java.io.IOException; -import java.util.BitSet; import java.util.List; import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; -import org.apache.carbondata.core.datastore.compression.Compressor; -import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; /** @@ -96,24 +93,6 @@ public MeasureRawColumnChunk[] readRawMeasureChunks(FileReader fileReader, return dataChunks; } - /** - * Below method will be used to convert the thrift presence meta to wrapper - * presence meta - * - * @param presentMetadataThrift - * @return wrapper presence meta - */ - protected BitSet getNullBitSet(org.apache.carbondata.format.PresenceMeta presentMetadataThrift) { - Compressor compressor = CompressorFactory.getInstance().getCompressor(); - final byte[] present_bit_stream = presentMetadataThrift.getPresent_bit_stream(); - if (null != present_bit_stream) { - return BitSet - .valueOf(compressor.unCompressByte(present_bit_stream)); - } else { - return new BitSet(1); - } - } - /** * Below method will be used to read measure chunk data in group. * This method will be useful to avoid multiple IO while reading the diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java index 04d6e2e1ca4..9864ab8c8fb 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java @@ -28,6 +28,7 @@ import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.ValueEncoderMeta; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; +import org.apache.carbondata.core.scan.executor.util.QueryUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.DataChunk2; @@ -125,7 +126,7 @@ public ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk, copyPoint += measureColumnChunkLength.get(blockIndex); ColumnPage page = decodeMeasure(measureRawColumnChunk, measureColumnChunk, copyPoint); - page.setNullBits(getNullBitSet(measureColumnChunk.presence)); + page.setNullBits(QueryUtil.getNullBitSet(measureColumnChunk.presence)); return page; } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java index 833636338e3..e389ac6d340 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java @@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; +import org.apache.carbondata.core.scan.executor.util.QueryUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.DataChunk3; @@ -198,7 +199,7 @@ public ColumnPage decodeColumnPage( measureColumnChunkLength.get(rawColumnChunk.getColumnIndex()) + dataChunk3.getPage_offset().get(pageNumber); ColumnPage decodedPage = decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset); - decodedPage.setNullBits(getNullBitSet(pageMetadata.presence)); + decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence)); return decodedPage; } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java index 2ebaa16e2a1..052f745304a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java @@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; +import org.apache.carbondata.core.scan.executor.util.QueryUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.DataChunk3; @@ -146,7 +147,7 @@ protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileReader fileRea .readByteBuffer(filePath, offset, pageMetadata.data_page_length); ColumnPage decodedPage = decodeMeasure(pageMetadata, buffer, 0); - decodedPage.setNullBits(getNullBitSet(pageMetadata.presence)); + decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence)); return decodedPage; } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java index 59e593afa63..f4712ba4e47 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java @@ -17,22 +17,34 @@ package org.apache.carbondata.core.datastore.chunk.store; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.ColumnType; import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage; import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo; import org.apache.carbondata.core.scan.result.vector.CarbonDictionary; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; +import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; + public class ColumnPageWrapper implements DimensionColumnPage { private ColumnPage columnPage; private CarbonDictionary localDictionary; - public ColumnPageWrapper(ColumnPage columnPage, CarbonDictionary localDictionary) { + private boolean isAdaptiveComplexPrimitivePage; + + public ColumnPageWrapper(ColumnPage columnPage, CarbonDictionary localDictionary, + boolean isAdaptiveComplexPrimitivePage) { this.columnPage = columnPage; this.localDictionary = localDictionary; + this.isAdaptiveComplexPrimitivePage = isAdaptiveComplexPrimitivePage; + } @Override @@ -58,21 +70,81 @@ public int fillVector(int[] filteredRowId, ColumnVectorInfo[] vectorInfo, int ch throw new UnsupportedOperationException("internal error"); } - @Override - public byte[] getChunkData(int rowId) { + @Override public byte[] getChunkData(int rowId) { + ColumnType columnType = columnPage.getColumnSpec().getColumnType(); + DataType srcDataType = columnPage.getColumnSpec().getSchemaDataType(); + DataType targetDataType = columnPage.getDataType(); if (null != localDictionary) { - return localDictionary.getDictionaryValue(CarbonUtil - .getSurrogateInternal(columnPage.getBytes(rowId), 0, 3)); + return localDictionary + .getDictionaryValue(CarbonUtil.getSurrogateInternal(columnPage.getBytes(rowId), 0, 3)); + } else if (columnType == ColumnType.COMPLEX_PRIMITIVE && this.isAdaptiveComplexPrimitive()) { + if (columnPage.getNullBits().get(rowId)) { + // if this row is null, return default null represent in byte array + return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; + } + if (srcDataType == DataTypes.DOUBLE || srcDataType == DataTypes.FLOAT) { + double doubleData = columnPage.getDouble(rowId); + if (srcDataType == DataTypes.FLOAT) { + float out = (float) doubleData; + return ByteUtil.toBytes(out); + } else { + return ByteUtil.toBytes(doubleData); + } + } else if (DataTypes.isDecimal(srcDataType)) { + throw new RuntimeException("unsupported type: " + srcDataType); + } else if ((srcDataType == DataTypes.BYTE) || (srcDataType == DataTypes.BOOLEAN) || ( + srcDataType == DataTypes.SHORT) || (srcDataType == DataTypes.SHORT_INT) || (srcDataType + == DataTypes.INT) || (srcDataType == DataTypes.LONG) || (srcDataType + == DataTypes.TIMESTAMP)) { + long longData = columnPage.getLong(rowId); + if ((srcDataType == DataTypes.BYTE)) { + byte out = (byte) longData; + return ByteUtil.toBytes(out); + } else if (srcDataType == DataTypes.BOOLEAN) { + byte out = (byte) longData; + return ByteUtil.toBytes(ByteUtil.toBoolean(out)); + } else if (srcDataType == DataTypes.SHORT) { + short out = (short) longData; + return ByteUtil.toBytes(out); + } else if (srcDataType == DataTypes.SHORT_INT) { + int out = (int) longData; + return ByteUtil.toBytes(out); + } else if (srcDataType == DataTypes.INT) { + int out = (int) longData; + return ByteUtil.toBytes(out); + } else { + // timestamp and long + return ByteUtil.toBytes(longData); + } + } else if ((targetDataType == DataTypes.STRING) || (targetDataType == DataTypes.VARCHAR) || ( + targetDataType == DataTypes.BYTE_ARRAY)) { + return columnPage.getBytes(rowId); + } else { + throw new RuntimeException("unsupported type: " + targetDataType); + } + } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE) && !this.isAdaptiveComplexPrimitive()) { + if ((srcDataType == DataTypes.BYTE) || (srcDataType == DataTypes.BOOLEAN)) { + byte[] out = new byte[1]; + out[0] = (columnPage.getByte(rowId)); + return out; + } else if (srcDataType == DataTypes.BYTE_ARRAY) { + return columnPage.getBytes(rowId); + } else { + throw new RuntimeException("unsupported type: " + targetDataType); + } + } else { + return columnPage.getBytes(rowId); } - return columnPage.getBytes(rowId); } + @Override public int getInvertedIndex(int rowId) { throw new UnsupportedOperationException("internal error"); } - @Override public int getInvertedReverseIndex(int rowId) { + @Override + public int getInvertedReverseIndex(int rowId) { throw new UnsupportedOperationException("internal error"); } @@ -96,4 +168,8 @@ public void freeMemory() { } + public boolean isAdaptiveComplexPrimitive() { + return isAdaptiveComplexPrimitivePage; + } + } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java index 6077ddf4d8e..8745545cb77 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java @@ -58,7 +58,7 @@ public abstract class ColumnPage { private final TableSpec.ColumnSpec columnSpec; // The index of the rowId whose value is null, will be set to 1 - private BitSet nullBitSet; + protected BitSet nullBitSet; // statistics collector for this column page protected ColumnPageStatsCollector statsCollector; @@ -193,6 +193,8 @@ public static ColumnPage newPage(TableSpec.ColumnSpec columnSpec, DataType dataT dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) { instance = new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize); + } else if (dataType == DataTypes.TIMESTAMP) { + instance = new UnsafeFixLengthColumnPage(columnSpec, DataTypes.LONG, pageSize); } else if (DataTypes.isDecimal(dataType)) { instance = new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize); } else if (dataType == DataTypes.STRING @@ -211,7 +213,7 @@ public static ColumnPage newPage(TableSpec.ColumnSpec columnSpec, DataType dataT instance = newShortIntPage(columnSpec, new byte[pageSize * 3]); } else if (dataType == DataTypes.INT) { instance = newIntPage(columnSpec, new int[pageSize]); - } else if (dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { instance = newLongPage(columnSpec, new long[pageSize]); } else if (dataType == DataTypes.FLOAT) { instance = newFloatPage(columnSpec, new float[pageSize]); @@ -494,7 +496,7 @@ public void putBoolean(int rowId, boolean value) { /** * Set null at rowId */ - private void putNull(int rowId) { + protected void putNull(int rowId) { if (dataType == DataTypes.BOOLEAN) { putBoolean(rowId, false); } else if (dataType == DataTypes.BYTE) { diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java index a170c8b8a44..7a95d23c580 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java @@ -20,12 +20,15 @@ import java.util.List; import java.util.Map; -import org.apache.carbondata.core.datastore.ColumnType; import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.datastore.page.statistics.DummyStatsCollector; +import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector; +import org.apache.carbondata.core.datastore.row.ComplexColumnInfo; import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.util.DataTypeUtil; /** * holds the complex columndata and its children data @@ -35,12 +38,12 @@ public class ComplexColumnPage { /** * number of columns */ - private int depth; + private int complexColumnIndex; /** * type of each column */ - private List complexColumnType; + private List complexColumnInfoList; /** * column page for each type @@ -52,36 +55,41 @@ public class ComplexColumnPage { */ private int[] currentRowIdList; - public ComplexColumnPage(List complexColumnType) { - this.depth = complexColumnType.size(); - this.complexColumnType = complexColumnType; - this.columnPages = new ColumnPage[this.depth]; - this.currentRowIdList = new int[depth]; + public ComplexColumnPage(List complexColumnInfoList) { + this.complexColumnIndex = complexColumnInfoList.size(); + this.complexColumnInfoList = complexColumnInfoList; + this.columnPages = new ColumnPage[this.complexColumnIndex]; + this.currentRowIdList = new int[complexColumnIndex]; } /** * below method will be used to initlize the column page of complex type * @param columnToDictMap * dictionary map - * @param columnNames - * list of columns * @param pageSize * number of records * @throws MemoryException * if memory is not sufficient */ - public void initialize(Map columnToDictMap, - List columnNames, int pageSize) throws MemoryException { + public void initialize(Map columnToDictMap, int pageSize) + throws MemoryException { + DataType dataType; for (int i = 0; i < this.columnPages.length; i++) { - LocalDictionaryGenerator localDictionaryGenerator = columnToDictMap.get(columnNames.get(i)); + LocalDictionaryGenerator localDictionaryGenerator = + columnToDictMap.get(complexColumnInfoList.get(i).getColumnNames()); + TableSpec.ColumnSpec spec = getColumnSpec(i, localDictionaryGenerator); if (null == localDictionaryGenerator) { - TableSpec.ColumnSpec spec = TableSpec.ColumnSpec - .newInstance(columnNames.get(i), DataTypes.BYTE_ARRAY, complexColumnType.get(i)); - this.columnPages[i] = ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, pageSize); - this.columnPages[i].setStatsCollector(new DummyStatsCollector()); + dataType = complexColumnInfoList.get(i).getColumnDataTypes(); + if (isColumnPageBasedOnDataType(i)) { + // no dictionary primitive types need adaptive encoding, + // hence store as actual value instead of byte array + this.columnPages[i] = ColumnPage.newPage(spec, dataType, pageSize); + this.columnPages[i].setStatsCollector(PrimitivePageStatsCollector.newInstance(dataType)); + } else { + this.columnPages[i] = ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, pageSize); + this.columnPages[i].setStatsCollector(new DummyStatsCollector()); + } } else { - TableSpec.ColumnSpec spec = TableSpec.ColumnSpec - .newInstance(columnNames.get(i), DataTypes.BYTE_ARRAY, complexColumnType.get(i)); this.columnPages[i] = ColumnPage .newLocalDictPage(spec, DataTypes.BYTE_ARRAY, pageSize, localDictionaryGenerator, true); this.columnPages[i].setStatsCollector(new DummyStatsCollector()); @@ -89,57 +97,92 @@ public void initialize(Map columnToDictMap, } } - /** - * - * @return depth - */ - public int getDepth() { - return depth; + private TableSpec.ColumnSpec getColumnSpec(int columnPageIndex, + LocalDictionaryGenerator localDictionaryGenerator) { + if ((localDictionaryGenerator == null) && isColumnPageBasedOnDataType(columnPageIndex)) { + return TableSpec.ColumnSpec + .newInstance(complexColumnInfoList.get(columnPageIndex).getColumnNames(), + complexColumnInfoList.get(columnPageIndex).getColumnDataTypes(), + complexColumnInfoList.get(columnPageIndex).getComplexColumnType()); + } else { + return TableSpec.ColumnSpec + .newInstance(complexColumnInfoList.get(columnPageIndex).getColumnNames(), + DataTypes.BYTE_ARRAY, + complexColumnInfoList.get(columnPageIndex).getComplexColumnType()); + } + } + + private boolean isColumnPageBasedOnDataType(int columnPageIndex) { + DataType dataType = complexColumnInfoList.get(columnPageIndex).getColumnDataTypes(); + if ((complexColumnInfoList.get(columnPageIndex).isNoDictionary() && + !((DataTypes.isStructType(dataType) || + DataTypes.isArrayType(dataType) || + (dataType == DataTypes.STRING) || + (dataType == DataTypes.VARCHAR) || + (dataType == DataTypes.DATE) || + DataTypes.isDecimal(dataType))))) { + // For all these above condition the ColumnPage should be Taken as BYTE_ARRAY + // for all other cases make Column Page Based on each DataType. + return true; + } else { + return false; + } } /** - * return the type of complex column - * @param isDepth - * @return co plex column type + * + * @return complexColumnIndex */ - public ColumnType getComplexColumnType(int isDepth) { - return complexColumnType.get(isDepth); + public int getComplexColumnIndex() { + return complexColumnIndex; } /** * method to add complex column data * @param depth - * depth of column + * complexColumnIndex of column * @param dataList * dataList */ public void putComplexData(int depth, List dataList) { - assert (depth <= this.depth); - int currentNumber = currentRowIdList[depth]; - for (int i = 0; i < dataList.size(); i++) { - columnPages[depth].putData(currentNumber, dataList.get(i)); - currentNumber++; + assert (depth <= this.complexColumnIndex); + int positionNumber = currentRowIdList[depth]; + for (byte[] value : dataList) { + if (columnPages[depth].getDataType() != DataTypes.BYTE_ARRAY) { + if ((value == null) || (value.length == 0)) { + columnPages[depth].putNull(positionNumber); + columnPages[depth].statsCollector.updateNull(positionNumber); + columnPages[depth].nullBitSet.set(positionNumber); + } else { + columnPages[depth].putData(positionNumber, DataTypeUtil + .getDataBasedOnDataTypeForNoDictionaryColumn(value, + columnPages[depth].getColumnSpec().getSchemaDataType(), false)); + } + } else { + columnPages[depth].putData(positionNumber, value); + } + positionNumber++; } - currentRowIdList[depth] = currentNumber; + currentRowIdList[depth] = positionNumber; } /** * to free the used memory */ public void freeMemory() { - for (int i = 0; i < depth; i++) { + for (int i = 0; i < complexColumnIndex; i++) { columnPages[i].freeMemory(); } } /** * return the column page - * @param depth - * depth of column + * @param complexColumnIndex + * complexColumnIndex of column * @return colum page */ - public ColumnPage getColumnPage(int depth) { - assert (depth <= this.depth); - return columnPages[depth]; + public ColumnPage getColumnPage(int complexColumnIndex) { + assert (complexColumnIndex <= this.complexColumnIndex); + return columnPages[complexColumnIndex]; } } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java index 32846a125d3..e16eb935e79 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java @@ -59,9 +59,11 @@ public FallbackColumnPageEncoder(EncodedColumnPage encodedColumnPage, int pageIn TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec(); switch (columnSpec.getColumnType()) { case COMPLEX_ARRAY: - case COMPLEX_PRIMITIVE: case COMPLEX_STRUCT: case COMPLEX: + throw new RuntimeException("Unsupported DataType. Only COMPLEX_PRIMITIVE should come"); + + case COMPLEX_PRIMITIVE: // for complex type column newEncodedColumnPage = ColumnPageEncoder.encodedColumn( encodedColumnPage.getActualPage()); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java index 23046148f59..8f0d934bde3 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java @@ -40,9 +40,12 @@ public class SafeFixLengthColumnPage extends ColumnPage { private byte[] shortIntData; private byte[][] fixedLengthdata; + // total number of entries in array + private int arrayElementCount = 0; SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) { super(columnSpec, dataType, pageSize); + this.fixedLengthdata = new byte[pageSize][]; } SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize, @@ -50,13 +53,14 @@ public class SafeFixLengthColumnPage extends ColumnPage { super(columnSpec, dataType, pageSize); this.fixedLengthdata = new byte[pageSize][]; } - /** * Set byte value at rowId */ @Override public void putByte(int rowId, byte value) { + ensureArraySize(rowId, DataTypes.BYTE); byteData[rowId] = value; + arrayElementCount++; } /** @@ -64,7 +68,9 @@ public void putByte(int rowId, byte value) { */ @Override public void putShort(int rowId, short value) { + ensureArraySize(rowId, DataTypes.SHORT); shortData[rowId] = value; + arrayElementCount++; } /** @@ -72,7 +78,9 @@ public void putShort(int rowId, short value) { */ @Override public void putInt(int rowId, int value) { + ensureArraySize(rowId, DataTypes.INT); intData[rowId] = value; + arrayElementCount++; } /** @@ -80,7 +88,9 @@ public void putInt(int rowId, int value) { */ @Override public void putLong(int rowId, long value) { + ensureArraySize(rowId, DataTypes.LONG); longData[rowId] = value; + arrayElementCount++; } /** @@ -88,7 +98,9 @@ public void putLong(int rowId, long value) { */ @Override public void putDouble(int rowId, double value) { + ensureArraySize(rowId, DataTypes.DOUBLE); doubleData[rowId] = value; + arrayElementCount++; } /** @@ -101,8 +113,10 @@ public void putBytes(int rowId, byte[] bytes) { @Override public void putShortInt(int rowId, int value) { + ensureArraySize(rowId, DataTypes.SHORT_INT); byte[] converted = ByteUtil.to3Bytes(value); System.arraycopy(converted, 0, shortIntData, rowId * 3, 3); + arrayElementCount++; } @Override @@ -291,8 +305,7 @@ public void setShortIntPage(byte[] shortIntData) { /** * Set int values to page */ - @Override - public void setIntPage(int[] intData) { + @Override public void setIntPage(int[] intData) { this.intData = intData; } @@ -346,27 +359,27 @@ public void freeMemory() { @Override public void convertValue(ColumnPageValueConverter codec) { if (dataType == DataTypes.BYTE) { - for (int i = 0; i < pageSize; i++) { + for (int i = 0; i < arrayElementCount; i++) { codec.encode(i, byteData[i]); } } else if (dataType == DataTypes.SHORT) { - for (int i = 0; i < pageSize; i++) { + for (int i = 0; i < arrayElementCount; i++) { codec.encode(i, shortData[i]); } } else if (dataType == DataTypes.INT) { - for (int i = 0; i < pageSize; i++) { + for (int i = 0; i < arrayElementCount; i++) { codec.encode(i, intData[i]); } } else if (dataType == DataTypes.LONG) { - for (int i = 0; i < pageSize; i++) { + for (int i = 0; i < arrayElementCount; i++) { codec.encode(i, longData[i]); } } else if (dataType == DataTypes.FLOAT) { - for (int i = 0; i < pageSize; i++) { + for (int i = 0; i < arrayElementCount; i++) { codec.encode(i, floatData[i]); } } else if (dataType == DataTypes.DOUBLE) { - for (int i = 0; i < pageSize; i++) { + for (int i = 0; i < arrayElementCount; i++) { codec.encode(i, doubleData[i]); } } else { @@ -375,4 +388,52 @@ public void convertValue(ColumnPageValueConverter codec) { } } + protected void ensureArraySize(int requestSize, DataType dataType) { + if (dataType == DataTypes.BYTE) { + if (requestSize >= byteData.length) { + byte[] newArray = new byte[arrayElementCount + 16]; + System.arraycopy(byteData, 0, newArray, 0, arrayElementCount); + byteData = newArray; + } + } else if (dataType == DataTypes.SHORT) { + if (requestSize >= shortData.length) { + short[] newArray = new short[arrayElementCount + 16]; + System.arraycopy(shortData, 0, newArray, 0, arrayElementCount); + shortData = newArray; + } + } else if (dataType == DataTypes.SHORT_INT) { + if (requestSize >= shortIntData.length / 3) { + byte[] newArray = new byte[(arrayElementCount * 3) + (16 * 3)]; + System.arraycopy(shortIntData, 0, newArray, 0, arrayElementCount * 3); + shortIntData = newArray; + } + } else if (dataType == DataTypes.INT) { + if (requestSize >= intData.length) { + int[] newArray = new int[arrayElementCount + 16]; + System.arraycopy(intData, 0, newArray, 0, arrayElementCount); + intData = newArray; + } + } else if (dataType == DataTypes.LONG) { + if (requestSize >= longData.length) { + long[] newArray = new long[arrayElementCount + 16]; + System.arraycopy(longData, 0, newArray, 0, arrayElementCount); + longData = newArray; + } + } else if (dataType == DataTypes.FLOAT) { + if (requestSize >= floatData.length) { + float[] newArray = new float[arrayElementCount + 16]; + System.arraycopy(floatData, 0, newArray, 0, arrayElementCount); + floatData = newArray; + } + } else if (dataType == DataTypes.DOUBLE) { + if (requestSize >= doubleData.length) { + double[] newArray = new double[arrayElementCount + 16]; + System.arraycopy(doubleData, 0, newArray, 0, arrayElementCount); + doubleData = newArray; + } + } else { + throw new UnsupportedOperationException( + "not support value conversion on " + dataType + " page"); + } + } } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java index 7965e9376c3..a4cea5d0555 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java @@ -46,6 +46,12 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { private int eachRowSize; + // the length of the bytes added in the page + private int totalLength; + + // size of the allocated memory, in bytes + private int capacity; + private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId(); private static final int byteBits = DataTypes.BYTE.getSizeBits(); @@ -69,14 +75,17 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size); baseAddress = memoryBlock.getBaseObject(); baseOffset = memoryBlock.getBaseOffset(); + capacity = size; } else if (dataType == DataTypes.SHORT_INT) { int size = pageSize * 3; memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size); baseAddress = memoryBlock.getBaseObject(); baseOffset = memoryBlock.getBaseOffset(); + capacity = size; } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.STRING) { throw new UnsupportedOperationException("invalid data type: " + dataType); } + totalLength = 0; } UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize, @@ -84,6 +93,7 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { throws MemoryException { this(columnSpec, dataType, pageSize); this.eachRowSize = eachRowSize; + totalLength = 0; if (dataType == DataTypes.BYTE_ARRAY) { memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) pageSize * eachRowSize); @@ -92,43 +102,89 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { } } + private void checkDataFileSize() { + // 16 is a Watermark in order to stop from overflowing. + if (totalLength > (Integer.MAX_VALUE - 16)) { + // since we later store a column page in a byte array, so its maximum size is 2GB + throw new RuntimeException("Carbondata only support maximum 2GB size for one column page"); + } + } + @Override public void putByte(int rowId, byte value) { + try { + ensureMemory(ByteUtil.SIZEOF_BYTE); + } catch (MemoryException e) { + throw new RuntimeException(e); + } long offset = ((long)rowId) << byteBits; CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset, value); + totalLength += ByteUtil.SIZEOF_BYTE; } + + @Override public void putShort(int rowId, short value) { + try { + ensureMemory(shortBits); + } catch (MemoryException e) { + throw new RuntimeException(e); + } long offset = ((long)rowId) << shortBits; CarbonUnsafe.getUnsafe().putShort(baseAddress, baseOffset + offset, value); + totalLength += ByteUtil.SIZEOF_SHORT; } @Override public void putShortInt(int rowId, int value) { + try { + ensureMemory(ByteUtil.SIZEOF_SHORT_INT); + } catch (MemoryException e) { + throw new RuntimeException(e); + } byte[] data = ByteUtil.to3Bytes(value); long offset = rowId * 3L; CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset, data[0]); CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset + 1, data[1]); CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset + 2, data[2]); + totalLength += ByteUtil.SIZEOF_SHORT_INT; } @Override public void putInt(int rowId, int value) { + try { + ensureMemory(ByteUtil.SIZEOF_INT); + } catch (MemoryException e) { + throw new RuntimeException(e); + } long offset = ((long)rowId) << intBits; CarbonUnsafe.getUnsafe().putInt(baseAddress, baseOffset + offset, value); + totalLength += ByteUtil.SIZEOF_INT; } @Override public void putLong(int rowId, long value) { + try { + ensureMemory(ByteUtil.SIZEOF_LONG); + } catch (MemoryException e) { + throw new RuntimeException(e); + } long offset = ((long)rowId) << longBits; CarbonUnsafe.getUnsafe().putLong(baseAddress, baseOffset + offset, value); + totalLength += ByteUtil.SIZEOF_LONG; } @Override public void putDouble(int rowId, double value) { + try { + ensureMemory(ByteUtil.SIZEOF_DOUBLE); + } catch (MemoryException e) { + throw new RuntimeException(e); + } long offset = ((long)rowId) << doubleBits; CarbonUnsafe.getUnsafe().putDouble(baseAddress, baseOffset + offset, value); + totalLength += ByteUtil.SIZEOF_DOUBLE; } @Override @@ -307,42 +363,49 @@ public byte[] getComplexParentFlattenedBytePage() { public void setBytePage(byte[] byteData) { CarbonUnsafe.getUnsafe().copyMemory(byteData, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseAddress, baseOffset, byteData.length << byteBits); + capacity = byteData.length; } @Override public void setShortPage(short[] shortData) { CarbonUnsafe.getUnsafe().copyMemory(shortData, CarbonUnsafe.SHORT_ARRAY_OFFSET, baseAddress, baseOffset, shortData.length << shortBits); + capacity = shortData.length; } @Override public void setShortIntPage(byte[] shortIntData) { CarbonUnsafe.getUnsafe().copyMemory(shortIntData, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseAddress, baseOffset, shortIntData.length); + capacity = shortIntData.length; } @Override public void setIntPage(int[] intData) { CarbonUnsafe.getUnsafe().copyMemory(intData, CarbonUnsafe.INT_ARRAY_OFFSET, baseAddress, baseOffset, intData.length << intBits); + capacity = intData.length; } @Override public void setLongPage(long[] longData) { CarbonUnsafe.getUnsafe().copyMemory(longData, CarbonUnsafe.LONG_ARRAY_OFFSET, baseAddress, baseOffset, longData.length << longBits); + capacity = longData.length; } @Override public void setFloatPage(float[] floatData) { CarbonUnsafe.getUnsafe().copyMemory(floatData, CarbonUnsafe.FLOAT_ARRAY_OFFSET, baseAddress, baseOffset, floatData.length << floatBits); + capacity = floatData.length; } @Override public void setDoublePage(double[] doubleData) { CarbonUnsafe.getUnsafe().copyMemory(doubleData, CarbonUnsafe.DOUBLE_ARRAY_OFFSET, baseAddress, baseOffset, doubleData.length << doubleBits); + capacity = doubleData.length; } @Override @@ -359,48 +422,65 @@ public void freeMemory() { } } - @Override - public void convertValue(ColumnPageValueConverter codec) { - int pageSize = getPageSize(); + @Override public void convertValue(ColumnPageValueConverter codec) { + int endLoop = getEndLoop(); if (dataType == DataTypes.BYTE) { - for (long i = 0; i < pageSize; i++) { + for (long i = 0; i < endLoop; i++) { long offset = i << byteBits; - codec.encode((int)i, CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset)); + codec.encode((int) i, CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset)); } } else if (dataType == DataTypes.SHORT) { - for (long i = 0; i < pageSize; i++) { + for (long i = 0; i < endLoop; i++) { long offset = i << shortBits; - codec.encode((int)i, CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset)); + codec.encode((int) i, CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset)); } } else if (dataType == DataTypes.INT) { - for (long i = 0; i < pageSize; i++) { + for (long i = 0; i < endLoop; i++) { long offset = i << intBits; - codec.encode((int)i, CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset)); + codec.encode((int) i, CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset)); } } else if (dataType == DataTypes.LONG) { - for (long i = 0; i < pageSize; i++) { + for (long i = 0; i < endLoop; i++) { long offset = i << longBits; - codec.encode((int)i, CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset)); + codec.encode((int) i, CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset)); } } else if (dataType == DataTypes.FLOAT) { - for (long i = 0; i < pageSize; i++) { + for (long i = 0; i < endLoop; i++) { long offset = i << floatBits; - codec.encode((int)i, CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset)); + codec.encode((int) i, CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset)); } } else if (dataType == DataTypes.DOUBLE) { - for (long i = 0; i < pageSize; i++) { + for (long i = 0; i < endLoop; i++) { long offset = i << doubleBits; - codec.encode((int)i, CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset)); + codec.encode((int) i, CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset)); } } else { throw new UnsupportedOperationException("invalid data type: " + dataType); } } + private int getEndLoop() { + if (dataType == DataTypes.BYTE) { + return totalLength / ByteUtil.SIZEOF_BYTE; + } else if (dataType == DataTypes.SHORT) { + return totalLength / ByteUtil.SIZEOF_SHORT; + } else if (dataType == DataTypes.INT) { + return totalLength / ByteUtil.SIZEOF_INT; + } else if (dataType == DataTypes.LONG) { + return totalLength / ByteUtil.SIZEOF_LONG; + } else if (dataType == DataTypes.FLOAT) { + return totalLength / DataTypes.FLOAT.getSizeInBytes(); + } else if (dataType == DataTypes.DOUBLE) { + return totalLength / DataTypes.DOUBLE.getSizeInBytes(); + } else { + throw new UnsupportedOperationException("invalid data type: " + dataType); + } + } + @Override public byte[] compress(Compressor compressor) throws MemoryException, IOException { if (UnsafeMemoryManager.isOffHeap()) { // use raw compression and copy to byte[] - int inputSize = pageSize * dataType.getSizeInBytes(); + int inputSize = totalLength; int compressedMaxSize = compressor.maxCompressedLength(inputSize); MemoryBlock compressed = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, compressedMaxSize); @@ -416,4 +496,22 @@ public void convertValue(ColumnPageValueConverter codec) { return super.compress(compressor); } } + + /** + * reallocate memory if capacity length than current size + request size + */ + protected void ensureMemory(int requestSize) throws MemoryException { + checkDataFileSize(); + if (totalLength + requestSize > capacity) { + int newSize = Math.max(2 * capacity, totalLength + requestSize); + MemoryBlock newBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, newSize); + CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset, + newBlock.getBaseObject(), newBlock.getBaseOffset(), totalLength); + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); + memoryBlock = newBlock; + baseAddress = newBlock.getBaseObject(); + baseOffset = newBlock.getBaseOffset(); + capacity = newSize; + } + } } \ No newline at end of file diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java index f53024a7770..b5a63f81591 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java @@ -24,6 +24,10 @@ import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.ColumnType; +import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; @@ -40,8 +44,17 @@ import org.apache.carbondata.format.LocalDictionaryChunkMeta; import org.apache.carbondata.format.PresenceMeta; +import static org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory.selectCodecByAlgorithmForFloating; +import static org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory.selectCodecByAlgorithmForIntegral; + public abstract class ColumnPageEncoder { + /** + * logger + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(ColumnPageEncoder.class.getName()); + protected abstract byte[] encodeData(ColumnPage input) throws MemoryException, IOException; protected abstract List getEncodingList(); @@ -135,9 +148,9 @@ protected void fillLegacyFields(DataChunk2 dataChunk) */ public static EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) throws IOException, MemoryException { - EncodedColumnPage[] encodedPages = new EncodedColumnPage[input.getDepth()]; + EncodedColumnPage[] encodedPages = new EncodedColumnPage[input.getComplexColumnIndex()]; int index = 0; - while (index < input.getDepth()) { + while (index < input.getComplexColumnIndex()) { ColumnPage subColumnPage = input.getColumnPage(index); encodedPages[index] = encodedColumn(subColumnPage); index++; @@ -147,10 +160,43 @@ public static EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) public static EncodedColumnPage encodedColumn(ColumnPage page) throws IOException, MemoryException { - ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null); - return encoder.encode(page); + ColumnPageEncoder pageEncoder = createCodecForDimension(page); + if (pageEncoder == null) { + ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null); + return encoder.encode(page); + } else { + LOGGER.debug("Encoder result ---> Source data type: " + pageEncoder.getEncoderMeta(page) + .getColumnSpec().getSchemaDataType() + " Destination data type: " + pageEncoder + .getEncoderMeta(page).getStoreDataType() + " for the column: " + pageEncoder + .getEncoderMeta(page).getColumnSpec().getFieldName()); + + return pageEncoder.encode(page); + } } + private static ColumnPageEncoder createCodecForDimension(ColumnPage inputPage) { + TableSpec.ColumnSpec columnSpec = inputPage.getColumnSpec(); + if (columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) { + if (inputPage.getDataType() == DataTypes.BYTE_ARRAY + || inputPage.getDataType() == DataTypes.STRING) { + // use legacy encoder + return null; + } else if ((inputPage.getDataType() == DataTypes.BYTE) || (inputPage.getDataType() + == DataTypes.SHORT) || (inputPage.getDataType() == DataTypes.INT) || ( + inputPage.getDataType() == DataTypes.LONG)) { + return selectCodecByAlgorithmForIntegral(inputPage.getStatistics(), true) + .createEncoder(null); + } else if ((inputPage.getDataType() == DataTypes.FLOAT) || (inputPage.getDataType() + == DataTypes.DOUBLE)) { + return selectCodecByAlgorithmForFloating(inputPage.getStatistics(), true) + .createEncoder(null); + } + } + // use legacy encoder + return null; + } + + /** * Below method to encode the dictionary page * @param dictionaryPage diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java index 659feb015d4..4e041863ef4 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java @@ -118,7 +118,7 @@ private void writeMinMax(DataOutput out) throws IOException { out.writeInt((int) getMaxValue()); out.writeInt((int) getMinValue()); out.writeLong(0L); // unique value is obsoleted, maintain for compatibility - } else if (dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { out.writeLong((Long) getMaxValue()); out.writeLong((Long) getMinValue()); out.writeLong(0L); // unique value is obsoleted, maintain for compatibility @@ -167,7 +167,7 @@ private void readMinMax(DataInput in) throws IOException { this.setMaxValue(in.readInt()); this.setMinValue(in.readInt()); in.readLong(); // for non exist value which is obsoleted, it is backward compatibility; - } else if (dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { this.setMaxValue(in.readLong()); this.setMinValue(in.readLong()); in.readLong(); // for non exist value which is obsoleted, it is backward compatibility; diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java index 816b01f839d..dc79b1321aa 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java @@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DictDimensionIndexCodec; import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DirectDictDimensionIndexCodec; import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.HighCardDictDimensionIndexCodec; +import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector; import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; @@ -120,12 +121,11 @@ private ColumnPageEncoder createEncoderForMeasure(ColumnPage columnPage) { dataType == DataTypes.SHORT || dataType == DataTypes.INT || dataType == DataTypes.LONG) { - return selectCodecByAlgorithmForIntegral(stats).createEncoder(null); + return selectCodecByAlgorithmForIntegral(stats, false).createEncoder(null); } else if (DataTypes.isDecimal(dataType)) { return createEncoderForDecimalDataTypeMeasure(columnPage); - } else if (dataType == DataTypes.FLOAT || - dataType == DataTypes.DOUBLE) { - return selectCodecByAlgorithmForFloating(stats).createEncoder(null); + } else if (dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) { + return selectCodecByAlgorithmForFloating(stats, false).createEncoder(null); } else if (dataType == DataTypes.BYTE_ARRAY) { return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null); } else { @@ -161,13 +161,13 @@ private static DataType fitLongMinMax(long max, long min) { } private static DataType fitMinMax(DataType dataType, Object max, Object min) { - if (dataType == DataTypes.BYTE) { + if (dataType == DataTypes.BYTE || dataType == DataTypes.BOOLEAN) { return fitLongMinMax((byte) max, (byte) min); } else if (dataType == DataTypes.SHORT) { return fitLongMinMax((short) max, (short) min); } else if (dataType == DataTypes.INT) { return fitLongMinMax((int) max, (int) min); - } else if (dataType == DataTypes.LONG) { + } else if ((dataType == DataTypes.LONG) || (dataType == DataTypes.TIMESTAMP)) { return fitLongMinMax((long) max, (long) min); } else if (dataType == DataTypes.DOUBLE) { return fitLongMinMax((long) (double) max, (long) (double) min); @@ -209,13 +209,13 @@ private static DataType fitDeltaForDecimalType(DataType dataType, Object max, Ob private static DataType fitDelta(DataType dataType, Object max, Object min) { // use long data type to calculate delta to avoid overflow long value; - if (dataType == DataTypes.BYTE) { + if (dataType == DataTypes.BYTE || dataType == DataTypes.BOOLEAN) { value = (long) (byte) max - (long) (byte) min; } else if (dataType == DataTypes.SHORT) { value = (long) (short) max - (long) (short) min; } else if (dataType == DataTypes.INT) { value = (long) (int) max - (long) (int) min; - } else if (dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { value = (long) max - (long) min; // The subtraction overflowed iff the operands have opposing signs // and the result's sign differs from the minuend. @@ -249,7 +249,8 @@ private static DataType compareMinMaxAndSelectDataType(long value) { * choose between adaptive encoder or delta adaptive encoder, based on whose target data type * size is smaller */ - static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats) { + static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats, + boolean isComplexPrimitive) { DataType srcDataType = stats.getDataType(); DataType adaptiveDataType = fitMinMax(stats.getDataType(), stats.getMax(), stats.getMin()); DataType deltaDataType; @@ -259,12 +260,15 @@ static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats } else { deltaDataType = fitDelta(stats.getDataType(), stats.getMax(), stats.getMin()); } - // in case of decimal data type check if the decimal converter type is Int or Long and based on - // that get size in bytes - if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) == srcDataType - .getSizeInBytes()) { - // no effect to use adaptive or delta, use compression only - return new DirectCompressCodec(stats.getDataType()); + // for complex primitive, if source and destination data type is same, use adaptive encoding. + if (!isComplexPrimitive) { + // in case of decimal datatype, check if the decimal converter type is Int or Long and based + // on that get size in bytes + if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) == srcDataType + .getSizeInBytes()) { + // no effect to use adaptive or delta, use compression only + return new DirectCompressCodec(stats.getDataType()); + } } if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) { // choose adaptive encoding @@ -277,19 +281,27 @@ static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats // choose between upscale adaptive encoder or upscale delta adaptive encoder, // based on whose target data type size is smaller - static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats) { + static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats, + boolean isComplexPrimitive) { DataType srcDataType = stats.getDataType(); double maxValue = (double) stats.getMax(); double minValue = (double) stats.getMin(); int decimalCount = stats.getDecimalCount(); + // For Complex Type primitive we should always choose adaptive path + // as LV format will be reduced to only V format. Therefore inorder + // to do that decimal count should be actual count instead of -1. + if (isComplexPrimitive && decimalCount == -1 && stats instanceof PrimitivePageStatsCollector) { + decimalCount = ((PrimitivePageStatsCollector)stats).getDecimalForComplexPrimitive(); + } + //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max, //but we can't use -1 to getDatatype, we should use -10000000. double absMaxValue = Math.max(Math.abs(maxValue), Math.abs(minValue)); if (decimalCount == 0) { // short, int, long - return selectCodecByAlgorithmForIntegral(stats); - } else if (decimalCount < 0) { + return selectCodecByAlgorithmForIntegral(stats, false); + } else if (decimalCount < 0 && !isComplexPrimitive) { return new DirectCompressCodec(DataTypes.DOUBLE); } else { // double @@ -299,7 +311,9 @@ static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats (long) (Math.pow(10, decimalCount) * (maxValue - minValue))); if (adaptiveDataType.getSizeInBytes() > deltaDataType.getSizeInBytes()) { return new AdaptiveDeltaFloatingCodec(srcDataType, deltaDataType, stats); - } else if (adaptiveDataType.getSizeInBytes() < DataTypes.DOUBLE.getSizeInBytes()) { + } else if (adaptiveDataType.getSizeInBytes() < DataTypes.DOUBLE.getSizeInBytes() || ( + (isComplexPrimitive) && (adaptiveDataType.getSizeInBytes() == DataTypes.DOUBLE + .getSizeInBytes()))) { return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats); } else { return new DirectCompressCodec(DataTypes.DOUBLE); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java index a661a49874d..8bc67c00f3c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java @@ -132,7 +132,8 @@ public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata) { dataType == DataTypes.INT || dataType == DataTypes.LONG) { // create the codec based on algorithm and create decoder by recovering the metadata - ColumnPageCodec codec = DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(stats); + ColumnPageCodec codec = + DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(stats, false); if (codec instanceof AdaptiveIntegralCodec) { AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec; ColumnPageEncoderMeta meta = @@ -153,7 +154,8 @@ public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata) { } } else if (dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) { // create the codec based on algorithm and create decoder by recovering the metadata - ColumnPageCodec codec = DefaultEncodingFactory.selectCodecByAlgorithmForFloating(stats); + ColumnPageCodec codec = + DefaultEncodingFactory.selectCodecByAlgorithmForFloating(stats, false); if (codec instanceof AdaptiveFloatingCodec) { AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec; ColumnPageEncoderMeta meta = diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java index a543f7e5812..e98397dda72 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java @@ -57,7 +57,7 @@ public AdaptiveDeltaIntegralCodec(DataType srcDataType, DataType targetDataType, this.max = (short) stats.getMax(); } else if (srcDataType == DataTypes.INT) { this.max = (int) stats.getMax(); - } else if (srcDataType == DataTypes.LONG) { + } else if (srcDataType == DataTypes.LONG || srcDataType == DataTypes.TIMESTAMP) { this.max = (long) stats.getMax(); } else if (srcDataType == DataTypes.DOUBLE) { this.max = (long) (double) stats.getMax(); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java index 76cb0029435..bbac772f622 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java @@ -41,6 +41,9 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si // scale of the double value, apply adaptive encoding if this is positive private int decimal; + // scale of the double value, only for complex primitive. + private int decimalCountForComplexPrimitive; + private boolean isFirst = true; private BigDecimal zeroDecimal; @@ -64,7 +67,7 @@ public static PrimitivePageStatsCollector newInstance(ColumnPageEncoderMeta meta } else if (dataType == DataTypes.INT) { instance.minInt = (int) meta.getMinValue(); instance.maxInt = (int) meta.getMaxValue(); - } else if (dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { instance.minLong = (long) meta.getMinValue(); instance.maxLong = (long) meta.getMaxValue(); } else if (dataType == DataTypes.DOUBLE) { @@ -96,7 +99,8 @@ public static PrimitivePageStatsCollector newInstance(ValueEncoderMeta meta) { } else if (dataType == DataTypes.INT) { instance.minInt = (int) meta.getMinValue(); instance.maxInt = (int) meta.getMaxValue(); - } else if (dataType == DataTypes.LEGACY_LONG || dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LEGACY_LONG || dataType == DataTypes.LONG + || dataType == DataTypes.TIMESTAMP) { instance.minLong = (long) meta.getMinValue(); instance.maxLong = (long) meta.getMaxValue(); } else if (dataType == DataTypes.DOUBLE) { @@ -128,7 +132,8 @@ private PrimitivePageStatsCollector(DataType dataType) { } else if (dataType == DataTypes.INT) { minInt = Integer.MAX_VALUE; maxInt = Integer.MIN_VALUE; - } else if (dataType == DataTypes.LEGACY_LONG || dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LEGACY_LONG || dataType == DataTypes.LONG + || dataType == DataTypes.TIMESTAMP) { minLong = Long.MAX_VALUE; maxLong = Long.MIN_VALUE; } else if (dataType == DataTypes.DOUBLE) { @@ -153,7 +158,7 @@ public void updateNull(int rowId) { update((short) value); } else if (dataType == DataTypes.INT) { update((int) value); - } else if (dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { update(value); } else if (dataType == DataTypes.DOUBLE) { update(0d); @@ -236,6 +241,7 @@ public void update(double value) { } if (decimal >= 0) { int decimalCount = getDecimalCount(value); + decimalCountForComplexPrimitive = decimalCount; if (decimalCount > 5) { // If deciaml count is too big, we do not do adaptive encoding. // So set decimal to negative value @@ -246,6 +252,11 @@ public void update(double value) { } } + public int getDecimalForComplexPrimitive() { + decimal = decimalCountForComplexPrimitive; + return decimalCountForComplexPrimitive; + } + @Override public void update(BigDecimal decimalValue) { if (isFirst) { @@ -294,7 +305,7 @@ public Object getMin() { return minShort; } else if (dataType == DataTypes.INT) { return minInt; - } else if (dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { return minLong; } else if (dataType == DataTypes.DOUBLE) { return minDouble; @@ -312,7 +323,7 @@ public Object getMax() { return maxShort; } else if (dataType == DataTypes.INT) { return maxInt; - } else if (dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { return maxLong; } else if (dataType == DataTypes.DOUBLE) { return maxDouble; diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/ComplexColumnInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/ComplexColumnInfo.java new file mode 100644 index 00000000000..e8d705036a8 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/ComplexColumnInfo.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datastore.row; + +import org.apache.carbondata.core.datastore.ColumnType; +import org.apache.carbondata.core.metadata.datatype.DataType; + +/** + * Wrapper object to hold the complex column details + */ +public class ComplexColumnInfo { + private ColumnType complexColumnType; + private DataType columnDataTypes; + private String columnNames; + private boolean isNoDictionary; + + public ComplexColumnInfo(ColumnType complexColumnType, DataType columnDataTypes, + String columnNames, boolean isNoDictionary) { + this.complexColumnType = complexColumnType; + this.columnDataTypes = columnDataTypes; + this.columnNames = columnNames; + this.isNoDictionary = isNoDictionary; + } + + public ColumnType getComplexColumnType() { + return complexColumnType; + } + + public DataType getColumnDataTypes() { + return columnDataTypes; + } + + public String getColumnNames() { + return columnNames; + } + + public boolean isNoDictionary() { + return isNoDictionary; + } +} + + diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java index 948b765e94e..b3f13d745e3 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java @@ -63,7 +63,7 @@ public PrimitiveQueryType(String name, String parentname, int blockIndex, this.name = name; this.parentname = parentname; this.isDirectDictionary = isDirectDictionary; - this.isDictionary = (dictionary != null && isDirectDictionary == false); + this.isDictionary = (dictionary != null && !isDirectDictionary); this.directDictGenForDate = DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(DataTypes.DATE); } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java index c8b0f6ec8bf..2e870514ca7 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -37,6 +38,8 @@ import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -873,4 +876,23 @@ private static void getChildDimensionOrdinal(CarbonDimension queryDimensions, } } } + + /** + * Below method will be used to convert the thrift presence meta to wrapper + * presence meta + * + * @param presentMetadataThrift + * @return wrapper presence meta + */ + public static BitSet getNullBitSet( + org.apache.carbondata.format.PresenceMeta presentMetadataThrift) { + Compressor compressor = CompressorFactory.getInstance().getCompressor(); + final byte[] present_bit_stream = presentMetadataThrift.getPresent_bit_stream(); + if (null != present_bit_stream) { + return BitSet + .valueOf(compressor.unCompressByte(present_bit_stream)); + } else { + return new BitSet(1); + } + } } diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java index 1df60c118fe..322c80a549e 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java @@ -29,11 +29,17 @@ */ public final class ByteUtil { - public static final int SIZEOF_LONG = 8; + public static final int SIZEOF_BYTE = 1; + + public static final int SIZEOF_SHORT = 2; + + public static final int SIZEOF_SHORT_INT = 3; public static final int SIZEOF_INT = 4; - public static final int SIZEOF_SHORT = 2; + public static final int SIZEOF_LONG = 8; + + public static final int SIZEOF_DOUBLE = 8; public static final String UTF8_CSN = StandardCharsets.UTF_8.name(); diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 789897e1932..dd34bc60d27 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -2471,7 +2471,7 @@ public static byte[] getValueAsBytes(DataType dataType, Object value) { b.putLong((int) value); b.flip(); return b.array(); - } else if (dataType == DataTypes.LONG) { + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { b = ByteBuffer.allocate(8); b.putLong((long) value); b.flip(); @@ -2486,7 +2486,6 @@ public static byte[] getValueAsBytes(DataType dataType, Object value) { } else if (dataType == DataTypes.BYTE_ARRAY) { return (byte[]) value; } else if (dataType == DataTypes.STRING - || dataType == DataTypes.TIMESTAMP || dataType == DataTypes.DATE || dataType == DataTypes.VARCHAR) { return (byte[]) value; diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java index 0306c020e1c..54b7441ddd3 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java @@ -429,16 +429,29 @@ public static boolean isFixedSizeDataType(DataType dataType) { } } + /** + * Wrapper for actual getDataBasedOnDataTypeForNoDictionaryColumn. + * + * @param dataInBytes + * @param actualDataType + * @return + */ + public static Object getDataBasedOnDataTypeForNoDictionaryColumn(byte[] dataInBytes, + DataType actualDataType) { + return getDataBasedOnDataTypeForNoDictionaryColumn(dataInBytes, actualDataType, true); + } + /** * Below method will be used to convert the data passed to its actual data * type * - * @param dataInBytes data - * @param actualDataType actual data type + * @param dataInBytes data + * @param actualDataType actual data type + * @param isTimeStampConversion * @return actual data after conversion */ public static Object getDataBasedOnDataTypeForNoDictionaryColumn(byte[] dataInBytes, - DataType actualDataType) { + DataType actualDataType, boolean isTimeStampConversion) { if (null == dataInBytes || Arrays .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, dataInBytes)) { return null; @@ -467,7 +480,11 @@ public static Object getDataBasedOnDataTypeForNoDictionaryColumn(byte[] dataInBy if (isEmptyByteArray(dataInBytes)) { return null; } - return ByteUtil.toLong(dataInBytes, 0, dataInBytes.length) * 1000L; + if (isTimeStampConversion) { + return ByteUtil.toLong(dataInBytes, 0, dataInBytes.length) * 1000L; + } else { + return ByteUtil.toLong(dataInBytes, 0, dataInBytes.length); + } } else if (actualDataType == DataTypes.DOUBLE) { if (isEmptyByteArray(dataInBytes)) { return null; diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/TestEncodingFactory.java b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/TestEncodingFactory.java index 52a4de34a9e..0b8bcc71210 100644 --- a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/TestEncodingFactory.java +++ b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/TestEncodingFactory.java @@ -38,25 +38,25 @@ public class TestEncodingFactory extends TestCase { // for Byte primitivePageStatsCollector.update((long) Byte.MAX_VALUE); ColumnPageCodec columnPageCodec = - DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector); + DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false); assert (columnPageCodec instanceof AdaptiveIntegralCodec); assert (DataTypes.BYTE == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType()); // for Short primitivePageStatsCollector.update((long) Short.MAX_VALUE); columnPageCodec = - DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector); + DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false); assert (columnPageCodec instanceof AdaptiveIntegralCodec); assert (DataTypes.SHORT == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType()); // for int primitivePageStatsCollector.update((long) Integer.MAX_VALUE); columnPageCodec = - DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector); + DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false); assert (columnPageCodec instanceof AdaptiveIntegralCodec); assert (DataTypes.INT == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType()); // for long primitivePageStatsCollector.update(Long.MAX_VALUE); columnPageCodec = - DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector); + DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false); assert (columnPageCodec instanceof DirectCompressCodec); assert ("DirectCompressCodec".equals(columnPageCodec.getName())); } @@ -67,25 +67,25 @@ public class TestEncodingFactory extends TestCase { // for Byte primitivePageStatsCollector.update((long) 200); ColumnPageCodec columnPageCodec = - DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector); + DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false); assert (columnPageCodec instanceof AdaptiveDeltaIntegralCodec); assert (DataTypes.BYTE == ((AdaptiveDeltaIntegralCodec) columnPageCodec).getTargetDataType()); // for Short primitivePageStatsCollector.update((long) 634767); columnPageCodec = - DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector); + DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false); assert (columnPageCodec instanceof AdaptiveIntegralCodec); assert (DataTypes.SHORT_INT == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType()); // for int primitivePageStatsCollector.update((long) (Integer.MAX_VALUE + 200)); columnPageCodec = - DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector); + DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false); assert (columnPageCodec instanceof AdaptiveIntegralCodec); assert (DataTypes.INT == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType()); // for int primitivePageStatsCollector.update(Long.MAX_VALUE); columnPageCodec = - DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector); + DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false); assert (columnPageCodec instanceof DirectCompressCodec); assert ("DirectCompressCodec".equals(columnPageCodec.getName())); } diff --git a/integration/spark-common-test/src/test/resources/adap.csv b/integration/spark-common-test/src/test/resources/adap.csv new file mode 100644 index 00000000000..de553d307e0 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/adap.csv @@ -0,0 +1,3 @@ +1,500$abc$20:30:40 +2,600$abc$20:30:40 +3,600$abc$20:30:40 diff --git a/integration/spark-common-test/src/test/resources/adap_double1.csv b/integration/spark-common-test/src/test/resources/adap_double1.csv new file mode 100644 index 00000000000..148c73f58d0 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/adap_double1.csv @@ -0,0 +1,3 @@ +1,1.323$abc$2.2:3.3:4.4 +2,1.323$abc$2.2:3.3:4.4 +3,1.323$abc$2.2:3.3:4.4 diff --git a/integration/spark-common-test/src/test/resources/adap_double2.csv b/integration/spark-common-test/src/test/resources/adap_double2.csv new file mode 100644 index 00000000000..9c712880d32 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/adap_double2.csv @@ -0,0 +1,3 @@ +1,1.323$abc$20.2:30.3:40.4 +2,2.323$abc$20.2:30.3:40.4 +3,4.323$abc$20.2:30.3:40.4 diff --git a/integration/spark-common-test/src/test/resources/adap_double3.csv b/integration/spark-common-test/src/test/resources/adap_double3.csv new file mode 100644 index 00000000000..c85574cd125 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/adap_double3.csv @@ -0,0 +1,3 @@ +1,1.323$abc$20.2:30.3:500.423 +2,2.323$abc$20.2:30.3:500.423 +3,50.323$abc$20.2:30.3:500.423 diff --git a/integration/spark-common-test/src/test/resources/adap_double4.csv b/integration/spark-common-test/src/test/resources/adap_double4.csv new file mode 100644 index 00000000000..a1e822b2d1d --- /dev/null +++ b/integration/spark-common-test/src/test/resources/adap_double4.csv @@ -0,0 +1,3 @@ +1,1.323$abc$20.2:30.3:50000.423 +2,2.323$abc$20.2:30.3:50000.423 +3,50000.323$abc$20.2:30.3:50000.423 diff --git a/integration/spark-common-test/src/test/resources/adap_int1.csv b/integration/spark-common-test/src/test/resources/adap_int1.csv new file mode 100644 index 00000000000..5db704e1c23 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/adap_int1.csv @@ -0,0 +1,3 @@ +1,500$abc$200:300:400 +2,700$abc$200:300:400 +3,800$abc$200:300:400 diff --git a/integration/spark-common-test/src/test/resources/adap_int2.csv b/integration/spark-common-test/src/test/resources/adap_int2.csv new file mode 100644 index 00000000000..b67b8cc66c2 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/adap_int2.csv @@ -0,0 +1,3 @@ +1,50000$abc$2000000:3000000:4000000 +2,70000$abc$2000000:3000000:4000000 +3,100000$abc$2000000:3000000:4000000 diff --git a/integration/spark-common-test/src/test/resources/adap_int3.csv b/integration/spark-common-test/src/test/resources/adap_int3.csv new file mode 100644 index 00000000000..ea0fed60fa3 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/adap_int3.csv @@ -0,0 +1,3 @@ +1,500000$abc$200:300:52000000 +2,7000000$abc$200:300:52000000 +3,10000000$abc$200:300:52000000 diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveComplexType.scala new file mode 100644 index 00000000000..6b0a13f1a06 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveComplexType.scala @@ -0,0 +1,554 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.integration.spark.testsuite.complexType + +import java.sql.Timestamp + +import scala.collection.mutable + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +trait TestAdaptiveComplexType extends QueryTest { + + test("test INT with struct and array, Encoding INT-->BYTE") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap.csv' into table adaptive options('delimiter'=','," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500, "abc", mutable.WrappedArray.make(Array(20, 30, 40)))), + Row(2, Row(600, "abc", mutable.WrappedArray.make(Array(20, 30, 40)))), + Row(3, Row(600, "abc", mutable.WrappedArray.make(Array(20, 30, 40)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'500$abc$20:30:40')") + sql("insert into adaptive values(2,'600$abc$20:30:40')") + sql("insert into adaptive values(3,'600$abc$20:30:40')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500, "abc", mutable.WrappedArray.make(Array(20, 30, 40)))), + Row(2, Row(600, "abc", mutable.WrappedArray.make(Array(20, 30, 40)))), + Row(3, Row(600, "abc", mutable.WrappedArray.make(Array(20, 30, 40)))))) + } + + test("test INT with struct and array, Encoding INT-->SHORT") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap_int1.csv' into table adaptive options('delimiter'=','," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))), + Row(2, Row(700, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))), + Row(3, Row(800, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'500$abc$200:300:400')") + sql("insert into adaptive values(2,'700$abc$200:300:400')") + sql("insert into adaptive values(3,'800$abc$200:300:400')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))), + Row(2, Row(700, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))), + Row(3, Row(800, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))))) + } + + test("test INT with struct and array, Encoding INT-->SHORT INT") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap_int2.csv' into table adaptive options('delimiter'=','," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(50000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))), + Row(2, Row(70000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))), + Row(3, Row(100000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'50000$abc$2000000:3000000:4000000')") + sql("insert into adaptive values(2,'70000$abc$2000000:3000000:4000000')") + sql("insert into adaptive values(3,'100000$abc$2000000:3000000:4000000')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(50000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))), + Row(2, Row(70000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))), + Row(3, Row(100000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))))) + } + + test("test INT with struct and array, Encoding INT-->INT") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap_int3.csv' into table adaptive options('delimiter'=','," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))), + Row(2, Row(7000000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))), + Row(3, Row(10000000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'500000$abc$200:300:52000000')") + sql("insert into adaptive values(2,'700000$abc$200:300:52000000')") + sql("insert into adaptive values(3,'10000000$abc$200:300:52000000')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))), + Row(2, Row(700000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))), + Row(3, Row(10000000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))))) + } + + + test("test SMALLINT with struct and array SMALLINT --> BYTE") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'100$abc$20:30:40')") + sql("insert into adaptive values(2,'200$abc$30:40:50')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(100, "abc", mutable.WrappedArray.make(Array(20, 30, 40)))), + Row(2, Row(200, "abc", mutable.WrappedArray.make(Array(30, 40, 50)))))) + } + + test("test SMALLINT with struct and array SMALLINT --> SHORT") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'500$abc$200:300:400')") + sql("insert into adaptive values(2,'8000$abc$300:400:500')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))), + Row(2, Row(8000, "abc", mutable.WrappedArray.make(Array(300, 400, 500)))))) + } + + test("test BigInt with struct and array BIGINT --> BYTE") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'1$abc$20:30:40')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(1, "abc", mutable.WrappedArray.make(Array(20, 30, 40)))))) + } + + test("test BigInt with struct and array BIGINT --> SHORT") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'500$abc$200:300:400')") + sql("insert into adaptive values(2,'8000$abc$300:400:500')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))), + Row(2, Row(8000, "abc", mutable.WrappedArray.make(Array(300, 400, 500)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>)" + + " " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap_int1.csv' into table adaptive options('delimiter'=','," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))), + Row(2, Row(700, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))), + Row(3, Row(800, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))))) + } + + test("test BigInt with struct and array BIGINT --> SHORT INT") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'50000$abc$2000000:3000000:4000000')") + sql("insert into adaptive values(2,'70000$abc$2000000:3000000:4000000')") + sql("insert into adaptive values(3,'100000$abc$2000000:3000000:4000000')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(50000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))), + Row(2, Row(70000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))), + Row(3, Row(100000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>)" + + " " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap_int2.csv' into table adaptive options('delimiter'=','," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(50000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))), + Row(2, Row(70000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))), + Row(3, Row(100000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))))) + } + + test("test BIGINT with struct and array, Encoding INT-->INT") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>)" + + " " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap_int3.csv' into table adaptive options('delimiter'=','," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))), + Row(2, Row(7000000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))), + Row(3, Row(10000000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>)" + + " " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'500000$abc$200:300:52000000')") + sql("insert into adaptive values(2,'700000$abc$200:300:52000000')") + sql("insert into adaptive values(3,'10000000$abc$200:300:52000000')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))), + Row(2, Row(700000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))), + Row(3, Row(10000000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))))) + } + + test("test Double with Struct and Array DOUBLE --> BYTE") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'1.323$abc$2.2:3.3:4.4')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(1.323, "abc", mutable.WrappedArray.make(Array(2.2, 3.3, 4.4)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>)" + + " " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap_double1.csv' into table adaptive options('delimiter'='," + + "'," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(1.323, "abc", mutable.WrappedArray.make(Array(2.2, 3.3, 4.4)))), + Row(2, Row(1.323, "abc", mutable.WrappedArray.make(Array(2.2, 3.3, 4.4)))), + Row(3, Row(1.323, "abc", mutable.WrappedArray.make(Array(2.2, 3.3, 4.4)))))) + } + + test("test Double with Struct and Array DOUBLE --> SHORT") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'1.323$abc$20.2:30.3:40.4')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(1.323, "abc", mutable.WrappedArray.make(Array(20.2, 30.3, 40.4)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>)" + + " " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap_double2.csv' into table adaptive options('delimiter'='," + + "'," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(1.323, "abc", mutable.WrappedArray.make(Array(20.2, 30.3, 40.4)))), + Row(2, Row(2.323, "abc", mutable.WrappedArray.make(Array(20.2, 30.3, 40.4)))), + Row(3, Row(4.323, "abc", mutable.WrappedArray.make(Array(20.2, 30.3, 40.4)))))) + } + + test("test Double with Struct and Array DOUBLE --> SHORT INT") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'10.323$abc$20.2:30.3:500.423')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(10.323, "abc", mutable.WrappedArray.make(Array(20.2, 30.3, 500.423)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>)" + + " " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap_double3.csv' into table adaptive options('delimiter'='," + + "'," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(1.323, "abc", mutable.WrappedArray.make(Array(20.2, 30.3, 500.423)))), + Row(2, Row(2.323, "abc", mutable.WrappedArray.make(Array(20.2, 30.3, 500.423)))), + Row(3, Row(50.323, "abc", mutable.WrappedArray.make(Array(20.2, 30.3, 500.423)))))) + } + + test("test Double with Struct and Array DOUBLE --> INT") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'1000.323$abc$20.2:30.3:50000.423')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(1000.323, "abc", mutable.WrappedArray.make(Array(20.2, 30.3, 50000.423)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>)" + + " " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap_double4.csv' into table adaptive options('delimiter'='," + + "'," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(1.323, "abc", mutable.WrappedArray.make(Array(20.2, 30.3, 50000.423)))), + Row(2, Row(2.323, "abc", mutable.WrappedArray.make(Array(20.2, 30.3, 50000.423)))), + Row(3, Row(50000.323, "abc", mutable.WrappedArray.make(Array(20.2, 30.3, 50000.423)))))) + } + + test("test Double with Struct and Array DOUBLE --> DOUBLE") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'1.797693134862315$abc$2.2:30.3:1.797693134862315')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, + Row(1.797693134862315, + "abc", + mutable.WrappedArray.make(Array(2.2, 30.3, 1.797693134862315)))))) + + } + + test("test Decimal with Struct") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct) stored by " + + "'carbondata'") + sql("insert into adaptive values(1,'3.2$abc')") + sql("select * from adaptive").show(false) + } + + test("test Decimal with Array") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'abc$20.2:30.3:40.4')") + sql("select * from adaptive").show(false) + } + + test("test Timestamp with Struct") { + sql("Drop table if exists adaptive") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + sql( + "create table adaptive(roll int, student struct) stored by " + + "'carbondata'") + sql("insert into adaptive values(1,'2017/01/01 00:00:00$abc')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(Timestamp.valueOf("2017-01-01 00:00:00.0"), "abc")))) + } + + test("test Timestamp with Array") { + sql("Drop table if exists adaptive") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'abc$2017/01/01:2018/01/01')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, + Row("abc", + mutable.WrappedArray + .make(Array(Timestamp.valueOf("2017-01-01 00:00:00.0"), + Timestamp.valueOf("2018-01-01 00:00:00.0"))))))) + } + + test("test DATE with Array") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'abc$2017-01-01')") + sql("select * from adaptive").show(false) + } + + test("test LONG with Array and Struct Encoding LONG --> BYTE") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'11111$abc$20:30:40')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(11111, "abc", mutable.WrappedArray.make(Array(20, 30, 40)))))) + } + + test("test LONG with Array and Struct Encoding LONG --> SHORT") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'11111$abc$200:300:400')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(11111, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap_int1.csv' into table adaptive options('delimiter'=','," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))), + Row(2, Row(700, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))), + Row(3, Row(800, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))))) + } + + test("test LONG with struct and array, Encoding LONG-->SHORT INT") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap_int2.csv' into table adaptive options('delimiter'=','," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(50000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))), + Row(2, Row(70000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))), + Row(3, Row(100000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'50000$abc$2000000:3000000:4000000')") + sql("insert into adaptive values(2,'70000$abc$2000000:3000000:4000000')") + sql("insert into adaptive values(3,'100000$abc$2000000:3000000:4000000')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(50000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))), + Row(2, Row(70000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))), + Row(3, Row(100000, "abc", mutable.WrappedArray.make(Array(2000000, 3000000, 4000000)))))) + } + + test("test LONG with struct and array, Encoding LONG-->INT") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap_int3.csv' into table adaptive options('delimiter'=','," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))), + Row(2, Row(7000000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))), + Row(3, Row(10000000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'500000$abc$200:300:52000000')") + sql("insert into adaptive values(2,'700000$abc$200:300:52000000')") + sql("insert into adaptive values(3,'10000000$abc$200:300:52000000')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))), + Row(2, Row(700000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))), + Row(3, Row(10000000, "abc", mutable.WrappedArray.make(Array(200, 300, 52000000)))))) + } + + test("test LONG with struct and array, Encoding LONG-->LONG") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'500000$abc$200:300:52000000000')") + sql("insert into adaptive values(2,'700000$abc$200:300:52000000000')") + sql("insert into adaptive values(3,'10000000$abc$200:300:52000000000')") + sql("select * from adaptive").show(false) + } + + test("test SHORT with Array and Struct Encoding SHORT -->BYTE") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'11$abc$20:30:40')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(11, "abc", mutable.WrappedArray.make(Array(20, 30, 40)))))) + } + + test("test SHORT with Array and Struct Encoding SHORT --> SHORT") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'11111$abc$200:300:400')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(11111, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))))) + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql( + s"load data inpath '$resourcesPath/adap_int1.csv' into table adaptive options('delimiter'=','," + + "'quotechar'='\"','fileheader'='roll,student','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'=':')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(500, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))), + Row(2, Row(700, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))), + Row(3, Row(800, "abc", mutable.WrappedArray.make(Array(200, 300, 400)))))) + } + + test("test Boolean with Struct and Array") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'true$abc$false:true:false')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(true, "abc", mutable.WrappedArray.make(Array(false, true, false)))))) + } + +} diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveEncodingForNullValues.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveEncodingForNullValues.scala new file mode 100644 index 00000000000..528fb697b3b --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveEncodingForNullValues.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.dataload + +import scala.collection.mutable + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +/** + * Test class of Adaptive Encoding UnSafe Column Page with Null values + * + */ + +class TestAdaptiveEncodingForNullValues + extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS adaptive") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, + "true") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS adaptive") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, + "true") + } + + test("test INT with struct and array, Encoding INT-->BYTE") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'null$abc$null:null:null')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(null, "abc", mutable.WrappedArray.make(Array(null, null, null)))))) + } + + + test("test SMALLINT with struct and array SMALLINT --> BYTE") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'null$abc$null:null:null')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(null, "abc", mutable.WrappedArray.make(Array(null, null, null)))))) + } + + + test("test BigInt with struct and array BIGINT --> BYTE") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'null$abc$null:null:null')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(null, "abc", mutable.WrappedArray.make(Array(null, null, null)))))) + } + + test("test Double with Struct and Array DOUBLE --> BYTE") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'null$abc$null:null:null')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(null, "abc", mutable.WrappedArray.make(Array(null, null, null)))))) + } + + test("test Decimal with Struct") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by " + + "'carbondata'") + sql("insert into adaptive values(1,'null$abc$null:null:null')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(null, "abc", mutable.WrappedArray.make(Array(null, null, null)))))) + } + + test("test Timestamp with Struct") { + sql("Drop table if exists adaptive") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + sql( + "create table adaptive(roll int, student struct) stored by " + + "'carbondata'") + sql("insert into adaptive values(1,'null$abc')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(null, "abc")))) + } + + test("test Timestamp with Array") { + sql("Drop table if exists adaptive") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'abc$null:null:null')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row("abc", mutable.WrappedArray.make(Array(null, null, null)))))) + } + + test("test DATE with Array") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) stored by 'carbondata'") + sql("insert into adaptive values(1,'abc$null:null:null')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row("abc", mutable.WrappedArray.make(Array(null, null, null)))))) + } + + test("test LONG with Array and Struct") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'null$abc$null:null:null')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(null, "abc", mutable.WrappedArray.make(Array(null, null, null)))))) + } + + test("test SHORT with Array and Struct") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'null$abc$null:null:null')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(null, "abc", mutable.WrappedArray.make(Array(null, null, null)))))) + } + + test("test Boolean with Struct and Array") { + sql("Drop table if exists adaptive") + sql( + "create table adaptive(roll int, student struct>) " + + "stored by 'carbondata'") + sql("insert into adaptive values(1,'null$abc$null:null:null')") + checkAnswer(sql("select * from adaptive"), + Seq(Row(1, Row(null, "abc", mutable.WrappedArray.make(Array(null, null, null)))))) + } +} diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveEncodingSafeColumnPageForComplexDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveEncodingSafeColumnPageForComplexDataType.scala new file mode 100644 index 00000000000..75d08bbe85c --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveEncodingSafeColumnPageForComplexDataType.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.dataload + +import java.io.File +import java.sql.Timestamp + +import scala.collection.mutable + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.integration.spark.testsuite.complexType.TestAdaptiveComplexType + +/** + * Test class of Adaptive Encoding Safe Column Page with Complex Data type + * + */ + +class TestAdaptiveEncodingSafeColumnPageForComplexDataType + extends QueryTest with BeforeAndAfterAll with TestAdaptiveComplexType { + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS adaptive") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, + "false") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS adaptive") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, + "true") + } + +} diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveEncodingUnsafeColumnPageForComplexDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveEncodingUnsafeColumnPageForComplexDataType.scala new file mode 100644 index 00000000000..83751956987 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAdaptiveEncodingUnsafeColumnPageForComplexDataType.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.dataload + +import java.io.{File, PrintWriter} +import java.sql.Timestamp + +import scala.collection.mutable +import scala.util.Random + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.integration.spark.testsuite.complexType.TestAdaptiveComplexType + +/** + * Test class of Adaptive Encoding UnSafe Column Page with Complex Data type + * + */ + +class TestAdaptiveEncodingUnsafeColumnPageForComplexDataType + extends QueryTest with BeforeAndAfterAll with TestAdaptiveComplexType { + + override def beforeAll(): Unit = { + + new File(CarbonProperties.getInstance().getSystemFolderLocation).delete() + sql("DROP TABLE IF EXISTS adaptive") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, + "true") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS adaptive") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, + "true") + } + + +} diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java index da34746cbf7..05754eb9a84 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java @@ -24,9 +24,11 @@ import java.util.List; import org.apache.carbondata.core.datastore.ColumnType; +import org.apache.carbondata.core.datastore.row.ComplexColumnInfo; import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.loading.complexobjects.ArrayObject; import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder; @@ -292,14 +294,10 @@ public GenericDataType deepCopy() { } @Override - public void getChildrenType(List type) { - type.add(ColumnType.COMPLEX_ARRAY); - children.getChildrenType(type); + public void getComplexColumnInfo(List columnInfoList) { + columnInfoList.add( + new ComplexColumnInfo(ColumnType.COMPLEX_ARRAY, DataTypeUtil.valueOf("array"), + name, false)); + children.getComplexColumnInfo(columnInfoList); } - - @Override public void getColumnNames(List columnNameList) { - columnNameList.add(name); - children.getColumnNames(columnNameList); - } - } \ No newline at end of file diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java index 049bf575025..68315d32c76 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java @@ -23,7 +23,7 @@ import java.util.ArrayList; import java.util.List; -import org.apache.carbondata.core.datastore.ColumnType; +import org.apache.carbondata.core.datastore.row.ComplexColumnInfo; import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; @@ -157,8 +157,5 @@ void parseComplexValue(ByteBuffer byteArrayInput, DataOutputStream dataOutputStr */ GenericDataType deepCopy(); - void getChildrenType(List type); - - void getColumnNames(List columnNameList); - + void getComplexColumnInfo(List columnInfoList); } diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java index 5d22e55ce57..c738bacc48e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java @@ -34,6 +34,7 @@ import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.ColumnType; +import org.apache.carbondata.core.datastore.row.ComplexColumnInfo; import org.apache.carbondata.core.devapi.BiDictionary; import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.dictionary.client.DictionaryClient; @@ -43,6 +44,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; @@ -112,6 +114,8 @@ public class PrimitiveDataType implements GenericDataType { private boolean isDirectDictionary; + private DataType dataType; + private PrimitiveDataType(int outputArrayIndex, int dataCounter) { this.outputArrayIndex = outputArrayIndex; this.dataCounter = dataCounter; @@ -121,25 +125,25 @@ private PrimitiveDataType(int outputArrayIndex, int dataCounter) { * constructor * * @param name - * @param parentname + * @param parentName * @param columnId - * @param dimensionOrdinal * @param isDictionary */ - public PrimitiveDataType(String name, String parentname, String columnId, int dimensionOrdinal, - boolean isDictionary, String nullformat, boolean isEmptyBadRecord) { + public PrimitiveDataType(String name, DataType dataType, String parentName, String columnId, + boolean isDictionary, String nullFormat, boolean isEmptyBadRecord) { this.name = name; - this.parentname = parentname; + this.parentname = parentName; this.columnId = columnId; this.isDictionary = isDictionary; - this.nullformat = nullformat; + this.nullformat = nullFormat; this.isEmptyBadRecord = isEmptyBadRecord; + this.dataType = dataType; } /** * Constructor * @param carbonColumn - * @param parentname + * @param parentName * @param columnId * @param carbonDimension * @param absoluteTableIdentifier @@ -149,17 +153,18 @@ public PrimitiveDataType(String name, String parentname, String columnId, int di * @param nullFormat * @param isEmptyBadRecords */ - public PrimitiveDataType(CarbonColumn carbonColumn, String parentname, String columnId, + public PrimitiveDataType(CarbonColumn carbonColumn, String parentName, String columnId, CarbonDimension carbonDimension, AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass, Map localCache, String nullFormat, Boolean isEmptyBadRecords) { this.name = carbonColumn.getColName(); - this.parentname = parentname; + this.parentname = parentName; this.columnId = columnId; this.carbonDimension = carbonDimension; this.isDictionary = isDictionaryDimension(carbonDimension); this.nullformat = nullFormat; this.isEmptyBadRecord = isEmptyBadRecords; + this.dataType = carbonColumn.getDataType(); DictionaryColumnUniqueIdentifier identifier = new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, @@ -537,14 +542,15 @@ public GenericDataType deepCopy() { dataType.setKeySize(this.keySize); dataType.setSurrogateIndex(this.index); dataType.name = this.name; + dataType.dataType = this.dataType; return dataType; } - public void getChildrenType(List type) { - type.add(ColumnType.COMPLEX_PRIMITIVE); + @Override + public void getComplexColumnInfo(List columnInfoList) { + columnInfoList.add( + new ComplexColumnInfo(ColumnType.COMPLEX_PRIMITIVE, dataType, + name, !isDictionary)); } - @Override public void getColumnNames(List columnNameList) { - columnNameList.add(name); - } } \ No newline at end of file diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java index 4d3ba8705e7..29acf951e31 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java @@ -24,9 +24,11 @@ import java.util.List; import org.apache.carbondata.core.datastore.ColumnType; +import org.apache.carbondata.core.datastore.row.ComplexColumnInfo; import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.loading.complexobjects.StructObject; import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder; @@ -323,17 +325,13 @@ public GenericDataType deepCopy() { return new StructDataType(childrenClone, this.outputArrayIndex, this.dataCounter, this.name); } - public void getChildrenType(List type) { - type.add(ColumnType.COMPLEX_STRUCT); - for (int i = 0; i < children.size(); i++) { - children.get(i).getChildrenType(type); - } - } - - @Override public void getColumnNames(List columnNameList) { - columnNameList.add(name); + @Override + public void getComplexColumnInfo(List columnInfoList) { + columnInfoList.add( + new ComplexColumnInfo(ColumnType.COMPLEX_STRUCT, DataTypeUtil.valueOf("struct"), + name, false)); for (int i = 0; i < children.size(); i++) { - children.get(i).getColumnNames(columnNameList); + children.get(i).getComplexColumnInfo(columnInfoList); } } } \ No newline at end of file diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java index 2e65772ef9c..c46b2c2a5b8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java @@ -43,6 +43,7 @@ import org.apache.carbondata.core.datastore.page.statistics.LVShortStringStatsCollector; import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector; import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.datastore.row.ComplexColumnInfo; import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; @@ -234,20 +235,18 @@ private void addComplexColumn(int index, int rowId, byte[] complexColumns) { // initialize the page if first row if (rowId == 0) { - List complexColumnType = new ArrayList<>(); - List columnNames = new ArrayList<>(); - complexDataType.getChildrenType(complexColumnType); - complexDataType.getColumnNames(columnNames); - complexDimensionPages[index] = new ComplexColumnPage(complexColumnType); + List complexColumnInfoList = new ArrayList<>(); + complexDataType.getComplexColumnInfo(complexColumnInfoList); + complexDimensionPages[index] = new ComplexColumnPage(complexColumnInfoList); try { complexDimensionPages[index] - .initialize(model.getColumnLocalDictGenMap(), columnNames, pageSize); + .initialize(model.getColumnLocalDictGenMap(), pageSize); } catch (MemoryException e) { throw new RuntimeException(e); } } - int depthInComplexColumn = complexDimensionPages[index].getDepth(); + int depthInComplexColumn = complexDimensionPages[index].getComplexColumnIndex(); // this is the result columnar data which will be added to page, // size of this list is the depth of complex column, we will fill it by input data List> encodedComplexColumnar = new ArrayList<>(depthInComplexColumn); diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index c1d5d900d6a..10888f64c4b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -44,6 +44,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datatypes.ArrayDataType; import org.apache.carbondata.processing.datatypes.GenericDataType; @@ -383,9 +384,10 @@ public static Map getComplexTypesMap(DataField[] dataFi } else if (levelInfo[1].toLowerCase().contains(CarbonCommonConstants.STRUCT)) { g.addChildren(new StructDataType(levelInfo[0], levelInfo[2], levelInfo[3])); } else { - g.addChildren(new PrimitiveDataType(levelInfo[0], levelInfo[2], levelInfo[4], - Integer.parseInt(levelInfo[5]), levelInfo[3].contains("true"), nullFormat, - isEmptyBadRecord)); + g.addChildren( + new PrimitiveDataType(levelInfo[0], DataTypeUtil.valueOf(levelInfo[1]), + levelInfo[2], levelInfo[4], levelInfo[3].contains("true"), nullFormat, + isEmptyBadRecord)); } } }