Skip to content

Commit

Permalink
Merge eed6e00 into 0e85887
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Oct 18, 2018
2 parents 0e85887 + eed6e00 commit cc196af
Show file tree
Hide file tree
Showing 99 changed files with 4,591 additions and 525 deletions.
Expand Up @@ -1845,6 +1845,18 @@ public final class CarbonCommonConstants {
public static final int CARBON_MINMAX_ALLOWED_BYTE_COUNT_MIN = 10;
public static final int CARBON_MINMAX_ALLOWED_BYTE_COUNT_MAX = 1000;

/**
* When enabled complete row filters will be handled by carbon in case of vector.
* If it is disabled then only page level pruning will be done by carbon and row level filtering
* will be done by spark for vector.
* There is no change in flow for non-vector based queries.
*/
@CarbonProperty
public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR =
"carbon.push.rowfilters.for.vector";

public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR_DEFAULT = "false";

private CarbonCommonConstants() {
}
}
Expand Up @@ -33,6 +33,7 @@
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonDictionaryImpl;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.format.Encoding;
Expand Down Expand Up @@ -121,6 +122,22 @@ public DimensionColumnPage convertToDimColDataChunkWithOutCache(int index) {
}
}

/**
* Convert raw data with specified page number processed to DimensionColumnDataChunk and fill
* the vector
*
* @param pageNumber page number to decode and fill the vector
* @param vectorInfo vector to be filled with column page
*/
public void convertToDimColDataChunkAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
assert pageNumber < pagesCount;
try {
chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override public void freeMemory() {
super.freeMemory();
if (null != dataChunks) {
Expand Down
Expand Up @@ -46,10 +46,37 @@ public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex,
dataChunk.length;
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
DimensionStoreType.FIXED_LENGTH, null);
DimensionStoreType.FIXED_LENGTH, null, false);
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
}

/**
* Constructor
*
* @param dataChunk data chunk
* @param invertedIndex inverted index
* @param invertedIndexReverse reverse inverted index
* @param numberOfRows number of rows
* @param columnValueSize size of each column value
* @param vectorInfo vector to be filled with decoded column page.
*/
public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, int columnValueSize,
ColumnVectorInfo vectorInfo) {
boolean isExplicitSorted = isExplicitSorted(invertedIndex);
long totalSize = isExplicitSorted ?
dataChunk.length + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) :
dataChunk.length;
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
DimensionStoreType.FIXED_LENGTH, null, vectorInfo != null);
if (vectorInfo == null) {
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
} else {
dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunk, vectorInfo);
}
}

/**
* Below method will be used to fill the data based on offset and row id
*
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;

/**
* Contains raw measure data
Expand Down Expand Up @@ -94,7 +95,7 @@ public ColumnPage decodeColumnPage(int pageNumber) {
public ColumnPage convertToColumnPageWithOutCache(int index) {
assert index < pagesCount;
// in case of filter query filter columns blocklet pages will uncompressed
// so no need to decode again
// so no need to decodeAndFillVector again
if (null != columnPages && columnPages[index] != null) {
return columnPages[index];
}
Expand All @@ -105,6 +106,22 @@ public ColumnPage convertToColumnPageWithOutCache(int index) {
}
}

/**
* Convert raw data with specified page number processed to DimensionColumnDataChunk and fill the
* vector
*
* @param pageNumber page number to decode and fill the vector
* @param vectorInfo vector to be filled with column page
*/
public void convertToColumnPageAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
assert pageNumber < pagesCount;
try {
chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
} catch (IOException | MemoryException e) {
throw new RuntimeException(e);
}
}

@Override public void freeMemory() {
super.freeMemory();
if (null != columnPages) {
Expand Down
Expand Up @@ -30,10 +30,31 @@ public class VariableLengthDimensionColumnPage extends AbstractDimensionColumnPa

/**
* Constructor for this class
* @param dataChunks data chunk
* @param invertedIndex inverted index
* @param invertedIndexReverse reverse inverted index
* @param numberOfRows number of rows
* @param dictionary carbon local dictionary for string column.
*/
public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
CarbonDictionary dictionary) {
this(dataChunks, invertedIndex, invertedIndexReverse, numberOfRows, dimStoreType, dictionary,
null);
}

/**
* Constructor for this class
* @param dataChunks data chunk
* @param invertedIndex inverted index
* @param invertedIndexReverse reverse inverted index
* @param numberOfRows number of rows
* @param dictionary carbon local dictionary for string column.
* @param vectorInfo vector to be filled with decoded column page.
*/
public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
CarbonDictionary dictionary, ColumnVectorInfo vectorInfo) {
boolean isExplicitSorted = isExplicitSorted(invertedIndex);
long totalSize = 0;
switch (dimStoreType) {
Expand All @@ -54,10 +75,15 @@ public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
}
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(0, isExplicitSorted, numberOfRows, totalSize, dimStoreType,
dictionary);
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
dictionary, vectorInfo != null);
if (vectorInfo != null) {
dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunks, vectorInfo);
} else {
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
}
}


/**
* Below method will be used to fill the data based on offset and row id
*
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;

/**
* Interface for reading the data chunk
Expand Down Expand Up @@ -60,4 +61,10 @@ DimensionRawColumnChunk readRawDimensionChunk(FileReader fileReader, int columnI
*/
DimensionColumnPage decodeColumnPage(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber) throws IOException, MemoryException;

/**
* Decodes the raw data chunk of given page number and fill the vector with decoded data.
*/
void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;
}
Expand Up @@ -22,6 +22,7 @@
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;

/**
* Reader interface for reading the measure blocks from file
Expand Down Expand Up @@ -58,4 +59,10 @@ MeasureRawColumnChunk readRawMeasureChunk(FileReader fileReader, int columnIndex
ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
int pageNumber) throws IOException, MemoryException;

/**
* Decode raw data and fill the vector
*/
void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;

}
Expand Up @@ -16,10 +16,15 @@
*/
package org.apache.carbondata.core.datastore.chunk.reader.dimension;

import java.io.IOException;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.CarbonProperties;

/**
Expand Down Expand Up @@ -79,4 +84,10 @@ public AbstractChunkReader(final int[] eachColumnValueSize, final String filePat
this.numberOfRows = numberOfRows;
}

@Override
public void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
throw new UnsupportedOperationException(
"This operation is not supported in this reader " + this.getClass().getName());
}
}
Expand Up @@ -171,6 +171,6 @@ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileReader fil
ByteBuffer rawData = dimensionRawColumnChunk.getFileReader()
.readByteBuffer(filePath, offset, length);

return decodeDimension(dimensionRawColumnChunk, rawData, pageMetadata, 0);
return decodeDimension(dimensionRawColumnChunk, rawData, pageMetadata, 0, null);
}
}

0 comments on commit cc196af

Please sign in to comment.