diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index aa81e38f29d3..2f67508c1fb8 100644 --- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical.DropTag import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.MergeRows import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode +import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields @@ -47,6 +48,7 @@ import org.apache.spark.sql.catalyst.plans.logical.UpdateRows import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.execution.OrderAwareCoalesceExec import org.apache.spark.sql.execution.SparkPlan import scala.jdk.CollectionConverters._ @@ -111,6 +113,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case NoStatsUnaryNode(child) => planLater(child) :: Nil + case OrderAwareCoalesce(numPartitions, coalescer, child) => + OrderAwareCoalesceExec(numPartitions, coalescer, planLater(child)) :: Nil + case _ => Nil } diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index 3ed47d54d374..88fdf2920723 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -184,6 +184,41 @@ public void testRewriteDataFilesWithSortStrategy() { assertEquals("Data after compaction should not change", expectedRecords, actualRecords); } + @Test + public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() { + createTable(); + insertData(10 /* file count */); + + List output = + sql( + "CALL %s.system.rewrite_data_files(" + + " table => '%s', " + + " strategy => 'sort', " + + " sort_order => 'c1', " + + " options => map('shuffle-partitions-per-file', '2'))", + catalogName, tableIdent); + + assertEquals( + "Action should rewrite 10 data files and add 1 data files", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + + // as there is only one small output file, validate the query ordering (it will not change) + ImmutableList expectedRows = + ImmutableList.of( + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null), + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null)); + assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName)); + } + @Test public void testRewriteDataFilesWithZOrder() { createTable(); @@ -225,6 +260,42 @@ public void testRewriteDataFilesWithZOrder() { assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName)); } + @Test + public void testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() { + createTable(); + insertData(10 /* file count */); + + List output = + sql( + "CALL %s.system.rewrite_data_files(" + + " table => '%s', " + + "strategy => 'sort', " + + " sort_order => 'zorder(c1, c2)', " + + " options => map('shuffle-partitions-per-file', '2'))", + catalogName, tableIdent); + + assertEquals( + "Action should rewrite 10 data files and add 1 data files", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + + // due to z-ordering, the data will be written in the below order + // as there is only one small output file, validate the query ordering (it will not change) + ImmutableList expectedRows = + ImmutableList.of( + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null), + row(2, "bar", null), + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null), + row(1, "foo", null)); + assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName)); + } + @Test public void testRewriteDataFilesWithFilter() { createTable(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java index 1ee469b09072..c9c962526eb3 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java @@ -36,6 +36,8 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce; +import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalescer; import org.apache.spark.sql.connector.distributions.Distribution; import org.apache.spark.sql.connector.distributions.Distributions; import org.apache.spark.sql.connector.distributions.OrderedDistribution; @@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter { public static final double COMPRESSION_FACTOR_DEFAULT = 1.0; + /** + * The number of shuffle partitions to use for each output file. By default, this file rewriter + * assumes each shuffle partition would become a separate output file. Attempting to generate + * large output files of 512 MB or higher may strain the memory resources of the cluster as such + * rewrites would require lots of Spark memory. This parameter can be used to further divide up + * the data which will end up in a single file. For example, if the target file size is 2 GB, but + * the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will + * use a custom coalesce operation to stitch these sorted partitions back together into a single + * sorted file. + * + *

Note using this parameter requires enabling Iceberg Spark session extensions. + */ + public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file"; + + public static final int SHUFFLE_PARTITIONS_PER_FILE_DEFAULT = 1; + private double compressionFactor; + private int numShufflePartitionsPerFile; protected SparkShufflingDataRewriter(SparkSession spark, Table table) { super(spark, table); @@ -75,6 +94,7 @@ public Set validOptions() { return ImmutableSet.builder() .addAll(super.validOptions()) .add(COMPRESSION_FACTOR) + .add(SHUFFLE_PARTITIONS_PER_FILE) .build(); } @@ -82,6 +102,7 @@ public Set validOptions() { public void init(Map options) { super.init(options); this.compressionFactor = compressionFactor(options); + this.numShufflePartitionsPerFile = numShufflePartitionsPerFile(options); } @Override @@ -114,7 +135,16 @@ private Function, Dataset> sortFunction(List gro private LogicalPlan sortPlan(LogicalPlan plan, SortOrder[] ordering, int numShufflePartitions) { SparkFunctionCatalog catalog = SparkFunctionCatalog.get(); OrderedWrite write = new OrderedWrite(ordering, numShufflePartitions); - return DistributionAndOrderingUtils$.MODULE$.prepareQuery(write, plan, Option.apply(catalog)); + LogicalPlan sortPlan = + DistributionAndOrderingUtils$.MODULE$.prepareQuery(write, plan, Option.apply(catalog)); + + if (numShufflePartitionsPerFile == 1) { + return sortPlan; + } else { + OrderAwareCoalescer coalescer = new OrderAwareCoalescer(numShufflePartitionsPerFile); + int numOutputPartitions = numShufflePartitions / numShufflePartitionsPerFile; + return new OrderAwareCoalesce(numOutputPartitions, coalescer, sortPlan); + } } private Dataset transformPlan(Dataset df, Function func) { @@ -134,7 +164,7 @@ private org.apache.iceberg.SortOrder outputSortOrder(List group) { private int numShufflePartitions(List group) { int numOutputFiles = (int) numOutputFiles((long) (inputSize(group) * compressionFactor)); - return Math.max(1, numOutputFiles); + return Math.max(1, numOutputFiles * numShufflePartitionsPerFile); } private double compressionFactor(Map options) { @@ -145,6 +175,19 @@ private double compressionFactor(Map options) { return value; } + private int numShufflePartitionsPerFile(Map options) { + int value = + PropertyUtil.propertyAsInt( + options, SHUFFLE_PARTITIONS_PER_FILE, SHUFFLE_PARTITIONS_PER_FILE_DEFAULT); + Preconditions.checkArgument( + value > 0, "'%s' is set to %s but must be > 0", SHUFFLE_PARTITIONS_PER_FILE, value); + Preconditions.checkArgument( + value == 1 || Spark3Util.extensionsEnabled(spark()), + "Using '%s' requires enabling Iceberg Spark session extensions", + SHUFFLE_PARTITIONS_PER_FILE); + return value; + } + private static class OrderedWrite implements RequiresDistributionAndOrdering { private final OrderedDistribution distribution; private final SortOrder[] ordering; diff --git a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/OrderAwareCoalesce.scala b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/OrderAwareCoalesce.scala new file mode 100644 index 000000000000..5acaa6800e68 --- /dev/null +++ b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/OrderAwareCoalesce.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.rdd.PartitionCoalescer +import org.apache.spark.rdd.PartitionGroup +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.Attribute + +// this node doesn't extend RepartitionOperation on purpose to keep this logic isolated +// and ignore it in optimizer rules such as CollapseRepartition +case class OrderAwareCoalesce( + numPartitions: Int, + coalescer: PartitionCoalescer, + child: LogicalPlan) extends OrderPreservingUnaryNode { + + override def output: Seq[Attribute] = child.output + + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = { + copy(child = newChild) + } +} + +class OrderAwareCoalescer(val groupSize: Int) extends PartitionCoalescer with Serializable { + + override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = { + val partitionBins = parent.partitions.grouped(groupSize) + partitionBins.map { partitions => + val group = new PartitionGroup() + group.partitions ++= partitions + group + }.toArray + } +} diff --git a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/OrderAwareCoalesceExec.scala b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/OrderAwareCoalesceExec.scala new file mode 100644 index 000000000000..2ef99550524a --- /dev/null +++ b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/OrderAwareCoalesceExec.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.PartitionCoalescer +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.SortOrder +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning + +case class OrderAwareCoalesceExec( + numPartitions: Int, + coalescer: PartitionCoalescer, + child: SparkPlan) extends UnaryExecNode { + + override def output: Seq[Attribute] = child.output + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = { + if (numPartitions == 1) SinglePartition else UnknownPartitioning(numPartitions) + } + + protected override def doExecute(): RDD[InternalRow] = { + val result = child.execute() + if (numPartitions == 1 && result.getNumPartitions < 1) { + // make sure we don't output an RDD with 0 partitions, + // when claiming that we have a `SinglePartition` + // see CoalesceExec in Spark + new CoalesceExec.EmptyRDDWithPartitions(sparkContext, numPartitions) + } else { + result.coalesce(numPartitions, shuffle = false, Some(coalescer)) + } + } + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = { + copy(child = newChild) + } +} diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index bf4bef74c3fe..0cb910364394 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -894,6 +894,15 @@ public void testInvalidOptions() { () -> basicRewrite(table).option(RewriteDataFiles.REWRITE_JOB_ORDER, "foo").execute()) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid rewrite job order name: foo"); + + Assertions.assertThatThrownBy( + () -> + basicRewrite(table) + .sort(SortOrder.builderFor(table.schema()).asc("c2").build()) + .option(SparkShufflingDataRewriter.SHUFFLE_PARTITIONS_PER_FILE, "5") + .execute()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("requires enabling Iceberg Spark session extensions"); } @Test diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java index 6800ffd404ea..055e5be681c6 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java @@ -261,6 +261,7 @@ public void testSortDataValidOptions() { Assert.assertEquals( "Rewriter must report all supported options", ImmutableSet.of( + SparkSortDataRewriter.SHUFFLE_PARTITIONS_PER_FILE, SparkSortDataRewriter.TARGET_FILE_SIZE_BYTES, SparkSortDataRewriter.MIN_FILE_SIZE_BYTES, SparkSortDataRewriter.MAX_FILE_SIZE_BYTES, @@ -281,6 +282,7 @@ public void testZOrderDataValidOptions() { Assert.assertEquals( "Rewriter must report all supported options", ImmutableSet.of( + SparkZOrderDataRewriter.SHUFFLE_PARTITIONS_PER_FILE, SparkZOrderDataRewriter.TARGET_FILE_SIZE_BYTES, SparkZOrderDataRewriter.MIN_FILE_SIZE_BYTES, SparkZOrderDataRewriter.MAX_FILE_SIZE_BYTES,