diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java index bbe2847a9216..0ee17d264451 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java @@ -113,6 +113,8 @@ Result doExecute() { * Dangling delete files can be identified with following steps * *
    + *
  1. Load the ENTRIES metadata table once and cache all live entries to avoid scanning + * manifests twice. *
  2. Group data files by partition keys and find the minimum data sequence number in each * group. *
  3. Left outer join delete files with partition-grouped data files on partition keys. @@ -123,56 +125,64 @@ Result doExecute() { *
*/ private List findDanglingDeletes() { - Dataset minSequenceNumberByPartition = - loadMetadataTable(table, MetadataTableType.ENTRIES) - // find live data files - .filter("data_file.content == 0 AND status < 2") - .selectExpr( - "data_file.partition as partition", - "data_file.spec_id as spec_id", - "sequence_number") - .groupBy("partition", "spec_id") - .agg(min("sequence_number")) - .toDF("grouped_partition", "grouped_spec_id", "min_data_sequence_number"); - - Dataset deleteEntries = - loadMetadataTable(table, MetadataTableType.ENTRIES) - // find live delete files - .filter("data_file.content != 0 AND status < 2"); - - Column joinOnPartition = - deleteEntries - .col("data_file.spec_id") - .equalTo(minSequenceNumberByPartition.col("grouped_spec_id")) - .and( - deleteEntries - .col("data_file.partition") - .equalTo(minSequenceNumberByPartition.col("grouped_partition"))); - - Column filterOnDanglingDeletes = - col("min_data_sequence_number") - // delete files without any data files in partition - .isNull() - // position delete files without any applicable data files in partition - .or( - col("data_file.content") - .equalTo("1") - .and(col("sequence_number").$less(col("min_data_sequence_number")))) - // equality delete files without any applicable data files in the partition - .or( - col("data_file.content") - .equalTo("2") - .and(col("sequence_number").$less$eq(col("min_data_sequence_number")))); - - Dataset danglingDeletes = - deleteEntries - .join(minSequenceNumberByPartition, joinOnPartition, "left") - .filter(filterOnDanglingDeletes) - .select("data_file.*"); - return danglingDeletes.collectAsList().stream() - // map on driver because SparkDeleteFile is not serializable - .map(row -> deleteFileWrapper(danglingDeletes.schema(), row)) - .collect(Collectors.toList()); + // Load the ENTRIES metadata table once and cache to avoid reading all manifests twice + Dataset liveEntries = + loadMetadataTable(table, MetadataTableType.ENTRIES).filter("status < 2").cache(); + + try { + Dataset minSequenceNumberByPartition = + liveEntries + // find live data files + .filter("data_file.content == 0") + .selectExpr( + "data_file.partition as partition", + "data_file.spec_id as spec_id", + "sequence_number") + .groupBy("partition", "spec_id") + .agg(min("sequence_number")) + .toDF("grouped_partition", "grouped_spec_id", "min_data_sequence_number"); + + Dataset deleteEntries = + liveEntries + // find live delete files + .filter("data_file.content != 0"); + + Column joinOnPartition = + deleteEntries + .col("data_file.spec_id") + .equalTo(minSequenceNumberByPartition.col("grouped_spec_id")) + .and( + deleteEntries + .col("data_file.partition") + .equalTo(minSequenceNumberByPartition.col("grouped_partition"))); + + Column filterOnDanglingDeletes = + col("min_data_sequence_number") + // delete files without any data files in partition + .isNull() + // position delete files without any applicable data files in partition + .or( + col("data_file.content") + .equalTo("1") + .and(col("sequence_number").$less(col("min_data_sequence_number")))) + // equality delete files without any applicable data files in the partition + .or( + col("data_file.content") + .equalTo("2") + .and(col("sequence_number").$less$eq(col("min_data_sequence_number")))); + + Dataset danglingDeletes = + deleteEntries + .join(minSequenceNumberByPartition, joinOnPartition, "left") + .filter(filterOnDanglingDeletes) + .select("data_file.*"); + return danglingDeletes.collectAsList().stream() + // map on driver because SparkDeleteFile is not serializable + .map(row -> deleteFileWrapper(danglingDeletes.schema(), row)) + .collect(Collectors.toList()); + } finally { + liveEntries.unpersist(false); + } } private List findDanglingDvs() {