Skip to content
Permalink
Browse files
[CARBONDATA-4285] Fix alter add complex columns with global sort comp…
…action failure

Why is this PR needed?
Alter add complex columns with global sort compaction is failing due to

AOI exception : Currently creating default complex delimiter list in global sort compaction
with size of 3. For map case need extra complex delimiter for handling the key-value
bad record handling: When we add complex columns after insert the data, complex columns
has null data for previously loaded segments. this null value is going to treat as bad
record and compaction is failed.

What changes were proposed in this PR?
In Global sort compaction flow create default complex delimiter with 4, as already
doing in load flow.
Bad records handling pruned for compaction case. No need to check bad records for
compaction as they are already checked while loading. previously loaded segments data
we are inserting again in compaction case

This closes #4218
  • Loading branch information
maheshrajus authored and kunal642 committed Sep 20, 2021
1 parent 2d1907b commit 22342f847d7db515e5f8c17525522085f49bd2a5
Showing 8 changed files with 128 additions and 16 deletions.
@@ -365,11 +365,16 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio
if (null == complexDelim) {
complexDelim = ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_1.value() + ","
+ ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_2.value() + ","
+ ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_3.value();
+ ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_3.value() + ","
+ ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_4.value();
}
String[] split = complexDelim.split(",");
model.setComplexDelimiter(split[0]);
if (split.length > 2) {
if (split.length > 3) {
model.setComplexDelimiter(split[1]);
model.setComplexDelimiter(split[2]);
model.setComplexDelimiter(split[3]);
} else if (split.length > 2) {
model.setComplexDelimiter(split[1]);
model.setComplexDelimiter(split[2]);
} else if (split.length > 1) {
@@ -73,7 +73,8 @@ object DataLoadProcessBuilderOnSpark {
dataFrame: Option[DataFrame],
model: CarbonLoadModel,
hadoopConf: Configuration,
segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]],
isCompactionFlow: Boolean = false)
: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
var isLoadFromCSV = false
val originRDD = if (dataFrame.isDefined) {
@@ -121,8 +122,13 @@ object DataLoadProcessBuilderOnSpark {
// 2. Convert
val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) =>
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum,
convertStepRowCounter)
DataLoadProcessorStepOnSpark.convertFunc(rows,
index,
modelBroadcast,
partialSuccessAccum,
convertStepRowCounter,
false,
isCompactionFlow)
}.filter(_ != null) // Filter the bad record

// 3. Sort
@@ -221,10 +221,11 @@ object DataLoadProcessorStepOnSpark {
modelBroadcast: Broadcast[CarbonLoadModel],
partialSuccessAccum: LongAccumulator,
rowCounter: LongAccumulator,
keepActualData: Boolean = false): Iterator[CarbonRow] = {
keepActualData: Boolean = false,
isCompactionFlow: Boolean = false): Iterator[CarbonRow] = {
val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
val conf = DataLoadProcessBuilder.createConfiguration(model)
val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf)
val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf, isCompactionFlow)
if (keepActualData) {
conf.getDataFields.foreach(_.setUseActualData(keepActualData))
}
@@ -507,7 +507,8 @@ class CarbonTableCompactor(
Option(dataFrame),
outputModel,
SparkSQLUtil.sessionState(sparkSession).newHadoopConf(),
segmentMetaDataAccumulator)
segmentMetaDataAccumulator,
isCompactionFlow = true)
.map { row =>
(row._1, FailureCauses.NONE == row._2._2.failureCauses)
}
@@ -594,4 +594,84 @@ class TestAlterTableAddColumns extends QueryTest with BeforeAndAfterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true")
}

test("test the complex columns with global sort compaction") {
sql("DROP TABLE IF EXISTS alter_global1")
sql("CREATE TABLE alter_global1(intField INT) STORED AS carbondata " +
"TBLPROPERTIES('sort_columns'='intField','sort_scope'='global_sort')")
sql("insert into alter_global1 values(1)")
sql("insert into alter_global1 values(2)")
sql("insert into alter_global1 values(3)")
sql( "ALTER TABLE alter_global1 ADD COLUMNS(str1 array<int>)")
sql("insert into alter_global1 values(4, array(1))")
sql("insert into alter_global1 values(5, null)")
sql( "ALTER TABLE alter_global1 ADD COLUMNS(str2 array<string>)")
sql("insert into alter_global1 values(6, array(1), array('', 'hi'))")
sql("insert into alter_global1 values(7, array(1), array('bye', 'hi'))")
sql("ALTER TABLE alter_global1 ADD COLUMNS(str3 array<date>, str4 struct<s1:timestamp>)")
sql(
"insert into alter_global1 values(8, array(1), array('bye', 'hi'), array('2017-02-01'," +
"'2018-09-11'),named_struct('s1', '2017-02-01 00:01:00'))")
val expected = Seq(Row(1, null, null, null, null),
Row(2, null, null, null, null),
Row(3, null, null, null, null),
Row(4, make(Array(1)), null, null, null),
Row(5, null, null, null, null),
Row(6, make(Array(1)), make(Array("", "hi")), null, null),
Row(7, make(Array(1)), make(Array("bye", "hi")), null, null),
Row(8, make(Array(1)), make(Array("bye", "hi")),
make(Array(Date.valueOf("2017-02-01"), Date.valueOf("2018-09-11"))),
Row(Timestamp.valueOf("2017-02-01 00:01:00"))))
checkAnswer(sql("select * from alter_global1"), expected)
val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_global1")
assert(addedColumns.size == 4)
sql("alter table alter_global1 compact 'minor'")
checkAnswer(sql("select * from alter_global1"), expected)
sql("DROP TABLE IF EXISTS alter_global1")
}

test("test the multi-level complex columns with global sort compaction") {
sql("DROP TABLE IF EXISTS alter_global2")
sql("CREATE TABLE alter_global2(intField INT) STORED AS carbondata " +
"TBLPROPERTIES('sort_columns'='intField','sort_scope'='global_sort')")
sql("insert into alter_global2 values(1)")
// multi-level nested array
sql(
"ALTER TABLE alter_global2 ADD COLUMNS(arr1 array<array<int>>, arr2 array<struct<a1:string," +
"map1:Map<string, string>>>) ")
sql(
"insert into alter_global2 values(1, array(array(1,2)), array(named_struct('a1','st'," +
"'map1', map('a','b'))))")
// multi-level nested struct
sql("ALTER TABLE alter_global2 ADD COLUMNS(struct1 struct<s1:string, arr: array<int>>," +
" struct2 struct<num:double,contact:map<string,array<int>>>) ")
sql("insert into alter_global2 values(1, " +
"array(array(1,2)), array(named_struct('a1','st','map1', map('a','b'))), " +
"named_struct('s1','hi','arr',array(1,2)), named_struct('num',2.3,'contact',map('ph'," +
"array(1,2))))")
// multi-level nested map
sql(
"ALTER TABLE alter_global2 ADD COLUMNS(map1 map<string,array<string>>, map2 map<string," +
"struct<d:int, s:struct<im:string>>>)")
sql("insert into alter_global2 values(1, " +
"array(array(1,2)), array(named_struct('a1','st','map1', map('a','b'))), " +
"named_struct('s1','hi','arr',array(1,2)), named_struct('num',2.3,'contact',map('ph'," +
"array(1,2))),map('a',array('hi')), map('a',named_struct('d',23,'s',named_struct('im'," +
"'sh'))))")
val expected = Seq(Row(1, null, null, null, null, null, null),
Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" -> "b")))),
null, null, null, null),
Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" -> "b")))),
Row("hi", make(Array(1, 2))), Row(2.3, Map("ph" -> make(Array(1, 2)))), null, null),
Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" -> "b")))),
Row("hi", make(Array(1, 2))), Row(2.3, Map("ph" -> make(Array(1, 2)))),
Map("a" -> make(Array("hi"))), Map("a" -> Row(23, Row("sh"))))
)
checkAnswer(sql("select * from alter_global2"), expected)
val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_global2")
assert(addedColumns.size == 6)
sql("alter table alter_global2 compact 'minor'")
checkAnswer(sql("select * from alter_global2"), expected)
sql("DROP TABLE IF EXISTS alter_global2")
}
}
@@ -88,11 +88,13 @@ public class BadRecordsLogger {

private boolean isDataLoadFail;

private boolean isCompactionFlow;

// private final Object syncObject =new Object();

public BadRecordsLogger(String key, String fileName, String storePath,
boolean badRecordsLogRedirect, boolean badRecordLoggerEnable,
boolean badRecordConvertNullDisable, boolean isDataLoadFail) {
boolean badRecordConvertNullDisable, boolean isDataLoadFail, boolean isCompactionFlow) {
// Initially no bad rec
taskKey = key;
this.fileName = fileName;
@@ -101,6 +103,11 @@ public BadRecordsLogger(String key, String fileName, String storePath,
this.badRecordLoggerEnable = badRecordLoggerEnable;
this.badRecordConvertNullDisable = badRecordConvertNullDisable;
this.isDataLoadFail = isDataLoadFail;
this.isCompactionFlow = isCompactionFlow;
}

public boolean isCompFlow() {
return isCompactionFlow;
}

/**
@@ -33,6 +33,16 @@ public class BadRecordsLoggerProvider {
* @return
*/
public static BadRecordsLogger createBadRecordLogger(CarbonDataLoadConfiguration configuration) {
return createBadRecordLogger(configuration, false);
}

/**
* method returns the BadRecordsLogger instance
* @param configuration
* @return
*/
public static BadRecordsLogger createBadRecordLogger(CarbonDataLoadConfiguration configuration,
Boolean isCompactionFlow) {
boolean badRecordsLogRedirect = false;
boolean badRecordConvertNullDisable = false;
boolean isDataLoadFail = false;
@@ -72,7 +82,7 @@ public static BadRecordsLogger createBadRecordLogger(CarbonDataLoadConfiguration
return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
identifier.getTableName() + '_' + System.currentTimeMillis(),
getBadLogStoreLocation(configuration), badRecordsLogRedirect,
badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail, isCompactionFlow);
}

public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configuration) {
@@ -134,13 +134,15 @@ public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
if (reason.equalsIgnoreCase(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE)) {
reason = String.format(reason, this.fields[i].getColumn().getColName());
}
badRecordLogger.addBadRecordsToBuilder(row.getRawData(), reason);
if (badRecordLogger.isDataLoadFail()) {
String error = "Data load failed due to bad record: " + reason;
if (!badRecordLogger.isBadRecordLoggerEnable()) {
error += "Please enable bad record logger to know the detail reason.";
if (!badRecordLogger.isCompFlow()) {
badRecordLogger.addBadRecordsToBuilder(row.getRawData(), reason);
if (badRecordLogger.isDataLoadFail()) {
String error = "Data load failed due to bad record: " + reason;
if (!badRecordLogger.isBadRecordLoggerEnable()) {
error += "Please enable bad record logger to know the detail reason.";
}
throw new BadRecordFoundException(error);
}
throw new BadRecordFoundException(error);
}
logHolder.clear();
logHolder.setLogged(true);

0 comments on commit 22342f8

Please sign in to comment.