Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ Result doExecute() {
* Dangling delete files can be identified with following steps
*
* <ol>
* <li>Load the ENTRIES metadata table once and cache all live entries to avoid scanning
* manifests twice.
* <li>Group data files by partition keys and find the minimum data sequence number in each
* group.
* <li>Left outer join delete files with partition-grouped data files on partition keys.
Expand All @@ -123,56 +125,64 @@ Result doExecute() {
* </ol>
*/
private List<DeleteFile> findDanglingDeletes() {
Dataset<Row> minSequenceNumberByPartition =
loadMetadataTable(table, MetadataTableType.ENTRIES)
// find live data files
.filter("data_file.content == 0 AND status < 2")
.selectExpr(
"data_file.partition as partition",
"data_file.spec_id as spec_id",
"sequence_number")
.groupBy("partition", "spec_id")
.agg(min("sequence_number"))
.toDF("grouped_partition", "grouped_spec_id", "min_data_sequence_number");

Dataset<Row> deleteEntries =
loadMetadataTable(table, MetadataTableType.ENTRIES)
// find live delete files
.filter("data_file.content != 0 AND status < 2");

Column joinOnPartition =
deleteEntries
.col("data_file.spec_id")
.equalTo(minSequenceNumberByPartition.col("grouped_spec_id"))
.and(
deleteEntries
.col("data_file.partition")
.equalTo(minSequenceNumberByPartition.col("grouped_partition")));

Column filterOnDanglingDeletes =
col("min_data_sequence_number")
// delete files without any data files in partition
.isNull()
// position delete files without any applicable data files in partition
.or(
col("data_file.content")
.equalTo("1")
.and(col("sequence_number").$less(col("min_data_sequence_number"))))
// equality delete files without any applicable data files in the partition
.or(
col("data_file.content")
.equalTo("2")
.and(col("sequence_number").$less$eq(col("min_data_sequence_number"))));

Dataset<Row> danglingDeletes =
deleteEntries
.join(minSequenceNumberByPartition, joinOnPartition, "left")
.filter(filterOnDanglingDeletes)
.select("data_file.*");
return danglingDeletes.collectAsList().stream()
// map on driver because SparkDeleteFile is not serializable
.map(row -> deleteFileWrapper(danglingDeletes.schema(), row))
.collect(Collectors.toList());
// Load the ENTRIES metadata table once and cache to avoid reading all manifests twice
Dataset<Row> liveEntries =
loadMetadataTable(table, MetadataTableType.ENTRIES).filter("status < 2").cache();

try {
Dataset<Row> minSequenceNumberByPartition =
liveEntries
// find live data files
.filter("data_file.content == 0")
.selectExpr(
"data_file.partition as partition",
"data_file.spec_id as spec_id",
"sequence_number")
.groupBy("partition", "spec_id")
.agg(min("sequence_number"))
.toDF("grouped_partition", "grouped_spec_id", "min_data_sequence_number");

Dataset<Row> deleteEntries =
liveEntries
// find live delete files
.filter("data_file.content != 0");

Column joinOnPartition =
deleteEntries
.col("data_file.spec_id")
.equalTo(minSequenceNumberByPartition.col("grouped_spec_id"))
.and(
deleteEntries
.col("data_file.partition")
.equalTo(minSequenceNumberByPartition.col("grouped_partition")));

Column filterOnDanglingDeletes =
col("min_data_sequence_number")
// delete files without any data files in partition
.isNull()
// position delete files without any applicable data files in partition
.or(
col("data_file.content")
.equalTo("1")
.and(col("sequence_number").$less(col("min_data_sequence_number"))))
// equality delete files without any applicable data files in the partition
.or(
col("data_file.content")
.equalTo("2")
.and(col("sequence_number").$less$eq(col("min_data_sequence_number"))));

Dataset<Row> danglingDeletes =
deleteEntries
.join(minSequenceNumberByPartition, joinOnPartition, "left")
.filter(filterOnDanglingDeletes)
.select("data_file.*");
return danglingDeletes.collectAsList().stream()
// map on driver because SparkDeleteFile is not serializable
.map(row -> deleteFileWrapper(danglingDeletes.schema(), row))
.collect(Collectors.toList());
} finally {
liveEntries.unpersist(false);
}
}

private List<DeleteFile> findDanglingDvs() {
Expand Down