diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala index 458d6569634..7b138f700e2 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala @@ -271,4 +271,88 @@ class TableLevelCompactionOptionTest extends QueryTest assert(!segmentSequenceIds.contains("0.1")) assert(!segmentSequenceIds.contains("3.1")) } + + test("AUTO MERGE TRUE:Verify 2nd Level compaction equals to 1"){ + sql("DROP TABLE IF EXISTS tablecompaction_table") + sql( + """ + |create table tablecompaction_table( + |name string,age int) stored by 'carbondata' + |tblproperties('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,1') + """.stripMargin) + + for(i <-0 until 4){ + sql("insert into tablecompaction_table select 'a',12") + } + var segments = sql("SHOW SEGMENTS FOR TABLE tablecompaction_table") + var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(segmentSequenceIds.size==6) + assert(segmentSequenceIds.contains("0.1")) + assert(segmentSequenceIds.contains("2.1")) + } + + test("AUTO MERGE FALSE:Verify 2nd Level compaction equals to 1"){ + sql("DROP TABLE IF EXISTS tablecompaction_table") + sql( + """ + |create table tablecompaction_table( + |name string,age int) stored by 'carbondata' + |tblproperties('COMPACTION_LEVEL_THRESHOLD'='2,1') + """.stripMargin) + + for(i <-0 until 4){ + sql("insert into tablecompaction_table select 'a',12") + } + sql("alter table tablecompaction_table compact 'minor' ") + var segments = sql("SHOW SEGMENTS FOR TABLE tablecompaction_table") + var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(segmentSequenceIds.size==6) + assert(segmentSequenceIds.contains("0.1")) + assert(segmentSequenceIds.contains("2.1")) + } + + // 2nd Level compaction value = 0 is supported by system level(like 6,0) + // same need to support for table level also + test("Verify 2nd Level compaction equals to 0"){ + sql("DROP TABLE IF EXISTS tablecompaction_table") + sql( + """ + |create table tablecompaction_table( + |name string,age int) stored by 'carbondata' + |tblproperties('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,0') + """.stripMargin) + + for(i <-0 until 4){ + sql("insert into tablecompaction_table select 'a',12") + } + var segments = sql("SHOW SEGMENTS FOR TABLE tablecompaction_table") + var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(segmentSequenceIds.size==6) + assert(segmentSequenceIds.contains("0.1")) + assert(segmentSequenceIds.contains("2.1")) + } + + test("System Level:Verify 2nd Level compaction equals to 1"){ + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,1") + sql("DROP TABLE IF EXISTS tablecompaction_table") + sql( + """ + |create table tablecompaction_table( + |name string,age int) stored by 'carbondata' + """.stripMargin) + + for(i <-0 until 4){ + sql("insert into tablecompaction_table select 'a',12") + } + sql("alter table tablecompaction_table compact 'minor' ") + var segments = sql("SHOW SEGMENTS FOR TABLE tablecompaction_table") + var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(segmentSequenceIds.size==6) + assert(segmentSequenceIds.contains("0.1")) + assert(segmentSequenceIds.contains("2.1")) + } + } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index c2f805d1a89..49b17fbe1bd 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -455,7 +455,7 @@ object CommonUtil { try { val levels: Array[String] = regularedStr.split(",") val thresholds = regularedStr.split(",").map(levelThresholdStr => levelThresholdStr.toInt) - if (!thresholds.forall(t => t < 100 && t > 0)) { + if (!thresholds.forall(t => t < 100 && t >= 0)) { throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " + s"$regularedStr, only int values separated by comma and between 0 " + s"and 100 are supported.") diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 29512834718..5b001bf6db9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -744,6 +744,13 @@ private static List identifySegmentsToBeMergedBasedOnSegCou if (size >= 2) { level1Size = noOfSegmentLevelsCount[0]; level2Size = noOfSegmentLevelsCount[1]; + /* + Ex. if segs => 0.1,2,3 and threshold =2,1 + during 2nd time compaction,mergeCounter becomes 1 and we checks if mergeCounter==level2Size + then return mergedSegments which will return 0.1 and since only 1 segment(0.1) is identified , + no segment would go for compaction .So change 2nd level threshold to 0 if it is 1. + */ + level2Size = level2Size == 1 ? 0 : level2Size; } else if (size == 1) { level1Size = noOfSegmentLevelsCount[0]; }