From 66fc90ea675bacad033956401d7d99018f570124 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Fri, 1 Mar 2019 09:46:12 +0530 Subject: [PATCH] Improve datamap prune performance further more --- .../blockletindex/BlockDataMap.java | 12 +- .../blockletindex/BlockletDataMap.java | 2 +- .../core/indexstore/row/DataMapRow.java | 6 +- .../core/indexstore/row/UnsafeDataMapRow.java | 145 +++++++++++------- 4 files changed, 100 insertions(+), 65 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java index 6b9165d1e95..4956ba298bb 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java @@ -644,7 +644,9 @@ private List prune(FilterResolverIntf filterExp) { int hitBlocklets = 0; if (filterExp == null) { for (int i = 0; i < numEntries; i++) { - DataMapRow safeRow = memoryDMStore.getDataMapRow(schema, i).convertToSafeRow(); + DataMapRow row = memoryDMStore.getDataMapRow(schema, i); + DataMapRow safeRow = row.convertToSafeRow(FILE_PATH_INDEX, 0); + safeRow.setInt(row.getInt(ROW_COUNT_INDEX), ROW_COUNT_INDEX); blocklets.add(createBlocklet(safeRow, getFileNameWithFilePath(safeRow, filePath, -1), getBlockletId(safeRow), false)); } @@ -669,7 +671,9 @@ private List prune(FilterResolverIntf filterExp) { addBlockBasedOnMinMaxValue(filterExecuter, minMaxRow.maxVals, minMaxRow.minVals, minMaxRow.flags, fileName, blockletId); if (isValid) { - blocklets.add(createBlocklet(unSafeRow.convertToSafeRow(), fileName, blockletId, + DataMapRow row = unSafeRow.convertToSafeRow(LOCATIONS, unSafeRow.getCurrentPointer()); + row.setInt(unSafeRow.getInt(ROW_COUNT_INDEX), ROW_COUNT_INDEX); + blocklets.add(createBlocklet(row, fileName, blockletId, useMinMaxForPruning)); hitBlocklets += getBlockletNumOfEntry(entryIndex); } @@ -840,7 +844,7 @@ private ExtendedBlocklet createBlockletFromRelativeBlockletId(int absoluteBlockl } } DataMapRow safeRow = - memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), rowIndex).convertToSafeRow(); + memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), rowIndex).convertToSafeRow(0, 0); String filePath = getFilePath(); return createBlocklet(safeRow, getFileNameWithFilePath(safeRow, filePath, -1), relativeBlockletId, @@ -872,7 +876,7 @@ public String getTableTaskInfo(int index) { private DataMapRowImpl getRow(DataMapRow row, int index, int position) { DataMapRow minMaxRow = row.getRow(index, position); - DataMapRowImpl safeRow = (DataMapRowImpl)minMaxRow.convertToSafeRow(); + DataMapRowImpl safeRow = (DataMapRowImpl)minMaxRow.convertToSafeRow(0, 0); safeRow.setTotalLengthInBytes(minMaxRow.getPosition() + safeRow.getTotalSizeInBytes()); return safeRow; } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index 9c7823c84e8..158f34b1dd4 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -225,7 +225,7 @@ public ExtendedBlocklet getDetailedBlocklet(String blockletId) { } int absoluteBlockletId = Integer.parseInt(blockletId); DataMapRow safeRow = memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), absoluteBlockletId) - .convertToSafeRow(); + .convertToSafeRow(FILE_PATH_INDEX, 0); short relativeBlockletId = safeRow.getShort(BLOCKLET_ID_INDEX); String filePath = getFilePath(); return createBlocklet(safeRow, getFileNameWithFilePath(safeRow, filePath, -1), diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java index 2dbb61dcad4..9500a49e934 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java @@ -114,7 +114,7 @@ public int getColumnCount() { * * @return */ - public DataMapRow convertToSafeRow() { + public DataMapRow convertToSafeRow(int ordinal, int position) { return this; } @@ -127,4 +127,8 @@ public void setSchemas(CarbonRowSchema[] schemas) { public int getPosition() { return 0; } + + public int getCurrentPointer() { + return 0; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java index bab6aa9bd21..7846f21b82b 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java @@ -39,6 +39,8 @@ public class UnsafeDataMapRow extends DataMapRow { private int pointer; + private int currentPointer; + public UnsafeDataMapRow(CarbonRowSchema[] schemas, MemoryBlock block, int pointer) { super(schemas); this.block = block; @@ -71,6 +73,7 @@ public UnsafeDataMapRow(CarbonRowSchema[] schemas, MemoryBlock block, int pointe byte[] data = new byte[length]; getUnsafe().copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + position, data, BYTE_ARRAY_OFFSET, data.length); + currentPointer += position + data.length; return data; } @@ -200,71 +203,82 @@ private int getLengthInBytes(int ordinal, int position) { * * @return */ - public DataMapRow convertToSafeRow() { + public DataMapRow convertToSafeRow(int ordinal, int position) { DataMapRowImpl row = new DataMapRowImpl(schemas); int runningLength = 0; - for (int i = 0; i < schemas.length; i++) { + int i = 0; + if (position > 0) { + runningLength = position; + i = ordinal; + } + for (; i < schemas.length; i++) { CarbonRowSchema schema = schemas[i]; + boolean readData = i >= ordinal; switch (schema.getSchemaType()) { case FIXED: DataType dataType = schema.getDataType(); if (dataType == DataTypes.BYTE) { - row.setByte( - getUnsafe().getByte( - block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength), - i); + if (readData) { + row.setByte(getUnsafe() + .getByte(block.getBaseObject(), + block.getBaseOffset() + pointer + runningLength), + i); + } runningLength += schema.getLength(); } else if (dataType == DataTypes.BOOLEAN) { - row.setBoolean( - getUnsafe().getBoolean( - block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength), - i); + if (readData) { + row.setBoolean(getUnsafe().getBoolean(block.getBaseObject(), + block.getBaseOffset() + pointer + runningLength), i); + } runningLength += schema.getLength(); } else if (dataType == DataTypes.SHORT) { - row.setShort( - getUnsafe().getShort( - block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength), - i); + if (readData) { + row.setShort(getUnsafe() + .getShort(block.getBaseObject(), + block.getBaseOffset() + pointer + runningLength), + i); + } runningLength += schema.getLength(); } else if (dataType == DataTypes.INT) { - row.setInt( - getUnsafe().getInt( - block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength), - i); + if (readData) { + row.setInt(getUnsafe() + .getInt(block.getBaseObject(), + block.getBaseOffset() + pointer + runningLength), + i); + } runningLength += schema.getLength(); } else if (dataType == DataTypes.LONG) { - row.setLong( - getUnsafe().getLong( - block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength), - i); + if (readData) { + row.setLong(getUnsafe() + .getLong(block.getBaseObject(), + block.getBaseOffset() + pointer + runningLength), + i); + } runningLength += schema.getLength(); } else if (dataType == DataTypes.FLOAT) { - row.setFloat( - getUnsafe().getFloat(block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength), - i); + if (readData) { + row.setFloat(getUnsafe() + .getFloat(block.getBaseObject(), + block.getBaseOffset() + pointer + runningLength), + i); + } runningLength += schema.getLength(); } else if (dataType == DataTypes.DOUBLE) { - row.setDouble( - getUnsafe().getDouble(block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength), - i); + if (readData) { + row.setDouble(getUnsafe().getDouble(block.getBaseObject(), + block.getBaseOffset() + pointer + runningLength), i); + } runningLength += schema.getLength(); } else if (dataType == DataTypes.BYTE_ARRAY) { - byte[] data = new byte[schema.getLength()]; - getUnsafe().copyMemory( - block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength, - data, - BYTE_ARRAY_OFFSET, - data.length); - row.setByteArray(data, i); - runningLength += data.length; + int len = schema.getLength(); + if (readData) { + byte[] data = new byte[len]; + getUnsafe().copyMemory(block.getBaseObject(), + block.getBaseOffset() + pointer + runningLength, data, BYTE_ARRAY_OFFSET, + data.length); + row.setByteArray(data, i); + } + runningLength += len; } else { throw new UnsupportedOperationException( "unsupported data type for unsafe storage: " + schema.getDataType()); @@ -274,28 +288,37 @@ public DataMapRow convertToSafeRow() { int length = getUnsafe() .getShort(block.getBaseObject(), block.getBaseOffset() + pointer + runningLength); runningLength += 2; - byte[] data = new byte[length]; - getUnsafe().copyMemory(block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength, - data, BYTE_ARRAY_OFFSET, data.length); - runningLength += data.length; - row.setByteArray(data, i); + if (readData) { + byte[] data = new byte[length]; + getUnsafe() + .copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + runningLength, + data, BYTE_ARRAY_OFFSET, data.length); + row.setByteArray(data, i); + } + runningLength += length; break; case VARIABLE_INT: int length2 = getUnsafe() .getInt(block.getBaseObject(), block.getBaseOffset() + pointer + runningLength); runningLength += 4; - byte[] data2 = new byte[length2]; - getUnsafe().copyMemory(block.getBaseObject(), - block.getBaseOffset() + pointer + runningLength, - data2, BYTE_ARRAY_OFFSET, data2.length); - runningLength += data2.length; - row.setByteArray(data2, i); + if (readData) { + byte[] data2 = new byte[length2]; + getUnsafe() + .copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + runningLength, + data2, BYTE_ARRAY_OFFSET, data2.length); + row.setByteArray(data2, i); + } + runningLength += length2; break; case STRUCT: - DataMapRow structRow = ((UnsafeDataMapRow) getRow(i)).convertToSafeRow(); - row.setRow(structRow, i); - runningLength += structRow.getTotalSizeInBytes(); + DataMapRow mapRow = getRow(i, runningLength); + if (readData) { + DataMapRow structRow = mapRow.convertToSafeRow(0, 0); + row.setRow(structRow, i); + runningLength += structRow.getTotalSizeInBytes(); + } else { + runningLength += mapRow.getTotalSizeInBytes(); + } break; default: throw new UnsupportedOperationException( @@ -333,4 +356,8 @@ private int getPosition(int ordinal) { @Override public int getPosition() { return pointer; } + + public int getCurrentPointer() { + return currentPointer; + } }