Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.apache.iceberg.actions;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -33,7 +33,6 @@
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Tasks;
Expand All @@ -45,10 +44,13 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -70,14 +72,26 @@
public class RemoveOrphanFilesAction extends BaseAction<List<String>> {

private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
private static final UserDefinedFunction filename = functions.udf((String path) -> {
int lastIndex = path.lastIndexOf(File.separator);
if (lastIndex == -1) {
return path;
} else {
return path.substring(lastIndex + 1);
}
}, DataTypes.StringType);
private static final String URI_DETAIL = "URI_DETAIL";
private static final String FILE_NAME = "file_name";
private static final String FILE_PATH = "file_path";
private static final String FILE_PATH_ONLY = "file_path_only";
private static final StructType FILE_DETAIL_STRUCT = new StructType(new StructField[] {
DataTypes.createStructField(FILE_NAME, DataTypes.StringType, false),
DataTypes.createStructField(FILE_PATH_ONLY, DataTypes.StringType, false)
});

/**
* Transform a file path to
* {@code Dataset<Row<file_name, file_path_no_scheme_authority>>}
*/
private static final UserDefinedFunction addFileDetailsUDF = functions.udf((String fileLocation) -> {
Path fullyQualifiedPath = new Path(fileLocation);
String fileName = fullyQualifiedPath.getName();
String filePathOnly = fullyQualifiedPath.toUri().getPath();
// file_name, only file path
return RowFactory.create(fileName, filePathOnly);
}, FILE_DETAIL_STRUCT);

private final SparkSession spark;
private final JavaSparkContext sparkContext;
Expand Down Expand Up @@ -147,16 +161,10 @@ public RemoveOrphanFilesAction deleteWith(Consumer<String> newDeleteFunc) {
public List<String> execute() {
Dataset<Row> validDataFileDF = buildValidDataFileDF(spark);
Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(spark, table, ops);
Dataset<Row> validFileDF = validDataFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = buildActualFileDF();

Column nameEqual = filename.apply(actualFileDF.col("file_path"))
.equalTo(filename.apply(validFileDF.col("file_path")));
Column actualContains = actualFileDF.col("file_path").contains(validFileDF.col("file_path"));
Column joinCond = nameEqual.and(actualContains);
List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti")
.as(Encoders.STRING())
.collectAsList();
Dataset<Row> validFileDF = addFilePathOnlyColumn(validDataFileDF.union(validMetadataFileDF));
Dataset<Row> actualFileDF = addFilePathOnlyColumn(buildActualFileDF());

List<String> orphanFiles = findOrphanFiles(validFileDF, actualFileDF);

Tasks.foreach(orphanFiles)
.noRetry()
Expand Down Expand Up @@ -226,7 +234,7 @@ private static void listDirRecursively(
listDirRecursively(subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, matchingFiles);
}
} catch (IOException e) {
throw new RuntimeIOException(e);
throw new UncheckedIOException(e);
}
}

Expand Down Expand Up @@ -254,4 +262,44 @@ private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
return files.iterator();
};
}

protected static List<String> findOrphanFiles(Dataset<Row> validFileDF, Dataset<Row> actualFileDF) {
Column nameEqual = actualFileDF.col(FILE_NAME)
.equalTo(validFileDF.col(FILE_NAME));

Column pathContains = actualFileDF.col(FILE_PATH_ONLY)
.contains(validFileDF.col(FILE_PATH_ONLY));

Column joinCond = nameEqual.and(pathContains);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't PathContains always true if nameEqual? I feel like you just need nameEqual Here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer nameEquals is only for fileName and does not include path. Also this condition was already there so didnt want to change it.

return actualFileDF.join(validFileDF, joinCond, "leftanti").select(FILE_PATH)
.as(Encoders.STRING())
.collectAsList();
}

/**
* From
* <pre>{@code
* Dataset<Row<file_path_with_scheme_authority>>
* will be transformed to
* Dataset<Row<file_name, file_path_no_scheme_authority, file_path_with_scheme_authority>>
* }</pre>
*
* This is required to compare the valid and all files to find the orphan files.
* Based on the result data set, only path will be compared while comparing valid and all files path.
* As in the case of hadoop, s3, there could be different authority names to access same path, which can give us files
* which are part of metadata and not orphan.
*
* @param filePathWithSchemeAndAuthority : complete file path, can include scheme, authority and path.
* @return : {@code file_name, file_path_no_scheme_authority, file_path}
*/
protected static Dataset<Row> addFilePathOnlyColumn(Dataset<Row> filePathWithSchemeAndAuthority) {
String selectExprFormat = "%s.%s as %s";
return filePathWithSchemeAndAuthority.withColumn(URI_DETAIL,
addFileDetailsUDF.apply(
filePathWithSchemeAndAuthority.apply(FILE_PATH)
)).selectExpr(
String.format(selectExprFormat, URI_DETAIL, FILE_NAME, FILE_NAME), // file name
String.format(selectExprFormat, URI_DETAIL, FILE_PATH_ONLY, FILE_PATH_ONLY), // file path only
String.format(FILE_PATH)); // fully qualified path
}
}
Loading