Skip to content

Commit

Permalink
[CARBONDATA-2606] [Complex DataType Enhancements] Projection PushDown…
Browse files Browse the repository at this point in the history
… For Complex DataType

Complex data type enhancements.
(1) Projection push down:
(a) Projection push down is handled only for STRUCT data type.
(b) Even if the STRUCT contains an ARRAY type as child then projection push down is not applicable.
(c) When the STRUCT column is given in projection list, then the projection list can be rewritten in an optimized way considering all the complex columns required.

This closes #2396
  • Loading branch information
Indhumathi27 authored and ravipesala committed Jun 26, 2018
1 parent 53a9fa7 commit afcaecf
Show file tree
Hide file tree
Showing 20 changed files with 1,600 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -661,14 +661,47 @@ public CarbonMeasure getMeasureByName(String tableName, String columnName) {
public CarbonDimension getDimensionByName(String tableName, String columnName) {
CarbonDimension carbonDimension = null;
List<CarbonDimension> dimList = tableDimensionsMap.get(tableName);
for (CarbonDimension dim : dimList) {
if (dim.getColName().equalsIgnoreCase(columnName)) {
carbonDimension = dim;
break;
String[] colSplits = columnName.split("\\.");
StringBuffer tempColName = new StringBuffer(colSplits[0]);
for (String colSplit : colSplits) {
if (!tempColName.toString().equalsIgnoreCase(colSplit)) {
tempColName = tempColName.append(".").append(colSplit);
}
carbonDimension = getCarbonDimension(tempColName.toString(), dimList);
if (carbonDimension != null && carbonDimension.getListOfChildDimensions() != null) {
dimList = carbonDimension.getListOfChildDimensions();
}
}
List<CarbonDimension> implicitDimList = tableImplicitDimensionsMap.get(tableName);
for (CarbonDimension dim : implicitDimList) {
if (carbonDimension == null) {
carbonDimension = getCarbonDimension(columnName, implicitDimList);
}

if (colSplits.length > 1) {
List<CarbonDimension> dimLists = tableDimensionsMap.get(tableName);
for (CarbonDimension dims : dimLists) {
if (dims.getColName().equalsIgnoreCase(colSplits[0])) {
// Set the parent Dimension
carbonDimension
.setComplexParentDimension(getDimensionBasedOnOrdinal(dimLists, dims.getOrdinal()));
break;
}
}
}
return carbonDimension;
}

/**
* Get Dimension for columnName from list of dimensions
*
* @param columnName
* @param dimensions
* @return
*/
public static CarbonDimension getCarbonDimension(String columnName,
List<CarbonDimension> dimensions) {
CarbonDimension carbonDimension = null;
for (CarbonDimension dim : dimensions) {
if (dim.getColName().equalsIgnoreCase(columnName)) {
carbonDimension = dim;
break;
Expand All @@ -677,6 +710,15 @@ public CarbonDimension getDimensionByName(String tableName, String columnName) {
return carbonDimension;
}

private CarbonDimension getDimensionBasedOnOrdinal(List<CarbonDimension> dimList, int ordinal) {
for (CarbonDimension dimension : dimList) {
if (dimension.getOrdinal() == ordinal) {
return dimension;
}
}
throw new RuntimeException("No Dimension Matches the ordinal value");
}

/**
* @param tableName
* @param columnName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public class CarbonDimension extends CarbonColumn {
*/
private int complexTypeOrdinal;

/**
* Save the Parent Dimension of the complex Parent Column.
*/
private CarbonDimension complexParentDimension = null;

public CarbonDimension(ColumnSchema columnSchema, int ordinal, int keyOrdinal,
int columnGroupOrdinal, int complexTypeOrdinal) {
this(columnSchema, ordinal, 0, keyOrdinal, columnGroupOrdinal, complexTypeOrdinal);
Expand Down Expand Up @@ -162,4 +167,12 @@ public boolean isSortColumn() {
}
return true;
}

public CarbonDimension getComplexParentDimension() {
return complexParentDimension;
}

public void setComplexParentDimension(CarbonDimension complexParentDimension) {
this.complexParentDimension = complexParentDimension;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -27,6 +28,7 @@
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
Expand Down Expand Up @@ -67,17 +69,41 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
int noDictionaryColumnIndex;
int complexTypeColumnIndex;


boolean isDimensionExists;

private int[] surrogateResult;
private byte[][] noDictionaryKeys;
private byte[][] complexTypeKeyArray;

protected Map<Integer, GenericQueryType> comlexDimensionInfoMap;

/**
* Field of this Map is the parent Column and associated child columns.
* Final Projection shuld be a merged list consist of only parents.
*/
private Map<Integer, List<Integer>> parentToChildColumnsMap = new HashMap<>();

/**
* Map to hold the complex parent ordinal of each query dimension
*/
private List<Integer> queryDimensionToComplexParentOrdinal = new ArrayList<>();

/**
* Fields of this Map of Parent Ordinal with the List is the Child Column Dimension and
* the corresponding data buffer of that column.
*/
private Map<Integer, Map<CarbonDimension, ByteBuffer>> mergedComplexDimensionDataMap =
new HashMap<>();

public DictionaryBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
super(blockExecutionInfos);
queryDimensions = executionInfo.getProjectionDimensions();
queryMeasures = executionInfo.getProjectionMeasures();
initDimensionAndMeasureIndexesForFillingData();
isDimensionExists = queryDimensions.length > 0;
this.comlexDimensionInfoMap = executionInfo.getComlexDimensionInfoMap();

}

/**
Expand All @@ -90,9 +116,7 @@ public List<Object[]> collectResultInRow(BlockletScannedResult scannedResult, in
// scan the record and add to list
List<Object[]> listBasedResult = new ArrayList<>(batchSize);
int rowCounter = 0;
int[] surrogateResult;
byte[][] noDictionaryKeys;
byte[][] complexTypeKeyArray;

while (scannedResult.hasNext() && rowCounter < batchSize) {
Object[] row = new Object[queryDimensions.length + queryMeasures.length];
if (isDimensionExists) {
Expand All @@ -102,6 +126,9 @@ public List<Object[]> collectResultInRow(BlockletScannedResult scannedResult, in
dictionaryColumnIndex = 0;
noDictionaryColumnIndex = 0;
complexTypeColumnIndex = 0;

// get the complex columns data of this row
fillComplexColumnDataBufferForThisRow();
for (int i = 0; i < queryDimensions.length; i++) {
fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray,
comlexDimensionInfoMap, row, i);
Expand All @@ -119,9 +146,54 @@ public List<Object[]> collectResultInRow(BlockletScannedResult scannedResult, in
return listBasedResult;
}

private void fillComplexColumnDataBufferForThisRow() {
mergedComplexDimensionDataMap.clear();
int noDictionaryComplexColumnIndex = 0;
int complexTypeComplexColumnIndex = 0;
for (int i = 0; i < queryDimensions.length; i++) {
int complexParentOrdinal = queryDimensionToComplexParentOrdinal.get(i);
if (complexParentOrdinal != -1) {
Map<CarbonDimension, ByteBuffer> childColumnByteBuffer;
// Add the parent and the child ordinal to the parentToChildColumnsMap
if (mergedComplexDimensionDataMap.get(complexParentOrdinal) == null) {
childColumnByteBuffer = new HashMap<>();
} else {
childColumnByteBuffer = mergedComplexDimensionDataMap.get(complexParentOrdinal);
}

// send the byte buffer for the complex columns. Currently expected columns for
// complex types are
// a) Complex Columns
// b) No Dictionary columns.
// TODO have to fill out for dictionary columns. Once the support for push down in
// complex dictionary columns comes.
ByteBuffer buffer;
if (!dictionaryEncodingArray[i]) {
if (implictColumnArray[i]) {
throw new RuntimeException("Not Supported Column Type");
} else if (complexDataTypeArray[i]) {
buffer = ByteBuffer.wrap(complexTypeKeyArray[complexTypeComplexColumnIndex++]);
} else {
buffer = ByteBuffer.wrap(noDictionaryKeys[noDictionaryComplexColumnIndex++]);
}
} else if (directDictionaryEncodingArray[i]) {
throw new RuntimeException("Direct Dictionary Column Type Not Supported Yet.");
} else if (complexDataTypeArray[i]) {
buffer = ByteBuffer.wrap(complexTypeKeyArray[complexTypeComplexColumnIndex++]);
} else {
throw new RuntimeException("Not Supported Column Type");
}

childColumnByteBuffer
.put(queryDimensions[i].getDimension(), buffer);
mergedComplexDimensionDataMap.put(complexParentOrdinal, childColumnByteBuffer);
}
}
}

void fillDimensionData(BlockletScannedResult scannedResult, int[] surrogateResult,
byte[][] noDictionaryKeys, byte[][] complexTypeKeyArray,
Map<Integer, GenericQueryType> comlexDimensionInfoMap, Object[] row, int i) {
Map<Integer, GenericQueryType> complexDimensionInfoMap, Object[] row, int i) {
if (!dictionaryEncodingArray[i]) {
if (implictColumnArray[i]) {
if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
Expand All @@ -136,28 +208,72 @@ void fillDimensionData(BlockletScannedResult scannedResult, int[] surrogateResul
}
} else if (complexDataTypeArray[i]) {
// Complex Type With No Dictionary Encoding.
row[order[i]] = comlexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
.getDataBasedOnDataType(ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
if (queryDimensionToComplexParentOrdinal.get(i) != -1) {
fillRow(complexDimensionInfoMap, row, i,
ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
} else {
row[order[i]] =
complexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
.getDataBasedOnDataType(
ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
}
} else {
row[order[i]] = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(
noDictionaryKeys[noDictionaryColumnIndex++],
queryDimensions[i].getDimension().getDataType());
if (queryDimensionToComplexParentOrdinal.get(i) != -1) {
// When the parent Ordinal is not -1 then this is a predicate is being pushed down
// for complex column.
fillRow(complexDimensionInfoMap, row, i,
ByteBuffer.wrap(noDictionaryKeys[noDictionaryColumnIndex++]));
} else {
row[order[i]] = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(
noDictionaryKeys[noDictionaryColumnIndex++],
queryDimensions[i].getDimension().getDataType());
}
}
} else if (directDictionaryEncodingArray[i]) {
if (directDictionaryGenerators[i] != null) {
row[order[i]] = directDictionaryGenerators[i].getValueFromSurrogate(
surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
}
} else if (complexDataTypeArray[i]) {
row[order[i]] = comlexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
.getDataBasedOnDataType(
ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
row[order[i]] = complexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
.getDataBasedOnDataType(ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
dictionaryColumnIndex++;
} else {
row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]];
}
}

private void fillRow(Map<Integer, GenericQueryType> complexDimensionInfoMap, Object[] row, int i,
ByteBuffer wrap) {
if (parentToChildColumnsMap.get(queryDimensionToComplexParentOrdinal.get(i)).size() > 1) {
fillRowForComplexColumn(complexDimensionInfoMap, row, i);
} else {
row[order[i]] = complexDimensionInfoMap.get(queryDimensionToComplexParentOrdinal.get(i))
.getDataBasedOnColumn(wrap, queryDimensions[i].getDimension().getComplexParentDimension(),
queryDimensions[i].getDimension());
}
}

private void fillRowForComplexColumn(Map<Integer, GenericQueryType> complexDimensionInfoMap,
Object[] row, int i) {
// When multiple columns are then the first child elements is only going to make
// parent Object Array. For all other cases it should be null.
// For e.g. a : <b,c,d>. here as a is the parent column and b, c, d are child columns
// during traversal when we encounter the first element in list i.e. column 'b'
// a will be completely filled. In case when column 'c' and 'd' encountered then
// only place null in the output.
int complexParentOrdinal = queryDimensionToComplexParentOrdinal.get(i);
List<Integer> childColumns = parentToChildColumnsMap.get(complexParentOrdinal);
if (childColumns.get(0).equals(queryDimensions[i].getDimension().getOrdinal())) {
// Fill out Parent Column.
row[order[i]] = complexDimensionInfoMap.get(complexParentOrdinal).getDataBasedOnColumnList(
mergedComplexDimensionDataMap.get(queryDimensions[i].getParentDimension().getOrdinal()),
queryDimensions[i].getParentDimension());
} else {
row[order[i]] = null;
}
}

void fillMeasureData(BlockletScannedResult scannedResult, Object[] row) {
if (measureInfo.getMeasureDataTypes().length > 0) {
Object[] msrValues = new Object[measureInfo.getMeasureDataTypes().length];
Expand All @@ -181,18 +297,41 @@ void initDimensionAndMeasureIndexesForFillingData() {
Arrays.sort(primitive);
actualIndexInSurrogateKey = new int[dictionaryIndexes.size()];
int index = 0;

dictionaryEncodingArray = CarbonUtil.getDictionaryEncodingArray(queryDimensions);
directDictionaryEncodingArray = CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
implictColumnArray = CarbonUtil.getImplicitColumnArray(queryDimensions);
complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);

parentToChildColumnsMap.clear();
queryDimensionToComplexParentOrdinal.clear();
for (int i = 0; i < queryDimensions.length; i++) {
if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
actualIndexInSurrogateKey[index++] =
Arrays.binarySearch(primitive, queryDimensions[i].getDimension().getOrdinal());
}
if (null != queryDimensions[i].getDimension().getComplexParentDimension()) {
// Add the parent and the child ordinal to the parentToChildColumnsMap
int complexParentOrdinal =
queryDimensions[i].getDimension().getComplexParentDimension().getOrdinal();
queryDimensionToComplexParentOrdinal.add(complexParentOrdinal);
if (parentToChildColumnsMap.get(complexParentOrdinal) == null) {
// Add the parent and child ordinal in the map
List<Integer> childOrdinals = new ArrayList<>();
childOrdinals.add(queryDimensions[i].getDimension().getOrdinal());
parentToChildColumnsMap.put(complexParentOrdinal, childOrdinals);

} else {
List<Integer> childOrdinals = parentToChildColumnsMap.get(complexParentOrdinal);
childOrdinals.add(queryDimensions[i].getDimension().getOrdinal());
parentToChildColumnsMap.put(complexParentOrdinal, childOrdinals);
}
} else {
queryDimensionToComplexParentOrdinal.add(-1);
}
}

dictionaryEncodingArray = CarbonUtil.getDictionaryEncodingArray(queryDimensions);
directDictionaryEncodingArray = CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
implictColumnArray = CarbonUtil.getImplicitColumnArray(queryDimensions);
complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);
order = new int[queryDimensions.length + queryMeasures.length];
for (int i = 0; i < queryDimensions.length; i++) {
order[i] = queryDimensions[i].getOrdinal();
Expand All @@ -206,5 +345,4 @@ void initDimensionAndMeasureIndexesForFillingData() {
.getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
import org.apache.carbondata.core.util.DataTypeUtil;
Expand Down Expand Up @@ -97,4 +99,14 @@ public void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[]
return DataTypeUtil.getDataTypeConverter().wrapWithGenericArrayData(data);
}

@Override public Object getDataBasedOnColumn(ByteBuffer dataBuffer, CarbonDimension parent,
CarbonDimension child) {
throw new UnsupportedOperationException("Operation Unsupported for ArrayType");
}

@Override public Object getDataBasedOnColumnList(Map<CarbonDimension, ByteBuffer> childBuffer,
CarbonDimension presentColumn) {
throw new UnsupportedOperationException("Operation Unsupported for ArrayType");
}

}
Loading

0 comments on commit afcaecf

Please sign in to comment.