Skip to content

Commit

Permalink
Merge 2a94b69 into 476e6b2
Browse files Browse the repository at this point in the history
  • Loading branch information
dhatchayani committed Sep 17, 2018
2 parents 476e6b2 + 2a94b69 commit 29e4533
Show file tree
Hide file tree
Showing 81 changed files with 2,298 additions and 615 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,23 @@ private void addMeasures(List<CarbonMeasure> measures) {
}
}

/**
* No dictionary and complex dimensions of the table
*
* @return
*/
public DimensionSpec[] getNoDictAndComplexDimensions() {
List<DimensionSpec> noDictAndComplexDimensions = new ArrayList<>();
for (int i = 0; i < dimensionSpec.length; i++) {
if (dimensionSpec[i].getColumnType() == ColumnType.PLAIN_VALUE
|| dimensionSpec[i].getColumnType() == ColumnType.COMPLEX_PRIMITIVE
|| dimensionSpec[i].getColumnType() == ColumnType.COMPLEX) {
noDictAndComplexDimensions.add(dimensionSpec[i]);
}
}
return noDictAndComplexDimensions.toArray(new DimensionSpec[noDictAndComplexDimensions.size()]);
}

public DimensionSpec getDimensionSpec(int dimensionIndex) {
return dimensionSpec[dimensionIndex];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,41 +227,39 @@ private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata,
.decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
}

private boolean isEncodedWithMeta(DataChunk2 pageMetadata) {
List<Encoding> encodings = pageMetadata.getEncoders();
if (encodings != null && encodings.size() == 1) {
Encoding encoding = encodings.get(0);
switch (encoding) {
case DIRECT_COMPRESS:
case DIRECT_STRING:
case ADAPTIVE_INTEGRAL:
case ADAPTIVE_DELTA_INTEGRAL:
case ADAPTIVE_FLOATING:
case ADAPTIVE_DELTA_FLOATING:
return true;
}
}
return false;
}

protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnPage,
ByteBuffer pageData, DataChunk2 pageMetadata, int offset)
throws IOException, MemoryException {
if (isEncodedWithMeta(pageMetadata)) {
List<Encoding> encodings = pageMetadata.getEncoders();
if (CarbonUtil.isEncodedWithMeta(encodings)) {
ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset,
null != rawColumnPage.getLocalDictionary());
decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(),
isEncodedWithAdaptiveMeta(pageMetadata));
int[] invertedIndexes = new int[0];
int[] invertedIndexesReverse = new int[0];
// in case of no dictionary measure data types, if it is included in sort columns
// then inverted index to be uncompressed
if (encodings.contains(Encoding.INVERTED_INDEX)) {
offset += pageMetadata.data_page_length;
if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
invertedIndexes = CarbonUtil
.getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
// get the reverse index
invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
}
}
return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), invertedIndexes,
invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata),
CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX));
} else {
// following code is for backward compatibility
return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset);
}
}

private boolean isEncodedWithAdaptiveMeta(DataChunk2 pageMetadata) {
public boolean isEncodedWithAdaptiveMeta(DataChunk2 pageMetadata) {
List<Encoding> encodings = pageMetadata.getEncoders();
if (encodings != null && encodings.size() == 1) {
if (encodings != null && !encodings.isEmpty()) {
Encoding encoding = encodings.get(0);
switch (encoding) {
case ADAPTIVE_INTEGRAL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;


public class ColumnPageWrapper implements DimensionColumnPage {
Expand All @@ -36,14 +39,23 @@ public class ColumnPageWrapper implements DimensionColumnPage {

private CarbonDictionary localDictionary;

private boolean isAdaptiveComplexPrimitivePage;
private boolean isAdaptivePrimitivePage;

private int[] invertedIndex;

private int[] invertedReverseIndex;

private boolean isExplicitSorted;

public ColumnPageWrapper(ColumnPage columnPage, CarbonDictionary localDictionary,
boolean isAdaptiveComplexPrimitivePage) {
int[] invertedIndex, int[] invertedReverseIndex, boolean isAdaptivePrimitivePage,
boolean isExplicitSorted) {
this.columnPage = columnPage;
this.localDictionary = localDictionary;
this.isAdaptiveComplexPrimitivePage = isAdaptiveComplexPrimitivePage;

this.invertedIndex = invertedIndex;
this.invertedReverseIndex = invertedReverseIndex;
this.isAdaptivePrimitivePage = isAdaptivePrimitivePage;
this.isExplicitSorted = isExplicitSorted;
}

@Override
Expand All @@ -58,26 +70,79 @@ public int fillSurrogateKey(int rowId, int chunkIndex, int[] outputSurrogateKey)

@Override
public int fillVector(ColumnVectorInfo[] vectorInfo, int chunkIndex) {
throw new UnsupportedOperationException("internal error");
ColumnVectorInfo columnVectorInfo = vectorInfo[chunkIndex];
CarbonColumnVector vector = columnVectorInfo.vector;
int offset = columnVectorInfo.offset;
int vectorOffset = columnVectorInfo.vectorOffset;
int len = offset + columnVectorInfo.size;
for (int i = offset; i < len; i++) {
fillRow(i, vector, vectorOffset++);
}
return chunkIndex + 1;
}

/**
* Fill the data to the vector
*
* @param rowId
* @param vector
* @param vectorRow
*/
private void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
if (columnPage.getNullBits().get(rowId)
&& columnPage.getColumnSpec().getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
// if this row is null, return default null represent in byte array
byte[] value = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
QueryUtil.putDataToVector(vector, value, vectorRow, value.length);
} else if (columnPage.getNullBits().get(rowId)) {
// if this row is null, return default null represent in byte array
byte[] value = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
QueryUtil.putDataToVector(vector, value, vectorRow, value.length);
} else {
if (isExplicitSorted) {
rowId = invertedReverseIndex[rowId];
}
byte[] value = getChunkData(rowId, true);
int length = value.length;
QueryUtil.putDataToVector(vector, value, vectorRow, length);
}
}

@Override
public int fillVector(int[] filteredRowId, ColumnVectorInfo[] vectorInfo, int chunkIndex) {
throw new UnsupportedOperationException("internal error");
ColumnVectorInfo columnVectorInfo = vectorInfo[chunkIndex];
CarbonColumnVector vector = columnVectorInfo.vector;
int offset = columnVectorInfo.offset;
int vectorOffset = columnVectorInfo.vectorOffset;
int len = offset + columnVectorInfo.size;
for (int i = offset; i < len; i++) {
fillRow(filteredRowId[i], vector, vectorOffset++);
}
return chunkIndex + 1;
}

@Override public byte[] getChunkData(int rowId) {
return getChunkData(rowId, false);
}

private byte[] getChunkData(int rowId, boolean isRowIdChanged) {
ColumnType columnType = columnPage.getColumnSpec().getColumnType();
DataType srcDataType = columnPage.getColumnSpec().getSchemaDataType();
DataType targetDataType = columnPage.getDataType();
if (null != localDictionary) {
return localDictionary
.getDictionaryValue(CarbonUtil.getSurrogateInternal(columnPage.getBytes(rowId), 0, 3));
} else if (columnType == ColumnType.COMPLEX_PRIMITIVE && this.isAdaptiveComplexPrimitive()) {
if (columnPage.getNullBits().get(rowId)) {
} else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && this.isAdaptivePrimitive()) || (
columnType == ColumnType.PLAIN_VALUE && DataTypeUtil.isPrimitiveColumn(srcDataType))) {
if (!isRowIdChanged && columnPage.getNullBits().get(rowId)
&& columnType == ColumnType.COMPLEX_PRIMITIVE) {
// if this row is null, return default null represent in byte array
return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
}
if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) {
// if this row is null, return default null represent in byte array
return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
}
if (srcDataType == DataTypes.DOUBLE || srcDataType == DataTypes.FLOAT) {
double doubleData = columnPage.getDouble(rowId);
if (srcDataType == DataTypes.FLOAT) {
Expand Down Expand Up @@ -118,15 +183,20 @@ public int fillVector(int[] filteredRowId, ColumnVectorInfo[] vectorInfo, int ch
} else {
throw new RuntimeException("unsupported type: " + targetDataType);
}
} else if ((columnType == ColumnType.COMPLEX_PRIMITIVE) && !this.isAdaptiveComplexPrimitive()) {
} else if ((columnType == ColumnType.COMPLEX_PRIMITIVE && !this.isAdaptivePrimitive())) {
if (!isRowIdChanged && columnPage.getNullBits().get(rowId)) {
return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
}
if ((srcDataType == DataTypes.BYTE) || (srcDataType == DataTypes.BOOLEAN)) {
byte[] out = new byte[1];
out[0] = (columnPage.getByte(rowId));
return out;
return ByteUtil.toBytes(ByteUtil.toBoolean(out));
} else if (srcDataType == DataTypes.BYTE_ARRAY) {
return columnPage.getBytes(rowId);
} else if (srcDataType == DataTypes.DOUBLE) {
} else if (srcDataType == DataTypes.DOUBLE) {
return ByteUtil.toXorBytes(columnPage.getDouble(rowId));
} else if (srcDataType == targetDataType) {
return columnPage.getBytes(rowId);
} else {
throw new RuntimeException("unsupported type: " + targetDataType);
}
Expand All @@ -135,15 +205,14 @@ public int fillVector(int[] filteredRowId, ColumnVectorInfo[] vectorInfo, int ch
}
}


@Override
public int getInvertedIndex(int rowId) {
throw new UnsupportedOperationException("internal error");
return invertedIndex[rowId];
}

@Override
public int getInvertedReverseIndex(int rowId) {
throw new UnsupportedOperationException("internal error");
return invertedReverseIndex[rowId];
}

@Override
Expand All @@ -153,12 +222,13 @@ public boolean isNoDicitionaryColumn() {

@Override
public boolean isExplicitSorted() {
return false;
return isExplicitSorted;
}

@Override
public int compareTo(int rowId, byte[] compareValue) {
throw new UnsupportedOperationException("internal error");
byte[] chunkData = this.getChunkData((int) rowId);
return ByteUtil.UnsafeComparer.INSTANCE.compareTo(chunkData, compareValue);
}

@Override
Expand All @@ -169,8 +239,8 @@ public void freeMemory() {
}
}

public boolean isAdaptiveComplexPrimitive() {
return isAdaptiveComplexPrimitivePage;
public boolean isAdaptivePrimitive() {
return isAdaptivePrimitivePage;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.DataTypeUtil;

/**
* Below class is responsible to store variable length dimension data chunk in
Expand Down Expand Up @@ -236,28 +233,7 @@ public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
}
// get the row from unsafe
fillRowInternal(length, value, currentDataOffset);
DataType dt = vector.getType();
if ((!(dt == DataTypes.STRING) && length == 0) || ByteUtil.UnsafeComparer.INSTANCE
.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, value, 0, length)) {
vector.putNull(vectorRow);
} else {
if (dt == DataTypes.STRING) {
vector.putBytes(vectorRow, 0, length, value);
} else if (dt == DataTypes.BOOLEAN) {
vector.putBoolean(vectorRow, ByteUtil.toBoolean(value[0]));
} else if (dt == DataTypes.SHORT) {
vector.putShort(vectorRow, ByteUtil.toXorShort(value, 0, length));
} else if (dt == DataTypes.INT) {
vector.putInt(vectorRow, ByteUtil.toXorInt(value, 0, length));
} else if (dt == DataTypes.LONG) {
vector.putLong(vectorRow,
DataTypeUtil.getDataBasedOnRestructuredDataType(value, vector.getBlockDataType(), 0,
length));
} else if (dt == DataTypes.TIMESTAMP) {
vector.putLong(vectorRow, ByteUtil.toXorLong(value, 0, length) * 1000L);
}
}
QueryUtil.putDataToVector(vector, value, vectorRow, length);
}

/**
Expand Down
Loading

0 comments on commit 29e4533

Please sign in to comment.