Skip to content

Commit

Permalink
[CARBONDATA-2755][Complex DataType Enhancements] Compaction Complex T…
Browse files Browse the repository at this point in the history
…ypes.

Enabling Compaction of Complex DataTypes(STRUCT AND ARRAY).
Major Minor compaction will run over complex dataTypes(STRUCT AND ARRAY).
  • Loading branch information
sounakr authored and dhatchayani committed Dec 11, 2018
1 parent 382ce43 commit 46b6b9c
Show file tree
Hide file tree
Showing 10 changed files with 1,142 additions and 63 deletions.
Expand Up @@ -59,29 +59,40 @@ public static CarbonRow fromMergerRow(Object[] row, SegmentProperties segmentPro
}
converted[DICTIONARY_DIMENSION] = dictDimensions;

Object[] noDictAndComplexKeys =
new Object[segmentProperties.getNumberOfNoDictionaryDimension() + segmentProperties
.getComplexDimensions().size()];

byte[][] noDictionaryKeys = ((ByteArrayWrapper) row[0]).getNoDictionaryKeys();
Object[] noDictKeys = new Object[noDictionaryKeys.length];
for (int i = 0; i < noDictionaryKeys.length; i++) {
// in case of compaction rows are collected from result collector and are in byte[].
// Convert the no dictionary columns to original data,
// as load expects the no dictionary column with original data.
if (DataTypeUtil.isPrimitiveColumn(noDicAndComplexColumns[i].getDataType())) {
noDictKeys[i] = DataTypeUtil
noDictAndComplexKeys[i] = DataTypeUtil
.getDataBasedOnDataTypeForNoDictionaryColumn(noDictionaryKeys[i],
noDicAndComplexColumns[i].getDataType());
// for timestamp the above method will give the original data, so it should be
// converted again to the format to be loaded (without micros)
if (null != noDictKeys[i]
if (null != noDictAndComplexKeys[i]
&& noDicAndComplexColumns[i].getDataType() == DataTypes.TIMESTAMP) {
noDictKeys[i] = (long) noDictKeys[i] / 1000L;
noDictAndComplexKeys[i] = (long) noDictAndComplexKeys[i] / 1000L;
}
} else {
noDictKeys[i] = noDictionaryKeys[i];
noDictAndComplexKeys[i] = noDictionaryKeys[i];
}
}

// For Complex Type Columns
byte[][] complexKeys = ((ByteArrayWrapper) row[0]).getComplexTypesKeys();
for (int i = segmentProperties.getNumberOfNoDictionaryDimension();
i < segmentProperties.getNumberOfNoDictionaryDimension() + segmentProperties
.getComplexDimensions().size(); i++) {
noDictAndComplexKeys[i] = complexKeys[i];
}

// no dictionary and complex dimension
converted[NO_DICTIONARY_AND_COMPLEX] = noDictKeys;
converted[NO_DICTIONARY_AND_COMPLEX] = noDictAndComplexKeys;

// measure
int measureCount = row.length - 1;
Expand Down
Expand Up @@ -90,6 +90,16 @@ public byte[][] getNoDictionaryKeys() {
return this.noDictionaryKeys;
}


/**
* to get the complex column data
*
* @return no complex values
*/
public byte[][] getComplexTypesKeys() {
return this.complexTypesKeys;
}

/**
* to generate the hash code
*/
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -858,38 +858,6 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
arrayException.getMessage)
}

test("test block compaction") {
sql("DROP TABLE IF EXISTS table1")
sql(
"create table table1 (roll int,person Struct<detail:int,age:string,height:double>) stored " +
"by 'carbondata'")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
val exception = intercept[UnsupportedOperationException](
sql("alter table table1 compact 'major'"))
assertResult(
"Compaction is unsupported for Table containing Complex Columns")(
exception.getMessage)
val exception1 = intercept[UnsupportedOperationException](
sql("alter table table1 compact 'minor'"))
assertResult(
"Compaction is unsupported for Table containing Complex Columns")(
exception1.getMessage)
val exception2 = intercept[UnsupportedOperationException](
sql("alter table table1 compact 'custom' where segment.id in (0,1)"))
assertResult(
"Compaction is unsupported for Table containing Complex Columns")(
exception2.getMessage)
}

test("test complex datatype double for encoding") {
sql("DROP TABLE IF EXISTS table1")
sql(
Expand Down
Expand Up @@ -87,13 +87,6 @@ case class CarbonAlterTableCompactionCommand(
if (!table.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}

if (table.getTableInfo.getFactTable.getListOfColumns.asScala
.exists(m => m.getDataType.isComplexType)) {
throw new UnsupportedOperationException(
"Compaction is unsupported for Table containing Complex Columns")
}

if (CarbonUtil.hasAggregationDataMap(table) ||
(table.isChildDataMap && null == operationContext.getProperty(table.getTableName))) {
// If the compaction request is of 'streaming' type then we need to generate loadCommands
Expand Down
Expand Up @@ -111,8 +111,6 @@ public class PrimitiveDataType implements GenericDataType<Object> {

private boolean isDictionary;

private boolean isEmptyBadRecord;

private String nullformat;

private boolean isDirectDictionary;
Expand All @@ -133,13 +131,12 @@ private PrimitiveDataType(int outputArrayIndex, int dataCounter) {
* @param isDictionary
*/
public PrimitiveDataType(String name, DataType dataType, String parentName, String columnId,
boolean isDictionary, String nullFormat, boolean isEmptyBadRecord) {
boolean isDictionary, String nullFormat) {
this.name = name;
this.parentname = parentName;
this.columnId = columnId;
this.isDictionary = isDictionary;
this.nullformat = nullFormat;
this.isEmptyBadRecord = isEmptyBadRecord;
this.dataType = dataType;
}

Expand All @@ -154,19 +151,17 @@ public PrimitiveDataType(String name, DataType dataType, String parentName, Stri
* @param useOnePass
* @param localCache
* @param nullFormat
* @param isEmptyBadRecords
*/
public PrimitiveDataType(CarbonColumn carbonColumn, String parentName, String columnId,
CarbonDimension carbonDimension, AbsoluteTableIdentifier absoluteTableIdentifier,
DictionaryClient client, Boolean useOnePass, Map<Object, Integer> localCache,
String nullFormat, Boolean isEmptyBadRecords) {
String nullFormat) {
this.name = carbonColumn.getColName();
this.parentname = parentName;
this.columnId = columnId;
this.carbonDimension = carbonDimension;
this.isDictionary = isDictionaryDimension(carbonDimension);
this.nullformat = nullFormat;
this.isEmptyBadRecord = isEmptyBadRecords;
this.dataType = carbonColumn.getDataType();

DictionaryColumnUniqueIdentifier identifier =
Expand Down Expand Up @@ -566,7 +561,6 @@ public GenericDataType<Object> deepCopy() {
dataType.parentname = this.parentname;
dataType.columnId = this.columnId;
dataType.dictionaryGenerator = this.dictionaryGenerator;
dataType.isEmptyBadRecord = this.isEmptyBadRecord;
dataType.nullformat = this.nullformat;
dataType.setKeySize(this.keySize);
dataType.setSurrogateIndex(this.index);
Expand Down
Expand Up @@ -184,7 +184,7 @@ private static GenericDataType createComplexType(CarbonColumn carbonColumn, Stri
} else {
return new PrimitiveDataType(carbonColumn, parentName, carbonColumn.getColumnId(),
(CarbonDimension) carbonColumn, absoluteTableIdentifier, client, useOnePass,
localCache, nullFormat, isEmptyBadRecords);
localCache, nullFormat);
}
}

Expand Down
Expand Up @@ -326,6 +326,11 @@ public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> col
updatedCardinalityList.add(value);
}
updatedColumnSchemaList.add(dimension.getColumnSchema());

if (dimension.getNumberOfChild() > 0) {
fillColumnSchemaListForComplexDims(dimension.getListOfChildDimensions(),
updatedColumnSchemaList, updatedCardinalityList, columnCardinalityMap);
}
}
// add measures to the column schema list
List<CarbonMeasure> masterSchemaMeasures =
Expand All @@ -337,6 +342,34 @@ public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> col
.toPrimitive(updatedCardinalityList.toArray(new Integer[updatedCardinalityList.size()]));
}

/**
* This method is to get the chile dimensions of the complex dimension and
* update the cardinality for all complex dimensions
*
* @param carbonDimensionsList
* @param updatedColumnSchemaList
* @param updatedCardinalityList
* @param columnCardinalityMap
*/
private static void fillColumnSchemaListForComplexDims(List<CarbonDimension> carbonDimensionsList,
List<ColumnSchema> updatedColumnSchemaList, List<Integer> updatedCardinalityList,
Map<String, Integer> columnCardinalityMap) {
for (CarbonDimension carbonDimension : carbonDimensionsList) {
Integer value = columnCardinalityMap.get(carbonDimension.getColumnId());
if (null == value) {
updatedCardinalityList.add(getDimensionDefaultCardinality(carbonDimension));
} else {
updatedCardinalityList.add(value);
}
updatedColumnSchemaList.add(carbonDimension.getColumnSchema());
List<CarbonDimension> childDims = carbonDimension.getListOfChildDimensions();
if (null != childDims && childDims.size() > 0) {
fillColumnSchemaListForComplexDims(childDims, updatedColumnSchemaList,
updatedCardinalityList, columnCardinalityMap);
}
}
}

/**
* This method will return the default cardinality based on dimension type
*
Expand Down
Expand Up @@ -44,7 +44,10 @@
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.datatypes.ArrayDataType;
import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
import org.apache.carbondata.processing.datatypes.StructDataType;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
Expand Down Expand Up @@ -370,10 +373,10 @@ public static CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoa
int[] formattedCardinality = CarbonUtil
.getFormattedCardinality(segmentProperties.getDimColumnsCardinality(), wrapperColumnSchema);
carbonFactDataHandlerModel.setColCardinality(formattedCardinality);
//TO-DO Need to handle complex types here .
Map<Integer, GenericDataType> complexIndexMap =
new HashMap<Integer, GenericDataType>(segmentProperties.getComplexDimensions().size());
carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);

carbonFactDataHandlerModel.setComplexIndexMap(
convertComplexDimensionToGenericDataType(segmentProperties,
loadModel.getSerializationNullFormat()));
DataType[] measureDataTypes = new DataType[segmentProperties.getMeasures().size()];
int i = 0;
for (CarbonMeasure msr : segmentProperties.getMeasures()) {
Expand Down Expand Up @@ -407,6 +410,84 @@ public static CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoa
return carbonFactDataHandlerModel;
}

/**
* This routine takes the Complex Dimension and convert into generic DataType.
*
* @param segmentProperties
* @param isNullFormat
* @return
*/
private static Map<Integer, GenericDataType> convertComplexDimensionToGenericDataType(
SegmentProperties segmentProperties, String isNullFormat) {
List<CarbonDimension> complexDimensions = segmentProperties.getComplexDimensions();
Map<Integer, GenericDataType> complexIndexMap = new HashMap<>(complexDimensions.size());
int dimensionCount = -1;
if (segmentProperties.getDimensions().size() == 0) {
dimensionCount = 0;
} else {
dimensionCount = segmentProperties.getDimensions().size() - segmentProperties
.getNumberOfNoDictionaryDimension() - segmentProperties.getComplexDimensions().size();
}
for (CarbonDimension carbonDimension : complexDimensions) {
if (carbonDimension.isComplex()) {
GenericDataType genericDataType;
DataType dataType = carbonDimension.getDataType();
if (DataTypes.isArrayType(dataType)) {
genericDataType =
new ArrayDataType(carbonDimension.getColName(), "", carbonDimension.getColumnId());
} else if (DataTypes.isStructType(dataType)) {
genericDataType =
new StructDataType(carbonDimension.getColName(), "", carbonDimension.getColumnId());
} else {
// Add Primitive type.
throw new RuntimeException("Primitive Type should not be coming in first loop");
}
if (carbonDimension.getNumberOfChild() > 0) {
addChildrenForComplex(carbonDimension.getListOfChildDimensions(), genericDataType,
isNullFormat);
}
genericDataType.setOutputArrayIndex(0);
complexIndexMap.put(dimensionCount++, genericDataType);
}

}
return complexIndexMap;
}

private static void addChildrenForComplex(List<CarbonDimension> listOfChildDimensions,
GenericDataType genericDataType, String isNullFormat) {
for (CarbonDimension carbonDimension : listOfChildDimensions) {
String parentColName =
carbonDimension.getColName().substring(0, carbonDimension.getColName().lastIndexOf("."));
DataType dataType = carbonDimension.getDataType();
if (DataTypes.isArrayType(dataType)) {
GenericDataType arrayGeneric =
new ArrayDataType(carbonDimension.getColName(), parentColName,
carbonDimension.getColumnId());
if (carbonDimension.getNumberOfChild() > 0) {
addChildrenForComplex(carbonDimension.getListOfChildDimensions(), arrayGeneric,
isNullFormat);
}
genericDataType.addChildren(arrayGeneric);
} else if (DataTypes.isStructType(dataType)) {
GenericDataType structGeneric =
new StructDataType(carbonDimension.getColName(), parentColName,
carbonDimension.getColumnId());
if (carbonDimension.getNumberOfChild() > 0) {
addChildrenForComplex(carbonDimension.getListOfChildDimensions(), structGeneric,
isNullFormat);
}
genericDataType.addChildren(structGeneric);
} else {
// Primitive Data Type
genericDataType.addChildren(
new PrimitiveDataType(carbonDimension.getColumnSchema().getColumnName(),
dataType, parentColName, carbonDimension.getColumnId(),
carbonDimension.getColumnSchema().hasEncoding(Encoding.DICTIONARY), isNullFormat));
}
}
}

/**
* This method will get the store location for the given path, segment id and partition id
*
Expand Down
Expand Up @@ -321,9 +321,6 @@ public static Map<String, GenericDataType> getComplexTypesMap(DataField[] dataFi
String nullFormat =
configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
.toString();
boolean isEmptyBadRecord = Boolean.parseBoolean(
configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
.toString());

Map<String, GenericDataType> complexTypesMap = new LinkedHashMap<String, GenericDataType>();
String[] hierarchies = complexTypeString.split(CarbonCommonConstants.SEMICOLON_SPC_CHARACTER);
Expand All @@ -347,8 +344,8 @@ public static Map<String, GenericDataType> getComplexTypesMap(DataField[] dataFi
} else {
g.addChildren(
new PrimitiveDataType(levelInfo[0], DataTypeUtil.valueOf(levelInfo[1]),
levelInfo[2], levelInfo[4], levelInfo[3].contains("true"), nullFormat,
isEmptyBadRecord));
levelInfo[2], levelInfo[4], levelInfo[3].contains("true"), nullFormat
));
}
}
}
Expand Down

0 comments on commit 46b6b9c

Please sign in to comment.