Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,33 @@
* dataset provided which are not found in table metadata will be deleted, using the same {@link
* Table#location()} and {@link #olderThan(long)} filtering as above.
*
* <p>Streaming mode can be enabled via the {@value #STREAM_RESULTS} option to avoid loading all
* orphan file paths into driver memory. When enabled, the result will contain only a sample of file
* paths (up to {@value #MAX_ORPHAN_FILE_SAMPLE_SIZE_DEFAULT}). The total count of deleted files is
* logged but not included in the result.
*
* <p><em>Note:</em> It is dangerous to call this action with a short retention interval as it might
* corrupt the state of the table if another operation is writing at the same time.
*/
public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFilesSparkAction>
implements DeleteOrphanFiles {

public static final String STREAM_RESULTS = "stream-results";
public static final boolean STREAM_RESULTS_DEFAULT = false;

// Test-only option to configure the max sample size for streaming mode
@VisibleForTesting
static final String MAX_ORPHAN_FILE_SAMPLE_SIZE = "max-orphan-file-sample-size";

private static final int MAX_ORPHAN_FILE_SAMPLE_SIZE_DEFAULT = 20000;

private static final Logger LOG = LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
private static final Map<String, String> EQUAL_SCHEMES_DEFAULT = ImmutableMap.of("s3n,s3a", "s3");
private static final int MAX_DRIVER_LISTING_DEPTH = 3;
private static final int MAX_DRIVER_LISTING_DIRECT_SUB_DIRS = 10;
private static final int MAX_EXECUTOR_LISTING_DEPTH = 2000;
private static final int MAX_EXECUTOR_LISTING_DIRECT_SUB_DIRS = Integer.MAX_VALUE;
private static final int DELETE_GROUP_SIZE = 100000;

private final SerializableConfiguration hadoopConf;
private final int listingParallelism;
Expand Down Expand Up @@ -234,7 +249,71 @@ private String jobDesc() {
return String.format("Deleting orphan files (%s) from %s", optionsAsString, table.name());
}

private void deleteFiles(SupportsBulkOperations io, List<String> paths) {
private boolean streamResults() {
return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
}

private DeleteOrphanFiles.Result doExecute() {
Dataset<FileURI> actualFileIdentDS = actualFileIdentDS();
Dataset<FileURI> validFileIdentDS = validFileIdentDS();

Dataset<String> orphanFileDS =
findOrphanFiles(actualFileIdentDS, validFileIdentDS, prefixMismatchMode);
try {
return deleteFiles(orphanFileDS);
} finally {
orphanFileDS.unpersist();
}
}

/**
* Deletes orphan files from the cached dataset.
*
* @param orphanFileDS the cached dataset of orphan files
* @return result with orphan file paths
*/
private DeleteOrphanFiles.Result deleteFiles(Dataset<String> orphanFileDS) {
int maxSampleSize =
PropertyUtil.propertyAsInt(
options(), MAX_ORPHAN_FILE_SAMPLE_SIZE, MAX_ORPHAN_FILE_SAMPLE_SIZE_DEFAULT);
List<String> orphanFileList = Lists.newArrayListWithCapacity(maxSampleSize);
long filesCount = 0;

Iterator<String> orphanFiles =
streamResults() ? orphanFileDS.toLocalIterator() : orphanFileDS.collectAsList().iterator();

Iterator<List<String>> fileGroups = Iterators.partition(orphanFiles, DELETE_GROUP_SIZE);

while (fileGroups.hasNext()) {
List<String> fileGroup = fileGroups.next();

collectPathsForOutput(fileGroup, orphanFileList, maxSampleSize);

if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
deleteBulk((SupportsBulkOperations) table.io(), fileGroup);
} else {
deleteNonBulk(fileGroup);
}

filesCount += fileGroup.size();
}

LOG.info("Deleted {} orphan files", filesCount);

return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFileList).build();
}

private void collectPathsForOutput(
List<String> paths, List<String> orphanFileList, int maxSampleSize) {
if (streamResults()) {
int lengthToAdd = Math.min(maxSampleSize - orphanFileList.size(), paths.size());
orphanFileList.addAll(paths.subList(0, lengthToAdd));
} else {
orphanFileList.addAll(paths);
}
}

private void deleteBulk(SupportsBulkOperations io, List<String> paths) {
try {
io.deleteFiles(paths);
LOG.info("Deleted {} files using bulk deletes", paths.size());
Expand All @@ -245,36 +324,65 @@ private void deleteFiles(SupportsBulkOperations io, List<String> paths) {
}
}

private DeleteOrphanFiles.Result doExecute() {
Dataset<FileURI> actualFileIdentDS = actualFileIdentDS();
Dataset<FileURI> validFileIdentDS = validFileIdentDS();
private void deleteNonBulk(List<String> paths) {
Tasks.Builder<String> deleteTasks =
Tasks.foreach(paths)
.noRetry()
.executeWith(deleteExecutorService)
.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc));

if (deleteFunc == null) {
LOG.info(
"Table IO {} does not support bulk operations. Using non-bulk deletes.",
table.io().getClass().getName());
deleteTasks.run(table.io()::deleteFile);
} else {
LOG.info("Custom delete function provided. Using non-bulk deletes");
deleteTasks.run(deleteFunc::accept);
}
}

List<String> orphanFiles =
findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode);
@VisibleForTesting
static Dataset<String> findOrphanFiles(
Dataset<FileURI> actualFileIdentDS,
Dataset<FileURI> validFileIdentDS,
PrefixMismatchMode prefixMismatchMode) {

if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
deleteFiles((SupportsBulkOperations) table.io(), orphanFiles);
} else {
SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
actualFileIdentDS.sparkSession().sparkContext().register(conflicts);

Tasks.Builder<String> deleteTasks =
Tasks.foreach(orphanFiles)
.noRetry()
.executeWith(deleteExecutorService)
.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc));

if (deleteFunc == null) {
LOG.info(
"Table IO {} does not support bulk operations. Using non-bulk deletes.",
table.io().getClass().getName());
deleteTasks.run(table.io()::deleteFile);
} else {
LOG.info("Custom delete function provided. Using non-bulk deletes");
deleteTasks.run(deleteFunc::accept);
Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path"));

Dataset<String> orphanFileDS =
actualFileIdentDS
.joinWith(validFileIdentDS, joinCond, "leftouter")
.mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING());

// Cache and force computation to populate conflicts accumulator
orphanFileDS = orphanFileDS.cache();

try {
orphanFileDS.count();

if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
throw new ValidationException(
"Unable to determine whether certain files are orphan. Metadata references files that"
+ " match listed/provided files except for authority/scheme. Please, inspect the"
+ " conflicting authorities/schemes and provide which of them are equal by further"
+ " configuring the action via equalSchemes() and equalAuthorities() methods. Set the"
+ " prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting"
+ " authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that"
+ " remaining conflicting authorities/schemes are different. It will be impossible to"
+ " recover deleted files. Conflicting authorities/schemes: %s.",
conflicts.value());
}
}

return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build();
return orphanFileDS;
} catch (Exception e) {
orphanFileDS.unpersist();
throw e;
}
}

private Dataset<FileURI> validFileIdentDS() {
Expand Down Expand Up @@ -355,40 +463,6 @@ private Dataset<String> listedFileDS() {
}
}

@VisibleForTesting
static List<String> findOrphanFiles(
SparkSession spark,
Dataset<FileURI> actualFileIdentDS,
Dataset<FileURI> validFileIdentDS,
PrefixMismatchMode prefixMismatchMode) {

SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
spark.sparkContext().register(conflicts);

Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path"));

List<String> orphanFiles =
actualFileIdentDS
.joinWith(validFileIdentDS, joinCond, "leftouter")
.mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
.collectAsList();

if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
throw new ValidationException(
"Unable to determine whether certain files are orphan. "
+ "Metadata references files that match listed/provided files except for authority/scheme. "
+ "Please, inspect the conflicting authorities/schemes and provide which of them are equal "
+ "by further configuring the action via equalSchemes() and equalAuthorities() methods. "
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting "
+ "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting "
+ "authorities/schemes are different. It will be impossible to recover deleted files. "
+ "Conflicting authorities/schemes: %s.",
conflicts.value());
}

return orphanFiles;
}

private static Map<String, String> flattenMap(Map<String, String> map) {
Map<String, String> flattenedMap = Maps.newHashMap();
if (map != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
// List files with prefix operations. Default is false.
private static final ProcedureParameter PREFIX_LISTING_PARAM =
optionalInParameter("prefix_listing", DataTypes.BooleanType);
// Stream results to avoid loading all orphan files in driver memory. Default is false.
private static final ProcedureParameter STREAM_RESULTS_PARAM =
optionalInParameter("stream_results", DataTypes.BooleanType);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
Expand All @@ -83,7 +86,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
EQUAL_SCHEMES_PARAM,
EQUAL_AUTHORITIES_PARAM,
PREFIX_MISMATCH_MODE_PARAM,
PREFIX_LISTING_PARAM
PREFIX_LISTING_PARAM,
STREAM_RESULTS_PARAM
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -138,6 +142,7 @@ public InternalRow[] call(InternalRow args) {
PrefixMismatchMode prefixMismatchMode = asPrefixMismatchMode(input, PREFIX_MISMATCH_MODE_PARAM);

boolean prefixListing = input.asBoolean(PREFIX_LISTING_PARAM, false);
boolean streamResults = input.asBoolean(STREAM_RESULTS_PARAM, false);

return withIcebergTable(
tableIdent,
Expand All @@ -163,7 +168,7 @@ public InternalRow[] call(InternalRow args) {
if (maxConcurrentDeletes != null) {
if (table.io() instanceof SupportsBulkOperations) {
LOG.warn(
"max_concurrent_deletes only works with FileIOs that do not support bulk deletes. This"
"max_concurrent_deletes only works with FileIOs that do not support bulk deletes. This "
+ "table is currently using {} which supports bulk deletes so the parameter will be ignored. "
+ "See that IO's documentation to learn how to adjust parallelism for that particular "
+ "IO's bulk delete.",
Expand All @@ -187,6 +192,10 @@ public InternalRow[] call(InternalRow args) {

action.usePrefixListing(prefixListing);

if (streamResults) {
action.option("stream-results", "true");
}

DeleteOrphanFiles.Result result = action.execute();

return toOutputRows(result);
Expand Down
Loading