Skip to content
Permalink
Browse files
[CARBONDATA-4320] Fix clean files removing wrong delta files
Why is this PR needed?
In the case where there are multiple delete delta files in a partition
in a partition table, some delta files were being ignored and deleted,
thus changing the value during the query

What changes were proposed in this PR?
Fixed the logic which checks which delta file to delete. Now checking
the deltaStartTime and comparing it with deltaEndTime to check consider
all the delta files during clean files.

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes, one test case has been added.

This closes #4246
  • Loading branch information
vikramahuja1001 authored and akashrn5 committed Jan 13, 2022
1 parent 308906e commit 05aff876d4e7ae7dcea2cecda176b470eb658ff8
Showing 2 changed files with 98 additions and 5 deletions.
@@ -732,11 +732,21 @@ public static long cleanUpDeltaFiles(CarbonTable table, boolean isDryRun) throws
.collect(Collectors.toList()));
}
SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
for (SegmentUpdateDetails block : updateDetails) {
totalDeltaFiles.stream().filter(fileName -> fileName.getName().endsWith(block
.getDeleteDeltaStartTimestamp() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
.collect(Collectors.toList()).forEach(fileName -> totalDeltaFiles.remove(fileName));
}

// Case 1: When deleteDeltaStartTimestamp = deleteDeltaEndTimestamp. in this case only 1
// delta file is present and deltaFileStamps is NULL
// Case 2: When deleteDeltaStartTimestamp != deleteDeltaEndTimestamp. in this case more
// than 1 delta files are present, then can blindly read deltaFilesStamps variable
Arrays.stream(updateDetails).forEach(block -> {
if (block.getDeleteDeltaStartTimestamp().equals(block.getDeleteDeltaEndTimestamp())) {
totalDeltaFiles.removeIf(deltaFile -> deltaFile.getName().endsWith(block
.getDeleteDeltaEndTimestamp() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT));
} else {
block.getDeltaFileStamps().stream().forEach(fileName -> totalDeltaFiles
.removeIf(deltaFile -> deltaFile.getName().endsWith(fileName +
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)));
}
});
for (CarbonFile invalidFile: totalDeltaFiles) {
totalSizeDeleted += invalidFile.getSize();
filesToBeDeleted.add(invalidFile);
@@ -359,6 +359,89 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
sql("drop table if exists partition_hc")
}

test("test clean files after IUD Horizontal Compaction when" +
" CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION > 1") {

CarbonProperties.getInstance().
addProperty(CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION, "3")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
sql("drop table if exists origintable")

sql(
"""
| CREATE TABLE origintable
| (id Int,
| vin String,
| logdate Date,
| phonenumber Long,
| area String,
| salary Int) PARTITIONED BY(country String)
| STORED AS carbondata
""".stripMargin)

val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
val testData = s"$rootPath/integration/spark/src/test/resources/" +
s"partition_data_example.csv"

sql(
s"""
LOAD DATA LOCAL INPATH '$testData' into table origintable
""")

sql("delete from origintable where salary = 10000").show()
sql("delete from origintable where salary = 10001").show()
sql("delete from origintable where salary = 10003").show()
var preCleanFiles = sql("select * from origintable").count()
sql(s"CLEAN FILES FOR TABLE origintable OPTIONS('force'='true')").collect()
var postCleanFiles = sql("select * from origintable").count()
assert(preCleanFiles == postCleanFiles)
sql("delete from origintable where salary = 10005").show()

// verify if the horizontal compaction happened or not
val carbonTable = CarbonEnv.getCarbonTable(None, "origintable")(sqlContext
.sparkSession)
val partitionPath = carbonTable.getTablePath + "/country=China"
val deltaFilesPre = FileFactory.getCarbonFile(partitionPath).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
}
})
assert(deltaFilesPre.size == 5)
val updateStatusFilesPre = FileFactory.getCarbonFile(CarbonTablePath.getMetadataPath(carbonTable
.getTablePath)).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)
}
})
assert(updateStatusFilesPre.size == 3)

preCleanFiles = sql("select * from origintable").count()
sql(s"CLEAN FILES FOR TABLE origintable OPTIONS('force'='true')").collect()
postCleanFiles = sql("select * from origintable").count()
assert(preCleanFiles == postCleanFiles)

val deltaFilesPost = FileFactory.getCarbonFile(partitionPath).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
}
})
assert(deltaFilesPost.size == 1)
val updateStatusFilesPost = FileFactory.getCarbonFile(CarbonTablePath
.getMetadataPath(carbonTable.getTablePath)).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)
}
})
assert(updateStatusFilesPost.size == 1)

sql("drop table if exists origintable")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants
.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION,
CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION)
}

def editTableStatusFile(carbonTablePath: String) : Unit = {
// Original Table status file
val f1 = new File(CarbonTablePath.getTableStatusFilePath(carbonTablePath))

0 comments on commit 05aff87

Please sign in to comment.