Skip to content
Permalink
Browse files
[CARBONDATA-4319] Fixed clean files not deleteting stale delete delta…
… files after horizontal compaction

Why is this PR needed?
After horizontal compaction was performed on partition and non partition tables, the clean files
operation was not deleting the stale delete delta files. the code was removed as the part of clean
files refactoring done previously.

What changes were proposed in this PR?
Clean files with force option now handles removal of these stale delta files as well as the stale
tableupdatestatus file for both partition and non partition table.

This closes #4245
  • Loading branch information
vikramahuja1001 authored and kunal642 committed Dec 28, 2021
1 parent 0f1d2a4 commit a072e7a9e8af7491ee009c3f4005cd028801f02e
Showing 6 changed files with 266 additions and 6 deletions.
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -688,4 +689,125 @@ public static long getLatestDeleteDeltaTimestamp(String[] deleteDeltaFiles) {
}
return latestTimestamp;
}


/**
* Handling of the clean up of old carbondata files, index files , delete delta,
* update status files.
*
* @param table clean up will be handled on this table.
* @param isDryRun if clean files dryRun selected, then only size will be shown,
* files will not be deleted.
*/
public static long cleanUpDeltaFiles(CarbonTable table, boolean isDryRun) throws IOException {

SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier());
LoadMetadataDetails[] details =
SegmentStatusManager.readLoadMetadata(table.getMetadataPath());
long totalSizeDeleted = 0;
ArrayList<CarbonFile> filesToBeDeleted = new ArrayList<>();
SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table);
SegmentUpdateDetails[] segmentUpdateDetails = updateStatusManager.getUpdateStatusDetails();
// hold all the segments updated so that wen can check the delta files in them, ne need to
// check the others.
Set<String> updatedSegments = new HashSet<>();
for (SegmentUpdateDetails updateDetails : segmentUpdateDetails) {
updatedSegments.add(updateDetails.getSegmentName());
}
boolean isInvalidFile = false;
// take the update status file name from 0th segment.
String validUpdateStatusFile = ssm.getUpdateStatusFileName(details);
// scan through each segment.

if (table.isHivePartitionTable()) {
List<CarbonFile> partitionList = Arrays.stream(FileFactory.getCarbonFile(table
.getTablePath()).listFiles()).filter(partitionName -> partitionName.getName()
.contains("=")).collect(Collectors.toList());

List<CarbonFile> totalDeltaFiles = new ArrayList<>();

for (CarbonFile carbonFile : partitionList) {
totalDeltaFiles.addAll(carbonFile.listFiles(true).stream().filter(fileName -> fileName
.getName().endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
.collect(Collectors.toList()));
}
SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
for (SegmentUpdateDetails block : updateDetails) {
totalDeltaFiles.stream().filter(fileName -> fileName.getName().endsWith(block
.getDeleteDeltaStartTimestamp() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
.collect(Collectors.toList()).forEach(fileName -> totalDeltaFiles.remove(fileName));
}
for (CarbonFile invalidFile: totalDeltaFiles) {
totalSizeDeleted += invalidFile.getSize();
filesToBeDeleted.add(invalidFile);
}
} else {
for (LoadMetadataDetails segment : details) {
// if this segment is valid then only we will go for delta file deletion.
// if the segment is mark for delete or compacted then any way it will get deleted.
if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
|| segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
// when there is no update operations done on table, then no need to go ahead. So
// just check the update delta start timestamp and proceed if not empty
if (!segment.getUpdateDeltaStartTimestamp().isEmpty()
|| updatedSegments.contains(segment.getLoadName())) {
// take the list of files from this segment.
String segmentPath = CarbonTablePath.getSegmentPath(
table.getAbsoluteTableIdentifier().getTablePath(), segment.getLoadName());
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath);
CarbonFile[] allSegmentFiles = segDir.listFiles();
SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
for (SegmentUpdateDetails block : updateDetails) {
CarbonFile[] invalidDeleteDeltaFiles;
if (!block.getSegmentName().equalsIgnoreCase(segment.getLoadName())) {
continue;
}
invalidDeleteDeltaFiles = updateStatusManager.getDeleteDeltaInvalidFilesList(block,
false, allSegmentFiles, isInvalidFile);
for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
totalSizeDeleted += invalidFile.getSize();
filesToBeDeleted.add(invalidFile);
}
}
}
}
}
}

// delete the update table status files which are old.
if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) {

final String updateStatusTimestamp = validUpdateStatusFile
.substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1);

String tablePath = table.getAbsoluteTableIdentifier().getTablePath();
CarbonFile metaFolder = FileFactory.getCarbonFile(
CarbonTablePath.getMetadataPath(tablePath));

CarbonFile[] invalidUpdateStatusFiles = metaFolder.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
if (file.getName().startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)) {
// CHECK if this is valid or not.
// we only send invalid ones to delete.
return !file.getName().endsWith(updateStatusTimestamp);
}
return false;
}
});

for (CarbonFile invalidFile : invalidUpdateStatusFiles) {
totalSizeDeleted += invalidFile.getSize();
filesToBeDeleted.add(invalidFile);
}
}
if (!isDryRun) {
for (CarbonFile invalidFile : filesToBeDeleted) {
invalidFile.deleteFile();
}
}
return totalSizeDeleted;
}
}

@@ -855,6 +855,26 @@ private static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadat
}
}

/**
* This API will return the update status file name.
* @param segmentList
* @return
*/
public String getUpdateStatusFileName(LoadMetadataDetails[] segmentList) {
if (segmentList.length == 0) {
return "";
}
else {
for (LoadMetadataDetails eachSeg : segmentList) {
// file name stored in 0th segment.
if (eachSeg.getLoadName().equalsIgnoreCase("0")) {
return eachSeg.getUpdateStatusFileName();
}
}
}
return "";
}

public static class ValidAndInvalidSegmentsInfo {
private final List<Segment> listOfValidSegments;
private final List<Segment> listOfValidUpdatedSegments;
@@ -45,14 +45,16 @@ The above clean files command will clean Marked For Delete and Compacted segment
* In trash folder, the retention time is "carbon.trash.retention.days"
* Outside trash folder(Segment Directories in table path), the retention time is Max("carbon.trash.retention.days", "max.query.execution.time")
### FORCE OPTION
The force option with clean files command deletes all the files and folders from the trash folder and delete the Marked for Delete and Compacted segments immediately. Since Clean Files operation with force option will delete data that can never be recovered, the force option by default is disabled. Clean files with force option is only allowed when the carbon property ```carbon.clean.file.force.allowed``` is set to true. The default value of this property is false.
The force option with clean files command deletes all the files and folders from the trash folder and delete the Marked for Delete and Compacted segments immediately. This option will also delete all the stale delete delta files that are present in the segment folder or the partition folder after a successful horizontal compaction. Since Clean Files operation with force option will delete data that can never be recovered, the force option by default is disabled. Clean files with force option is only allowed when the carbon property ```carbon.clean.file.force.allowed``` is set to true. The default value of this property is false.



```
CLEAN FILES FOR TABLE TABLE_NAME options('force'='true')
```

**NOTE**:
* Since clean files with force option also deletes the stale delete delta files immediately, do not run this operation concurrently with other delete/update operation as it can lead to query failures.

### STALE_INPROGRESS OPTION
The stale_inprogress option deletes the stale Insert In Progress segments after the expiration of the property ```carbon.trash.retention.days```

@@ -101,4 +103,4 @@ clean files operation, the user can disable that option by using ```statistics =
```
CLEAN FILES FOR TABLE TABLE_NAME options('statistics'='false')
```


@@ -30,6 +30,7 @@ import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, DeleteLoadFolders, TrashUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -82,6 +83,13 @@ object DataTrashManager {
isDryRun = false, showStatistics)
// step 2: move stale segments which are not exists in metadata into .Trash
moveStaleSegmentsToTrash(carbonTable)
// clean all the stale delete delta files, which are generated as the part of
// horizontal compaction
val deltaFileSize = if (isForceDelete) {
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
} else {
0
}
// step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
// Since calculating the the size before and after clean files can be a costly operation
// have exposed an option where user can change this behaviour.
@@ -92,7 +100,7 @@ object DataTrashManager {
cleanStaleInProgress, partitionSpecs)
val sizeAfterCleaning = getPostOpSizeSnapshot(carbonTable, metadataDetails
.map(a => a.getLoadName).toSet)
(sizeBeforeCleaning - sizeAfterCleaning + trashFolderSizeStats._1).abs
(sizeBeforeCleaning - sizeAfterCleaning + trashFolderSizeStats._1 + deltaFileSize).abs
} else {
checkAndCleanExpiredSegments(carbonTable, isForceDelete,
cleanStaleInProgress, partitionSpecs)
@@ -158,11 +166,17 @@ object DataTrashManager {
// get size freed from the trash folder
val trashFolderSizeStats = checkAndCleanTrashFolder(carbonTable, isForceDelete,
isDryRun = true, showStats)
// get the size of stale delete delta files that will be deleted in case any
val deleteDeltaFileSize = if (isForceDelete) {
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
} else {
0
}
// get size that will be deleted (MFD, COmpacted, Inprogress segments)
val expiredSegmentsSizeStats = dryRunOnExpiredSegments(carbonTable, isForceDelete,
cleanStaleInProgress)
(trashFolderSizeStats._1 + expiredSegmentsSizeStats._1, trashFolderSizeStats._2 +
expiredSegmentsSizeStats._2)
(trashFolderSizeStats._1 + expiredSegmentsSizeStats._1 + deleteDeltaFileSize,
trashFolderSizeStats._2 + expiredSegmentsSizeStats._2)
}

private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean,
@@ -530,6 +530,40 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists cleantest")
}

test("Test clean files after horizontal compaction") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")

sql("drop table if exists cleantest")
sql(
"""
| CREATE TABLE cleantest (empname String, designation String, doj Timestamp,
| workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
| projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
| utilization int,salary int, empno int)
| STORED AS carbondata
""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE cleantest OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)

sql("delete from cleantest where deptno='10'").show()
sql("delete from cleantest where deptno='11'").show()
val table = CarbonEnv.getCarbonTable(None, "cleantest") (sqlContext.sparkSession)
val segment0Path = CarbonTablePath.getSegmentPath(table.getTablePath, "0")
val allSegmentFilesPreCleanFiles = FileFactory.getCarbonFile(segment0Path).listFiles()
.filter(a => a.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
assert(allSegmentFilesPreCleanFiles.length == 3)

sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").collect()
val allSegmentFilesPostCleanFiles = FileFactory.getCarbonFile(segment0Path).listFiles()
.filter(a => a.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
assert(allSegmentFilesPostCleanFiles.length == 1)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
sql("drop table if exists cleantest")
}

def editTableStatusFile(carbonTablePath: String) : Unit = {
// original table status file
@@ -26,6 +26,7 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -291,6 +292,73 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
sql("""DROP TABLE IF EXISTS CLEANTEST""")
}

test("Test clean files after horizontal compaction") {
sql("drop table if exists partition_hc")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
sql(
"create table partition_hc (c1 string,c2 int,c5 string) PARTITIONED BY(c3 string) " +
"STORED AS carbondata")
sql(
"insert into partition_hc values ('a',1,'aaa','aa'),('a',5,'aaa','aa'),('a',9,'aaa'," +
"'aa'),('a',4,'aaa','aa'),('a',2,'aaa','aa'),('a',3,'aaa'," +
"'aa')")
sql(
"insert into partition_hc values ('a',1,'aaa','bb'),('a',5,'aaa','bb'),('a',9,'aaa'," +
"'bb'),('a',4,'aaa','bb'),('a',2,'aaa','bb'),('a',3,'aaa'," +
"'bb')")

sql("delete from partition_hc where c2 = 1").show()
sql("delete from partition_hc where c2 = 5").show()

// verify if the horizontal compaction happened or not
val carbonTable = CarbonEnv.getCarbonTable(None, "partition_hc")(sqlContext
.sparkSession)
val partitionPath1 = carbonTable.getTablePath + "/c3=aa"
val partitionPath2 = carbonTable.getTablePath + "/c3=bb"
val deltaFilesPre1 = FileFactory.getCarbonFile(partitionPath1).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
}
})
val deltaFilesPre2 = FileFactory.getCarbonFile(partitionPath2).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
}
})
assert(deltaFilesPre1.size + deltaFilesPre2.size == 6)
val updateStatusFilesPre = FileFactory.getCarbonFile(CarbonTablePath.getMetadataPath(carbonTable
.getTablePath)).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)
}
})
assert(updateStatusFilesPre.size == 3)

sql(s"CLEAN FILES FOR TABLE partition_hc OPTIONS('force'='true')").collect()

val deltaFilesPost1 = FileFactory.getCarbonFile(partitionPath1).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
}
})
val deltaFilesPost2 = FileFactory.getCarbonFile(partitionPath2).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
}
})
assert(deltaFilesPost1.size + deltaFilesPost2.size == 2)
val updateStatusFilesPost = FileFactory.getCarbonFile(CarbonTablePath
.getMetadataPath(carbonTable.getTablePath)).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)
}
})
assert(updateStatusFilesPost.size == 1)

sql("drop table if exists partition_hc")
}

def editTableStatusFile(carbonTablePath: String) : Unit = {
// Original Table status file
val f1 = new File(CarbonTablePath.getTableStatusFilePath(carbonTablePath))

0 comments on commit a072e7a

Please sign in to comment.