Skip to content

Commit

Permalink
Added support for inverted index and delete delta for direct scan que…
Browse files Browse the repository at this point in the history
…ries
  • Loading branch information
ravipesala committed Oct 18, 2018
1 parent 3034af0 commit eed6e00
Show file tree
Hide file tree
Showing 20 changed files with 958 additions and 25 deletions.
Expand Up @@ -1855,7 +1855,7 @@ public final class CarbonCommonConstants {
public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR =
"carbon.push.rowfilters.for.vector";

public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR_DEFAULT = "true";
public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR_DEFAULT = "false";

private CarbonCommonConstants() {
}
Expand Down
Expand Up @@ -276,8 +276,12 @@ protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnP
if (isExplicitSorted) {
invertedIndexes = CarbonUtil
.getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
// get the reverse index
invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
if (vectorInfo == null) {
// get the reverse index
invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
} else {
vectorInfo.invertedIndex = invertedIndexes;
}
}
}
BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
Expand Down
Expand Up @@ -17,11 +17,15 @@

package org.apache.carbondata.core.datastore.chunk.store.impl.safe;

import java.util.BitSet;

import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;

Expand All @@ -48,7 +52,14 @@ public SafeFixedLengthDimensionDataChunkStore(boolean isInvertedIndex, int colum
public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
ColumnVectorInfo vectorInfo) {
CarbonColumnVector vector = vectorInfo.vector;
BitSet deletedRows = vectorInfo.deletedRows;
BitSet nullBits = new BitSet(numOfRows);
vector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector, invertedIndex, nullBits, deletedRows, false);
fillVector(data, vectorInfo, vector);
if (vector instanceof ConvertableVector) {
((ConvertableVector) vector).convert();
}
}

private void fillVector(byte[] data, ColumnVectorInfo vectorInfo, CarbonColumnVector vector) {
Expand Down
Expand Up @@ -25,6 +25,8 @@
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.DataTypeUtil;

Expand Down Expand Up @@ -109,7 +111,13 @@ public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] d
BitSet deletedRows = vectorInfo.deletedRows;
AbstractNonDictionaryVectorFiller vectorFiller =
NonDictionaryVectorFillerFactory.getVectorFiller(dt, lengthSize, numberOfRows);
BitSet nullBits = new BitSet(numberOfRows);
vector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector, invertedIndex, nullBits, deletedRows, false);
vectorFiller.fillVector(data, vector, buffer);
if (vector instanceof ConvertableVector) {
((ConvertableVector) vector).convert();
}
}

protected abstract int getLengthSize();
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;

Expand Down Expand Up @@ -245,6 +246,8 @@ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorIn
int pageSize = columnPage.getPageSize();
BitSet deletedRows = vectorInfo.deletedRows;
DataType dataType = vector.getType();
vector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, true);
if (dataType == DataTypes.FLOAT) {
float floatFactor = factor.floatValue();
if (type == DataTypes.BOOLEAN || type == DataTypes.BYTE) {
Expand Down
Expand Up @@ -39,6 +39,8 @@
import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;

Expand Down Expand Up @@ -302,16 +304,26 @@ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorIn
DataType type = columnPage.getDataType();
int pageSize = columnPage.getPageSize();
BitSet deletedRows = vectorInfo.deletedRows;
vector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
true);
fillVector(columnPage, vector, dataType, type, pageSize, vectorInfo);
if (deletedRows == null || deletedRows.isEmpty()) {
for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
vector.putNull(i);
}
}
if (vector instanceof ConvertableVector) {
((ConvertableVector) vector).convert();
}
}

private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, DataType dataType,
DataType type, int pageSize, ColumnVectorInfo vectorInfo) {
int newScale = 0;
if (vectorInfo.measure != null) {
newScale = vectorInfo.measure.getMeasure().getScale();
}
if (type == DataTypes.BOOLEAN || type == DataTypes.BYTE) {
byte[] byteData = columnPage.getByteData();
if (dataType == DataTypes.SHORT) {
Expand Down Expand Up @@ -339,6 +351,9 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, DataTy
int precision = vectorInfo.measure.getMeasure().getPrecision();
for (int i = 0; i < pageSize; i++) {
BigDecimal decimal = decimalConverter.getDecimal(max - byteData[i]);
if (decimal.scale() < newScale) {
decimal = decimal.setScale(newScale);
}
vector.putDecimal(i, decimal, precision);
}
} else {
Expand Down Expand Up @@ -369,6 +384,9 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, DataTy
int precision = vectorInfo.measure.getMeasure().getPrecision();
for (int i = 0; i < pageSize; i++) {
BigDecimal decimal = decimalConverter.getDecimal(max - shortData[i]);
if (decimal.scale() < newScale) {
decimal = decimal.setScale(newScale);
}
vector.putDecimal(i, decimal, precision);
}
} else {
Expand Down Expand Up @@ -396,6 +414,9 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, DataTy
int precision = vectorInfo.measure.getMeasure().getPrecision();
for (int i = 0; i < pageSize; i++) {
BigDecimal decimal = decimalConverter.getDecimal(max - shortIntData[i]);
if (decimal.scale() < newScale) {
decimal = decimal.setScale(newScale);
}
vector.putDecimal(i, decimal, precision);
}
} else {
Expand All @@ -422,6 +443,9 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, DataTy
int precision = vectorInfo.measure.getMeasure().getPrecision();
for (int i = 0; i < pageSize; i++) {
BigDecimal decimal = decimalConverter.getDecimal(max - intData[i]);
if (decimal.scale() < newScale) {
decimal = decimal.setScale(newScale);
}
vector.putDecimal(i, decimal, precision);
}
} else {
Expand All @@ -444,6 +468,9 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, DataTy
int precision = vectorInfo.measure.getMeasure().getPrecision();
for (int i = 0; i < pageSize; i++) {
BigDecimal decimal = decimalConverter.getDecimal(max - longData[i]);
if (decimal.scale() < newScale) {
decimal = decimal.setScale(newScale);
}
vector.putDecimal(i, decimal, precision);
}
}
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;

Expand Down Expand Up @@ -248,6 +249,8 @@ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorIn
int pageSize = columnPage.getPageSize();
BitSet deletedRows = vectorInfo.deletedRows;
DataType dataType = vector.getType();
vector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, true);
if (dataType == DataTypes.FLOAT) {
if (type == DataTypes.BOOLEAN || type == DataTypes.BYTE) {
byte[] byteData = columnPage.getByteData();
Expand Down
Expand Up @@ -38,6 +38,8 @@
import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.Encoding;

Expand Down Expand Up @@ -275,12 +277,19 @@ public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorIn
DataType type = columnPage.getDataType();
int pageSize = columnPage.getPageSize();
BitSet deletedRows = vectorInfo.deletedRows;
vector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
true);
fillVector(columnPage, vector, dataType, type, pageSize, vectorInfo);
if (deletedRows == null || deletedRows.isEmpty()) {
for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
vector.putNull(i);
}
}
if (vector instanceof ConvertableVector) {
((ConvertableVector) vector).convert();
}

}

private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, DataType dataType,
Expand Down
Expand Up @@ -38,6 +38,8 @@
import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory;
import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector;
import org.apache.carbondata.format.Encoding;

/**
Expand Down Expand Up @@ -204,12 +206,18 @@ public double decodeDouble(double value) {
DataType type = columnPage.getDataType();
int pageSize = columnPage.getPageSize();
BitSet deletedRows = vectorInfo.deletedRows;
vector = ColumnarVectorWrapperDirectFactory
.getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows,
true);
fillVector(columnPage, vector, dataType, type, pageSize, vectorInfo);
if (deletedRows == null || deletedRows.isEmpty()) {
for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
vector.putNull(i);
}
}
if (vector instanceof ConvertableVector) {
((ConvertableVector) vector).convert();
}
}

private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, DataType dataType,
Expand Down
Expand Up @@ -106,13 +106,18 @@ public static class DecimalIntConverter implements DecimalConverter {
// inefficient.
CarbonColumnVector vector = info.vector;
int precision = info.measure.getMeasure().getPrecision();
int newMeasureScale = info.measure.getMeasure().getScale();
if (valuesToBeConverted instanceof byte[]) {
byte[] data = (byte[]) valuesToBeConverted;
for (int i = 0; i < size; i++) {
if (nullBitset.get(i)) {
vector.putNull(i);
} else {
vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
BigDecimal value = BigDecimal.valueOf(data[i], scale);
if (value.scale() < newMeasureScale) {
value = value.setScale(newMeasureScale);
}
vector.putDecimal(i, value, precision);
}
}
} else if (valuesToBeConverted instanceof short[]) {
Expand All @@ -121,7 +126,11 @@ public static class DecimalIntConverter implements DecimalConverter {
if (nullBitset.get(i)) {
vector.putNull(i);
} else {
vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
BigDecimal value = BigDecimal.valueOf(data[i], scale);
if (value.scale() < newMeasureScale) {
value = value.setScale(newMeasureScale);
}
vector.putDecimal(i, value, precision);
}
}
} else if (valuesToBeConverted instanceof int[]) {
Expand All @@ -130,7 +139,11 @@ public static class DecimalIntConverter implements DecimalConverter {
if (nullBitset.get(i)) {
vector.putNull(i);
} else {
vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
BigDecimal value = BigDecimal.valueOf(data[i], scale);
if (value.scale() < newMeasureScale) {
value = value.setScale(newMeasureScale);
}
vector.putDecimal(i, value, precision);
}
}
} else if (valuesToBeConverted instanceof long[]) {
Expand All @@ -139,7 +152,11 @@ public static class DecimalIntConverter implements DecimalConverter {
if (nullBitset.get(i)) {
vector.putNull(i);
} else {
vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
BigDecimal value = BigDecimal.valueOf(data[i], scale);
if (value.scale() < newMeasureScale) {
value = value.setScale(newMeasureScale);
}
vector.putDecimal(i, value, precision);
}
}
}
Expand Down Expand Up @@ -225,14 +242,22 @@ public class DecimalUnscaledConverter implements DecimalConverter {
BitSet nullBitset) {
CarbonColumnVector vector = info.vector;
int precision = info.measure.getMeasure().getPrecision();
int newMeasureScale = info.measure.getMeasure().getScale();
if (scale < newMeasureScale) {
scale = newMeasureScale;
}
if (valuesToBeConverted instanceof byte[][]) {
byte[][] data = (byte[][]) valuesToBeConverted;
for (int i = 0; i < size; i++) {
if (nullBitset.get(i)) {
vector.putNull(i);
} else {
BigInteger bigInteger = new BigInteger(data[i]);
vector.putDecimal(i, new BigDecimal(bigInteger, scale), precision);
BigDecimal value = new BigDecimal(bigInteger, scale);
if (value.scale() < newMeasureScale) {
value = value.setScale(newMeasureScale);
}
vector.putDecimal(i, value, precision);
}
}
}
Expand Down Expand Up @@ -263,13 +288,18 @@ public static class LVBytesDecimalConverter implements DecimalConverter {
BitSet nullBitset) {
CarbonColumnVector vector = info.vector;
int precision = info.measure.getMeasure().getPrecision();
int newMeasureScale = info.measure.getMeasure().getScale();
if (valuesToBeConverted instanceof byte[][]) {
byte[][] data = (byte[][]) valuesToBeConverted;
for (int i = 0; i < size; i++) {
if (nullBitset.get(i)) {
vector.putNull(i);
} else {
vector.putDecimal(i, DataTypeUtil.byteToBigDecimal(data[i]), precision);
BigDecimal value = DataTypeUtil.byteToBigDecimal(data[i]);
if (value.scale() < newMeasureScale) {
value = value.setScale(newMeasureScale);
}
vector.putDecimal(i, value, precision);
}
}
}
Expand Down
Expand Up @@ -17,16 +17,7 @@
package org.apache.carbondata.core.scan.collector;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.scan.collector.impl.AbstractScannedResultCollector;
import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedResultCollector;
import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedVectorResultCollector;
import org.apache.carbondata.core.scan.collector.impl.RawBasedResultCollector;
import org.apache.carbondata.core.scan.collector.impl.RestructureBasedDictionaryResultCollector;
import org.apache.carbondata.core.scan.collector.impl.RestructureBasedRawResultCollector;
import org.apache.carbondata.core.scan.collector.impl.RestructureBasedVectorResultCollector;
import org.apache.carbondata.core.scan.collector.impl.RowIdBasedResultCollector;
import org.apache.carbondata.core.scan.collector.impl.RowIdRawBasedResultCollector;
import org.apache.carbondata.core.scan.collector.impl.RowIdRestructureBasedRawResultCollector;
import org.apache.carbondata.core.scan.collector.impl.*;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;

import org.apache.log4j.Logger;
Expand Down
Expand Up @@ -35,6 +35,7 @@ public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> {
public DirectDictionaryGenerator directDictionaryGenerator;
public MeasureDataVectorProcessor.MeasureVectorFiller measureVectorFiller;
public GenericQueryType genericQueryType;
public int[] invertedIndex;
public BitSet deletedRows;
public DecimalConverterFactory.DecimalConverter decimalConverter;

Expand Down

0 comments on commit eed6e00

Please sign in to comment.