Skip to content

Commit

Permalink
[CARBONDATA-3015] Support Lazy load in carbon vector
Browse files Browse the repository at this point in the history
Even though we prune the pages as per min/max there is a high chance of false positives in case of filters on high cardinality columns.
So to avoid that we can use the lazy loading design. It does not read/decompresses data and fill the vector immediately
when the call comes for data filling from spark/presto.
First only reads the required filter columns give back to execution engine, execution engine starts filtering on the filtered column vector
and if it finds some data need to be read from projection columns then only it starts reads the projection columns and fills the vector on demand.
It is the concept of presto and same is integrated with spark 2.3. Older versions of spark cannot use this advantage as ColumnVector interfaces are non-extendable.
For the above purpose added new classes 'LazyBlockletLoad' and 'LazyPageLoad' and changed the carbon vector interfaces.

This closes #2823
  • Loading branch information
ravipesala authored and kumarvishal09 committed Oct 26, 2018
1 parent 019f5cd commit 170c2f5
Show file tree
Hide file tree
Showing 22 changed files with 630 additions and 280 deletions.
Expand Up @@ -1735,7 +1735,7 @@ private CarbonCommonConstants() {
public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR =
"carbon.push.rowfilters.for.vector";

public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR_DEFAULT = "true";
public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR_DEFAULT = "false";

//////////////////////////////////////////////////////////////////////////////////////////
// Unused constants and parameters start here
Expand Down
Expand Up @@ -52,7 +52,7 @@ class NonDictionaryVectorFillerFactory {
public static AbstractNonDictionaryVectorFiller getVectorFiller(DataType type, int lengthSize,
int numberOfRows) {
if (type == DataTypes.STRING) {
if (lengthSize > 2) {
if (lengthSize > DataTypes.SHORT.getSizeInBytes()) {
return new LongStringVectorFiller(lengthSize, numberOfRows);
} else {
return new StringVectorFiller(lengthSize, numberOfRows);
Expand Down
Expand Up @@ -47,7 +47,6 @@ public class SafeFixLengthColumnPage extends ColumnPage {

SafeFixLengthColumnPage(ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
super(columnPageEncoderMeta, pageSize);
this.fixedLengthdata = new byte[pageSize][];
}

/**
Expand Down Expand Up @@ -456,6 +455,9 @@ private void ensureArraySize(int requestSize, DataType dataType) {
doubleData = newArray;
}
} else if (dataType == DataTypes.BYTE_ARRAY) {
if (fixedLengthdata == null) {
fixedLengthdata = new byte[pageSize][];
}
if (requestSize >= fixedLengthdata.length) {
byte[][] newArray = new byte[arrayElementCount * 2][];
int index = 0;
Expand Down
Expand Up @@ -345,6 +345,11 @@ private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
decimalConverter.fillVector(columnPage.getByteArrayPage(), pageSize, vectorInfo,
columnPage.getNullBits());
} else if (vectorDataType == DataTypes.FLOAT) {
float[] floatPage = columnPage.getFloatPage();
for (int i = 0; i < pageSize; i++) {
vector.putFloats(0, pageSize, floatPage, 0);
}
} else {
double[] doubleData = columnPage.getDoublePage();
vector.putDoubles(0, pageSize, doubleData, 0);
Expand Down
Expand Up @@ -39,6 +39,8 @@
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.scanner.LazyBlockletLoader;
import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
import org.apache.carbondata.core.stats.QueryStatisticsModel;
Expand Down Expand Up @@ -145,6 +147,8 @@ public abstract class BlockletScannedResult {

protected QueryStatisticsModel queryStatisticsModel;

protected LazyBlockletLoader lazyBlockletLoader;

public BlockletScannedResult(BlockExecutionInfo blockExecutionInfo,
QueryStatisticsModel queryStatisticsModel) {
this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize();
Expand Down Expand Up @@ -185,6 +189,14 @@ public void setMsrRawColumnChunks(MeasureRawColumnChunk[] msrRawColumnChunks) {
this.msrRawColumnChunks = msrRawColumnChunks;
}

public LazyBlockletLoader getLazyBlockletLoader() {
return lazyBlockletLoader;
}

public void setLazyBlockletLoader(LazyBlockletLoader lazyBlockletLoader) {
this.lazyBlockletLoader = lazyBlockletLoader;
}

/**
* Below method will be used to get the chunk based in measure ordinal
*
Expand Down Expand Up @@ -396,25 +408,24 @@ public void fillDataChunks(ColumnVectorInfo[] dictionaryInfo, ColumnVectorInfo[]
if (pageCounter >= pageFilteredRowCount.length) {
return;
}
long startTime = System.currentTimeMillis();

for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
dimRawColumnChunks[dictionaryColumnChunkIndexes[i]]
.convertToDimColDataChunkAndFillVector(pageIdFiltered[pageCounter], dictionaryInfo[i]);
dictionaryInfo[i].vector.setLazyPage(
new LazyPageLoader(lazyBlockletLoader, dictionaryColumnChunkIndexes[i], false,
pageIdFiltered[pageCounter], dictionaryInfo[i]));
}
for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
dimRawColumnChunks[noDictionaryColumnChunkIndexes[i]]
.convertToDimColDataChunkAndFillVector(pageIdFiltered[pageCounter], noDictionaryInfo[i]);
noDictionaryInfo[i].vector.setLazyPage(
new LazyPageLoader(lazyBlockletLoader, noDictionaryColumnChunkIndexes[i], false,
pageIdFiltered[pageCounter], noDictionaryInfo[i]));
}

for (int i = 0; i < measuresOrdinal.length; i++) {
msrRawColumnChunks[measuresOrdinal[i]]
.convertToColumnPageAndFillVector(pageIdFiltered[pageCounter], msrVectorInfo[i]);
msrVectorInfo[i].vector.setLazyPage(
new LazyPageLoader(lazyBlockletLoader, measuresOrdinal[i], true,
pageIdFiltered[pageCounter], msrVectorInfo[i]));
}
QueryStatistic pageUncompressTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
.get(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME);
pageUncompressTime.addCountStatistic(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME,
pageUncompressTime.getCount() + (System.currentTimeMillis() - startTime));

}

// free the memory for the last page chunk
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.math.BigDecimal;

import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.scan.scanner.LazyPageLoader;

public interface CarbonColumnVector {

Expand Down Expand Up @@ -108,4 +109,6 @@ public interface CarbonColumnVector {

CarbonColumnVector getDictionaryVector();

void setLazyPage(LazyPageLoader lazyPage);

}
Expand Up @@ -25,6 +25,7 @@
import org.apache.carbondata.core.metadata.datatype.DecimalType;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.scanner.LazyPageLoader;

public class CarbonColumnVectorImpl implements CarbonColumnVector {

Expand Down Expand Up @@ -349,5 +350,7 @@ public void setBlockDataType(DataType blockDataType) {
}
}


@Override public void setLazyPage(LazyPageLoader lazyPage) {
lazyPage.loadPage();
}
}
Expand Up @@ -22,112 +22,118 @@
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.scanner.LazyPageLoader;

public abstract class AbstractCarbonColumnarVector
implements CarbonColumnVector, ConvertableVector {

@Override
public void putShorts(int rowId, int count, short value) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public void putInts(int rowId, int count, int value) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public void putLongs(int rowId, int count, long value) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public void putDoubles(int rowId, int count, double value) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public void putBytes(int rowId, int count, byte[] value) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public void putNulls(int rowId, int count) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public void putNotNull(int rowId) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public void putNotNull(int rowId, int count) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public boolean isNull(int rowId) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public void putObject(int rowId, Object obj) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public Object getData(int rowId) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public void reset() {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public DataType getType() {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public DataType getBlockDataType() {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public void setBlockDataType(DataType blockDataType) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public void setFilteredRowsExist(boolean filteredRowsExist) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public void setDictionary(CarbonDictionary dictionary) {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public boolean hasDictionary() {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public CarbonColumnVector getDictionaryVector() {
throw new UnsupportedOperationException("Not allowed from here");
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}

@Override
public void convert() {
// Do nothing
}

@Override
public void setLazyPage(LazyPageLoader lazyPage) {
throw new UnsupportedOperationException("Not allowed from here " + getClass().getName());
}
}

0 comments on commit 170c2f5

Please sign in to comment.