diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java index c000fd77ff..55ab77751d 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -42,6 +42,7 @@ import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -200,7 +201,9 @@ protected Set expireSnapshotNeedToExcludeFiles() { } protected Set orphanFileCleanNeedToExcludeFiles() { - return IcebergTableUtil.getAllContentFilePath(table); + return Sets.union( + IcebergTableUtil.getAllContentFilePath(table), + IcebergTableUtil.getAllStatisticsFilePath(table)); } protected ArcticFileIO arcticFileIO() { diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java index 18c3ad40b7..1be30660bb 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/MixedTableMaintainer.java @@ -26,6 +26,8 @@ import com.netease.arctic.server.utils.HiveLocationUtil; import com.netease.arctic.server.utils.IcebergTableUtil; import com.netease.arctic.table.ArcticTable; +import com.netease.arctic.table.BaseTable; +import com.netease.arctic.table.ChangeTable; import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.UnkeyedTable; @@ -39,6 +41,7 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.Snapshot; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.primitives.Longs; import org.apache.iceberg.util.StructLikeMap; import org.slf4j.Logger; @@ -73,15 +76,25 @@ public class MixedTableMaintainer implements TableMaintainer { public MixedTableMaintainer(ArcticTable arcticTable) { this.arcticTable = arcticTable; if (arcticTable.isKeyedTable()) { - changeMaintainer = new ChangeTableMaintainer(arcticTable.asKeyedTable().changeTable()); - baseMaintainer = new BaseTableMaintainer(arcticTable.asKeyedTable().baseTable()); + ChangeTable changeTable = arcticTable.asKeyedTable().changeTable(); + BaseTable baseTable = arcticTable.asKeyedTable().baseTable(); + changeMaintainer = new ChangeTableMaintainer(changeTable); + baseMaintainer = new BaseTableMaintainer(baseTable); changeFiles = - IcebergTableUtil.getAllContentFilePath(arcticTable.asKeyedTable().changeTable()); - baseFiles = IcebergTableUtil.getAllContentFilePath(arcticTable.asKeyedTable().baseTable()); + Sets.union( + IcebergTableUtil.getAllContentFilePath(changeTable), + IcebergTableUtil.getAllStatisticsFilePath(changeTable)); + baseFiles = + Sets.union( + IcebergTableUtil.getAllContentFilePath(baseTable), + IcebergTableUtil.getAllStatisticsFilePath(baseTable)); } else { baseMaintainer = new BaseTableMaintainer(arcticTable.asUnkeyedTable()); changeFiles = new HashSet<>(); - baseFiles = IcebergTableUtil.getAllContentFilePath(arcticTable.asUnkeyedTable()); + baseFiles = + Sets.union( + IcebergTableUtil.getAllContentFilePath(arcticTable.asUnkeyedTable()), + IcebergTableUtil.getAllStatisticsFilePath(arcticTable.asUnkeyedTable())); } if (TableTypeUtil.isHive(arcticTable)) { @@ -143,15 +156,6 @@ public BaseTableMaintainer getBaseMaintainer() { return baseMaintainer; } - @SafeVarargs - private final Set mergeSets(Set... sets) { - Set result = new HashSet<>(); - for (Set set : sets) { - result.addAll(set); - } - return result; - } - public class ChangeTableMaintainer extends IcebergTableMaintainer { private static final int DATA_FILE_LIST_SPLIT = 3000; @@ -165,7 +169,7 @@ public ChangeTableMaintainer(UnkeyedTable unkeyedTable) { @Override public Set orphanFileCleanNeedToExcludeFiles() { - return mergeSets(changeFiles, baseFiles, hiveFiles); + return Sets.union(changeFiles, Sets.union(baseFiles, hiveFiles)); } @Override @@ -188,7 +192,7 @@ protected long olderThanSnapshotNeedToExpire(long mustOlderThan) { @Override protected Set expireSnapshotNeedToExcludeFiles() { - return mergeSets(baseFiles, hiveFiles); + return Sets.union(baseFiles, hiveFiles); } public void expireFiles(long ttlPoint) { @@ -324,12 +328,12 @@ public BaseTableMaintainer(UnkeyedTable unkeyedTable) { @Override public Set orphanFileCleanNeedToExcludeFiles() { - return mergeSets(changeFiles, baseFiles, hiveFiles); + return Sets.union(changeFiles, Sets.union(baseFiles, hiveFiles)); } @Override protected Set expireSnapshotNeedToExcludeFiles() { - return mergeSets(changeFiles, hiveFiles); + return Sets.union(changeFiles, hiveFiles); } } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java b/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java index 353988767e..170221a040 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java +++ b/ams/server/src/main/java/com/netease/arctic/server/utils/IcebergTableUtil.java @@ -31,6 +31,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; @@ -43,6 +44,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; public class IcebergTableUtil { @@ -93,6 +95,12 @@ public static Set getAllContentFilePath(Table internalTable) { return validFilesPath; } + public static Set getAllStatisticsFilePath(Table table) { + return ReachableFileUtil.statisticsFilesLocations(table).stream() + .map(TableFileUtil::getUriPath) + .collect(Collectors.toSet()); + } + public static Set getDanglingDeleteFiles(Table internalTable) { if (internalTable.currentSnapshot() == null) { return Collections.emptySet(); diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestOrphanFileClean.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestOrphanFileClean.java index 25e08d4a2a..fc76dbbd08 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestOrphanFileClean.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestOrphanFileClean.java @@ -334,21 +334,27 @@ public void notDeleteStatisticsFile() { } else { unkeyedTable = getArcticTable().asUnkeyedTable(); } - unkeyedTable.newAppend().commit(); - Snapshot snapshot = unkeyedTable.currentSnapshot(); - StatisticsFile file = writeStatisticsFile(unkeyedTable, snapshot); - unkeyedTable.updateStatistics().setStatistics(snapshot.snapshotId(), file).commit(); - - Assert.assertTrue(unkeyedTable.io().exists(file.path())); + StatisticsFile file1 = + commitStatisticsFile(unkeyedTable, unkeyedTable.location() + "/metadata/test1.puffin"); + StatisticsFile file2 = + commitStatisticsFile(unkeyedTable, unkeyedTable.location() + "/data/test2.puffin"); + StatisticsFile file3 = + commitStatisticsFile(unkeyedTable, unkeyedTable.location() + "/data/puffin/test3.puffin"); + + Assert.assertTrue(unkeyedTable.io().exists(file1.path())); + Assert.assertTrue(unkeyedTable.io().exists(file2.path())); + Assert.assertTrue(unkeyedTable.io().exists(file3.path())); + new MixedTableMaintainer(getArcticTable()).cleanContentFiles(System.currentTimeMillis() + 1); new MixedTableMaintainer(getArcticTable()).cleanMetadata(System.currentTimeMillis() + 1); - Assert.assertTrue(unkeyedTable.io().exists(file.path())); + Assert.assertTrue(unkeyedTable.io().exists(file1.path())); + Assert.assertTrue(unkeyedTable.io().exists(file2.path())); + Assert.assertTrue(unkeyedTable.io().exists(file3.path())); } - private StatisticsFile writeStatisticsFile(UnkeyedTable table, Snapshot snapshot) { - OutputFile outputFile = - table - .io() - .newOutputFile(table.location() + "/metadata/" + snapshot.snapshotId() + ".puffin"); + private StatisticsFile commitStatisticsFile(UnkeyedTable table, String fileLocation) { + table.newAppend().commit(); + Snapshot snapshot = table.currentSnapshot(); + OutputFile outputFile = table.io().newOutputFile(fileLocation); List blobMetadata; long fileSize; long footerSize; @@ -362,7 +368,11 @@ private StatisticsFile writeStatisticsFile(UnkeyedTable table, Snapshot snapshot } List collect = blobMetadata.stream().map(GenericBlobMetadata::from).collect(Collectors.toList()); - return new GenericStatisticsFile( - snapshot.snapshotId(), outputFile.location(), fileSize, footerSize, collect); + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshot.snapshotId(), outputFile.location(), fileSize, footerSize, collect); + table.updateStatistics().setStatistics(snapshot.snapshotId(), statisticsFile).commit(); + + return statisticsFile; } }