Skip to content

Commit

Permalink
Merge 5d20d55 into 15d3826
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Oct 16, 2018
2 parents 15d3826 + 5d20d55 commit b8a4831
Show file tree
Hide file tree
Showing 91 changed files with 3,644 additions and 512 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1845,6 +1845,18 @@ public final class CarbonCommonConstants {
public static final int CARBON_MINMAX_ALLOWED_BYTE_COUNT_MIN = 10;
public static final int CARBON_MINMAX_ALLOWED_BYTE_COUNT_MAX = 1000;

/**
* When enabled complete row filters will be handled by carbon in case of vector.
* If it is disabled then only page level pruning will be done by carbon and row level filtering
* will be done by spark for vector.
* There is no change in flow for non-vector based queries.
*/
@CarbonProperty
public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR =
"carbon.push.rowfilters.for.vector";

public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR_DEFAULT = "true";

private CarbonCommonConstants() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonDictionaryImpl;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.format.Encoding;
Expand Down Expand Up @@ -121,6 +122,22 @@ public DimensionColumnPage convertToDimColDataChunkWithOutCache(int index) {
}
}

/**
* Convert raw data with specified page number processed to DimensionColumnDataChunk and fill
* the vector
*
* @param pageNumber page number to decode and fill the vector
* @param vectorInfo vector to be filled with column page
*/
public void convertToDimColDataChunkAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
assert pageNumber < pagesCount;
try {
chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override public void freeMemory() {
super.freeMemory();
if (null != dataChunks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,37 @@ public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex,
dataChunk.length;
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
DimensionStoreType.FIXED_LENGTH, null);
DimensionStoreType.FIXED_LENGTH, null, false);
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
}

/**
* Constructor
*
* @param dataChunk data chunk
* @param invertedIndex inverted index
* @param invertedIndexReverse reverse inverted index
* @param numberOfRows number of rows
* @param columnValueSize size of each column value
* @param vectorInfo vector to be filled with decoded column page.
*/
public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, int columnValueSize,
ColumnVectorInfo vectorInfo) {
boolean isExplicitSorted = isExplicitSorted(invertedIndex);
long totalSize = isExplicitSorted ?
dataChunk.length + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) :
dataChunk.length;
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
DimensionStoreType.FIXED_LENGTH, null, vectorInfo != null);
if (vectorInfo == null) {
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
} else {
dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunk, vectorInfo);
}
}

/**
* Below method will be used to fill the data based on offset and row id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;

/**
* Contains raw measure data
Expand Down Expand Up @@ -94,7 +95,7 @@ public ColumnPage decodeColumnPage(int pageNumber) {
public ColumnPage convertToColumnPageWithOutCache(int index) {
assert index < pagesCount;
// in case of filter query filter columns blocklet pages will uncompressed
// so no need to decode again
// so no need to decodeAndFillVector again
if (null != columnPages && columnPages[index] != null) {
return columnPages[index];
}
Expand All @@ -105,6 +106,22 @@ public ColumnPage convertToColumnPageWithOutCache(int index) {
}
}

/**
* Convert raw data with specified page number processed to DimensionColumnDataChunk and fill the
* vector
*
* @param pageNumber page number to decode and fill the vector
* @param vectorInfo vector to be filled with column page
*/
public void convertToColumnPageAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
assert pageNumber < pagesCount;
try {
chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
} catch (IOException | MemoryException e) {
throw new RuntimeException(e);
}
}

@Override public void freeMemory() {
super.freeMemory();
if (null != columnPages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,31 @@ public class VariableLengthDimensionColumnPage extends AbstractDimensionColumnPa

/**
* Constructor for this class
* @param dataChunks data chunk
* @param invertedIndex inverted index
* @param invertedIndexReverse reverse inverted index
* @param numberOfRows number of rows
* @param dictionary carbon local dictionary for string column.
*/
public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
CarbonDictionary dictionary) {
this(dataChunks, invertedIndex, invertedIndexReverse, numberOfRows, dimStoreType, dictionary,
null);
}

/**
* Constructor for this class
* @param dataChunks data chunk
* @param invertedIndex inverted index
* @param invertedIndexReverse reverse inverted index
* @param numberOfRows number of rows
* @param dictionary carbon local dictionary for string column.
* @param vectorInfo vector to be filled with decoded column page.
*/
public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
CarbonDictionary dictionary, ColumnVectorInfo vectorInfo) {
boolean isExplicitSorted = isExplicitSorted(invertedIndex);
long totalSize = 0;
switch (dimStoreType) {
Expand All @@ -54,10 +75,15 @@ public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
}
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(0, isExplicitSorted, numberOfRows, totalSize, dimStoreType,
dictionary);
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
dictionary, vectorInfo != null);
if (vectorInfo != null) {
dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunks, vectorInfo);
} else {
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
}
}


/**
* Below method will be used to fill the data based on offset and row id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;

/**
* Interface for reading the data chunk
Expand Down Expand Up @@ -60,4 +61,10 @@ DimensionRawColumnChunk readRawDimensionChunk(FileReader fileReader, int columnI
*/
DimensionColumnPage decodeColumnPage(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber) throws IOException, MemoryException;

/**
* Decodes the raw data chunk of given page number and fill the vector with decoded data.
*/
void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;

/**
* Reader interface for reading the measure blocks from file
Expand Down Expand Up @@ -58,4 +59,10 @@ MeasureRawColumnChunk readRawMeasureChunk(FileReader fileReader, int columnIndex
ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
int pageNumber) throws IOException, MemoryException;

/**
* Decode raw data and fill the vector
*/
void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
*/
package org.apache.carbondata.core.datastore.chunk.reader.dimension;

import java.io.IOException;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.CarbonProperties;

/**
Expand Down Expand Up @@ -79,4 +84,10 @@ public AbstractChunkReader(final int[] eachColumnValueSize, final String filePat
this.numberOfRows = numberOfRows;
}

@Override
public void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
throw new UnsupportedOperationException(
"This operation is not supported in this reader " + this.getClass().getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,6 @@ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileReader fil
ByteBuffer rawData = dimensionRawColumnChunk.getFileReader()
.readByteBuffer(filePath, offset, length);

return decodeDimension(dimensionRawColumnChunk, rawData, pageMetadata, 0);
return decodeDimension(dimensionRawColumnChunk, rawData, pageMetadata, 0, null);
}
}
Loading

0 comments on commit b8a4831

Please sign in to comment.