Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void testRemoveOrphanFilesGCDisabled() {
sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'false')", tableName, GC_ENABLED);

AssertHelpers.assertThrows("Should reject call",
ValidationException.class, "Cannot remove orphan files: GC is disabled",
ValidationException.class, "Cannot delete orphan files: GC is disabled",
() -> sql("CALL %s.system.remove_orphan_files('%s')", catalogName, tableIdent));

// reset the property to enable the table purging in removeTable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,20 @@ public class BaseDeleteOrphanFilesSparkAction
}
}, DataTypes.StringType);

private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see very little value in this constant, to be honest.


private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
private final Table table;

private String location = null;
private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
private Consumer<String> deleteFunc = new Consumer<String>() {
private final Consumer<String> defaultDelete = new Consumer<String>() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most other places, we have a separate defaultDelete var.

@Override
public void accept(String file) {
table.io().deleteFile(file);
}
};

private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE;
private String location = null;
private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = null;

public BaseDeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
super(spark);
Expand All @@ -114,7 +112,7 @@ public BaseDeleteOrphanFilesSparkAction(SparkSession spark, Table table) {

ValidationException.check(
PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
"Cannot remove orphan files: GC is disabled (deleting files may corrupt other tables)");
"Cannot delete orphan files: GC is disabled (deleting files may corrupt other tables)");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the action a while ago but missed some places.

}

@Override
Expand Down Expand Up @@ -148,7 +146,7 @@ public BaseDeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFun

@Override
public DeleteOrphanFiles.Result execute() {
JobGroupInfo info = newJobGroupInfo("REMOVE-ORPHAN-FILES", jobDesc());
JobGroupInfo info = newJobGroupInfo("DELETE-ORPHAN-FILES", jobDesc());
return withJobGroupInfo(info, this::doExecute);
}

Expand All @@ -158,7 +156,7 @@ private String jobDesc() {
if (location != null) {
options.add("location=" + location);
}
return String.format("Removing orphan files (%s) from %s", Joiner.on(',').join(options), table.name());
return String.format("Deleting orphan files (%s) from %s", Joiner.on(',').join(options), table.name());
}

private DeleteOrphanFiles.Result doExecute() {
Expand All @@ -167,10 +165,10 @@ private DeleteOrphanFiles.Result doExecute() {
Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = buildActualFileDF();

Column actualFileName = filenameUDF.apply(actualFileDF.col("file_path"));
Column validFileName = filenameUDF.apply(validFileDF.col("file_path"));
Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
Column nameEqual = actualFileName.equalTo(validFileName);
Column actualContains = actualFileDF.col("file_path").contains(validFileDF.col("file_path"));
Column actualContains = actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
Column joinCond = nameEqual.and(actualContains);
List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti")
.as(Encoders.STRING())
Expand Down Expand Up @@ -198,7 +196,7 @@ private Dataset<Row> buildActualFileDF() {
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);

if (subDirs.isEmpty()) {
return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING()).toDF("file_path");
return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING()).toDF(FILE_PATH);
}

int parallelism = Math.min(subDirs.size(), partitionDiscoveryParallelism);
Expand All @@ -208,7 +206,7 @@ private Dataset<Row> buildActualFileDF() {
JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(listDirsRecursively(conf, olderThanTimestamp));

JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()).toDF("file_path");
return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()).toDF(FILE_PATH);
}

private static void listDirRecursively(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
package org.apache.iceberg.spark.actions;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
Expand All @@ -35,15 +33,12 @@
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -57,37 +52,27 @@
@SuppressWarnings("UnnecessaryAnonymousClass")
public class BaseDeleteReachableFilesSparkAction
extends BaseSparkAction<DeleteReachableFiles, DeleteReachableFiles.Result> implements DeleteReachableFiles {
private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteReachableFilesSparkAction.class);

private static final String CONTENT_FILE = "Content File";
private static final String MANIFEST = "Manifest";
private static final String MANIFEST_LIST = "Manifest List";
private static final String OTHERS = "Others";

private static final String STREAM_RESULTS = "stream-results";
public static final String STREAM_RESULTS = "stream-results";
public static final boolean STREAM_RESULTS_DEFAULT = false;

// Creates an executor service that runs each task in the thread that invokes execute/submit.
private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;

private final TableMetadata tableMetadata;
private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteReachableFilesSparkAction.class);

private final String metadataFileLocation;
private final Consumer<String> defaultDelete = new Consumer<String>() {
@Override
public void accept(String file) {
io.deleteFile(file);
}
};

private Consumer<String> removeFunc = defaultDelete;
private ExecutorService removeExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE;
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = null;
private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf());

public BaseDeleteReachableFilesSparkAction(SparkSession spark, String metadataLocation) {
public BaseDeleteReachableFilesSparkAction(SparkSession spark, String metadataFileLocation) {
super(spark);
this.tableMetadata = TableMetadataParser.read(io, metadataLocation);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not try to read the metadata using the default FileIO in the constructor. I moved this check to execution once we have the actual FileIO the user has configured.

ValidationException.check(
PropertyUtil.propertyAsBoolean(tableMetadata.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
"Cannot remove files: GC is disabled (deleting files may corrupt other tables)");
this.metadataFileLocation = metadataFileLocation;
}

@Override
Expand All @@ -102,54 +87,48 @@ public DeleteReachableFiles io(FileIO fileIO) {
}

@Override
public DeleteReachableFiles deleteWith(Consumer<String> deleteFunc) {
this.removeFunc = deleteFunc;
public DeleteReachableFiles deleteWith(Consumer<String> newDeleteFunc) {
this.deleteFunc = newDeleteFunc;
return this;

}

@Override
public DeleteReachableFiles executeDeleteWith(ExecutorService executorService) {
this.removeExecutorService = executorService;
this.deleteExecutorService = executorService;
return this;
}

@Override
public Result execute() {
Preconditions.checkArgument(io != null, "File IO cannot be null");
String msg = String.format("Removing files reachable from %s", tableMetadata.metadataFileLocation());
JobGroupInfo info = newJobGroupInfo("REMOVE-FILES", msg);
String jobDesc = String.format("Deleting files reachable from %s", metadataFileLocation);
JobGroupInfo info = newJobGroupInfo("DELETE-REACHABLE-FILES", jobDesc);
return withJobGroupInfo(info, this::doExecute);
}

private Result doExecute() {
boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, false);
Dataset<Row> validFileDF = buildValidFileDF(tableMetadata).distinct();
TableMetadata metadata = TableMetadataParser.read(io, metadataFileLocation);

ValidationException.check(
PropertyUtil.propertyAsBoolean(metadata.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
"Cannot delete files: GC is disabled (deleting files may corrupt other tables)");

Dataset<Row> reachableFileDF = buildReachableFileDF(metadata).distinct();

boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
if (streamResults) {
return deleteFiles(validFileDF.toLocalIterator());
return deleteFiles(reachableFileDF.toLocalIterator());
} else {
return deleteFiles(validFileDF.collectAsList().iterator());
return deleteFiles(reachableFileDF.collectAsList().iterator());
}
}

private Dataset<Row> projectFilePathWithType(Dataset<Row> ds, String type) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are multiple actions that define the exact same method.

return ds.select(functions.col("file_path"), functions.lit(type).as("file_type"));
}

private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
private Dataset<Row> buildReachableFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, io);
return projectFilePathWithType(buildValidContentFileDF(staticTable), CONTENT_FILE)
.union(projectFilePathWithType(buildManifestFileDF(staticTable), MANIFEST))
.union(projectFilePathWithType(buildManifestListDF(staticTable), MANIFEST_LIST))
.union(projectFilePathWithType(buildOtherMetadataFileDF(staticTable), OTHERS));
}

@Override
protected Dataset<Row> buildOtherMetadataFileDF(Table table) {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to overload a method in the base class. I defined buildAllReachableOtherMetadataFileDF instead.

List<String> otherMetadataFiles = Lists.newArrayList();
otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, true));
otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table));
return spark().createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path");
return withFileType(buildValidContentFileDF(staticTable), CONTENT_FILE)
.union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
.union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST))
.union(withFileType(buildAllReachableOtherMetadataFileDF(staticTable), OTHERS));
}

/**
Expand All @@ -166,7 +145,7 @@ private BaseDeleteReachableFilesActionResult deleteFiles(Iterator<Row> deleted)

Tasks.foreach(deleted)
.retry(3).stopRetryOn(NotFoundException.class).suppressFailureWhenFinished()
.executeWith(removeExecutorService)
.executeWith(deleteExecutorService)
.onFailure((fileInfo, exc) -> {
String file = fileInfo.getString(0);
String type = fileInfo.getString(1);
Expand All @@ -175,7 +154,7 @@ private BaseDeleteReachableFilesActionResult deleteFiles(Iterator<Row> deleted)
.run(fileInfo -> {
String file = fileInfo.getString(0);
String type = fileInfo.getString(1);
removeFunc.accept(file);
deleteFunc.accept(file);
switch (type) {
case CONTENT_FILE:
dataFileCount.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -68,16 +66,11 @@
@SuppressWarnings("UnnecessaryAnonymousClass")
public class BaseExpireSnapshotsSparkAction
extends BaseSparkAction<ExpireSnapshots, ExpireSnapshots.Result> implements ExpireSnapshots {
private static final Logger LOG = LoggerFactory.getLogger(BaseExpireSnapshotsSparkAction.class);

public static final String STREAM_RESULTS = "stream-results";
public static final boolean STREAM_RESULTS_DEFAULT = false;

private static final String CONTENT_FILE = "Content File";
private static final String MANIFEST = "Manifest";
private static final String MANIFEST_LIST = "Manifest List";

// Creates an executor service that runs each task in the thread that invokes execute/submit.
private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;
private static final Logger LOG = LoggerFactory.getLogger(BaseExpireSnapshotsSparkAction.class);

private final Table table;
private final TableOperations ops;
Expand All @@ -92,7 +85,7 @@ public void accept(String file) {
private Long expireOlderThanValue = null;
private Integer retainLastValue = null;
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE;
private ExecutorService deleteExecutorService = null;
private Dataset<Row> expiredFiles = null;

public BaseExpireSnapshotsSparkAction(SparkSession spark, Table table) {
Expand Down Expand Up @@ -212,23 +205,19 @@ private String jobDesc() {
}

private ExpireSnapshots.Result doExecute() {
boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, false);
boolean streamResults = PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, STREAM_RESULTS_DEFAULT);
if (streamResults) {
return deleteFiles(expire().toLocalIterator());
} else {
return deleteFiles(expire().collectAsList().iterator());
}
}

private Dataset<Row> appendTypeString(Dataset<Row> ds, String type) {
return ds.select(new Column("file_path"), functions.lit(type).as("file_type"));
}

private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, this.table.io());
return appendTypeString(buildValidContentFileDF(staticTable), CONTENT_FILE)
.union(appendTypeString(buildManifestFileDF(staticTable), MANIFEST))
.union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST));
Table staticTable = newStaticTable(metadata, table.io());
return withFileType(buildValidContentFileDF(staticTable), CONTENT_FILE)
.union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
.union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST));
}

/**
Expand Down
Loading