Skip to content

Commit

Permalink
[CARBONDATA-3012] Added support for full scan queries for vector dire…
Browse files Browse the repository at this point in the history
…ct fill.

After decompressing the page in our V3 reader we can immediately fill the data to a vector without any condition checks inside loops.
So here complete column page data is set to column vector in a single batch and gives back data to Spark/Presto.
For this purpose, a new method is added in ColumnPageDecoder

ColumnPage decodeAndFillVector(byte[] input, int offset, int length, ColumnVectorInfo vectorInfo,
      BitSet nullBits, boolean isLVEncoded)
The above method takes vector fill it in a single loop without any checks inside loop.

And also added new method inside DimensionDataChunkStore

 void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
      ColumnVectorInfo vectorInfo);
The above method takes vector fill it in a single loop without any checks inside loop.

This closes #2818
  • Loading branch information
ravipesala authored and kumarvishal09 committed Oct 25, 2018
1 parent e0baa9b commit 3d3b6ff
Show file tree
Hide file tree
Showing 67 changed files with 2,616 additions and 463 deletions.
Expand Up @@ -33,6 +33,7 @@
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonDictionaryImpl;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.format.Encoding;
Expand Down Expand Up @@ -121,6 +122,22 @@ public DimensionColumnPage convertToDimColDataChunkWithOutCache(int index) {
}
}

/**
* Convert raw data with specified page number processed to DimensionColumnDataChunk and fill
* the vector
*
* @param pageNumber page number to decode and fill the vector
* @param vectorInfo vector to be filled with column page
*/
public void convertToDimColDataChunkAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
assert pageNumber < pagesCount;
try {
chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override public void freeMemory() {
super.freeMemory();
if (null != dataChunks) {
Expand Down
Expand Up @@ -46,10 +46,37 @@ public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex,
dataChunk.length;
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
DimensionStoreType.FIXED_LENGTH, null);
DimensionStoreType.FIXED_LENGTH, null, false);
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
}

/**
* Constructor
*
* @param dataChunk data chunk
* @param invertedIndex inverted index
* @param invertedIndexReverse reverse inverted index
* @param numberOfRows number of rows
* @param columnValueSize size of each column value
* @param vectorInfo vector to be filled with decoded column page.
*/
public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, int columnValueSize,
ColumnVectorInfo vectorInfo) {
boolean isExplicitSorted = isExplicitSorted(invertedIndex);
long totalSize = isExplicitSorted ?
dataChunk.length + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) :
dataChunk.length;
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
DimensionStoreType.FIXED_LENGTH, null, vectorInfo != null);
if (vectorInfo == null) {
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
} else {
dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunk, vectorInfo);
}
}

/**
* Below method will be used to fill the data based on offset and row id
*
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;

/**
* Contains raw measure data
Expand Down Expand Up @@ -105,6 +106,22 @@ public ColumnPage convertToColumnPageWithOutCache(int index) {
}
}

/**
* Convert raw data with specified page number processed to DimensionColumnDataChunk and fill the
* vector
*
* @param pageNumber page number to decode and fill the vector
* @param vectorInfo vector to be filled with column page
*/
public void convertToColumnPageAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
assert pageNumber < pagesCount;
try {
chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
} catch (IOException | MemoryException e) {
throw new RuntimeException(e);
}
}

@Override public void freeMemory() {
super.freeMemory();
if (null != columnPages) {
Expand Down
Expand Up @@ -30,10 +30,31 @@ public class VariableLengthDimensionColumnPage extends AbstractDimensionColumnPa

/**
* Constructor for this class
* @param dataChunks data chunk
* @param invertedIndex inverted index
* @param invertedIndexReverse reverse inverted index
* @param numberOfRows number of rows
* @param dictionary carbon local dictionary for string column.
*/
public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
CarbonDictionary dictionary) {
this(dataChunks, invertedIndex, invertedIndexReverse, numberOfRows, dimStoreType, dictionary,
null);
}

/**
* Constructor for this class
* @param dataChunks data chunk
* @param invertedIndex inverted index
* @param invertedIndexReverse reverse inverted index
* @param numberOfRows number of rows
* @param dictionary carbon local dictionary for string column.
* @param vectorInfo vector to be filled with decoded column page.
*/
public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
CarbonDictionary dictionary, ColumnVectorInfo vectorInfo) {
boolean isExplicitSorted = isExplicitSorted(invertedIndex);
long totalSize = 0;
switch (dimStoreType) {
Expand All @@ -54,8 +75,12 @@ public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
}
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(0, isExplicitSorted, numberOfRows, totalSize, dimStoreType,
dictionary);
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
dictionary, vectorInfo != null);
if (vectorInfo != null) {
dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunks, vectorInfo);
} else {
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
}
}

/**
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;

/**
* Interface for reading the data chunk
Expand Down Expand Up @@ -60,4 +61,10 @@ DimensionRawColumnChunk readRawDimensionChunk(FileReader fileReader, int columnI
*/
DimensionColumnPage decodeColumnPage(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber) throws IOException, MemoryException;

/**
* Decodes the raw data chunk of given page number and fill the vector with decoded data.
*/
void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;
}
Expand Up @@ -22,6 +22,7 @@
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;

/**
* Reader interface for reading the measure blocks from file
Expand Down Expand Up @@ -58,4 +59,10 @@ MeasureRawColumnChunk readRawMeasureChunk(FileReader fileReader, int columnIndex
ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
int pageNumber) throws IOException, MemoryException;

/**
* Decode raw data and fill the vector
*/
void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;

}
Expand Up @@ -16,10 +16,15 @@
*/
package org.apache.carbondata.core.datastore.chunk.reader.dimension;

import java.io.IOException;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.CarbonProperties;

/**
Expand Down Expand Up @@ -79,4 +84,10 @@ public AbstractChunkReader(final int[] eachColumnValueSize, final String filePat
this.numberOfRows = numberOfRows;
}

@Override
public void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
throw new UnsupportedOperationException(
"This operation is not supported in this reader " + this.getClass().getName());
}
}
Expand Up @@ -171,6 +171,6 @@ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileReader fil
ByteBuffer rawData = dimensionRawColumnChunk.getFileReader()
.readByteBuffer(filePath, offset, length);

return decodeDimension(dimensionRawColumnChunk, rawData, pageMetadata, 0);
return decodeDimension(dimensionRawColumnChunk, rawData, pageMetadata, 0, null);
}
}
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
Expand All @@ -39,6 +40,7 @@
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
Expand Down Expand Up @@ -207,6 +209,12 @@ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileReader fil
*/
@Override public DimensionColumnPage decodeColumnPage(
DimensionRawColumnChunk rawColumnPage, int pageNumber) throws IOException, MemoryException {
return decodeColumnPage(rawColumnPage, pageNumber, null);
}

private DimensionColumnPage decodeColumnPage(
DimensionRawColumnChunk rawColumnPage, int pageNumber,
ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
// data chunk of blocklet column
DataChunk3 dataChunk3 = rawColumnPage.getDataChunkV3();
// get the data buffer
Expand All @@ -221,49 +229,65 @@ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileReader fil
int offset = (int) rawColumnPage.getOffSet() + dimensionChunksLength
.get(rawColumnPage.getColumnIndex()) + dataChunk3.getPage_offset().get(pageNumber);
// first read the data and uncompressed it
return decodeDimension(rawColumnPage, rawData, pageMetadata, offset);
return decodeDimension(rawColumnPage, rawData, pageMetadata, offset, vectorInfo);
}

@Override
public void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
DimensionColumnPage columnPage =
decodeColumnPage(dimensionRawColumnChunk, pageNumber, vectorInfo);
columnPage.freeMemory();
}

private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata,
ByteBuffer pageData, int offset, boolean isLocalDictEncodedPage)
private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata, ByteBuffer pageData, int offset,
boolean isLocalDictEncodedPage, ColumnVectorInfo vectorInfo, BitSet nullBitSet)
throws IOException, MemoryException {
List<Encoding> encodings = pageMetadata.getEncoders();
List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas,
compressorName);
return decoder
.decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
compressorName, vectorInfo != null);
if (vectorInfo != null) {
decoder
.decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
nullBitSet, isLocalDictEncodedPage);
return null;
} else {
return decoder
.decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
}
}

protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnPage,
ByteBuffer pageData, DataChunk2 pageMetadata, int offset)
ByteBuffer pageData, DataChunk2 pageMetadata, int offset, ColumnVectorInfo vectorInfo)
throws IOException, MemoryException {
List<Encoding> encodings = pageMetadata.getEncoders();
if (CarbonUtil.isEncodedWithMeta(encodings)) {
ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset,
null != rawColumnPage.getLocalDictionary());
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
int[] invertedIndexes = new int[0];
int[] invertedIndexesReverse = new int[0];
// in case of no dictionary measure data types, if it is included in sort columns
// then inverted index to be uncompressed
if (encodings.contains(Encoding.INVERTED_INDEX)) {
boolean isExplicitSorted =
CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX);
int dataOffset = offset;
if (isExplicitSorted) {
offset += pageMetadata.data_page_length;
if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
invertedIndexes = CarbonUtil
.getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
// get the reverse index
invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
}
invertedIndexes = CarbonUtil
.getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
// get the reverse index
invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
}
BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, dataOffset,
null != rawColumnPage.getLocalDictionary(), vectorInfo, nullBitSet);
decodedPage.setNullBits(nullBitSet);
return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), invertedIndexes,
invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata),
CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX));
invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata), isExplicitSorted);
} else {
// following code is for backward compatibility
return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset);
return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset, vectorInfo);
}
}

Expand All @@ -283,8 +307,8 @@ public boolean isEncodedWithAdaptiveMeta(DataChunk2 pageMetadata) {
}

private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawColumnPage,
ByteBuffer pageData, DataChunk2 pageMetadata, int offset) throws IOException,
MemoryException {
ByteBuffer pageData, DataChunk2 pageMetadata, int offset, ColumnVectorInfo vectorInfo)
throws IOException, MemoryException {
byte[] dataPage;
int[] rlePage;
int[] invertedIndexes = new int[0];
Expand All @@ -296,8 +320,10 @@ private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawCol
invertedIndexes = CarbonUtil
.getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
offset += pageMetadata.rowid_page_length;
// get the reverse index
invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
if (vectorInfo == null) {
// get the reverse index
invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
}
}
// if rle is applied then read the rle block chunk and then uncompress
//then actual data based on rle block
Expand All @@ -324,13 +350,13 @@ private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawCol
columnDataChunk =
new VariableLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
pageMetadata.getNumberOfRowsInpage(), dimStoreType,
rawColumnPage.getLocalDictionary());
rawColumnPage.getLocalDictionary(), vectorInfo);
} else {
// to store fixed length column chunk values
columnDataChunk =
new FixedLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
pageMetadata.getNumberOfRowsInpage(),
eachColumnValueSize[rawColumnPage.getColumnIndex()]);
eachColumnValueSize[rawColumnPage.getColumnIndex()], vectorInfo);
}
return columnDataChunk;
}
Expand Down

0 comments on commit 3d3b6ff

Please sign in to comment.