From 5a82c778c140659b82389a09b756126abac7266a Mon Sep 17 00:00:00 2001 From: wangtao Date: Fri, 17 Nov 2023 14:03:12 +0800 Subject: [PATCH 1/3] fix orphan files deleting puffin files --- .../maintainer/IcebergTableMaintainer.java | 6 ++- .../maintainer/MixedTableMaintainer.java | 51 ++++++++++++------- .../arctic/server/utils/IcebergTableUtil.java | 8 +++ .../maintainer/TestOrphanFileClean.java | 38 +++++++++----- 4 files changed, 70 insertions(+), 33 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java index c000fd77ff..5a096dd28a 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -41,6 +41,7 @@ import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -200,7 +201,10 @@ protected Set expireSnapshotNeedToExcludeFiles() { } protected Set orphanFileCleanNeedToExcludeFiles() { - return IcebergTableUtil.getAllContentFilePath(table); + return ImmutableSet.builder() + .addAll(IcebergTableUtil.getAllContentFilePath(table)) + .addAll(IcebergTableUtil.getAllStatisticsFilePath(table)) + .build(); } protected ArcticFileIO arcticFileIO() { diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java index 18c3ad40b7..ef0b310fd0 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java @@ -26,6 +26,8 @@ import com.netease.arctic.server.utils.HiveLocationUtil; import com.netease.arctic.server.utils.IcebergTableUtil; import com.netease.arctic.table.ArcticTable; +import com.netease.arctic.table.BaseTable; +import com.netease.arctic.table.ChangeTable; import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.UnkeyedTable; @@ -39,6 +41,7 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.Snapshot; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.primitives.Longs; import org.apache.iceberg.util.StructLikeMap; import org.slf4j.Logger; @@ -73,15 +76,28 @@ public class MixedTableMaintainer implements TableMaintainer { public MixedTableMaintainer(ArcticTable arcticTable) { this.arcticTable = arcticTable; if (arcticTable.isKeyedTable()) { - changeMaintainer = new ChangeTableMaintainer(arcticTable.asKeyedTable().changeTable()); - baseMaintainer = new BaseTableMaintainer(arcticTable.asKeyedTable().baseTable()); + ChangeTable changeTable = arcticTable.asKeyedTable().changeTable(); + BaseTable baseTable = arcticTable.asKeyedTable().baseTable(); + changeMaintainer = new ChangeTableMaintainer(changeTable); + baseMaintainer = new BaseTableMaintainer(baseTable); changeFiles = - IcebergTableUtil.getAllContentFilePath(arcticTable.asKeyedTable().changeTable()); - baseFiles = IcebergTableUtil.getAllContentFilePath(arcticTable.asKeyedTable().baseTable()); + ImmutableSet.builder() + .addAll(IcebergTableUtil.getAllContentFilePath(changeTable)) + .addAll(IcebergTableUtil.getAllStatisticsFilePath(changeTable)) + .build(); + baseFiles = + ImmutableSet.builder() + .addAll(IcebergTableUtil.getAllContentFilePath(baseTable)) + .addAll(IcebergTableUtil.getAllStatisticsFilePath(baseTable)) + .build(); } else { baseMaintainer = new BaseTableMaintainer(arcticTable.asUnkeyedTable()); changeFiles = new HashSet<>(); - baseFiles = IcebergTableUtil.getAllContentFilePath(arcticTable.asUnkeyedTable()); + baseFiles = + ImmutableSet.builder() + .addAll(IcebergTableUtil.getAllContentFilePath(arcticTable.asUnkeyedTable())) + .addAll(IcebergTableUtil.getAllStatisticsFilePath(arcticTable.asUnkeyedTable())) + .build(); } if (TableTypeUtil.isHive(arcticTable)) { @@ -143,15 +159,6 @@ public BaseTableMaintainer getBaseMaintainer() { return baseMaintainer; } - @SafeVarargs - private final Set mergeSets(Set... sets) { - Set result = new HashSet<>(); - for (Set set : sets) { - result.addAll(set); - } - return result; - } - public class ChangeTableMaintainer extends IcebergTableMaintainer { private static final int DATA_FILE_LIST_SPLIT = 3000; @@ -165,7 +172,11 @@ public ChangeTableMaintainer(UnkeyedTable unkeyedTable) { @Override public Set orphanFileCleanNeedToExcludeFiles() { - return mergeSets(changeFiles, baseFiles, hiveFiles); + return ImmutableSet.builder() + .addAll(changeFiles) + .addAll(baseFiles) + .addAll(hiveFiles) + .build(); } @Override @@ -188,7 +199,7 @@ protected long olderThanSnapshotNeedToExpire(long mustOlderThan) { @Override protected Set expireSnapshotNeedToExcludeFiles() { - return mergeSets(baseFiles, hiveFiles); + return ImmutableSet.builder().addAll(baseFiles).addAll(hiveFiles).build(); } public void expireFiles(long ttlPoint) { @@ -324,12 +335,16 @@ public BaseTableMaintainer(UnkeyedTable unkeyedTable) { @Override public Set orphanFileCleanNeedToExcludeFiles() { - return mergeSets(changeFiles, baseFiles, hiveFiles); + return ImmutableSet.builder() + .addAll(changeFiles) + .addAll(baseFiles) + .addAll(hiveFiles) + .build(); } @Override protected Set expireSnapshotNeedToExcludeFiles() { - return mergeSets(changeFiles, hiveFiles); + return ImmutableSet.builder().addAll(changeFiles).addAll(hiveFiles).build(); } } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java b/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java index 353988767e..170221a040 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java +++ b/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java @@ -31,6 +31,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; @@ -43,6 +44,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; public class IcebergTableUtil { @@ -93,6 +95,12 @@ public static Set getAllContentFilePath(Table internalTable) { return validFilesPath; } + public static Set getAllStatisticsFilePath(Table table) { + return ReachableFileUtil.statisticsFilesLocations(table).stream() + .map(TableFileUtil::getUriPath) + .collect(Collectors.toSet()); + } + public static Set getDanglingDeleteFiles(Table internalTable) { if (internalTable.currentSnapshot() == null) { return Collections.emptySet(); diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestOrphanFileClean.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestOrphanFileClean.java index 25e08d4a2a..fc76dbbd08 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestOrphanFileClean.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestOrphanFileClean.java @@ -334,21 +334,27 @@ public void notDeleteStatisticsFile() { } else { unkeyedTable = getArcticTable().asUnkeyedTable(); } - unkeyedTable.newAppend().commit(); - Snapshot snapshot = unkeyedTable.currentSnapshot(); - StatisticsFile file = writeStatisticsFile(unkeyedTable, snapshot); - unkeyedTable.updateStatistics().setStatistics(snapshot.snapshotId(), file).commit(); - - Assert.assertTrue(unkeyedTable.io().exists(file.path())); + StatisticsFile file1 = + commitStatisticsFile(unkeyedTable, unkeyedTable.location() + "/metadata/test1.puffin"); + StatisticsFile file2 = + commitStatisticsFile(unkeyedTable, unkeyedTable.location() + "/data/test2.puffin"); + StatisticsFile file3 = + commitStatisticsFile(unkeyedTable, unkeyedTable.location() + "/data/puffin/test3.puffin"); + + Assert.assertTrue(unkeyedTable.io().exists(file1.path())); + Assert.assertTrue(unkeyedTable.io().exists(file2.path())); + Assert.assertTrue(unkeyedTable.io().exists(file3.path())); + new MixedTableMaintainer(getArcticTable()).cleanContentFiles(System.currentTimeMillis() + 1); new MixedTableMaintainer(getArcticTable()).cleanMetadata(System.currentTimeMillis() + 1); - Assert.assertTrue(unkeyedTable.io().exists(file.path())); + Assert.assertTrue(unkeyedTable.io().exists(file1.path())); + Assert.assertTrue(unkeyedTable.io().exists(file2.path())); + Assert.assertTrue(unkeyedTable.io().exists(file3.path())); } - private StatisticsFile writeStatisticsFile(UnkeyedTable table, Snapshot snapshot) { - OutputFile outputFile = - table - .io() - .newOutputFile(table.location() + "/metadata/" + snapshot.snapshotId() + ".puffin"); + private StatisticsFile commitStatisticsFile(UnkeyedTable table, String fileLocation) { + table.newAppend().commit(); + Snapshot snapshot = table.currentSnapshot(); + OutputFile outputFile = table.io().newOutputFile(fileLocation); List blobMetadata; long fileSize; long footerSize; @@ -362,7 +368,11 @@ private StatisticsFile writeStatisticsFile(UnkeyedTable table, Snapshot snapshot } List collect = blobMetadata.stream().map(GenericBlobMetadata::from).collect(Collectors.toList()); - return new GenericStatisticsFile( - snapshot.snapshotId(), outputFile.location(), fileSize, footerSize, collect); + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshot.snapshotId(), outputFile.location(), fileSize, footerSize, collect); + table.updateStatistics().setStatistics(snapshot.snapshotId(), statisticsFile).commit(); + + return statisticsFile; } } From 67a8b064e569f682aa67916a63c357f8569d2aea Mon Sep 17 00:00:00 2001 From: wangtao Date: Fri, 17 Nov 2023 18:06:52 +0800 Subject: [PATCH 2/3] using Sets.union instead of ImmutableSet Builder --- .../maintainer/IcebergTableMaintainer.java | 9 +++--- .../maintainer/MixedTableMaintainer.java | 30 ++++++++----------- 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java index 5a096dd28a..55ab77751d 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -41,8 +41,8 @@ import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -201,10 +201,9 @@ protected Set expireSnapshotNeedToExcludeFiles() { } protected Set orphanFileCleanNeedToExcludeFiles() { - return ImmutableSet.builder() - .addAll(IcebergTableUtil.getAllContentFilePath(table)) - .addAll(IcebergTableUtil.getAllStatisticsFilePath(table)) - .build(); + return Sets.union( + IcebergTableUtil.getAllContentFilePath(table), + IcebergTableUtil.getAllStatisticsFilePath(table)); } protected ArcticFileIO arcticFileIO() { diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java index ef0b310fd0..87747430e8 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java @@ -42,6 +42,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.primitives.Longs; import org.apache.iceberg.util.StructLikeMap; import org.slf4j.Logger; @@ -81,23 +82,20 @@ public MixedTableMaintainer(ArcticTable arcticTable) { changeMaintainer = new ChangeTableMaintainer(changeTable); baseMaintainer = new BaseTableMaintainer(baseTable); changeFiles = - ImmutableSet.builder() - .addAll(IcebergTableUtil.getAllContentFilePath(changeTable)) - .addAll(IcebergTableUtil.getAllStatisticsFilePath(changeTable)) - .build(); + Sets.union( + IcebergTableUtil.getAllContentFilePath(changeTable), + IcebergTableUtil.getAllStatisticsFilePath(changeTable)); baseFiles = - ImmutableSet.builder() - .addAll(IcebergTableUtil.getAllContentFilePath(baseTable)) - .addAll(IcebergTableUtil.getAllStatisticsFilePath(baseTable)) - .build(); + Sets.union( + IcebergTableUtil.getAllContentFilePath(baseTable), + IcebergTableUtil.getAllStatisticsFilePath(baseTable)); } else { baseMaintainer = new BaseTableMaintainer(arcticTable.asUnkeyedTable()); changeFiles = new HashSet<>(); baseFiles = - ImmutableSet.builder() - .addAll(IcebergTableUtil.getAllContentFilePath(arcticTable.asUnkeyedTable())) - .addAll(IcebergTableUtil.getAllStatisticsFilePath(arcticTable.asUnkeyedTable())) - .build(); + Sets.union( + IcebergTableUtil.getAllContentFilePath(arcticTable.asUnkeyedTable()), + IcebergTableUtil.getAllStatisticsFilePath(arcticTable.asUnkeyedTable())); } if (TableTypeUtil.isHive(arcticTable)) { @@ -335,16 +333,12 @@ public BaseTableMaintainer(UnkeyedTable unkeyedTable) { @Override public Set orphanFileCleanNeedToExcludeFiles() { - return ImmutableSet.builder() - .addAll(changeFiles) - .addAll(baseFiles) - .addAll(hiveFiles) - .build(); + return Sets.union(changeFiles, Sets.union(baseFiles, hiveFiles)); } @Override protected Set expireSnapshotNeedToExcludeFiles() { - return ImmutableSet.builder().addAll(changeFiles).addAll(hiveFiles).build(); + return Sets.union(changeFiles, hiveFiles); } } } From 8fdad6c49ee995cbbec15a9c84e5298df761d54c Mon Sep 17 00:00:00 2001 From: wangtao Date: Fri, 17 Nov 2023 18:11:20 +0800 Subject: [PATCH 3/3] fix --- .../optimizing/maintainer/MixedTableMaintainer.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java index 87747430e8..1be30660bb 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java @@ -41,7 +41,6 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.Snapshot; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.primitives.Longs; import org.apache.iceberg.util.StructLikeMap; @@ -170,11 +169,7 @@ public ChangeTableMaintainer(UnkeyedTable unkeyedTable) { @Override public Set orphanFileCleanNeedToExcludeFiles() { - return ImmutableSet.builder() - .addAll(changeFiles) - .addAll(baseFiles) - .addAll(hiveFiles) - .build(); + return Sets.union(changeFiles, Sets.union(baseFiles, hiveFiles)); } @Override @@ -197,7 +192,7 @@ protected long olderThanSnapshotNeedToExpire(long mustOlderThan) { @Override protected Set expireSnapshotNeedToExcludeFiles() { - return ImmutableSet.builder().addAll(baseFiles).addAll(hiveFiles).build(); + return Sets.union(baseFiles, hiveFiles); } public void expireFiles(long ttlPoint) {