From 095ccaa3e3a7a7a60a4f191ba96df702642d6898 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Mon, 3 Feb 2025 16:51:46 +0100 Subject: [PATCH 1/2] Spark: Add separate action to rewrite DVs --- .../org/apache/iceberg/EmptyStructLike.java | 4 +- .../apache/iceberg/puffin/PuffinReader.java | 8 + .../spark/actions/RewriteDVsSparkAction.java | 394 ++++++++++++++++++ .../iceberg/spark/actions/SparkActions.java | 4 + .../spark/actions/SparkBinPackDVRewriter.java | 137 ++++++ .../SparkBinPackPositionDeletesRewriter.java | 4 + .../spark/actions/TestRewriteDVsAction.java | 384 +++++++++++++++++ 7 files changed, 933 insertions(+), 2 deletions(-) create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDVsSparkAction.java create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDVRewriter.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDVsAction.java diff --git a/api/src/main/java/org/apache/iceberg/EmptyStructLike.java b/api/src/main/java/org/apache/iceberg/EmptyStructLike.java index 2d57f4c01a66..8b046780aa7a 100644 --- a/api/src/main/java/org/apache/iceberg/EmptyStructLike.java +++ b/api/src/main/java/org/apache/iceberg/EmptyStructLike.java @@ -20,13 +20,13 @@ import java.io.Serializable; -class EmptyStructLike implements StructLike, Serializable { +public class EmptyStructLike implements StructLike, Serializable { private static final EmptyStructLike INSTANCE = new EmptyStructLike(); private EmptyStructLike() {} - static EmptyStructLike get() { + public static EmptyStructLike get() { return INSTANCE; } diff --git a/core/src/main/java/org/apache/iceberg/puffin/PuffinReader.java b/core/src/main/java/org/apache/iceberg/puffin/PuffinReader.java index e30b6e1ee6ef..63f7af917540 100644 --- a/core/src/main/java/org/apache/iceberg/puffin/PuffinReader.java +++ b/core/src/main/java/org/apache/iceberg/puffin/PuffinReader.java @@ -158,6 +158,14 @@ private static void checkMagic(byte[] data, int offset) { } } + public long dataSize() { + try { + return fileSize - footerSize() - 4; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private int footerSize() throws IOException { if (knownFooterSize == null) { Preconditions.checkState( diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDVsSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDVsSparkAction.java new file mode 100644 index 000000000000..871a5277bd9a --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDVsSparkAction.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.math.RoundingMode; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.EmptyStructLike; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.PositionDeletesTable.PositionDeletesBatchScan; +import org.apache.iceberg.RewriteJobOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableUtil; +import org.apache.iceberg.actions.ImmutableRewritePositionDeleteFiles; +import org.apache.iceberg.actions.RewritePositionDeleteFiles; +import org.apache.iceberg.actions.RewritePositionDeletesCommitManager; +import org.apache.iceberg.actions.RewritePositionDeletesCommitManager.CommitService; +import org.apache.iceberg.actions.RewritePositionDeletesGroup; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Queues; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.math.IntMath; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.Tasks; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Spark implementation of {@link RewritePositionDeleteFiles} for DVs. */ +public class RewriteDVsSparkAction extends BaseSnapshotUpdateSparkAction + implements RewritePositionDeleteFiles { + + private static final Logger LOG = LoggerFactory.getLogger(RewriteDVsSparkAction.class); + private static final Set VALID_OPTIONS = + ImmutableSet.of( + MAX_CONCURRENT_FILE_GROUP_REWRITES, + PARTIAL_PROGRESS_ENABLED, + PARTIAL_PROGRESS_MAX_COMMITS, + REWRITE_JOB_ORDER); + private static final Result EMPTY_RESULT = + ImmutableRewritePositionDeleteFiles.Result.builder().build(); + + private final Table table; + private final SparkBinPackDVRewriter rewriter; + private final boolean caseSensitive; + private Expression filter = Expressions.alwaysTrue(); + + private int maxConcurrentFileGroupRewrites; + private int maxCommits; + private boolean partialProgressEnabled; + private RewriteJobOrder rewriteJobOrder; + + RewriteDVsSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + this.rewriter = new SparkBinPackDVRewriter(spark(), table); + this.caseSensitive = SparkUtil.caseSensitive(spark); + } + + @Override + protected RewriteDVsSparkAction self() { + return this; + } + + @Override + public RewriteDVsSparkAction filter(Expression expression) { + filter = Expressions.and(filter, expression); + return this; + } + + @Override + public Result execute() { + if (table.currentSnapshot() == null) { + LOG.info("Nothing found to rewrite in empty table {}", table.name()); + return EMPTY_RESULT; + } + + validateAndInitOptions(); + + // TODO: 1) if V3 table has V2 deletes -> rewrite those + // TODO: 2) if V3 table has V3 deletes -> check their liveness ratio + // TODO: 3) if V3 table has multiple (conflicting DVs) for the same data file -> merge those + + Table deletesTable = + MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); + CloseableIterable tasks = planFiles(deletesTable); + + Map>> fileGroupsByPuffinPath = Maps.newHashMap(); + + Map> byPuffinPath = + StreamSupport.stream(tasks.spliterator(), false) + .collect(Collectors.groupingBy(task -> task.file().location())); + + for (Map.Entry> entry : byPuffinPath.entrySet()) { + fileGroupsByPuffinPath.put( + entry.getKey(), ImmutableList.copyOf(rewriter.planFileGroups(entry.getValue()))); + } + + RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPuffinPath); + + if (ctx.totalGroupCount() == 0) { + LOG.info("Nothing found to rewrite in {}", table.name()); + return EMPTY_RESULT; + } + + Stream groupStream = + fileGroupsByPuffinPath.entrySet().stream() + .filter(e -> !e.getValue().isEmpty()) + .flatMap(e -> e.getValue().stream().map(t -> newRewriteGroup(ctx, t))) + .sorted(RewritePositionDeletesGroup.comparator(rewriteJobOrder)); + + if (partialProgressEnabled) { + return doExecuteWithPartialProgress(ctx, groupStream, commitManager()); + } else { + return doExecute(ctx, groupStream, commitManager()); + } + } + + private CloseableIterable planFiles(Table deletesTable) { + PositionDeletesBatchScan scan = (PositionDeletesBatchScan) deletesTable.newBatchScan(); + return CloseableIterable.transform( + scan.baseTableFilter(filter).caseSensitive(caseSensitive).ignoreResiduals().planFiles(), + task -> (PositionDeletesScanTask) task); + } + + private RewritePositionDeletesGroup rewriteDeleteFiles( + RewriteExecutionContext ctx, RewritePositionDeletesGroup fileGroup) { + String desc = + String.format( + "Rewriting %d position files (%s, file group %d/%d) in %s", + fileGroup.rewrittenDeleteFiles().size(), + rewriter.description(), + fileGroup.info().globalIndex(), + ctx.totalGroupCount(), + table.name()); + Set addedFiles = + withJobGroupInfo( + newJobGroupInfo("REWRITE-DELETION-VECTORS", desc), + () -> rewriter.rewrite(fileGroup.tasks())); + + fileGroup.setOutputFiles(addedFiles); + LOG.info("Rewrite DVs ready to be committed - {}", desc); + return fileGroup; + } + + private ExecutorService rewriteService() { + return MoreExecutors.getExitingExecutorService( + (ThreadPoolExecutor) + Executors.newFixedThreadPool( + maxConcurrentFileGroupRewrites, + new ThreadFactoryBuilder().setNameFormat("Rewrite-DV-Service-%d").build())); + } + + private RewritePositionDeletesCommitManager commitManager() { + return new RewritePositionDeletesCommitManager(table, commitSummary()); + } + + private Result doExecute( + RewriteExecutionContext ctx, + Stream groupStream, + RewritePositionDeletesCommitManager commitManager) { + ExecutorService rewriteService = rewriteService(); + + ConcurrentLinkedQueue rewrittenGroups = + Queues.newConcurrentLinkedQueue(); + + Tasks.Builder rewriteTaskBuilder = + Tasks.foreach(groupStream) + .executeWith(rewriteService) + .stopOnFailure() + .noRetry() + .onFailure( + (fileGroup, exception) -> + LOG.warn( + "Failure during rewrite process for group {}", + fileGroup.info(), + exception)); + + try { + rewriteTaskBuilder.run(fileGroup -> rewrittenGroups.add(rewriteDeleteFiles(ctx, fileGroup))); + } catch (Exception e) { + // At least one rewrite group failed, clean up all completed rewrites + LOG.error( + "Cannot complete rewrite, {} is not enabled and one of the file set groups failed to " + + "be rewritten. This error occurred during the writing of new files, not during the commit process. This " + + "indicates something is wrong that doesn't involve conflicts with other Iceberg operations. Enabling " + + "{} may help in this case but the root cause should be investigated. Cleaning up {} groups which finished " + + "being written.", + PARTIAL_PROGRESS_ENABLED, + PARTIAL_PROGRESS_ENABLED, + rewrittenGroups.size(), + e); + + Tasks.foreach(rewrittenGroups).suppressFailureWhenFinished().run(commitManager::abort); + throw e; + } finally { + rewriteService.shutdown(); + } + + try { + commitManager.commitOrClean(Sets.newHashSet(rewrittenGroups)); + } catch (ValidationException | CommitFailedException e) { + String errorMessage = + String.format( + "Cannot commit rewrite because of a ValidationException or CommitFailedException. This usually means that " + + "this rewrite has conflicted with another concurrent Iceberg operation. To reduce the likelihood of " + + "conflicts, set %s which will break up the rewrite into multiple smaller commits controlled by %s. " + + "Separate smaller rewrite commits can succeed independently while any commits that conflict with " + + "another Iceberg operation will be ignored. This mode will create additional snapshots in the table " + + "history, one for each commit.", + PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_MAX_COMMITS); + throw new RuntimeException(errorMessage, e); + } + + List rewriteResults = + rewrittenGroups.stream() + .map(RewritePositionDeletesGroup::asResult) + .collect(Collectors.toList()); + + return ImmutableRewritePositionDeleteFiles.Result.builder() + .rewriteResults(rewriteResults) + .build(); + } + + private Result doExecuteWithPartialProgress( + RewriteExecutionContext ctx, + Stream groupStream, + RewritePositionDeletesCommitManager commitManager) { + ExecutorService rewriteService = rewriteService(); + + // start commit service + int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING); + CommitService commitService = commitManager.service(groupsPerCommit); + commitService.start(); + + // start rewrite tasks + Tasks.foreach(groupStream) + .suppressFailureWhenFinished() + .executeWith(rewriteService) + .noRetry() + .onFailure( + (fileGroup, exception) -> + LOG.error("Failure during rewrite group {}", fileGroup.info(), exception)) + .run(fileGroup -> commitService.offer(rewriteDeleteFiles(ctx, fileGroup))); + rewriteService.shutdown(); + + // stop commit service + commitService.close(); + List commitResults = commitService.results(); + if (commitResults.isEmpty()) { + LOG.error( + "{} is true but no rewrite commits succeeded. Check the logs to determine why the individual " + + "commits failed. If this is persistent it may help to increase {} which will break the rewrite operation " + + "into smaller commits.", + PARTIAL_PROGRESS_ENABLED, + PARTIAL_PROGRESS_MAX_COMMITS); + } + + return ImmutableRewritePositionDeleteFiles.Result.builder() + .rewriteResults( + commitResults.stream() + .map(RewritePositionDeletesGroup::asResult) + .collect(Collectors.toList())) + .build(); + } + + private RewritePositionDeletesGroup newRewriteGroup( + RewriteExecutionContext ctx, List tasks) { + return new RewritePositionDeletesGroup( + ImmutableRewritePositionDeleteFiles.FileGroupInfo.builder() + .globalIndex(ctx.currentGlobalIndex()) + .partitionIndex(1) + .partition(EmptyStructLike.get()) + .build(), + tasks); + } + + private void validateAndInitOptions() { + Set validOptions = Sets.newHashSet(rewriter.validOptions()); + validOptions.addAll(VALID_OPTIONS); + + Set invalidKeys = Sets.newHashSet(options().keySet()); + invalidKeys.removeAll(validOptions); + + Preconditions.checkArgument( + invalidKeys.isEmpty(), + "Cannot use options %s, they are not supported by the action or the rewriter %s", + invalidKeys, + rewriter.description()); + + rewriter.init(options()); + + this.maxConcurrentFileGroupRewrites = + PropertyUtil.propertyAsInt( + options(), + MAX_CONCURRENT_FILE_GROUP_REWRITES, + MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT); + + this.maxCommits = + PropertyUtil.propertyAsInt( + options(), PARTIAL_PROGRESS_MAX_COMMITS, PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT); + + this.partialProgressEnabled = + PropertyUtil.propertyAsBoolean( + options(), PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_ENABLED_DEFAULT); + + this.rewriteJobOrder = + RewriteJobOrder.fromName( + PropertyUtil.propertyAsString(options(), REWRITE_JOB_ORDER, REWRITE_JOB_ORDER_DEFAULT)); + + Preconditions.checkArgument( + maxConcurrentFileGroupRewrites >= 1, + "Cannot set %s to %s, the value must be positive.", + MAX_CONCURRENT_FILE_GROUP_REWRITES, + maxConcurrentFileGroupRewrites); + + Preconditions.checkArgument( + !partialProgressEnabled || maxCommits > 0, + "Cannot set %s to %s, the value must be positive when %s is true", + PARTIAL_PROGRESS_MAX_COMMITS, + maxCommits, + PARTIAL_PROGRESS_ENABLED); + + int formatVersion = TableUtil.formatVersion(table); + Preconditions.checkArgument( + formatVersion >= 3, "Cannot rewrite DVs for v%s table", formatVersion); + } + + static class RewriteExecutionContext { + private final int totalGroupCount; + private final AtomicInteger groupIndex; + private final Map numGroupsByPath = Maps.newHashMap(); + + RewriteExecutionContext( + Map>> fileTasksByPuffinPath) { + fileTasksByPuffinPath.forEach( + (key, value) -> + value.stream() + .mapToInt(List::size) + .forEach( + v -> numGroupsByPath.put(key, numGroupsByPath.getOrDefault(key, 0) + v))); + + this.totalGroupCount = numGroupsByPath.values().stream().reduce(Integer::sum).orElse(0); + this.groupIndex = new AtomicInteger(1); + } + + public int currentGlobalIndex() { + return groupIndex.getAndIncrement(); + } + + public int totalGroupCount() { + return totalGroupCount; + } + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java index aa4ef987e788..942998986dde 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java @@ -113,4 +113,8 @@ public RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) { public RewriteTablePathSparkAction rewriteTablePath(Table table) { return new RewriteTablePathSparkAction(spark, table); } + + public RewriteDVsSparkAction rewriteDVs(Table table) { + return new RewriteDVsSparkAction(spark, table); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDVRewriter.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDVRewriter.java new file mode 100644 index 000000000000..d5b63c9b4fbc --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDVRewriter.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinReader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkTableUtil; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +class SparkBinPackDVRewriter extends SparkBinPackPositionDeletesRewriter { + + /** + * The minimum deletion ratio that needs to be associated with a Puffin file for it to be + * considered for rewriting. + * + *

Defaults to 0.3, which means that if the deletion ratio of a Puffin file reaches or exceeds + * 30%, it may trigger the rewriting operation. + */ + public static final String DELETE_RATIO_THRESHOLD = "delete-ratio-threshold"; + + public static final double DELETE_RATIO_THRESHOLD_DEFAULT = 0.3; + + private double deleteRatio; + + SparkBinPackDVRewriter(SparkSession spark, Table table) { + super(spark, table); + } + + @Override + public String description() { + return "BIN-PACK-DVS"; + } + + @Override + public Set validOptions() { + return ImmutableSet.of(REWRITE_ALL, DELETE_RATIO_THRESHOLD); + } + + @Override + public void init(Map options) { + super.init(options); + this.deleteRatio = + PropertyUtil.propertyAsDouble( + options, DELETE_RATIO_THRESHOLD, DELETE_RATIO_THRESHOLD_DEFAULT); + } + + @Override + protected Iterable> filterFileGroups( + List> groups) { + return Iterables.filter(groups, this::shouldRewrite); + } + + private boolean shouldRewrite(List group) { + return tooHighDeleteRatio(group); + } + + private boolean tooHighDeleteRatio(List group) { + if (group.isEmpty()) { + return false; + } + + long liveDataSize = group.stream().mapToLong(task -> task.file().contentSizeInBytes()).sum(); + + String puffinLocation = group.get(0).file().location(); + long totalDataSize; + try (PuffinReader reader = Puffin.read(table().io().newInputFile(puffinLocation)).build()) { + totalDataSize = reader.dataSize(); + } catch (IOException e) { + return false; + } + + double liveRatio = liveDataSize / (double) totalDataSize; + return 1.0d - liveRatio >= deleteRatio; + } + + @Override + protected void doRewrite(String groupId, List group) { + Preconditions.checkArgument(!group.isEmpty(), "Empty group"); + + // read the deletes packing them into splits of the required size + Dataset posDeletes = + spark() + .read() + .format("iceberg") + .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId) + .option(SparkReadOptions.SPLIT_SIZE, splitSize(inputSize(group))) + .option(SparkReadOptions.FILE_OPEN_COST, "0") + .load(groupId); + + // keep only valid position deletes + Dataset dataFiles = + SparkTableUtil.loadMetadataTable(spark(), table(), MetadataTableType.DATA_FILES); + Column joinCond = posDeletes.col("file_path").equalTo(dataFiles.col("file_path")); + Dataset validDeletes = posDeletes.join(dataFiles, joinCond, "leftsemi"); + + // write the packed deletes into new files where each split becomes a new file + validDeletes + .sortWithinPartitions("file_path", "pos") + .write() + .format("iceberg") + .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId) + .mode("append") + .save(groupId); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackPositionDeletesRewriter.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackPositionDeletesRewriter.java index 5afd724aad88..c1e78077c919 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackPositionDeletesRewriter.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackPositionDeletesRewriter.java @@ -89,6 +89,10 @@ public Set rewrite(List group) { } } + protected SparkSession spark() { + return spark; + } + protected void doRewrite(String groupId, List group) { // all position deletes are of the same partition, because they are in same file group Preconditions.checkArgument(!group.isEmpty(), "Empty group"); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDVsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDVsAction.java new file mode 100644 index 000000000000..d7cd8ce09e9d --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDVsAction.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.data.TestHelpers; +import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.io.TempDir; + +public class TestRewriteDVsAction extends CatalogTestBase { + + private static final String TABLE_NAME = "test_table"; + private static final Schema SCHEMA = + new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get())); + + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties() + } + }; + } + + @TempDir private Path temp; + + @AfterEach + public void cleanup() { + validationCatalog.dropTable(TableIdentifier.of("default", TABLE_NAME)); + } + + @TestTemplate + public void rewriteDVsOnEmptyTable() { + Table table = + validationCatalog.createTable( + TableIdentifier.of("default", TABLE_NAME), + SCHEMA, + PartitionSpec.unpartitioned(), + tableProperties()); + + Result result = SparkActions.get(spark).rewriteDVs(table).execute(); + assertThat(result.rewrittenDeleteFilesCount()).as("No rewritten delete files").isZero(); + assertThat(result.addedDeleteFilesCount()).as("No added delete files").isZero(); + } + + @TestTemplate + public void rewriteDVsOnV2Table() { + Table table = + validationCatalog.createTable( + TableIdentifier.of("default", TABLE_NAME), + SCHEMA, + PartitionSpec.unpartitioned(), + tableProperties(2)); + + writeRecords(table, 1, 10); + + assertThatThrownBy(SparkActions.get(spark).rewriteDVs(table)::execute) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot rewrite DVs for v2 table"); + } + + @TestTemplate + public void dvCompactionWithHighDeleteRatioOnUnpartitionedTable() throws IOException { + Table table = + validationCatalog.createTable( + TableIdentifier.of("default", TABLE_NAME), + SCHEMA, + PartitionSpec.unpartitioned(), + tableProperties(3)); + + // write 10 records per data file + writeRecords(table, 10, 100); + + dvCompactionWithHighDeleteRatio(table); + } + + @TestTemplate + public void dvCompactionWithHighDeleteRatioOnPartitionedTable() throws IOException { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + Table table = + validationCatalog.createTable( + TableIdentifier.of("default", TABLE_NAME), SCHEMA, spec, tableProperties(3)); + + // write 10 records per data file + writeRecords(table, 10, 10, 10); + + dvCompactionWithHighDeleteRatio(table); + } + + private void dvCompactionWithHighDeleteRatio(Table table) throws IOException { + assertThat(records(table)).hasSize(100); + assertThat(deleteRecords(table)).hasSize(0); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null); + + List dataFiles = TestHelpers.dataFiles(table); + for (int i = 0; i < dataFiles.size(); i++) { + DataFile dataFile = dataFiles.get(i); + for (int j = 0; j < dataFile.recordCount(); j++) { + if (i + 1 < dataFiles.size()) { + writer.delete(dataFile.location(), j, table.spec(), dataFile.partition()); + } else if (j < 5) { + // only delete 5 records from the last data file + writer.delete(dataFile.location(), j, table.spec(), dataFile.partition()); + } + } + } + + writer.close(); + List deleteFiles = writer.result().deleteFiles(); + RowDelta rowDelta = table.newRowDelta(); + deleteFiles.forEach(rowDelta::addDeletes); + rowDelta.commit(); + + assertThat(records(table)).hasSize(5); + assertThat(deleteRecords(table)).hasSize(95); + + RowDelta delta = table.newRowDelta(); + // expire 9 out of 10 delete files + TestHelpers.deleteFiles(table).stream() + .filter(del -> del.recordCount() == 10) + .forEach(delta::removeDeletes); + delta.commit(); + + assertThat(records(table)).hasSize(95); + assertThat(deleteRecords(table)).hasSize(5); + + String puffinLocationBeforeRewrite = deleteFiles.get(0).location(); + assertThat( + Puffin.read(Files.localInput(puffinLocationBeforeRewrite)) + .build() + .fileMetadata() + .blobs()) + .hasSize(10); + + Result result = SparkActions.get(spark).rewriteDVs(table).execute(); + + assertThat(result.rewrittenDeleteFilesCount()).isEqualTo(1); + assertThat(result.addedDeleteFilesCount()).isEqualTo(1); + + Set files = TestHelpers.deleteFiles(table); + assertThat(files).hasSize(1); + + // live ratio was 10%, so a new Puffin file has been written with only a single DV + String puffinLocationAfterRewrite = Iterables.getOnlyElement(files).location(); + assertThat(puffinLocationAfterRewrite).isNotEqualTo(puffinLocationBeforeRewrite); + assertThat( + Puffin.read(Files.localInput(puffinLocationAfterRewrite)) + .build() + .fileMetadata() + .blobs()) + .hasSize(1); + } + + @TestTemplate + public void dvCompactionWithLowLiveRatio() throws IOException { + Table table = + validationCatalog.createTable( + TableIdentifier.of("default", TABLE_NAME), + SCHEMA, + PartitionSpec.unpartitioned(), + tableProperties(3)); + + // write 10 records per data file + writeRecords(table, 10, 100); + + assertThat(records(table)).hasSize(100); + assertThat(deleteRecords(table)).hasSize(0); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null); + + List dataFiles = TestHelpers.dataFiles(table); + for (DataFile dataFile : dataFiles) { + for (int j = 0; j < dataFile.recordCount(); j++) { + if (j < 5) { + // only delete 5 records from each data file + writer.delete(dataFile.location(), j, table.spec(), dataFile.partition()); + } + } + } + + writer.close(); + List deleteFiles = writer.result().deleteFiles(); + RowDelta rowDelta = table.newRowDelta(); + deleteFiles.forEach(rowDelta::addDeletes); + rowDelta.commit(); + + assertThat(records(table)).hasSize(50); + assertThat(deleteRecords(table)).hasSize(50); + + RowDelta delta = table.newRowDelta(); + // expire 5 out of 10 delete files + ImmutableList.copyOf(TestHelpers.deleteFiles(table)) + .subList(0, 5) + .forEach(delta::removeDeletes); + delta.commit(); + + assertThat(records(table)).hasSize(75); + assertThat(deleteRecords(table)).hasSize(25); + + Set currentDeleteFiles = TestHelpers.deleteFiles(table); + assertThat(currentDeleteFiles).hasSize(5); + String puffinLocationBeforeRewrite = currentDeleteFiles.stream().findFirst().get().location(); + assertThat( + Puffin.read(Files.localInput(puffinLocationBeforeRewrite)) + .build() + .fileMetadata() + .blobs()) + .hasSize(10); + + Result result = + SparkActions.get(spark) + .rewriteDVs(table) + .option(SparkBinPackDVRewriter.DELETE_RATIO_THRESHOLD, "0.7") + .execute(); + + assertThat(result.rewrittenDeleteFilesCount()).isEqualTo(0); + assertThat(result.addedDeleteFilesCount()).isEqualTo(0); + + Set filesAfterRewrite = TestHelpers.deleteFiles(table); + assertThat(filesAfterRewrite).hasSize(5); + + // live ratio was 50%, so no new Puffin file should have been written + String puffinLocationAfterRewrite = filesAfterRewrite.stream().findFirst().get().location(); + assertThat(puffinLocationAfterRewrite).isEqualTo(puffinLocationBeforeRewrite); + assertThat( + Puffin.read(Files.localInput(puffinLocationAfterRewrite)) + .build() + .fileMetadata() + .blobs()) + .hasSize(10); + } + + private Map tableProperties() { + return tableProperties(3); + } + + private Map tableProperties(int formatVersion) { + return ImmutableMap.of(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + } + + private void writeRecords(Table table, int files, int numRecords) { + writeRecords(table, files, numRecords, 1); + } + + private void writeRecords(Table table, int files, int numRecords, int numPartitions) { + writeRecordsWithPartitions( + table, + files, + numRecords, + IntStream.range(0, numPartitions).mapToObj(ImmutableList::of).collect(Collectors.toList())); + } + + private void writeRecordsWithPartitions( + Table table, int files, int numRecords, List> partitions) { + int partitionTypeSize = table.spec().partitionType().fields().size(); + assertThat(partitionTypeSize) + .as("This method currently supports only two columns as partition columns") + .isLessThanOrEqualTo(2); + + BiFunction, ThreeColumnRecord> recordFunction = + (i, partValues) -> { + switch (partitionTypeSize) { + case (0): + return new ThreeColumnRecord(i, String.valueOf(i), String.valueOf(i)); + case (1): + return new ThreeColumnRecord(partValues.get(0), String.valueOf(i), String.valueOf(i)); + case (2): + return new ThreeColumnRecord( + partValues.get(0), String.valueOf(partValues.get(1)), String.valueOf(i)); + default: + throw new ValidationException( + "This method currently supports only two columns as partition columns"); + } + }; + List records = + partitions.stream() + .flatMap( + partition -> + IntStream.range(0, numRecords) + .mapToObj(i -> recordFunction.apply(i, partition))) + .collect(Collectors.toList()); + spark + .createDataFrame(records, ThreeColumnRecord.class) + .repartition(files) + .write() + .format("iceberg") + .mode("append") + .save(name(table)); + table.refresh(); + } + + private List records(Table table) { + return rowsToJava( + spark.read().format("iceberg").load(name(table)).sort("c1", "c2", "c3").collectAsList()); + } + + private List deleteRecords(Table table) { + String[] additionalFields; + // do not select delete_file_path for comparison + // as delete files have been rewritten + if (table.spec().isUnpartitioned()) { + additionalFields = new String[] {"pos", "row"}; + } else { + additionalFields = new String[] {"pos", "row", "partition", "spec_id"}; + } + return rowsToJava( + spark + .read() + .format("iceberg") + .load(name(table) + ".position_deletes") + .select("file_path", additionalFields) + .sort("file_path", "pos") + .collectAsList()); + } + + private String name(Table table) { + String[] splits = table.name().split("\\."); + + assertThat(splits).hasSize(3); + return String.format("%s.%s", splits[1], splits[2]); + } +} From 204c7dbaac6dbe435c5be103a98c0ce880bdaaae Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Wed, 26 Feb 2025 10:33:28 +0100 Subject: [PATCH 2/2] changes --- .../apache/iceberg/puffin/PuffinReader.java | 2 +- .../spark/actions/RewriteDVsSparkAction.java | 19 +++++++++---------- .../spark/actions/SparkBinPackDVRewriter.java | 6 +++--- .../spark/actions/TestRewriteDVsAction.java | 4 ---- 4 files changed, 13 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/puffin/PuffinReader.java b/core/src/main/java/org/apache/iceberg/puffin/PuffinReader.java index 63f7af917540..9183674db204 100644 --- a/core/src/main/java/org/apache/iceberg/puffin/PuffinReader.java +++ b/core/src/main/java/org/apache/iceberg/puffin/PuffinReader.java @@ -160,7 +160,7 @@ private static void checkMagic(byte[] data, int offset) { public long dataSize() { try { - return fileSize - footerSize() - 4; + return fileSize - footerSize() - 4L; } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDVsSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDVsSparkAction.java index 871a5277bd9a..35ab19df2ad2 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDVsSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDVsSparkAction.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.actions; +import java.io.IOException; import java.math.RoundingMode; import java.util.List; import java.util.Map; @@ -116,19 +117,17 @@ public Result execute() { validateAndInitOptions(); - // TODO: 1) if V3 table has V2 deletes -> rewrite those - // TODO: 2) if V3 table has V3 deletes -> check their liveness ratio - // TODO: 3) if V3 table has multiple (conflicting DVs) for the same data file -> merge those - Table deletesTable = MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); - CloseableIterable tasks = planFiles(deletesTable); - Map>> fileGroupsByPuffinPath = Maps.newHashMap(); - - Map> byPuffinPath = - StreamSupport.stream(tasks.spliterator(), false) - .collect(Collectors.groupingBy(task -> task.file().location())); + Map> byPuffinPath = Maps.newHashMap(); + try (CloseableIterable tasks = planFiles(deletesTable)) { + byPuffinPath = + StreamSupport.stream(tasks.spliterator(), false) + .collect(Collectors.groupingBy(task -> task.file().location())); + } catch (IOException e) { + LOG.error("Cannot properly close file iterable while planning for rewrite", e); + } for (Map.Entry> entry : byPuffinPath.entrySet()) { fileGroupsByPuffinPath.put( diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDVRewriter.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDVRewriter.java index d5b63c9b4fbc..0a0cf983c338 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDVRewriter.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDVRewriter.java @@ -52,7 +52,7 @@ class SparkBinPackDVRewriter extends SparkBinPackPositionDeletesRewriter { public static final double DELETE_RATIO_THRESHOLD_DEFAULT = 0.3; - private double deleteRatio; + private double deleteRatioThreshold; SparkBinPackDVRewriter(SparkSession spark, Table table) { super(spark, table); @@ -71,7 +71,7 @@ public Set validOptions() { @Override public void init(Map options) { super.init(options); - this.deleteRatio = + this.deleteRatioThreshold = PropertyUtil.propertyAsDouble( options, DELETE_RATIO_THRESHOLD, DELETE_RATIO_THRESHOLD_DEFAULT); } @@ -102,7 +102,7 @@ private boolean tooHighDeleteRatio(List group) { } double liveRatio = liveDataSize / (double) totalDataSize; - return 1.0d - liveRatio >= deleteRatio; + return 1.0d - liveRatio >= deleteRatioThreshold; } @Override diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDVsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDVsAction.java index d7cd8ce09e9d..c99157be04fa 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDVsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDVsAction.java @@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; -import java.nio.file.Path; import java.util.List; import java.util.Map; import java.util.Set; @@ -57,7 +56,6 @@ import org.apache.iceberg.types.Types; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.io.TempDir; public class TestRewriteDVsAction extends CatalogTestBase { @@ -79,8 +77,6 @@ public static Object[][] parameters() { }; } - @TempDir private Path temp; - @AfterEach public void cleanup() { validationCatalog.dropTable(TableIdentifier.of("default", TABLE_NAME));