Skip to content

Commit

Permalink
[CARBONDATA-2443][SDK] Multi level complex type support for AVRO base…
Browse files Browse the repository at this point in the history
…d SDK

Problem:
Problem inferring the complex type schema with boolean array type from the store created using SDK writer

Analysis:
When we create an external table and infer the schema from store created using SDK writer, the operation fails because of complex type field with boolean array dataType. This is because during schema creation by SDK writer, for array type children a child with column name val is added.
While parsing the logic to append the parent name with child column name is missing for boolean type which is causing this problem.

Solution:
Handle the parsing for boolean type

This closes #2294
  • Loading branch information
manishgupta88 authored and ravipesala committed May 10, 2018
1 parent ff5166e commit 35a7b5e
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 28 deletions.
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
Expand Down Expand Up @@ -114,12 +115,12 @@ public void setSortColumns(List<ColumnSchema> sortColumns) {
this.sortColumns = sortColumns;
}

public ColumnSchema addColumn(StructField field, boolean isSortColumn) {
return addColumn(field, null, isSortColumn, false);
public ColumnSchema addColumn(StructField field, AtomicInteger valIndex, boolean isSortColumn) {
return addColumn(field, null, valIndex, isSortColumn, false);
}

private ColumnSchema addColumn(StructField field, String parentName, boolean isSortColumn,
boolean isComplexChild) {
private ColumnSchema addColumn(StructField field, String parentName, AtomicInteger valIndex,
boolean isSortColumn, boolean isComplexChild) {
Objects.requireNonNull(field);
checkRepeatColumnName(field);
ColumnSchema newColumn = new ColumnSchema();
Expand Down Expand Up @@ -184,33 +185,25 @@ private ColumnSchema addColumn(StructField field, String parentName, boolean isS
if (field.getDataType().isComplexType()) {
String parentFieldName = newColumn.getColumnName();
if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
String colName = getColNameForArray(parentFieldName);
addColumn(new StructField(colName,
((ArrayType) field.getDataType()).getElementType()), field.getFieldName(), false, true);
String colName = getColNameForArray(valIndex);
addColumn(new StructField(colName, ((ArrayType) field.getDataType()).getElementType()),
field.getFieldName(), valIndex, false, true);
} else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")
&& ((StructType) field.getDataType()).getFields().size() > 0) {
// This field has children.
List<StructField> fields = ((StructType) field.getDataType()).getFields();
for (int i = 0; i < fields.size(); i++) {
addColumn(fields.get(i), parentFieldName, false, true);
addColumn(fields.get(i), parentFieldName, valIndex, false, true);
}
}
}
return newColumn;
}

private String getColNameForArray(String parentFieldName) {
if (!parentFieldName.endsWith(".val")) {
return "val";
} else {
String[] splits = parentFieldName.split("val");
if (splits.length == 1) {
return "val" + 1;
} else {
return "val" + (Integer.parseInt(parentFieldName
.substring(parentFieldName.lastIndexOf("val") + 3, parentFieldName.length())) + 1);
}
}
private String getColNameForArray(AtomicInteger valIndex) {
String colName = "val" + valIndex.get();
valIndex.incrementAndGet();
return colName;
}

/**
Expand Down
Expand Up @@ -19,6 +19,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.StructField;
Expand All @@ -32,15 +33,16 @@ public class TableSchemaBuilderSuite {
@Test(expected = NullPointerException.class)
public void testNullField() {
TableSchemaBuilder builder = TableSchema.builder();
builder.addColumn(null, true);
builder.addColumn(null, new AtomicInteger(0), true);
}

@Test
public void testBuilder() {
TableSchemaBuilder builder = TableSchema.builder();
ColumnSchema columnSchema = builder.addColumn(new StructField("a", DataTypes.INT), true);
ColumnSchema columnSchema =
builder.addColumn(new StructField("a", DataTypes.INT), new AtomicInteger(0), true);
builder.setSortColumns(Arrays.asList(columnSchema));
builder.addColumn(new StructField("b", DataTypes.DOUBLE), false);
builder.addColumn(new StructField("b", DataTypes.DOUBLE), new AtomicInteger(0), false);
TableSchema schema = builder.build();
Assert.assertEquals(2, schema.getListOfColumns().size());
List<ColumnSchema> columns = schema.getListOfColumns();
Expand All @@ -51,9 +53,10 @@ public void testBuilder() {
@Test(expected = IllegalArgumentException.class)
public void testRepeatedColumn() {
TableSchemaBuilder builder = TableSchema.builder();
ColumnSchema columnSchema = builder.addColumn(new StructField("a", DataTypes.INT), true);
ColumnSchema columnSchema =
builder.addColumn(new StructField("a", DataTypes.INT), new AtomicInteger(0), true);
builder.setSortColumns(Arrays.asList(columnSchema));
builder.addColumn(new StructField("a", DataTypes.DOUBLE), false);
builder.addColumn(new StructField("a", DataTypes.DOUBLE), new AtomicInteger(0), false);
TableSchema schema = builder.build();
}
}
Expand Up @@ -1147,6 +1147,9 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent,
field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema,
field.columnComment)
case "boolean" => Field(field.column, Some("Boolean"), field.name, Some(null), field.parent,
field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema,
field.columnComment)
// checking if the nested data type contains the child type as decimal(10,0),
// if it is present then extracting the precision and scale. resetting the data type
// with Decimal.
Expand Down Expand Up @@ -1214,6 +1217,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
case "Decimal" => Field(parentName + "." + field.column, Some("Decimal"),
Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName,
field.storeType, field.schemaOrdinal, field.precision, field.scale)
case "Boolean" => Field(parentName + "." + field.column, Some("Boolean"),
Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
case _ => field
}
}
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
Expand Down Expand Up @@ -426,6 +427,10 @@ private CarbonTable buildCarbonTable() {

private void buildTableSchema(Field[] fields, TableSchemaBuilder tableSchemaBuilder,
List<String> sortColumnsList, ColumnSchema[] sortColumnsSchemaList) {
// a counter which will be used in case of complex array type. This valIndex will be assigned
// to child of complex array type in the order val1, val2 so that each array type child is
// differentiated to any level
AtomicInteger valIndex = new AtomicInteger(0);
for (Field field : fields) {
if (null != field) {
int isSortColumn = sortColumnsList.indexOf(field.getFieldName());
Expand All @@ -443,7 +448,8 @@ private void buildTableSchema(Field[] fields, TableSchemaBuilder tableSchemaBuil
// Loop through the inner columns and for a StructData
DataType complexType =
DataTypes.createArrayType(field.getChildren().get(0).getDataType());
tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), complexType), false);
tableSchemaBuilder
.addColumn(new StructField(field.getFieldName(), complexType), valIndex, false);
} else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")) {
// Loop through the inner columns and for a StructData
List<StructField> structFieldsArray =
Expand All @@ -453,12 +459,13 @@ private void buildTableSchema(Field[] fields, TableSchemaBuilder tableSchemaBuil
.add(new StructField(childFld.getFieldName(), childFld.getDataType()));
}
DataType complexType = DataTypes.createStructType(structFieldsArray);
tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), complexType), false);
tableSchemaBuilder
.addColumn(new StructField(field.getFieldName(), complexType), valIndex, false);
}
} else {
ColumnSchema columnSchema = tableSchemaBuilder
.addColumn(new StructField(field.getFieldName(), field.getDataType()),
isSortColumn > -1);
valIndex, isSortColumn > -1);
columnSchema.setSortColumn(true);
if (isSortColumn > -1) {
sortColumnsSchemaList[isSortColumn] = columnSchema;
Expand Down

0 comments on commit 35a7b5e

Please sign in to comment.