New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.4: Multiple shuffle partitions per file in compaction #7897
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -184,6 +184,30 @@ public void testRewriteDataFilesWithSortStrategy() { | |
assertEquals("Data after compaction should not change", expectedRecords, actualRecords); | ||
} | ||
|
||
@Test | ||
public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() { | ||
createTable(); | ||
insertData(10 /* file count */); | ||
List<Object[]> expectedRecords = currentData(); | ||
|
||
List<Object[]> output = | ||
sql( | ||
"CALL %s.system.rewrite_data_files(" | ||
+ " table => '%s', " | ||
+ " strategy => 'sort', " | ||
+ " sort_order => 'c1', " | ||
+ " options => map('shuffle-partitions-per-file', '2'))", | ||
catalogName, tableIdent); | ||
|
||
assertEquals( | ||
"Action should rewrite 10 data files and add 1 data files", | ||
row(10, 1), | ||
Arrays.copyOf(output.get(0), 2)); | ||
|
||
List<Object[]> actualRecords = currentData(); | ||
assertEquals("Data after compaction should not change", expectedRecords, actualRecords); | ||
} | ||
|
||
@Test | ||
public void testRewriteDataFilesWithZOrder() { | ||
createTable(); | ||
|
@@ -225,6 +249,43 @@ public void testRewriteDataFilesWithZOrder() { | |
assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName)); | ||
} | ||
|
||
@Test | ||
public void testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() { | ||
createTable(); | ||
insertData(10 /* file count */); | ||
|
||
List<Object[]> output = | ||
sql( | ||
"CALL %s.system.rewrite_data_files(" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we also assert that OrderAwareCoaleseExec is inserted by inspecting the plan ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a bit tricky in this case as the result plan would be |
||
+ " table => '%s', " | ||
+ "strategy => 'sort', " | ||
+ " sort_order => 'zorder(c1, c2)', " | ||
+ " options => map('shuffle-partitions-per-file', '2'))", | ||
catalogName, tableIdent); | ||
|
||
assertEquals( | ||
"Action should rewrite 10 data files and add 1 data files", | ||
row(10, 1), | ||
Arrays.copyOf(output.get(0), 2)); | ||
|
||
// Due to Z_order, the data written will be in the below order. | ||
// As there is only one small output file, we can validate the query ordering (as it will not | ||
// change). | ||
ImmutableList<Object[]> expectedRows = | ||
ImmutableList.of( | ||
row(2, "bar", null), | ||
row(2, "bar", null), | ||
row(2, "bar", null), | ||
row(2, "bar", null), | ||
row(2, "bar", null), | ||
row(1, "foo", null), | ||
row(1, "foo", null), | ||
row(1, "foo", null), | ||
row(1, "foo", null), | ||
row(1, "foo", null)); | ||
assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName)); | ||
} | ||
|
||
@Test | ||
public void testRewriteDataFilesWithFilter() { | ||
createTable(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,8 @@ | |
import org.apache.spark.sql.Row; | ||
import org.apache.spark.sql.SparkSession; | ||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; | ||
import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce; | ||
import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalescer; | ||
import org.apache.spark.sql.connector.distributions.Distribution; | ||
import org.apache.spark.sql.connector.distributions.Distributions; | ||
import org.apache.spark.sql.connector.distributions.OrderedDistribution; | ||
|
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter { | |
|
||
public static final double COMPRESSION_FACTOR_DEFAULT = 1.0; | ||
|
||
/** | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tested the current implementation on a table with 1 TB of data and a cluster with 16 GB executors 7 cores each. The target file size is 1 GB (zstd Parquet data). Sort-based optimizations without this option were spilling and failed, I lost all executors one by one. I tried using 8 shuffle partitions per file and the operation succeeded without any failures and produced properly sized files. |
||
* The number of shuffle partitions to use for each output file. By default, this file rewriter | ||
* assumes each shuffle partition would become a separate output file. Attempting to generate | ||
* large output files of 512 MB and more may strain the memory resources of the cluster as such | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and more => or higher There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
* rewrites would require lots of Spark memory. This parameter can be used to further divide up | ||
* the data which will end up in a single file. For example, if the target file size is 2 GB, but | ||
* the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will | ||
* use a custom coalesce operation to stitch these sorted partitions back together into a single | ||
* sorted file. | ||
* | ||
* <p>Note using this parameter requires enabling Iceberg Spark session extensions. | ||
*/ | ||
public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not to block this change, but did we consider having shuffle-threshold? Ie, if we have some partition with 2G but others that are way less than 512MB, no need to shuffle the ones that are less? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean like switching to a local sort if the size of the data to compact is small? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was just wondering the use case, where we set shuffle-partitions-per-file to 4, because we want 2GB files but can only shuffle 512mb. However, consider an Iceberg partition (rewrite group) that has only 512MB files during this rewrite. Will we still shuffle to four partitions in this case and coalesce at end, unnecessarily? I may be missing something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be still fine to apply this optimization as there is no extra cost. I achieved best results with 128 MB shuffle blocks so it should be fairly safe to assume the operation would complete fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, but would there be issues in contending for pods. Also wouldn't it make more sense to have 128MB as a conf (shuffle-threshold), otherwise its always a bit dynamic depending on the max partition size? Not sure if there are other issues with this approach. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be honest, I have never seen issues with this approach in any of our prod jobs in the last few years. Not applying this split if the size of the job is less than 128MB could be a valid step but it would require quite a bit of changes to pass more info around. I'd probably skip it for now until we experience any issues. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, we can do it later then if there's a need |
||
|
||
public static final int SHUFFLE_PARTITIONS_PER_FILE_DEFAULT = 1; | ||
|
||
private double compressionFactor; | ||
private int numShufflePartitionsPerFile; | ||
|
||
protected SparkShufflingDataRewriter(SparkSession spark, Table table) { | ||
super(spark, table); | ||
|
@@ -75,13 +94,15 @@ public Set<String> validOptions() { | |
return ImmutableSet.<String>builder() | ||
.addAll(super.validOptions()) | ||
.add(COMPRESSION_FACTOR) | ||
.add(SHUFFLE_PARTITIONS_PER_FILE) | ||
.build(); | ||
} | ||
|
||
@Override | ||
public void init(Map<String, String> options) { | ||
super.init(options); | ||
this.compressionFactor = compressionFactor(options); | ||
this.numShufflePartitionsPerFile = numShufflePartitionsPerFile(options); | ||
} | ||
|
||
@Override | ||
|
@@ -114,7 +135,16 @@ private Function<Dataset<Row>, Dataset<Row>> sortFunction(List<FileScanTask> gro | |
private LogicalPlan sortPlan(LogicalPlan plan, SortOrder[] ordering, int numShufflePartitions) { | ||
SparkFunctionCatalog catalog = SparkFunctionCatalog.get(); | ||
OrderedWrite write = new OrderedWrite(ordering, numShufflePartitions); | ||
return DistributionAndOrderingUtils$.MODULE$.prepareQuery(write, plan, Option.apply(catalog)); | ||
LogicalPlan sortPlan = | ||
DistributionAndOrderingUtils$.MODULE$.prepareQuery(write, plan, Option.apply(catalog)); | ||
|
||
if (numShufflePartitionsPerFile == 1) { | ||
return sortPlan; | ||
} else { | ||
OrderAwareCoalescer coalescer = new OrderAwareCoalescer(numShufflePartitionsPerFile); | ||
int numOutputPartitions = numShufflePartitions / numShufflePartitionsPerFile; | ||
return new OrderAwareCoalesce(numOutputPartitions, coalescer, sortPlan); | ||
} | ||
} | ||
|
||
private Dataset<Row> transformPlan(Dataset<Row> df, Function<LogicalPlan, LogicalPlan> func) { | ||
|
@@ -134,7 +164,7 @@ private org.apache.iceberg.SortOrder outputSortOrder(List<FileScanTask> group) { | |
|
||
private int numShufflePartitions(List<FileScanTask> group) { | ||
int numOutputFiles = (int) numOutputFiles((long) (inputSize(group) * compressionFactor)); | ||
return Math.max(1, numOutputFiles); | ||
return Math.max(1, numOutputFiles * numShufflePartitionsPerFile); | ||
} | ||
|
||
private double compressionFactor(Map<String, String> options) { | ||
|
@@ -145,6 +175,19 @@ private double compressionFactor(Map<String, String> options) { | |
return value; | ||
} | ||
|
||
private int numShufflePartitionsPerFile(Map<String, String> options) { | ||
int value = | ||
PropertyUtil.propertyAsInt( | ||
options, SHUFFLE_PARTITIONS_PER_FILE, SHUFFLE_PARTITIONS_PER_FILE_DEFAULT); | ||
Preconditions.checkArgument( | ||
value > 0, "'%s' is set to %s but must be > 0", SHUFFLE_PARTITIONS_PER_FILE, value); | ||
Preconditions.checkArgument( | ||
value == 1 || Spark3Util.extensionsEnabled(spark()), | ||
"Using '%s' requires enabling Iceberg Spark session extensions", | ||
SHUFFLE_PARTITIONS_PER_FILE); | ||
return value; | ||
} | ||
|
||
private static class OrderedWrite implements RequiresDistributionAndOrdering { | ||
private final OrderedDistribution distribution; | ||
private final SortOrder[] ordering; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.plans.logical | ||
|
||
import org.apache.spark.rdd.PartitionCoalescer | ||
import org.apache.spark.rdd.PartitionGroup | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.catalyst.expressions.Attribute | ||
|
||
// this node doesn't extend RepartitionOperation on purpose to keep this logic isolated | ||
// and ignore it in optimizer rules such as CollapseRepartition | ||
case class OrderAwareCoalesce( | ||
numPartitions: Int, | ||
coalescer: PartitionCoalescer, | ||
child: LogicalPlan) extends OrderPreservingUnaryNode { | ||
|
||
override def output: Seq[Attribute] = child.output | ||
|
||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = { | ||
copy(child = newChild) | ||
} | ||
} | ||
|
||
class OrderAwareCoalescer(val groupSize: Int) extends PartitionCoalescer with Serializable { | ||
|
||
override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = { | ||
val partitionBins = parent.partitions.grouped(groupSize) | ||
partitionBins.map { partitions => | ||
val group = new PartitionGroup() | ||
group.partitions ++= partitions | ||
group | ||
}.toArray | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.execution | ||
|
||
import org.apache.spark.rdd.PartitionCoalescer | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions.Attribute | ||
import org.apache.spark.sql.catalyst.expressions.SortOrder | ||
import org.apache.spark.sql.catalyst.plans.physical.Partitioning | ||
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition | ||
import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning | ||
|
||
case class OrderAwareCoalesceExec( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inspired by |
||
numPartitions: Int, | ||
coalescer: PartitionCoalescer, | ||
child: SparkPlan) extends UnaryExecNode { | ||
|
||
override def output: Seq[Attribute] = child.output | ||
|
||
override def outputOrdering: Seq[SortOrder] = child.outputOrdering | ||
|
||
override def outputPartitioning: Partitioning = { | ||
if (numPartitions == 1) SinglePartition else UnknownPartitioning(numPartitions) | ||
} | ||
|
||
protected override def doExecute(): RDD[InternalRow] = { | ||
val result = child.execute() | ||
if (numPartitions == 1 && result.getNumPartitions < 1) { | ||
// make sure we don't output an RDD with 0 partitions, | ||
// when claiming that we have a `SinglePartition` | ||
// see CoalesceExec in Spark | ||
new CoalesceExec.EmptyRDDWithPartitions(sparkContext, numPartitions) | ||
} else { | ||
result.coalesce(numPartitions, shuffle = false, Some(coalescer)) | ||
} | ||
} | ||
|
||
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = { | ||
copy(child = newChild) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice, but did we also add a test that assert the sort order is preserved within partition? (ex, small partition, and just assert that that the file is in order)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a check below for the order of records. I just added a similar one for the regular sort, so we verify the order of records is correct both in regular sorts and in z-ordering.