Skip to content

Commit

Permalink
[CARBONDATA-2606]Fix Complex array Pushdown and block auto merge comp…
Browse files Browse the repository at this point in the history
…action

1.Check for if Complex Column contains ArrayType at n levels and add parent to projection if contains array.
2.Block Auto merge compaction for table containing complex datatype columns.
3.Fix Decimal Datatype scale and precision with two level struct type
4.Fix Dictionary Include for ComplexDataType
- If other complex columns other than first complex column is given in dictionary include, then its insertion fails.
5.Fix BadRecord and dateformat for Complex primitive type-DATE

This closes #2535
  • Loading branch information
Indhumathi27 authored and kunal642 committed Jul 25, 2018
1 parent 50b74d2 commit 06d38ff
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 26 deletions.
Expand Up @@ -125,6 +125,8 @@ public int fillVector(int[] filteredRowId, ColumnVectorInfo[] vectorInfo, int ch
return out;
} else if (srcDataType == DataTypes.BYTE_ARRAY) {
return columnPage.getBytes(rowId);
} else if (srcDataType == DataTypes.DOUBLE) {
return ByteUtil.toBytes(columnPage.getDouble(rowId));
} else {
throw new RuntimeException("unsupported type: " + targetDataType);
}
Expand Down
Expand Up @@ -19,6 +19,9 @@ import org.apache.carbondata.core.util.CarbonProperties

class TestComplexDataType extends QueryTest with BeforeAndAfterAll {

val badRecordAction = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION)

override def beforeAll(): Unit = {
sql("DROP TABLE IF EXISTS table1")
sql("DROP TABLE IF EXISTS test")
Expand All @@ -27,6 +30,13 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
override def afterAll(): Unit = {
sql("DROP TABLE IF EXISTS table1")
sql("DROP TABLE IF EXISTS test")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, badRecordAction)
}

test("test Projection PushDown for Struct - Integer type") {
Expand Down Expand Up @@ -883,6 +893,110 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
"create table table1 (person struct<height:double>) stored by 'carbondata'")
sql("insert into table1 values('1000000000')")
checkExistence(sql("select * from table1"),true,"1.0E9")
sql("DROP TABLE IF EXISTS table1")
sql(
"create table table1 (person struct<height:double>) stored by 'carbondata'")
sql("insert into table1 values('12345678912')")
checkExistence(sql("select * from table1"),true,"1.2345678912E10")
sql("DROP TABLE IF EXISTS table1")
sql(
"create table table1 (person struct<b:array<double>>) stored by 'carbondata'")
sql("insert into table1 values('10000000:2000000000:2900000000')")
checkExistence(sql("select * from table1"),true,"2.9E9")
}

test("test block compaction - auto merge") {
sql("DROP TABLE IF EXISTS table1")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
sql(
"create table table1 (roll int,person Struct<detail:int,age:string,height:double>) stored " +
"by 'carbondata'")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
sql(
"load data inpath '" + resourcesPath +
"/Struct.csv' into table table1 options('delimiter'=','," +
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
"'complex_delimiter_level_2'='&')")
checkExistence(sql("show segments for table table1"),false, "Compacted")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
}

test("decimal with two level struct type") {
sql("DROP TABLE IF EXISTS test")
sql(
"create table test(id int,a struct<c:struct<d:decimal(20,10)>>) stored by 'carbondata' " +
"tblproperties('dictionary_include'='a')")
checkExistence(sql("desc test"),true,"struct<c:struct<d:decimal(20,10)>>")
checkExistence(sql("describe formatted test"),true,"struct<c:struct<d:decimal(20,10)>>")
sql("insert into test values(1,'3999.999')")
checkExistence(sql("select * from test"),true,"3999.9990000000")
}

test("test dictionary include for second struct and array column") {
sql("DROP TABLE IF EXISTS test")
sql(
"create table test(id int,a struct<b:int,c:int>, d struct<e:int,f:int>, d1 struct<e1:int," +
"f1:int>) stored by 'carbondata' tblproperties('dictionary_include'='d1')")
sql("insert into test values(1,'2$3','4$5','6$7')")
checkAnswer(sql("select * from test"),Seq(Row(1,Row(2,3),Row(4,5),Row(6,7))))
sql("DROP TABLE IF EXISTS test")
sql(
"create table test(a array<int>, b array<int>) stored by 'carbondata' tblproperties" +
"('dictionary_include'='b')")
sql("insert into test values(1,2) ")
checkAnswer(sql("select b[0] from test"),Seq(Row(2)))
}

test("date with struct and array") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
sql("DROP TABLE IF EXISTS test")
sql("create table test(a struct<b:date>) stored by 'carbondata'")
val exception1 = intercept[Exception] {
sql("insert into test select 'a' ")
}
assert(exception1.getMessage
.contains(
"Data load failed due to bad record: The value with column name a.b and column data type " +
"DATE is not a valid DATE type.Please enable bad record logger to know the detail reason."))
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
sql("DROP TABLE IF EXISTS test")
sql("create table test(a array<date>) stored by 'carbondata'")
val exception2 = intercept[Exception] {
sql("insert into test select 'a' ")
}
assert(exception2.getMessage
.contains(
"Data load failed due to bad record: The value with column name a.val and column data type " +
"DATE is not a valid DATE type.Please enable bad record logger to know the detail reason."))
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
"MM-dd-yyyy")
sql("DROP TABLE IF EXISTS test")
sql("create table test(a struct<d1:date,d2:date>) stored by 'carbondata'")
sql("insert into test values ('02-18-2012$12-9-2016')")
checkAnswer(sql("select * from test "), Row(Row(java.sql.Date.valueOf("2012-02-18"),java.sql.Date.valueOf("2016-12-09"))))
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
}
}
Expand Up @@ -128,7 +128,10 @@ private[spark] object SparkTypeConverter {
case "struct" => s"${
childDim.getColName.substring(dimName.length + 1)
}:struct<${ getStructChildren(table, childDim.getColName) }>"
case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
case dType => s"${
childDim.getColName
.substring(dimName.length + 1)
}:${ addDecimalScaleAndPrecision(childDim, dType) }"
}
}
}
Expand Up @@ -583,13 +583,19 @@ object CarbonDataRDDFactory {
if (carbonTable.isHivePartitionTable) {
carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
}
val compactedSegments = new util.ArrayList[String]()
handleSegmentMerging(sqlContext,
carbonLoadModel,
carbonTable,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
// Block compaction for table containing complex datatype
if (carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
.exists(m => m.getDataType.isComplexType)) {
LOGGER.warn("Compaction is skipped as table contains complex columns")
} else {
val compactedSegments = new util.ArrayList[String]()
handleSegmentMerging(sqlContext,
carbonLoadModel,
carbonTable,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
}
} catch {
case e: Exception =>
throw new Exception(
Expand Down
Expand Up @@ -97,7 +97,7 @@ case class CarbonDatasourceHadoopRelation(
breakable({
while (ifGetArrayItemExists.containsChild != null) {
if (ifGetArrayItemExists.childSchema.toString().contains("ArrayType")) {
arrayTypeExists = s.childSchema.toString().contains("ArrayType")
arrayTypeExists = ifGetArrayItemExists.childSchema.toString().contains("ArrayType")
break
}
if (ifGetArrayItemExists.child.isInstanceOf[AttributeReference]) {
Expand Down
Expand Up @@ -823,15 +823,21 @@ case class CarbonLoadDataCommand(
}
try {
carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
val compactedSegments = new util.ArrayList[String]()
// Trigger auto compaction
CarbonDataRDDFactory.handleSegmentMerging(
sparkSession.sqlContext,
carbonLoadModel,
table,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
// Block compaction for table containing complex datatype
if (table.getTableInfo.getFactTable.getListOfColumns.asScala
.exists(m => m.getDataType.isComplexType)) {
LOGGER.warn("Compaction is skipped as table contains complex columns")
} else {
val compactedSegments = new util.ArrayList[String]()
// Trigger auto compaction
CarbonDataRDDFactory.handleSegmentMerging(
sparkSession.sqlContext,
carbonLoadModel,
table,
compactedSegments,
operationContext)
carbonLoadModel.setMergedSegmentIds(compactedSegments)
}
} catch {
case e: Exception =>
throw new Exception(
Expand Down
Expand Up @@ -172,8 +172,10 @@ public void setSurrogateIndex(int surrIndex) {

@Override
public void fillCardinality(List<Integer> dimCardWithComplex) {
dimCardWithComplex.add(0);
children.fillCardinality(dimCardWithComplex);
if (children.getIsColumnDictionary()) {
dimCardWithComplex.add(0);
children.fillCardinality(dimCardWithComplex);
}
}

@Override
Expand Down
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -173,7 +175,8 @@ public PrimitiveDataType(CarbonColumn carbonColumn, String parentName, String co
if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
|| carbonColumn.getDataType() == DataTypes.DATE) {
dictionaryGenerator = new DirectDictionary(DirectDictionaryKeyGeneratorFactory
.getDirectDictionaryGenerator(carbonDimension.getDataType()));
.getDirectDictionaryGenerator(carbonDimension.getDataType(),
getDateFormat(carbonDimension)));
isDirectDictionary = true;
} else if (carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
CacheProvider cacheProvider = CacheProvider.getInstance();
Expand Down Expand Up @@ -204,6 +207,25 @@ public PrimitiveDataType(CarbonColumn carbonColumn, String parentName, String co
}
}

/**
* get dateformat
* @param carbonDimension
* @return
*/
private String getDateFormat(CarbonDimension carbonDimension) {
String format;
String dateFormat = null;
if (this.carbonDimension.getDataType() == DataTypes.DATE) {
dateFormat = carbonDimension.getDateFormat();
}
if (dateFormat != null && !dateFormat.trim().isEmpty()) {
format = dateFormat;
} else {
format = CarbonUtil.getFormatFromProperty(dataType);
}
return format;
}

private boolean isDictionaryDimension(CarbonDimension carbonDimension) {
if (carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
return true;
Expand Down Expand Up @@ -326,6 +348,10 @@ public int getSurrogateIndex() {
byte[] value = null;
if (isDirectDictionary) {
int surrogateKey;
if (!(input instanceof Long)) {
SimpleDateFormat parser = new SimpleDateFormat(getDateFormat(carbonDimension));
parser.parse(parsedValue);
}
// If the input is a long value then this means that logical type was provided by
// the user using AvroCarbonWriter. In this case directly generate surrogate key
// using dictionaryGenerator.
Expand Down Expand Up @@ -389,6 +415,8 @@ public int getSurrogateIndex() {
updateNullValue(dataOutputStream, logHolder);
} catch (CarbonDataLoadingException e) {
throw e;
} catch (ParseException ex) {
updateNullValue(dataOutputStream, logHolder);
} catch (Throwable ex) {
// TODO have to implemented the Bad Records LogHolder.
// Same like NonDictionaryFieldConverterImpl.
Expand Down
Expand Up @@ -178,9 +178,17 @@ public void setSurrogateIndex(int surrIndex) {

@Override
public void fillCardinality(List<Integer> dimCardWithComplex) {
dimCardWithComplex.add(0);
for (int i = 0; i < children.size(); i++) {
children.get(i).fillCardinality(dimCardWithComplex);
boolean isDictionaryColumn = false;
for (GenericDataType child : children) {
if (child.getIsColumnDictionary()) {
isDictionaryColumn = true;
}
}
if (isDictionaryColumn) {
dimCardWithComplex.add(0);
for (int i = 0; i < children.size(); i++) {
children.get(i).fillCardinality(dimCardWithComplex);
}
}
}

Expand Down
Expand Up @@ -165,7 +165,9 @@ private void setComplexMapSurrogateIndex(int dimensionCount) {
List<GenericDataType> primitiveTypes = new ArrayList<GenericDataType>();
complexDataType.getAllPrimitiveChildren(primitiveTypes);
for (GenericDataType eachPrimitive : primitiveTypes) {
eachPrimitive.setSurrogateIndex(surrIndex++);
if (eachPrimitive.getIsColumnDictionary()) {
eachPrimitive.setSurrogateIndex(surrIndex++);
}
}
} else {
surrIndex++;
Expand Down
Expand Up @@ -248,7 +248,9 @@ public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
List<GenericDataType> primitiveTypes = new ArrayList<GenericDataType>();
complexDataType.getValue().getAllPrimitiveChildren(primitiveTypes);
for (GenericDataType eachPrimitive : primitiveTypes) {
eachPrimitive.setSurrogateIndex(surrIndex++);
if (eachPrimitive.getIsColumnDictionary()) {
eachPrimitive.setSurrogateIndex(surrIndex++);
}
}
}

Expand Down

0 comments on commit 06d38ff

Please sign in to comment.