Skip to content

Commit

Permalink
[CARBONDATA-2896][Refactor] Adaptive Encoding for Primitive data types
Browse files Browse the repository at this point in the history
Loading configurations and settings
(1) Parse data as like that of measure, so change in FieldEncoderFactory to take up measure flow
(2) While creating loading configurations, no dictionary, sort columns should be taken care in all the needed flows

Sort rows preparation
(1) Prepare the row to be sorted with original data for no dictionary columns
(2) Use data type based comparators for the no dictionary sort columns in all the flows like Intermediate Sort, Final sort, Unsafe sort
(3) Handle read write of row with no dictionary primitive data types to intermediate files and in the final file merger, as we will be reading and writing as original data
(4) Get the no dictionary sort data types from the load configurations what we set in LOAD step

Adding to Column page and apply adaptive encoding
(1) Add the no dictionary primitive datatypes data as original data
(2) Apply adaptive encoding to the page
(3) Reuse the adaptive encoding techniques existing for measure column

Writing inverted index to adaptive encoded page
(1) Prepare in the inverted inverted list based on the datatype based comparison
(2) Apply RLE on the inverted index
(3) Write the inverted index to the encoded page

Create decoder while querying
(1) Create proper decoder for the no dictionary column pages
(2) Uncompress the column page and also the inverted index

Filter flow changes
(1) FilterValues will be in bytes, so convert the data to bytes for comparison
(2) Change the isScanRequired to compare min/max values based on the data type

Fill output row in case of queries
(1) Change the noDictionaryKeys to Object, now it can be datatypes based data for no dictionary primitive data types

Bloom filter changes
(1) Change bloom filter load
(2) While rebuilding the data map, the load expects the data to original data. Therefore a conversion is used
(3) Fill the no dictionary primitive data as original data

Compaction Changes
Compaction will get the rows from the result collectors. But the result collectors will give bytes as no dictionary columns.
So a conversion is needed to convert the bytes to original data based on the data type.
  • Loading branch information
dhatchayani committed Sep 17, 2018
1 parent 476e6b2 commit d687986
Show file tree
Hide file tree
Showing 81 changed files with 2,298 additions and 615 deletions.
Expand Up @@ -104,6 +104,23 @@ private void addMeasures(List<CarbonMeasure> measures) {
}
}

/**
* No dictionary and complex dimensions of the table
*
* @return
*/
public DimensionSpec[] getNoDictAndComplexDimensions() {
List<DimensionSpec> noDictAndComplexDimensions = new ArrayList<>();
for (int i = 0; i < dimensionSpec.length; i++) {
if (dimensionSpec[i].getColumnType() == ColumnType.PLAIN_VALUE
|| dimensionSpec[i].getColumnType() == ColumnType.COMPLEX_PRIMITIVE
|| dimensionSpec[i].getColumnType() == ColumnType.COMPLEX) {
noDictAndComplexDimensions.add(dimensionSpec[i]);
}
}
return noDictAndComplexDimensions.toArray(new DimensionSpec[noDictAndComplexDimensions.size()]);
}

public DimensionSpec getDimensionSpec(int dimensionIndex) {
return dimensionSpec[dimensionIndex];
}
Expand Down
Expand Up @@ -227,41 +227,39 @@ private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata,
.decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
}

private boolean isEncodedWithMeta(DataChunk2 pageMetadata) {
List<Encoding> encodings = pageMetadata.getEncoders();
if (encodings != null && encodings.size() == 1) {
Encoding encoding = encodings.get(0);
switch (encoding) {
case DIRECT_COMPRESS:
case DIRECT_STRING:
case ADAPTIVE_INTEGRAL:
case ADAPTIVE_DELTA_INTEGRAL:
case ADAPTIVE_FLOATING:
case ADAPTIVE_DELTA_FLOATING:
return true;
}
}
return false;
}

protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnPage,
ByteBuffer pageData, DataChunk2 pageMetadata, int offset)
throws IOException, MemoryException {
if (isEncodedWithMeta(pageMetadata)) {
List<Encoding> encodings = pageMetadata.getEncoders();
if (CarbonUtil.isEncodedWithMeta(encodings)) {
ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset,
null != rawColumnPage.getLocalDictionary());
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(),
isEncodedWithAdaptiveMeta(pageMetadata));
int[] invertedIndexes = new int[0];
int[] invertedIndexesReverse = new int[0];
// in case of no dictionary measure data types, if it is included in sort columns
// then inverted index to be uncompressed
if (encodings.contains(Encoding.INVERTED_INDEX)) {
offset += pageMetadata.data_page_length;
if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
invertedIndexes = CarbonUtil
.getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
// get the reverse index
invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
}
}
return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), invertedIndexes,
invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata),
CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX));
} else {
// following code is for backward compatibility
return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset);
}
}

private boolean isEncodedWithAdaptiveMeta(DataChunk2 pageMetadata) {
public boolean isEncodedWithAdaptiveMeta(DataChunk2 pageMetadata) {
List<Encoding> encodings = pageMetadata.getEncoders();
if (encodings != null && encodings.size() == 1) {
if (encodings != null && !encodings.isEmpty()) {
Encoding encoding = encodings.get(0);
switch (encoding) {
case ADAPTIVE_INTEGRAL:
Expand Down
Expand Up @@ -24,10 +24,13 @@
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;


public class ColumnPageWrapper implements DimensionColumnPage {
Expand All @@ -36,14 +39,23 @@ public class ColumnPageWrapper implements DimensionColumnPage {

private CarbonDictionary localDictionary;

private boolean isAdaptiveComplexPrimitivePage;
private boolean isAdaptivePrimitivePage;

private int[] invertedIndex;

private int[] invertedReverseIndex;

private boolean isExplicitSorted;

public ColumnPageWrapper(ColumnPage columnPage, CarbonDictionary localDictionary,
boolean isAdaptiveComplexPrimitivePage) {
int[] invertedIndex, int[] invertedReverseIndex, boolean isAdaptivePrimitivePage,
boolean isExplicitSorted) {
this.columnPage = columnPage;
this.localDictionary = localDictionary;
this.isAdaptiveComplexPrimitivePage = isAdaptiveComplexPrimitivePage;

this.invertedIndex = invertedIndex;
this.invertedReverseIndex = invertedReverseIndex;
this.isAdaptivePrimitivePage = isAdaptivePrimitivePage;
this.isExplicitSorted = isExplicitSorted;
}

@Override
Expand All @@ -58,26 +70,79 @@ public int fillSurrogateKey(int rowId, int chunkIndex, int[] outputSurrogateKey)

@Override
public int fillVector(ColumnVectorInfo[] vectorInfo, int chunkIndex) {
throw new UnsupportedOperationException("internal error");
ColumnVectorInfo columnVectorInfo = vectorInfo[chunkIndex];
CarbonColumnVector vector = columnVectorInfo.vector;
int offset = columnVectorInfo.offset;
int vectorOffset = columnVectorInfo.vectorOffset;
int len = offset + columnVectorInfo.size;
for (int i = offset; i < len; i++) {
fillRow(i, vector, vectorOffset++);
}
return chunkIndex + 1;
}

/**
* Fill the data to the vector
*
* @param rowId
* @param vector
* @param vectorRow
*/
private void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
if (columnPage.getNullBits().get(rowId)
&& columnPage.getColumnSpec().getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
// if this row is null, return default null represent in byte array
byte[] value = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
QueryUtil.putDataToVector(vector, value, vectorRow, value.length);
} else if (columnPage.getNullBits().get(rowId)) {
// if this row is null, return default null represent in byte array
byte[] value = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
QueryUtil.putDataToVector(vector, value, vectorRow, value.length);
} else {
if (isExplicitSorted) {
rowId = invertedReverseIndex[rowId];
}
byte[] value = getChunkData(rowId, true);
int length = value.length;
QueryUtil.putDataToVector(vector, value, vectorRow, length);
}
}

@Override
public int fillVector(int[] filteredRowId, ColumnVectorInfo[] vectorInfo, int chunkIndex) {
throw new UnsupportedOperationException("internal error");
ColumnVectorInfo columnVectorInfo = vectorInfo[chunkIndex];
CarbonColumnVector vector = columnVectorInfo.vector;
int offset = columnVectorInfo.offset;
int vectorOffset = columnVectorInfo.vectorOffset;
int len = offset + columnVectorInfo.size;
for (int i = offset; i < len; i++) {
fillRow(filteredRowId[i], vector, vectorOffset++);
}
return chunkIndex + 1;
}

@Override public byte[] getChunkData(int rowId) {
return getChunkData(rowId, false);
}

private byte[] getChunkData(int rowId, boolean isRowIdChanged) {
ColumnType columnType = columnPage.getColumnSpec().getColumnType();
DataType srcDataType = columnPage.getColumnSpec().getSchemaDataType();
DataType targetDataType = columnPage.getDataType();
if (null != localDictionary) {
return localDictionary
.getDictionaryValue(CarbonUtil.getSurrogateInternal(columnPage.getBytes(rowId), 0, 3));
} else if (columnType == ColumnType.COMPLEX_PRIMITIVE && this.isAdaptiveComplexPrimitive()) {
if (columnPage.getNullBits().get(rowId)) {
} else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && this.isAdaptivePrimitive()) || (
columnType == ColumnType.PLAIN_VALUE && DataTypeUtil.isPrimitiveColumn(srcDataType))) {
if (!isRowIdChanged && columnPage.getNullBits().get(rowId)
&& columnType == ColumnType.COMPLEX_PRIMITIVE) {
// if this row is null, return default null represent in byte array
return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
}
if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) {
// if this row is null, return default null represent in byte array
return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
}
if (srcDataType == DataTypes.DOUBLE || srcDataType == DataTypes.FLOAT) {
double doubleData = columnPage.getDouble(rowId);
if (srcDataType == DataTypes.FLOAT) {
Expand Down Expand Up @@ -118,15 +183,20 @@ public int fillVector(int[] filteredRowId, ColumnVectorInfo[] vectorInfo, int ch
} else {
throw new RuntimeException("unsupported type: " + targetDataType);
}
} else if ((columnType == ColumnType.COMPLEX_PRIMITIVE) && !this.isAdaptiveComplexPrimitive()) {
} else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && !this.isAdaptivePrimitive())) {
if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) {
return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
}
if ((srcDataType == DataTypes.BYTE) || (srcDataType == DataTypes.BOOLEAN)) {
byte[] out = new byte[1];
out[0] = (columnPage.getByte(rowId));
return out;
return ByteUtil.toBytes(ByteUtil.toBoolean(out));
} else if (srcDataType == DataTypes.BYTE_ARRAY) {
return columnPage.getBytes(rowId);
} else if (srcDataType == DataTypes.DOUBLE) {
} else if (srcDataType == DataTypes.DOUBLE) {
return ByteUtil.toXorBytes(columnPage.getDouble(rowId));
} else if (srcDataType == targetDataType) {
return columnPage.getBytes(rowId);
} else {
throw new RuntimeException("unsupported type: " + targetDataType);
}
Expand All @@ -135,15 +205,14 @@ public int fillVector(int[] filteredRowId, ColumnVectorInfo[] vectorInfo, int ch
}
}


@Override
public int getInvertedIndex(int rowId) {
throw new UnsupportedOperationException("internal error");
return invertedIndex[rowId];
}

@Override
public int getInvertedReverseIndex(int rowId) {
throw new UnsupportedOperationException("internal error");
return invertedReverseIndex[rowId];
}

@Override
Expand All @@ -153,12 +222,13 @@ public boolean isNoDicitionaryColumn() {

@Override
public boolean isExplicitSorted() {
return false;
return isExplicitSorted;
}

@Override
public int compareTo(int rowId, byte[] compareValue) {
throw new UnsupportedOperationException("internal error");
byte[] chunkData = this.getChunkData((int) rowId);
return ByteUtil.UnsafeComparer.INSTANCE.compareTo(chunkData, compareValue);
}

@Override
Expand All @@ -169,8 +239,8 @@ public void freeMemory() {
}
}

public boolean isAdaptiveComplexPrimitive() {
return isAdaptiveComplexPrimitivePage;
public boolean isAdaptivePrimitive() {
return isAdaptivePrimitivePage;
}

}
Expand Up @@ -21,11 +21,8 @@

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.DataTypeUtil;

/**
* Below class is responsible to store variable length dimension data chunk in
Expand Down Expand Up @@ -236,28 +233,7 @@ public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
}
// get the row from unsafe
fillRowInternal(length, value, currentDataOffset);
DataType dt = vector.getType();
if ((!(dt == DataTypes.STRING) && length == 0) || ByteUtil.UnsafeComparer.INSTANCE
.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, value, 0, length)) {
vector.putNull(vectorRow);
} else {
if (dt == DataTypes.STRING) {
vector.putBytes(vectorRow, 0, length, value);
} else if (dt == DataTypes.BOOLEAN) {
vector.putBoolean(vectorRow, ByteUtil.toBoolean(value[0]));
} else if (dt == DataTypes.SHORT) {
vector.putShort(vectorRow, ByteUtil.toXorShort(value, 0, length));
} else if (dt == DataTypes.INT) {
vector.putInt(vectorRow, ByteUtil.toXorInt(value, 0, length));
} else if (dt == DataTypes.LONG) {
vector.putLong(vectorRow,
DataTypeUtil.getDataBasedOnRestructuredDataType(value, vector.getBlockDataType(), 0,
length));
} else if (dt == DataTypes.TIMESTAMP) {
vector.putLong(vectorRow, ByteUtil.toXorLong(value, 0, length) * 1000L);
}
}
QueryUtil.putDataToVector(vector, value, vectorRow, length);
}

/**
Expand Down

0 comments on commit d687986

Please sign in to comment.