Skip to content

Commit

Permalink
Improve datamap prune performance further more
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Mar 1, 2019
1 parent 1ac9320 commit 66fc90e
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 65 deletions.
Expand Up @@ -644,7 +644,9 @@ private List<Blocklet> prune(FilterResolverIntf filterExp) {
int hitBlocklets = 0;
if (filterExp == null) {
for (int i = 0; i < numEntries; i++) {
DataMapRow safeRow = memoryDMStore.getDataMapRow(schema, i).convertToSafeRow();
DataMapRow row = memoryDMStore.getDataMapRow(schema, i);
DataMapRow safeRow = row.convertToSafeRow(FILE_PATH_INDEX, 0);
safeRow.setInt(row.getInt(ROW_COUNT_INDEX), ROW_COUNT_INDEX);
blocklets.add(createBlocklet(safeRow, getFileNameWithFilePath(safeRow, filePath, -1),
getBlockletId(safeRow), false));
}
Expand All @@ -669,7 +671,9 @@ private List<Blocklet> prune(FilterResolverIntf filterExp) {
addBlockBasedOnMinMaxValue(filterExecuter, minMaxRow.maxVals, minMaxRow.minVals,
minMaxRow.flags, fileName, blockletId);
if (isValid) {
blocklets.add(createBlocklet(unSafeRow.convertToSafeRow(), fileName, blockletId,
DataMapRow row = unSafeRow.convertToSafeRow(LOCATIONS, unSafeRow.getCurrentPointer());
row.setInt(unSafeRow.getInt(ROW_COUNT_INDEX), ROW_COUNT_INDEX);
blocklets.add(createBlocklet(row, fileName, blockletId,
useMinMaxForPruning));
hitBlocklets += getBlockletNumOfEntry(entryIndex);
}
Expand Down Expand Up @@ -840,7 +844,7 @@ private ExtendedBlocklet createBlockletFromRelativeBlockletId(int absoluteBlockl
}
}
DataMapRow safeRow =
memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), rowIndex).convertToSafeRow();
memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), rowIndex).convertToSafeRow(0, 0);
String filePath = getFilePath();
return createBlocklet(safeRow, getFileNameWithFilePath(safeRow, filePath, -1),
relativeBlockletId,
Expand Down Expand Up @@ -872,7 +876,7 @@ public String getTableTaskInfo(int index) {

private DataMapRowImpl getRow(DataMapRow row, int index, int position) {
DataMapRow minMaxRow = row.getRow(index, position);
DataMapRowImpl safeRow = (DataMapRowImpl)minMaxRow.convertToSafeRow();
DataMapRowImpl safeRow = (DataMapRowImpl)minMaxRow.convertToSafeRow(0, 0);
safeRow.setTotalLengthInBytes(minMaxRow.getPosition() + safeRow.getTotalSizeInBytes());
return safeRow;
}
Expand Down
Expand Up @@ -225,7 +225,7 @@ public ExtendedBlocklet getDetailedBlocklet(String blockletId) {
}
int absoluteBlockletId = Integer.parseInt(blockletId);
DataMapRow safeRow = memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), absoluteBlockletId)
.convertToSafeRow();
.convertToSafeRow(FILE_PATH_INDEX, 0);
short relativeBlockletId = safeRow.getShort(BLOCKLET_ID_INDEX);
String filePath = getFilePath();
return createBlocklet(safeRow, getFileNameWithFilePath(safeRow, filePath, -1),
Expand Down
Expand Up @@ -114,7 +114,7 @@ public int getColumnCount() {
*
* @return
*/
public DataMapRow convertToSafeRow() {
public DataMapRow convertToSafeRow(int ordinal, int position) {
return this;
}

Expand All @@ -127,4 +127,8 @@ public void setSchemas(CarbonRowSchema[] schemas) {
public int getPosition() {
return 0;
}

public int getCurrentPointer() {
return 0;
}
}
Expand Up @@ -39,6 +39,8 @@ public class UnsafeDataMapRow extends DataMapRow {

private int pointer;

private int currentPointer;

public UnsafeDataMapRow(CarbonRowSchema[] schemas, MemoryBlock block, int pointer) {
super(schemas);
this.block = block;
Expand Down Expand Up @@ -71,6 +73,7 @@ public UnsafeDataMapRow(CarbonRowSchema[] schemas, MemoryBlock block, int pointe
byte[] data = new byte[length];
getUnsafe().copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + position, data,
BYTE_ARRAY_OFFSET, data.length);
currentPointer += position + data.length;
return data;
}

Expand Down Expand Up @@ -200,71 +203,82 @@ private int getLengthInBytes(int ordinal, int position) {
*
* @return
*/
public DataMapRow convertToSafeRow() {
public DataMapRow convertToSafeRow(int ordinal, int position) {
DataMapRowImpl row = new DataMapRowImpl(schemas);
int runningLength = 0;
for (int i = 0; i < schemas.length; i++) {
int i = 0;
if (position > 0) {
runningLength = position;
i = ordinal;
}
for (; i < schemas.length; i++) {
CarbonRowSchema schema = schemas[i];
boolean readData = i >= ordinal;
switch (schema.getSchemaType()) {
case FIXED:
DataType dataType = schema.getDataType();
if (dataType == DataTypes.BYTE) {
row.setByte(
getUnsafe().getByte(
block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength),
i);
if (readData) {
row.setByte(getUnsafe()
.getByte(block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength),
i);
}
runningLength += schema.getLength();
} else if (dataType == DataTypes.BOOLEAN) {
row.setBoolean(
getUnsafe().getBoolean(
block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength),
i);
if (readData) {
row.setBoolean(getUnsafe().getBoolean(block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength), i);
}
runningLength += schema.getLength();
} else if (dataType == DataTypes.SHORT) {
row.setShort(
getUnsafe().getShort(
block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength),
i);
if (readData) {
row.setShort(getUnsafe()
.getShort(block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength),
i);
}
runningLength += schema.getLength();
} else if (dataType == DataTypes.INT) {
row.setInt(
getUnsafe().getInt(
block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength),
i);
if (readData) {
row.setInt(getUnsafe()
.getInt(block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength),
i);
}
runningLength += schema.getLength();
} else if (dataType == DataTypes.LONG) {
row.setLong(
getUnsafe().getLong(
block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength),
i);
if (readData) {
row.setLong(getUnsafe()
.getLong(block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength),
i);
}
runningLength += schema.getLength();
} else if (dataType == DataTypes.FLOAT) {
row.setFloat(
getUnsafe().getFloat(block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength),
i);
if (readData) {
row.setFloat(getUnsafe()
.getFloat(block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength),
i);
}
runningLength += schema.getLength();
} else if (dataType == DataTypes.DOUBLE) {
row.setDouble(
getUnsafe().getDouble(block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength),
i);
if (readData) {
row.setDouble(getUnsafe().getDouble(block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength), i);
}
runningLength += schema.getLength();
} else if (dataType == DataTypes.BYTE_ARRAY) {
byte[] data = new byte[schema.getLength()];
getUnsafe().copyMemory(
block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength,
data,
BYTE_ARRAY_OFFSET,
data.length);
row.setByteArray(data, i);
runningLength += data.length;
int len = schema.getLength();
if (readData) {
byte[] data = new byte[len];
getUnsafe().copyMemory(block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength, data, BYTE_ARRAY_OFFSET,
data.length);
row.setByteArray(data, i);
}
runningLength += len;
} else {
throw new UnsupportedOperationException(
"unsupported data type for unsafe storage: " + schema.getDataType());
Expand All @@ -274,28 +288,37 @@ public DataMapRow convertToSafeRow() {
int length = getUnsafe()
.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + runningLength);
runningLength += 2;
byte[] data = new byte[length];
getUnsafe().copyMemory(block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength,
data, BYTE_ARRAY_OFFSET, data.length);
runningLength += data.length;
row.setByteArray(data, i);
if (readData) {
byte[] data = new byte[length];
getUnsafe()
.copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + runningLength,
data, BYTE_ARRAY_OFFSET, data.length);
row.setByteArray(data, i);
}
runningLength += length;
break;
case VARIABLE_INT:
int length2 = getUnsafe()
.getInt(block.getBaseObject(), block.getBaseOffset() + pointer + runningLength);
runningLength += 4;
byte[] data2 = new byte[length2];
getUnsafe().copyMemory(block.getBaseObject(),
block.getBaseOffset() + pointer + runningLength,
data2, BYTE_ARRAY_OFFSET, data2.length);
runningLength += data2.length;
row.setByteArray(data2, i);
if (readData) {
byte[] data2 = new byte[length2];
getUnsafe()
.copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + runningLength,
data2, BYTE_ARRAY_OFFSET, data2.length);
row.setByteArray(data2, i);
}
runningLength += length2;
break;
case STRUCT:
DataMapRow structRow = ((UnsafeDataMapRow) getRow(i)).convertToSafeRow();
row.setRow(structRow, i);
runningLength += structRow.getTotalSizeInBytes();
DataMapRow mapRow = getRow(i, runningLength);
if (readData) {
DataMapRow structRow = mapRow.convertToSafeRow(0, 0);
row.setRow(structRow, i);
runningLength += structRow.getTotalSizeInBytes();
} else {
runningLength += mapRow.getTotalSizeInBytes();
}
break;
default:
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -333,4 +356,8 @@ private int getPosition(int ordinal) {
@Override public int getPosition() {
return pointer;
}

public int getCurrentPointer() {
return currentPointer;
}
}

0 comments on commit 66fc90e

Please sign in to comment.