Skip to content

Commit

Permalink
[CARBONDATA-3353 ]Fixed MinMax Based Pruning for Measure column in ca…
Browse files Browse the repository at this point in the history
…se of Legacy store

Why this PR needed?

Problem:
For table created and loaded with legacy store having a measure column, while building the page min max,
min is written as max and viceversa, so blocklet level minmax is wrong. With current version, when we query with filter on measure column, measure filter pruning is skipping some blocks and giving wrong results.

Solution:
Skip MinMax based pruning in case of legacy store for measure column.

This closes #3176
  • Loading branch information
Indhumathi27 authored and ravipesala committed Apr 17, 2019
1 parent 32af97e commit a803304
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 131 deletions.
Expand Up @@ -67,7 +67,6 @@
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.util.BlockletDataMapUtil;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataFileFooterConverter;
import org.apache.carbondata.core.util.path.CarbonTablePath;

Expand Down Expand Up @@ -219,7 +218,7 @@ protected DataMapRowImpl loadBlockInfoForOldStore(CarbonRowSchema[] taskSummaryS
DataMapRowImpl summaryRow = null;
CarbonRowSchema[] schema = getFileFooterEntrySchema();
boolean[] minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(minMaxFlag, true);
FilterUtil.setMinMaxFlagForLegacyStore(minMaxFlag, segmentProperties);
long totalRowCount = 0;
for (DataFileFooter fileFooter : indexInfo) {
TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
Expand All @@ -232,19 +231,9 @@ protected DataMapRowImpl loadBlockInfoForOldStore(CarbonRowSchema[] taskSummaryS
if (null != blockMetaInfo) {
BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex();
byte[][] minValues =
BlockletDataMapUtil.updateMinValues(segmentProperties, minMaxIndex.getMinValues());
byte[][] maxValues =
BlockletDataMapUtil.updateMaxValues(segmentProperties, minMaxIndex.getMaxValues());
// update min max values in case of old store for measures as measure min/max in
// old stores in written opposite
byte[][] updatedMinValues =
CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, true);
byte[][] updatedMaxValues =
CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, false);
summaryRow = loadToUnsafeBlock(schema, taskSummarySchema, fileFooter, segmentProperties,
getMinMaxCacheColumns(), blockInfo.getFilePath(), summaryRow,
blockMetaInfo, updatedMinValues, updatedMaxValues, minMaxFlag);
blockMetaInfo, minMaxIndex.getMinValues(), minMaxIndex.getMaxValues(), minMaxFlag);
totalRowCount += fileFooter.getNumberOfRows();
}
}
Expand Down
Expand Up @@ -238,6 +238,12 @@ private List<AbstractIndex> getDataBlocks(QueryModel queryModel) throws IOExcept
LOGGER.warn("Skipping Direct Vector Filling as it is not Supported "
+ "for Legacy store prior to V3 store");
queryModel.setDirectVectorFill(false);
// Skip minmax based pruning for measure column in case of legacy store
boolean[] minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
FilterUtil.setMinMaxFlagForLegacyStore(minMaxFlag, segmentProperties);
for (BlockletInfo blockletInfo : fileFooter.getBlockletList()) {
blockletInfo.getBlockletIndex().getMinMaxIndex().setIsMinMaxSet(minMaxFlag);
}
}
readAndFillBlockletInfo(tableBlockInfos, blockInfo,
blockletDetailInfo, fileFooter, segmentProperties);
Expand Down Expand Up @@ -386,15 +392,6 @@ private void fillBlockletInfoToTableBlock(List<TableBlockInfo> tableBlockInfos,
byte[][] maxValues = blockletInfo.getBlockletIndex().getMinMaxIndex().getMaxValues();
byte[][] minValues = blockletInfo.getBlockletIndex().getMinMaxIndex().getMinValues();
if (blockletDetailInfo.isLegacyStore()) {
minValues = BlockletDataMapUtil.updateMinValues(segmentProperties,
blockletInfo.getBlockletIndex().getMinMaxIndex().getMinValues());
maxValues = BlockletDataMapUtil.updateMaxValues(segmentProperties,
blockletInfo.getBlockletIndex().getMinMaxIndex().getMaxValues());
// update min and max values in case of old store for measures as min and max is written
// opposite for measures in old store ( store <= 1.1 version)
byte[][] tempMaxValues = maxValues;
maxValues = CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, false);
minValues = CarbonUtil.updateMinMaxValues(fileFooter, tempMaxValues, minValues, true);
info.setDataBlockFromOldStore(true);
}
blockletInfo.getBlockletIndex().getMinMaxIndex().setMaxValues(maxValues);
Expand Down
Expand Up @@ -2326,4 +2326,12 @@ public static byte[] getDefaultNullValue(CarbonDimension currentBlockDimension,
return defaultValue;
}

public static void setMinMaxFlagForLegacyStore(boolean[] minMaxFlag,
SegmentProperties segmentProperties) {
int index = segmentProperties.getEachDimColumnValueSize().length + segmentProperties
.getEachComplexDimColumnValueSize().length;
Arrays.fill(minMaxFlag, 0, index, true);
Arrays.fill(minMaxFlag, index, minMaxFlag.length, false);
}

}
Expand Up @@ -524,10 +524,13 @@ public BitSet isScanRequired(byte[][] blkMaxVal, byte[][] blkMinVal, boolean[] i
isMinMaxSet[chunkIndex]);
}
} else if (isMeasurePresentInCurrentBlock) {
chunkIndex = msrColumnEvaluatorInfo.getColumnIndexInMinMaxByteArray();
isScanRequired = isScanRequired(blkMaxVal[chunkIndex], blkMinVal[chunkIndex],
msrColumnExecutorInfo.getFilterKeys(),
msrColumnEvaluatorInfo.getType());
if (isMinMaxSet[chunkIndex]) {
chunkIndex = msrColumnEvaluatorInfo.getColumnIndexInMinMaxByteArray();
isScanRequired = isScanRequired(blkMaxVal[chunkIndex], blkMinVal[chunkIndex],
msrColumnExecutorInfo.getFilterKeys(), msrColumnEvaluatorInfo.getType());
} else {
isScanRequired = true;
}
}

if (isScanRequired) {
Expand Down
Expand Up @@ -122,9 +122,13 @@ private void ifDefaultValueMatchesFilter() {
byte[] maxValue = null;
if (isMeasurePresentInCurrentBlock[0] || isDimensionPresentInCurrentBlock[0]) {
if (isMeasurePresentInCurrentBlock[0]) {
maxValue = blockMaxValue[measureChunkIndex[0]];
isScanRequired =
isScanRequired(maxValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType());
if (isMinMaxSet[measureChunkIndex[0]]) {
maxValue = blockMaxValue[measureChunkIndex[0]];
isScanRequired = isScanRequired(maxValue, msrFilterRangeValues,
msrColEvalutorInfoList.get(0).getType());
} else {
isScanRequired = true;
}
} else {
maxValue = blockMaxValue[dimensionChunkIndex[0]];
DataType dataType = dimColEvaluatorInfoList.get(0).getDimension().getDataType();
Expand Down
Expand Up @@ -120,9 +120,13 @@ private void ifDefaultValueMatchesFilter() {
byte[] maxValue = null;
if (isMeasurePresentInCurrentBlock[0] || isDimensionPresentInCurrentBlock[0]) {
if (isMeasurePresentInCurrentBlock[0]) {
maxValue = blockMaxValue[measureChunkIndex[0]];
isScanRequired =
isScanRequired(maxValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType());
if (isMinMaxSet[measureChunkIndex[0]]) {
maxValue = blockMaxValue[measureChunkIndex[0]];
isScanRequired = isScanRequired(maxValue, msrFilterRangeValues,
msrColEvalutorInfoList.get(0).getType());
} else {
isScanRequired = true;
}
} else {
maxValue = blockMaxValue[dimensionChunkIndex[0]];
DataType dataType = dimColEvaluatorInfoList.get(0).getDimension().getDataType();
Expand Down
Expand Up @@ -120,9 +120,13 @@ private void ifDefaultValueMatchesFilter() {
boolean isScanRequired = false;
if (isMeasurePresentInCurrentBlock[0] || isDimensionPresentInCurrentBlock[0]) {
if (isMeasurePresentInCurrentBlock[0]) {
minValue = blockMinValue[measureChunkIndex[0]];
isScanRequired =
isScanRequired(minValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType());
if (isMinMaxSet[measureChunkIndex[0]]) {
minValue = blockMinValue[measureChunkIndex[0]];
isScanRequired = isScanRequired(minValue, msrFilterRangeValues,
msrColEvalutorInfoList.get(0).getType());
} else {
isScanRequired = true;
}
} else {
minValue = blockMinValue[dimensionChunkIndex[0]];
DataType dataType = dimColEvaluatorInfoList.get(0).getDimension().getDataType();
Expand Down
Expand Up @@ -120,9 +120,13 @@ private void ifDefaultValueMatchesFilter() {
boolean isScanRequired = false;
if (isMeasurePresentInCurrentBlock[0] || isDimensionPresentInCurrentBlock[0]) {
if (isMeasurePresentInCurrentBlock[0]) {
minValue = blockMinValue[measureChunkIndex[0]];
isScanRequired =
isScanRequired(minValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType());
if (isMinMaxSet[measureChunkIndex[0]]) {
minValue = blockMinValue[measureChunkIndex[0]];
isScanRequired = isScanRequired(minValue, msrFilterRangeValues,
msrColEvalutorInfoList.get(0).getType());
} else {
isScanRequired = true;
}
} else {
minValue = blockMinValue[dimensionChunkIndex[0]];
DataType dataType = dimColEvaluatorInfoList.get(0).getDimension().getDataType();
Expand Down
47 changes: 0 additions & 47 deletions core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
Expand Up @@ -88,8 +88,6 @@
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.comparator.Comparator;
import org.apache.carbondata.core.util.comparator.SerializableComparator;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.BlockletHeader;
import org.apache.carbondata.format.DataChunk2;
Expand Down Expand Up @@ -2825,51 +2823,6 @@ private static long getMaxOfBlockAndFileSize(long blockSize, long fileSize) {
return maxSize;
}

/**
* This method will be used to update the min and max values and this will be used in case of
* old store where min and max values for measures are written opposite
* (i.e max values in place of min and min in place of max values)
*
* @param dataFileFooter
* @param maxValues
* @param minValues
* @param isMinValueComparison
* @return
*/
public static byte[][] updateMinMaxValues(DataFileFooter dataFileFooter, byte[][] maxValues,
byte[][] minValues, boolean isMinValueComparison) {
byte[][] updatedMinMaxValues = new byte[maxValues.length][];
if (isMinValueComparison) {
System.arraycopy(minValues, 0, updatedMinMaxValues, 0, minValues.length);
} else {
System.arraycopy(maxValues, 0, updatedMinMaxValues, 0, maxValues.length);
}
for (int i = 0; i < maxValues.length; i++) {
// update min and max values only for measures
if (!dataFileFooter.getColumnInTable().get(i).isDimensionColumn()) {
DataType dataType = dataFileFooter.getColumnInTable().get(i).getDataType();
SerializableComparator comparator = Comparator.getComparator(dataType);
int compare;
if (isMinValueComparison) {
compare = comparator
.compare(DataTypeUtil.getMeasureObjectFromDataType(maxValues[i], dataType),
DataTypeUtil.getMeasureObjectFromDataType(minValues[i], dataType));
if (compare < 0) {
updatedMinMaxValues[i] = maxValues[i];
}
} else {
compare = comparator
.compare(DataTypeUtil.getMeasureObjectFromDataType(minValues[i], dataType),
DataTypeUtil.getMeasureObjectFromDataType(maxValues[i], dataType));
if (compare > 0) {
updatedMinMaxValues[i] = minValues[i];
}
}
}
}
return updatedMinMaxValues;
}

/**
* Generate the blockid as per the block path
*
Expand Down
Expand Up @@ -1011,52 +1011,6 @@ public void testSplitSchemaStringToMapWithMultiplesOfSplitLen() {
Assert.assertTrue(schemaString.length() > schema.length());
}

@Test
public void testUpdateMinMaxValues() {
// create dimension and measure column schema
ColumnSchema dimensionColumnSchema = createColumnSchema(DataTypes.STRING, true);
ColumnSchema measureColumnSchema = createColumnSchema(DataTypes.DOUBLE, false);
List<ColumnSchema> columnSchemas = new ArrayList<>(2);
columnSchemas.add(dimensionColumnSchema);
columnSchemas.add(measureColumnSchema);
// create data file footer object
DataFileFooter fileFooter = new DataFileFooter();
fileFooter.setColumnInTable(columnSchemas);
// initialise the expected values
int expectedMaxValue = 5;
int expectedMinValue = 2;
double expectedMeasureMaxValue = 28.74;
double expectedMeasureMinValue = -21.46;
// initialise the minValues
byte[][] minValues = new byte[2][];
minValues[0] = new byte[] { 2 };
ByteBuffer buffer = ByteBuffer.allocate(8);
minValues[1] = (byte[]) buffer.putDouble(28.74).flip().array();
buffer = ByteBuffer.allocate(8);
// initialise the maxValues
byte[][] maxValues = new byte[2][];
maxValues[0] = new byte[] { 5 };
maxValues[1] = (byte[]) buffer.putDouble(-21.46).flip().array();
byte[][] updateMaxValues =
CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, false);
byte[][] updateMinValues =
CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, true);
// compare max values
assert (expectedMaxValue == ByteBuffer.wrap(updateMaxValues[0]).get());
assert (expectedMeasureMaxValue == ByteBuffer.wrap(updateMaxValues[1]).getDouble());

// compare min values
assert (expectedMinValue == ByteBuffer.wrap(updateMinValues[0]).get());
assert (expectedMeasureMinValue == ByteBuffer.wrap(updateMinValues[1]).getDouble());
}

private ColumnSchema createColumnSchema(DataType dataType, boolean isDimensionColumn) {
ColumnSchema columnSchema = new ColumnSchema();
columnSchema.setDataType(dataType);
columnSchema.setDimensionColumn(isDimensionColumn);
return columnSchema;
}

private String generateString(int length) {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < length; i++) {
Expand Down

0 comments on commit a803304

Please sign in to comment.