Skip to content

Commit

Permalink
[CARBONDATA-2755][Complex DataType Enhancements] Compaction Complex T…
Browse files Browse the repository at this point in the history
…ypes. Enabling Compaction of Complex DataTypes.

Major Minor compaction will run over complex dataTypes.
  • Loading branch information
sounakr authored and dhatchayani committed Dec 5, 2018
1 parent 382ce43 commit e691a1d
Show file tree
Hide file tree
Showing 7 changed files with 1,140 additions and 48 deletions.
Expand Up @@ -59,29 +59,40 @@ public static CarbonRow fromMergerRow(Object[] row, SegmentProperties segmentPro
}
converted[DICTIONARY_DIMENSION] = dictDimensions;

Object[] noDictAndComplexKeys =
new Object[segmentProperties.getNumberOfNoDictionaryDimension() + segmentProperties
.getComplexDimensions().size()];

byte[][] noDictionaryKeys = ((ByteArrayWrapper) row[0]).getNoDictionaryKeys();
Object[] noDictKeys = new Object[noDictionaryKeys.length];
for (int i = 0; i < noDictionaryKeys.length; i++) {
// in case of compaction rows are collected from result collector and are in byte[].
// Convert the no dictionary columns to original data,
// as load expects the no dictionary column with original data.
if (DataTypeUtil.isPrimitiveColumn(noDicAndComplexColumns[i].getDataType())) {
noDictKeys[i] = DataTypeUtil
noDictAndComplexKeys[i] = DataTypeUtil
.getDataBasedOnDataTypeForNoDictionaryColumn(noDictionaryKeys[i],
noDicAndComplexColumns[i].getDataType());
// for timestamp the above method will give the original data, so it should be
// converted again to the format to be loaded (without micros)
if (null != noDictKeys[i]
if (null != noDictAndComplexKeys[i]
&& noDicAndComplexColumns[i].getDataType() == DataTypes.TIMESTAMP) {
noDictKeys[i] = (long) noDictKeys[i] / 1000L;
noDictAndComplexKeys[i] = (long) noDictAndComplexKeys[i] / 1000L;
}
} else {
noDictKeys[i] = noDictionaryKeys[i];
noDictAndComplexKeys[i] = noDictionaryKeys[i];
}
}

// For Complex Type Columns
byte[][] complexKeys = ((ByteArrayWrapper) row[0]).getComplexTypesKeys();
for (int i = segmentProperties.getNumberOfNoDictionaryDimension();
i < segmentProperties.getNumberOfNoDictionaryDimension() + segmentProperties
.getComplexDimensions().size(); i++) {
noDictAndComplexKeys[i] = complexKeys[i];
}

// no dictionary and complex dimension
converted[NO_DICTIONARY_AND_COMPLEX] = noDictKeys;
converted[NO_DICTIONARY_AND_COMPLEX] = noDictAndComplexKeys;

// measure
int measureCount = row.length - 1;
Expand Down
Expand Up @@ -90,6 +90,16 @@ public byte[][] getNoDictionaryKeys() {
return this.noDictionaryKeys;
}


/**
* to get the complex column data
*
* @return no complex values
*/
public byte[][] getComplexTypesKeys() {
return this.complexTypesKeys;
}

/**
* to generate the hash code
*/
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -858,38 +858,6 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
arrayException.getMessage)
}

test("test block compaction") {
sql("DROP TABLE IF EXISTS table1")
sql(
"create table table1 (roll int,person Struct<detail:int,age:string,height:double>) stored " +
"by 'carbondata'")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
val exception = intercept[UnsupportedOperationException](
sql("alter table table1 compact 'major'"))
assertResult(
"Compaction is unsupported for Table containing Complex Columns")(
exception.getMessage)
val exception1 = intercept[UnsupportedOperationException](
sql("alter table table1 compact 'minor'"))
assertResult(
"Compaction is unsupported for Table containing Complex Columns")(
exception1.getMessage)
val exception2 = intercept[UnsupportedOperationException](
sql("alter table table1 compact 'custom' where segment.id in (0,1)"))
assertResult(
"Compaction is unsupported for Table containing Complex Columns")(
exception2.getMessage)
}

test("test complex datatype double for encoding") {
sql("DROP TABLE IF EXISTS table1")
sql(
Expand Down
Expand Up @@ -87,13 +87,6 @@ case class CarbonAlterTableCompactionCommand(
if (!table.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}

if (table.getTableInfo.getFactTable.getListOfColumns.asScala
.exists(m => m.getDataType.isComplexType)) {
throw new UnsupportedOperationException(
"Compaction is unsupported for Table containing Complex Columns")
}

if (CarbonUtil.hasAggregationDataMap(table) ||
(table.isChildDataMap && null == operationContext.getProperty(table.getTableName))) {
// If the compaction request is of 'streaming' type then we need to generate loadCommands
Expand Down
Expand Up @@ -326,6 +326,11 @@ public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> col
updatedCardinalityList.add(value);
}
updatedColumnSchemaList.add(dimension.getColumnSchema());

if (dimension.getNumberOfChild() > 0) {
fillColumnSchemaListForComplexDims(dimension.getListOfChildDimensions(),
updatedColumnSchemaList, updatedCardinalityList, columnCardinalityMap);
}
}
// add measures to the column schema list
List<CarbonMeasure> masterSchemaMeasures =
Expand All @@ -337,6 +342,25 @@ public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> col
.toPrimitive(updatedCardinalityList.toArray(new Integer[updatedCardinalityList.size()]));
}

private static void fillColumnSchemaListForComplexDims(List<CarbonDimension> carbonDimensionsList,
List<ColumnSchema> updatedColumnSchemaList, List<Integer> updatedCardinalityList,
Map<String, Integer> columnCardinalityMap) {
for (CarbonDimension carbonDimension : carbonDimensionsList) {
Integer value = columnCardinalityMap.get(carbonDimension.getColumnId());
if (null == value) {
updatedCardinalityList.add(getDimensionDefaultCardinality(carbonDimension));
} else {
updatedCardinalityList.add(value);
}
updatedColumnSchemaList.add(carbonDimension.getColumnSchema());
List<CarbonDimension> childDims = carbonDimension.getListOfChildDimensions();
if (null != childDims && childDims.size() > 0) {
fillColumnSchemaListForComplexDims(childDims, updatedColumnSchemaList,
updatedCardinalityList, columnCardinalityMap);
}
}
}

/**
* This method will return the default cardinality based on dimension type
*
Expand Down
Expand Up @@ -44,7 +44,10 @@
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.datatypes.ArrayDataType;
import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
import org.apache.carbondata.processing.datatypes.StructDataType;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
Expand Down Expand Up @@ -371,9 +374,25 @@ public static CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoa
.getFormattedCardinality(segmentProperties.getDimColumnsCardinality(), wrapperColumnSchema);
carbonFactDataHandlerModel.setColCardinality(formattedCardinality);
//TO-DO Need to handle complex types here .
Map<Integer, GenericDataType> complexIndexMap =
new HashMap<Integer, GenericDataType>(segmentProperties.getComplexDimensions().size());
carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);

int simpleDimensionCount = -1;
if (segmentProperties.getDimensions().size() == 0) {
simpleDimensionCount = 0;
} else {
simpleDimensionCount = segmentProperties.getDimensions().size() - segmentProperties
.getNumberOfNoDictionaryDimension() - segmentProperties.getComplexDimensions().size();
}
boolean isEmptyBadRecords;
if (loadModel.getIsEmptyDataBadRecord() == null) {
isEmptyBadRecords = false;
} else if (loadModel.getIsEmptyDataBadRecord().equalsIgnoreCase("true")) {
isEmptyBadRecords = true;
} else {
isEmptyBadRecords = false;
}
carbonFactDataHandlerModel.setComplexIndexMap(
convertComplexDimensionToGenericDataType(segmentProperties.getComplexDimensions(),
simpleDimensionCount, loadModel.getSerializationNullFormat(), isEmptyBadRecords));
DataType[] measureDataTypes = new DataType[segmentProperties.getMeasures().size()];
int i = 0;
for (CarbonMeasure msr : segmentProperties.getMeasures()) {
Expand Down Expand Up @@ -407,6 +426,81 @@ public static CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoa
return carbonFactDataHandlerModel;
}

/**
* This routine takes the Complex Dimension and convert into generic DataType.
* @param complexDimensions
* @param dimensionCount
* @param isNullFormat
*@param isEmptyBadRecords @return
*/
private static Map<Integer, GenericDataType> convertComplexDimensionToGenericDataType(
List<CarbonDimension> complexDimensions, int dimensionCount, String isNullFormat,
boolean isEmptyBadRecords) {
Map<Integer, GenericDataType> complexIndexMap =
new HashMap<Integer, GenericDataType>(complexDimensions.size());

for (CarbonDimension carbonDimension : complexDimensions) {

if (carbonDimension.isComplex()) {
GenericDataType g;
if (carbonDimension.getColumnSchema().getDataType().getName().equalsIgnoreCase("ARRAY")) {
g = new ArrayDataType(carbonDimension.getColName(), "", carbonDimension.getColumnId());
} else if (carbonDimension.getColumnSchema().getDataType().getName()
.equalsIgnoreCase("STRUCT")) {
g = new StructDataType(carbonDimension.getColName(), "", carbonDimension.getColumnId());
} else {
// Add Primitive type.
throw new RuntimeException("Primitive Type should not be coming in first loop");
}
if (carbonDimension.getNumberOfChild() > 0) {
addChildrenForComplex(carbonDimension.getListOfChildDimensions(), g, isNullFormat,
isEmptyBadRecords);
}
g.setOutputArrayIndex(0);
complexIndexMap.put(dimensionCount++, g);
}

}
return complexIndexMap;
}

private static void addChildrenForComplex(List<CarbonDimension> listOfChildDimensions,
GenericDataType genericDataType, String isNullFormat, boolean isEmptyBadRecord) {
for (CarbonDimension carbonDimension : listOfChildDimensions) {
if (carbonDimension.getColumnSchema().getDataType().getName().equalsIgnoreCase("ARRAY")) {
GenericDataType arrayGeneric = new ArrayDataType(carbonDimension.getColName(),
carbonDimension.getColName()
.substring(0, carbonDimension.getColName().lastIndexOf(".")),
carbonDimension.getColumnId());
if (carbonDimension.getNumberOfChild() > 0) {
addChildrenForComplex(carbonDimension.getListOfChildDimensions(), arrayGeneric,
isNullFormat, isEmptyBadRecord);
}
genericDataType.addChildren(arrayGeneric);
} else if (carbonDimension.getColumnSchema().getDataType().getName()
.equalsIgnoreCase("STRUCT")) {
GenericDataType structGeneric = new StructDataType(carbonDimension.getColName(),
carbonDimension.getColName()
.substring(0, carbonDimension.getColName().lastIndexOf(".")),
carbonDimension.getColumnId());
if (carbonDimension.getNumberOfChild() > 0) {
addChildrenForComplex(carbonDimension.getListOfChildDimensions(), structGeneric,
isNullFormat, isEmptyBadRecord);
}
genericDataType.addChildren(structGeneric);
} else {
// Primitive Data Type
genericDataType.addChildren(
new PrimitiveDataType(carbonDimension.getColumnSchema().getColumnName(),
carbonDimension.getDataType(), carbonDimension.getColName()
.substring(0, carbonDimension.getColName().lastIndexOf(".")),
carbonDimension.getColumnId(),
carbonDimension.getColumnSchema().hasEncoding(Encoding.DICTIONARY), isNullFormat,
isEmptyBadRecord));
}
}
}

/**
* This method will get the store location for the given path, segment id and partition id
*
Expand Down

0 comments on commit e691a1d

Please sign in to comment.