From 62d9adcf9203cb478f9d572fb3792f48361736e7 Mon Sep 17 00:00:00 2001 From: manishgupta88 Date: Thu, 19 Apr 2018 20:12:09 +0530 Subject: [PATCH] Code synced from PR #2149 for compaction performance. Using this code compaction will perform IO at blocklet level and uncompression at page level --- ...eVariableLengthDimesionDataChunkStore.java | 200 +++++++++++------- .../scan/scanner/AbstractBlockletScanner.java | 16 +- 2 files changed, 126 insertions(+), 90 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java index 36b2bd80329..c2e5b936258 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java @@ -44,10 +44,20 @@ public class UnsafeVariableLengthDimesionDataChunkStore */ private long dataPointersOffsets; + /** + * Reusable data array + * this will be useful for vector scenario, as it will be created once and filled every time + * if new data length is bigger than exiting data length then create new data with bigger length + * and assign to value + */ + private byte[] value; + public UnsafeVariableLengthDimesionDataChunkStore(long totalSize, boolean isInvertedIdex, int numberOfRows) { super(totalSize, isInvertedIdex, numberOfRows); this.numberOfRows = numberOfRows; + // initials size assigning to some random value + this.value = new byte[20]; } /** @@ -78,70 +88,93 @@ public UnsafeVariableLengthDimesionDataChunkStore(long totalSize, boolean isInve // start position will be used to store the current data position int startOffset = 0; - // position from where offsets will start - long pointerOffsets = this.dataPointersOffsets; // as first position will be start from 2 byte as data is stored first in the memory block // we need to skip first two bytes this is because first two bytes will be length of the data // which we have to skip - CarbonUnsafe.getUnsafe().putInt(dataPageMemoryBlock.getBaseObject(), - dataPageMemoryBlock.getBaseOffset() + pointerOffsets, - CarbonCommonConstants.SHORT_SIZE_IN_BYTE); - // incrementing the pointers as first value is already filled and as we are storing as int - // we need to increment the 4 bytes to set the position of the next value to set - pointerOffsets += CarbonCommonConstants.INT_SIZE_IN_BYTE; + int [] dataOffsets = new int[numberOfRows]; + dataOffsets[0] = CarbonCommonConstants.SHORT_SIZE_IN_BYTE; // creating a byte buffer which will wrap the length of the row - // using byte buffer as unsafe will return bytes in little-endian encoding - ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.SHORT_SIZE_IN_BYTE); - // store length of data - byte[] length = new byte[CarbonCommonConstants.SHORT_SIZE_IN_BYTE]; - // as first offset is already stored, we need to start from the 2nd row in data array + ByteBuffer buffer = ByteBuffer.wrap(data); for (int i = 1; i < numberOfRows; i++) { - // first copy the length of previous row - CarbonUnsafe.getUnsafe().copyMemory(dataPageMemoryBlock.getBaseObject(), - dataPageMemoryBlock.getBaseOffset() + startOffset, length, CarbonUnsafe.BYTE_ARRAY_OFFSET, - CarbonCommonConstants.SHORT_SIZE_IN_BYTE); - buffer.put(length); - buffer.flip(); + buffer.position(startOffset); // so current row position will be // previous row length + 2 bytes used for storing previous row data - startOffset += CarbonCommonConstants.SHORT_SIZE_IN_BYTE + buffer.getShort(); + startOffset += buffer.getShort() + CarbonCommonConstants.SHORT_SIZE_IN_BYTE; // as same byte buffer is used to avoid creating many byte buffer for each row // we need to clear the byte buffer - buffer.clear(); - // now put the offset of current row, here we need to add 2 more bytes as current will - // also have length part so we have to skip length - CarbonUnsafe.getUnsafe().putInt(dataPageMemoryBlock.getBaseObject(), - dataPageMemoryBlock.getBaseOffset() + pointerOffsets, - startOffset + CarbonCommonConstants.SHORT_SIZE_IN_BYTE); - // incrementing the pointers as first value is already filled and as we are storing as int - // we need to increment the 4 bytes to set the position of the next value to set - pointerOffsets += CarbonCommonConstants.INT_SIZE_IN_BYTE; + dataOffsets[i] = startOffset + CarbonCommonConstants.SHORT_SIZE_IN_BYTE; } - + CarbonUnsafe.getUnsafe().copyMemory(dataOffsets, CarbonUnsafe.INT_ARRAY_OFFSET, + dataPageMemoryBlock.getBaseObject(), + dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets, + dataOffsets.length * CarbonCommonConstants.INT_SIZE_IN_BYTE); } /** * Below method will be used to get the row based on row id passed - * + * Getting the row from unsafe works in below logic + * 1. if inverted index is present then get the row id based on reverse inverted index + * 2. get the current row id data offset + * 3. if it's not a last row- get the next row offset + * Subtract the current row offset + 2 bytes(to skip the data length) with next row offset + * 4. if it's last row + * subtract the current row offset + 2 bytes(to skip the data length) with complete data length * @param rowId * @return row */ @Override public byte[] getRow(int rowId) { + // get the actual row id + rowId = getRowId(rowId); + // get offset of data in unsafe + int currentDataOffset = getOffSet(rowId); + // get the data length + short length = getLength(rowId, currentDataOffset); + // create data array + byte[] data = new byte[length]; + // fill the row data + fillRowInternal(length, data, currentDataOffset); + return data; + } + + /** + * Returns the actual row id for data + * if inverted index is present then get the row id based on reverse inverted index + * otherwise return the same row id + * @param rowId row id + * @return actual row id + */ + private int getRowId(int rowId) { // if column was explicitly sorted we need to get the rowid based inverted index reverse if (isExplicitSorted) { rowId = CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(), dataPageMemoryBlock.getBaseOffset() + this.invertedIndexReverseOffset + ((long)rowId * CarbonCommonConstants.INT_SIZE_IN_BYTE)); } - // now to get the row from memory block we need to do following thing - // 1. first get the current offset - // 2. if it's not a last row- get the next row offset - // Subtract the current row offset + 2 bytes(to skip the data length) with next row offset - // else subtract the current row offset + 2 bytes(to skip the data length) - // with complete data length - int currentDataOffset = CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(), - dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + (rowId + return rowId; + } + + /** + * get data offset based on current row id + * @param rowId row id + * @return data offset + */ + private int getOffSet(int rowId) { + return CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(), + dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + ((long)rowId * CarbonCommonConstants.INT_SIZE_IN_BYTE)); + } + + /** + * To get the length of data for row id + * if it's not a last row- get the next row offset + * Subtract the current row offset + 2 bytes(to skip the data length) with next row offset + * if it's last row + * subtract the current row offset + 2 bytes(to skip the data length) with complete data length + * @param rowId rowId + * @param currentDataOffset current data offset + * @return length of row + */ + private short getLength(int rowId, int currentDataOffset) { short length = 0; // calculating the length of data if (rowId < numberOfRows - 1) { @@ -154,34 +187,70 @@ public UnsafeVariableLengthDimesionDataChunkStore(long totalSize, boolean isInve // for last record we need to subtract with data length length = (short) (this.dataLength - currentDataOffset); } - byte[] data = new byte[length]; + return length; + } + + /** + * Return the row from unsafe + * @param length length of the data + * @param data data array + * @param currentDataOffset current data offset + */ + private void fillRowInternal(short length, byte[] data, int currentDataOffset) { CarbonUnsafe.getUnsafe().copyMemory(dataPageMemoryBlock.getBaseObject(), dataPageMemoryBlock.getBaseOffset() + currentDataOffset, data, CarbonUnsafe.BYTE_ARRAY_OFFSET, length); - return data; } + /** + * + * Below method will be used to put the row in vector based on row id passed + * Getting the row from unsafe works in below logic + * 1. if inverted index is present then get the row id based on reverse inverted index + * 2. get the current row id data offset + * 3. if it's not a last row- get the next row offset + * Subtract the current row offset + 2 bytes(to skip the data length) with next row offset + * 4. if it's last row + * subtract the current row offset + 2 bytes(to skip the data length) with complete data length + * @param rowId row id + * @param vector vector to be filled + * @param vectorRow vector row id + * + */ @Override public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) { - byte[] value = getRow(rowId); + // get the row id from reverse inverted index based on row id + rowId = getRowId(rowId); + // get the current row offset + int currentDataOffset = getOffSet(rowId); + // get the row data length + short length = getLength(rowId, currentDataOffset); + // check if value length is less the current data length + // then create a new array else use the same + if (length > value.length) { + value = new byte[length]; + } + // get the row from unsafe + fillRowInternal(length, value, currentDataOffset); DataType dt = vector.getType(); - if ((!(dt == DataTypes.STRING) && value.length == 0) || ByteUtil.UnsafeComparer.INSTANCE - .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) { + if ((!(dt == DataTypes.STRING) && length == 0) || ByteUtil.UnsafeComparer.INSTANCE + .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0, + CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, value, 0, length)) { vector.putNull(vectorRow); } else { if (dt == DataTypes.STRING) { - vector.putBytes(vectorRow, 0, value.length, value); + vector.putBytes(vectorRow, 0, length, value); } else if (dt == DataTypes.BOOLEAN) { vector.putBoolean(vectorRow, ByteUtil.toBoolean(value[0])); } else if (dt == DataTypes.SHORT) { - vector.putShort(vectorRow, ByteUtil.toShort(value, 0, value.length)); + vector.putShort(vectorRow, ByteUtil.toShort(value, 0, length)); } else if (dt == DataTypes.INT) { - vector.putInt(vectorRow, ByteUtil.toInt(value, 0, value.length)); + vector.putInt(vectorRow, ByteUtil.toInt(value, 0, length)); } else if (dt == DataTypes.LONG) { - vector.putLong(vectorRow, DataTypeUtil - .getDataBasedOnRestructuredDataType(value, vector.getBlockDataType(), 0, - value.length)); + vector.putLong(vectorRow, + DataTypeUtil.getDataBasedOnRestructuredDataType(value, vector.getBlockDataType(), 0, + length)); } else if (dt == DataTypes.TIMESTAMP) { - vector.putLong(vectorRow, ByteUtil.toLong(value, 0, value.length) * 1000L); + vector.putLong(vectorRow, ByteUtil.toLong(value, 0, length) * 1000L); } } } @@ -189,32 +258,13 @@ public UnsafeVariableLengthDimesionDataChunkStore(long totalSize, boolean isInve /** * to compare the two byte array * - * @param index index of first byte array + * @param rowId index of first byte array * @param compareValue value of to be compared * @return compare result */ - @Override public int compareTo(int index, byte[] compareValue) { - // now to get the row from memory block we need to do following thing - // 1. first get the current offset - // 2. if it's not a last row- get the next row offset - // Subtract the current row offset + 2 bytes(to skip the data length) with next row offset - // else subtract the current row offset - // with complete data length get the offset of set of data - int currentDataOffset = CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(), - dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + ((long)index - * CarbonCommonConstants.INT_SIZE_IN_BYTE * 1L)); - short length = 0; - // calculating the length of data - if (index < numberOfRows - 1) { - int OffsetOfNextdata = CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(), - dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + ((index + 1) - * CarbonCommonConstants.INT_SIZE_IN_BYTE)); - length = (short) (OffsetOfNextdata - (currentDataOffset - + CarbonCommonConstants.SHORT_SIZE_IN_BYTE)); - } else { - // for last record we need to subtract with data length - length = (short) (this.dataLength - currentDataOffset); - } + @Override public int compareTo(int rowId, byte[] compareValue) { + int currentDataOffset = getOffSet(rowId);; + short length = getLength(rowId, currentDataOffset); // as this class handles this variable length data, so filter value can be // smaller or bigger than than actual data, so we need to take the smaller length int compareResult; @@ -233,4 +283,4 @@ public UnsafeVariableLengthDimesionDataChunkStore(long totalSize, boolean isInve return length - compareValue.length; } -} +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java index bf26ca32580..03a4b0cca3e 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java @@ -93,18 +93,6 @@ public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) { scannedResult.setMeasureChunks(columnPages); scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks); scannedResult.setMsrRawColumnChunks(measureRawColumnChunks); - if (blockExecutionInfo.isPrefetchBlocklet()) { - for (int i = 0; i < dimensionRawColumnChunks.length; i++) { - if (dimensionRawColumnChunks[i] != null) { - dimensionColumnDataChunks[i] = dimensionRawColumnChunks[i].convertToDimColDataChunks(); - } - } - for (int i = 0; i < measureRawColumnChunks.length; i++) { - if (measureRawColumnChunks[i] != null) { - columnPages[i] = measureRawColumnChunks[i].convertToColumnPage(); - } - } - } int[] numberOfRows = null; if (blockExecutionInfo.getAllSelectedDimensionBlocksIndexes().length > 0) { for (int i = 0; i < dimensionRawColumnChunks.length; i++) { @@ -137,9 +125,7 @@ public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) { } } scannedResult.setNumberOfRows(numberOfRows); - if (!blockExecutionInfo.isPrefetchBlocklet()) { - scannedResult.fillDataChunks(); - } + scannedResult.fillDataChunks(); // adding statistics for carbon scan time QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap() .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);