Skip to content

Commit

Permalink
fix global sort compaction issue
Browse files Browse the repository at this point in the history
  • Loading branch information
akashrn5 committed Jul 20, 2020
1 parent 05dc1e6 commit 0dfb90b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
Expand Up @@ -424,6 +424,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
// generate LoadModel which can be used global_sort flow
val outputModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(
sparkSession, table)
// set fact time stamp, else the carbondata file will be created with fact timestamp as 0.
outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
outputModel.setLoadMetadataDetails(carbonLoadModel.getLoadMetadataDetails)
outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1))
loadResult = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
sparkSession,
Expand Down
Expand Up @@ -483,6 +483,25 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
checkAnswer(sql("select * from sink"), Row("k", null, null,null,null, null, null, mutable.WrappedArray.make(Array(null)), Row(null), Map("null" -> "null")))
}

test("test global sort compaction, clean files, update delete") {
sql("DROP TABLE IF EXISTS carbon_global_sort_update")
sql(
"""
| CREATE TABLE carbon_global_sort_update(id INT, name STRING, city STRING, age INT)
| STORED AS carbondata TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 'sort_columns' = 'name, city')
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_global_sort_update")
sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_global_sort_update")
sql("alter table carbon_global_sort_update compact 'major'")
sql("clean files for table carbon_global_sort_update")
assert(sql("select * from carbon_global_sort_update").count() == 24)
val updatedRows = sql("update carbon_global_sort_update d set (id) = (id + 3) where d.name = 'd'").collect()
assert(updatedRows.head.get(0) == 2)
val deletedRows = sql("delete from carbon_global_sort_update d where d.id = 12").collect()
assert(deletedRows.head.get(0) == 2)
assert(sql("select * from carbon_global_sort_update").count() == 22)
}

private def resetConf() {
CarbonProperties.getInstance()
.removeProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)
Expand Down

0 comments on commit 0dfb90b

Please sign in to comment.