Skip to content
Permalink
Browse files
[CARBONDATA-4154] Fix various concurrent issues with clean files
Why is this PR needed?
There are 2 issues in clean files operation when ran concurrently with multiple load operations:

Dry run can show negative space freed for clean files with concurrent load.
Accidental deletion of Insert in progress(ongoing load) during clean files operation.
What changes were proposed in this PR?
To solve the dry run negative result, saving the old metadatadetails before the clean files operation and comparing it with loadmetadetails after the clean files operation and just ignoring any new entry that has been added, basically doing an intersection of new and old metadatadetails to show the correct space freed.
In case of load failure issue, there can be scenarios where load in going on(insert in progress state and segment lock is occupied) and as during clean files operation when the final table status lock is removed, there can be scenarios where the load has completed and the segment lock is released but in the clean files in the final list of loadmetadatadetails to be deleted, that load can still be in Insert In Progress state with segment lock released by the load. The clean files operation will delete such loads. To solve this issue, instead of sending a boolean which check if update is required or not in the tablestatus, can send a list of load numbers and will only delete those loadnumbers.

Does this PR introduce any user interface change?
No

Is any new testcase added?
No

This closes #4109
  • Loading branch information
vikramahuja1001 authored and ajantha-bhat committed Mar 26, 2021
1 parent 865ec9b commit d535a1e22f1800afab9a148f42a9124efc6df192
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 60 deletions.
@@ -27,11 +27,7 @@
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.TimeUnit;

import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -1039,21 +1035,21 @@ private static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(

private static class ReturnTuple {
LoadMetadataDetails[] details;
boolean isUpdateRequired;
ReturnTuple(LoadMetadataDetails[] details, boolean isUpdateRequired) {
Set<String> loadsToDelete;
ReturnTuple(LoadMetadataDetails[] details, Set<String> loadsToDelete) {
this.details = details;
this.isUpdateRequired = isUpdateRequired;
this.loadsToDelete = loadsToDelete;
}
}

private static ReturnTuple isUpdateRequired(boolean isForceDeletion, CarbonTable carbonTable,
AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[] details,
boolean cleanStaleInProgress) {
// Delete marked loads
boolean isUpdateRequired = DeleteLoadFolders
Set<String> loadsToDelete = DeleteLoadFolders
.deleteLoadFoldersFromFileSystem(absoluteTableIdentifier, isForceDeletion, details,
carbonTable.getMetadataPath(), cleanStaleInProgress);
return new ReturnTuple(details, isUpdateRequired);
return new ReturnTuple(details, loadsToDelete);
}

public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, boolean isForceDeletion,
@@ -1066,11 +1062,12 @@ public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, boolean
if (isLoadDeletionRequired(metadataDetails)) {
AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
boolean updateCompletionStatus = false;
Set<String> loadsToDelete = new HashSet<>();
LoadMetadataDetails[] newAddedLoadHistoryList = null;
ReturnTuple tuple =
isUpdateRequired(isForceDeletion, carbonTable, identifier, metadataDetails,
cleanStaleInprogress);
if (tuple.isUpdateRequired) {
if (!tuple.loadsToDelete.isEmpty()) {
ICarbonLock carbonTableStatusLock =
CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK);
boolean locked = false;
@@ -1091,7 +1088,7 @@ public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, boolean
ReturnTuple tuple2 =
isUpdateRequired(isForceDeletion, carbonTable,
identifier, details, cleanStaleInprogress);
if (!tuple2.isUpdateRequired) {
if (tuple2.loadsToDelete.isEmpty()) {
return;
}
// read latest table status again.
@@ -1130,6 +1127,7 @@ public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, boolean
latestStatus.toArray(new LoadMetadataDetails[0]));
}
updateCompletionStatus = true;
loadsToDelete = tuple2.loadsToDelete;
} else {
String dbName = identifier.getCarbonTableIdentifier().getDatabaseName();
String tableName = identifier.getCarbonTableIdentifier().getTableName();
@@ -1147,7 +1145,7 @@ public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, boolean
if (updateCompletionStatus) {
DeleteLoadFolders
.physicalFactAndMeasureMetadataDeletion(carbonTable, newAddedLoadHistoryList,
isForceDeletion, partitionSpecs, cleanStaleInprogress);
isForceDeletion, partitionSpecs, cleanStaleInprogress, loadsToDelete);
}
}
}
@@ -19,7 +19,9 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -69,22 +71,25 @@ public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTabl
LoadMetadataDetails[] newAddedLoadHistoryList,
boolean isForceDelete,
List<PartitionSpec> specs,
boolean cleanStaleInProgress) {
boolean cleanStaleInProgress,
Set<String> loadsToDelete) {
LoadMetadataDetails[] currentDetails =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
physicalFactAndMeasureMetadataDeletion(carbonTable,
currentDetails,
isForceDelete,
specs,
currentDetails,
cleanStaleInProgress);
cleanStaleInProgress,
loadsToDelete);
if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) {
physicalFactAndMeasureMetadataDeletion(carbonTable,
newAddedLoadHistoryList,
isForceDelete,
specs,
currentDetails,
cleanStaleInProgress);
cleanStaleInProgress,
loadsToDelete);
}
}

@@ -98,7 +103,8 @@ public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTabl
*/
private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
LoadMetadataDetails[] loadDetails, boolean isForceDelete, List<PartitionSpec> specs,
LoadMetadataDetails[] currLoadDetails, boolean cleanStaleInProgress) {
LoadMetadataDetails[] currLoadDetails, boolean cleanStaleInProgress,
Set<String> loadsToDelete) {
List<TableIndex> indexes = new ArrayList<>();
try {
for (TableIndex index : IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)) {
@@ -115,7 +121,7 @@ private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTab
SegmentUpdateStatusManager updateStatusManager =
new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
for (final LoadMetadataDetails oneLoad : loadDetails) {
if (canDeleteThisLoad(oneLoad, isForceDelete, cleanStaleInProgress)) {
if (loadsToDelete.contains(oneLoad.getLoadName())) {
try {
if (oneLoad.getSegmentFile() != null) {
String tablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
@@ -180,15 +186,18 @@ public boolean accept(CarbonFile file) {
}

private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
boolean isForceDelete, boolean cleanStaleInProgress) {
boolean isForceDelete, boolean cleanStaleInProgress, AbsoluteTableIdentifier
absoluteTableIdentifier) {
if (oneLoad.getVisibility().equalsIgnoreCase("true")) {
return canDeleteThisLoad(oneLoad, isForceDelete, cleanStaleInProgress);
return canDeleteThisLoad(oneLoad, isForceDelete, cleanStaleInProgress,
absoluteTableIdentifier);
}
return false;
}

public static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad,
boolean isForceDelete, boolean cleanStaleInProgress) {
public static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad, boolean
isForceDelete, boolean cleanStaleInProgress, AbsoluteTableIdentifier
absoluteTableIdentifier) {
/*
* if cleanStaleInProgress == false and isForceDelete == false, clean MFD and Compacted
* segments will depend on query timeout(1 hr) and trashRetentionTimeout(7 days, default).
@@ -213,7 +222,8 @@ public static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad,
return canDelete;
case INSERT_IN_PROGRESS:
case INSERT_OVERWRITE_IN_PROGRESS:
return canDelete && cleanStaleInProgress;
return canDelete && cleanStaleInProgress && canSegmentLockBeAcquired(oneLoad,
absoluteTableIdentifier);
default:
return false;
}
@@ -230,45 +240,46 @@ private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentI
return null;
}

public static boolean deleteLoadFoldersFromFileSystem(
public static Set<String> deleteLoadFoldersFromFileSystem(
AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete, LoadMetadataDetails[]
details, String metadataPath, boolean cleanStaleInProgress) {
boolean isDeleted = false;
Set<String> loadsToDelete = new HashSet<>();
if (details != null && details.length != 0) {
for (LoadMetadataDetails oneLoad : details) {
if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete, cleanStaleInProgress)) {
ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
try {
if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
|| oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
if (segmentLock.lockWithRetries(1, 5)) {
LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName());
LoadMetadataDetails currentDetails =
getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath);
if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails,
isForceDelete, cleanStaleInProgress)) {
oneLoad.setVisibility("false");
isDeleted = true;
LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
}
} else {
LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName());
return isDeleted;
}
} else {
if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete, cleanStaleInProgress,
absoluteTableIdentifier)) {
if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
|| oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
LoadMetadataDetails currentDetails =
getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath);
if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails,
isForceDelete, cleanStaleInProgress, absoluteTableIdentifier)) {
oneLoad.setVisibility("false");
isDeleted = true;
LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
loadsToDelete.add(oneLoad.getLoadName());
LOGGER.info("Deleted the load " + oneLoad.getLoadName());
}
} finally {
segmentLock.unlock();
LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released");
} else {
oneLoad.setVisibility("false");
loadsToDelete.add(oneLoad.getLoadName());
LOGGER.info("Deleted the load " + oneLoad.getLoadName());
}
}
}
}
return isDeleted;
return loadsToDelete;
}

private static boolean canSegmentLockBeAcquired(LoadMetadataDetails oneLoad,
AbsoluteTableIdentifier absoluteTableIdentifier) {
ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
if (segmentLock.lockWithRetries()) {
LOGGER.info("Segment Lock on segment: " + oneLoad.getLoadName() + "can be acquired.");
return segmentLock.unlock();
} else {
LOGGER.info("Segment Lock on segment: " + oneLoad.getLoadName() + "can not be" +
" acquired. Load going on for that load");
}
return false;
}
}
@@ -86,11 +86,13 @@ object DataTrashManager {
// Since calculating the the size before and after clean files can be a costly operation
// have exposed an option where user can change this behaviour.
if (showStatistics) {
val sizeBeforeCleaning = getSizeSnapshot(carbonTable)
val metadataDetails = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
val sizeBeforeCleaning = getPreOpSizeSnapshot(carbonTable, metadataDetails)
checkAndCleanExpiredSegments(carbonTable, isForceDelete,
cleanStaleInProgress, partitionSpecs)
val sizeAfterCleaning = getSizeSnapshot(carbonTable)
sizeBeforeCleaning - sizeAfterCleaning + trashFolderSizeStats._1
val sizeAfterCleaning = getPostOpSizeSnapshot(carbonTable, metadataDetails
.map(a => a.getLoadName).toSet)
(sizeBeforeCleaning - sizeAfterCleaning + trashFolderSizeStats._1).abs
} else {
checkAndCleanExpiredSegments(carbonTable, isForceDelete,
cleanStaleInProgress, partitionSpecs)
@@ -107,11 +109,11 @@ object DataTrashManager {
}

/**
* Checks the size of the segment files as well as datafiles, this method is used before and after
* clean files operation to check how much space is actually freed, during the operation.
* Checks the size of the segment files as well as datafiles and index files, this method
* is used before clean files operation.
*/
def getSizeSnapshot(carbonTable: CarbonTable): Long = {
val metadataDetails = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
def getPreOpSizeSnapshot(carbonTable: CarbonTable, metadataDetails:
Array[LoadMetadataDetails]): Long = {
var size: Long = 0
val segmentFileLocation = CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath)
if (FileFactory.isFileExist(segmentFileLocation)) {
@@ -125,6 +127,26 @@ object DataTrashManager {
size
}

/**
* Checks the size of the segment files as well as datafiles, this method is used after
* clean files operation.
*/
def getPostOpSizeSnapshot(carbonTable: CarbonTable, metadataDetails: Set[String]): Long = {
val finalMetadataDetails = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
var size: Long = 0
val segmentFileLocation = CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath)
if (FileFactory.isFileExist(segmentFileLocation)) {
size += FileFactory.getDirectorySize(segmentFileLocation)
}
finalMetadataDetails.foreach(oneLoad =>
if (metadataDetails.contains(oneLoad.getLoadName) && oneLoad.getVisibility.toBoolean) {
size += calculateSegmentSizeForOneLoad(carbonTable, oneLoad, finalMetadataDetails)
}
)
size
}


/**
* Method to handle the Clean files dry run operation
*/
@@ -198,7 +220,8 @@ object DataTrashManager {
if (!oneLoad.getVisibility.equalsIgnoreCase("false")) {
val segmentFilePath = CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath,
oneLoad.getSegmentFile)
if (DeleteLoadFolders.canDeleteThisLoad(oneLoad, isForceDelete, cleanStaleInProgress)) {
if (DeleteLoadFolders.canDeleteThisLoad(oneLoad, isForceDelete, cleanStaleInProgress,
carbonTable.getAbsoluteTableIdentifier)) {
// No need to consider physical data for external segments, only consider metadata.
if (oneLoad.getPath() == null || oneLoad.getPath().equalsIgnoreCase("NA")) {
sizeFreed += calculateSegmentSizeForOneLoad(carbonTable, oneLoad, loadMetadataDetails)

0 comments on commit d535a1e

Please sign in to comment.