Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Multiple shuffle partitions per file in compaction #7897

Merged
merged 4 commits into from Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical.DropTag
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.MergeRows
import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode
import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce
import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
Expand All @@ -47,6 +48,7 @@ import org.apache.spark.sql.catalyst.plans.logical.UpdateRows
import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.execution.OrderAwareCoalesceExec
import org.apache.spark.sql.execution.SparkPlan
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -111,6 +113,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
case NoStatsUnaryNode(child) =>
planLater(child) :: Nil

case OrderAwareCoalesce(numPartitions, coalescer, child) =>
OrderAwareCoalesceExec(numPartitions, coalescer, planLater(child)) :: Nil

case _ => Nil
}

Expand Down
Expand Up @@ -184,6 +184,30 @@ public void testRewriteDataFilesWithSortStrategy() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() {
createTable();
insertData(10 /* file count */);
List<Object[]> expectedRecords = currentData();

List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files("
+ " table => '%s', "
+ " strategy => 'sort', "
+ " sort_order => 'c1', "
+ " options => map('shuffle-partitions-per-file', '2'))",
catalogName, tableIdent);

assertEquals(
"Action should rewrite 10 data files and add 1 data files",
row(10, 1),
Arrays.copyOf(output.get(0), 2));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithZOrder() {
createTable();
Expand Down Expand Up @@ -225,6 +249,43 @@ public void testRewriteDataFilesWithZOrder() {
assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
}

@Test
Copy link
Collaborator

@szehon-ho szehon-ho Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice, but did we also add a test that assert the sort order is preserved within partition? (ex, small partition, and just assert that that the file is in order)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a check below for the order of records. I just added a similar one for the regular sort, so we verify the order of records is correct both in regular sorts and in z-ordering.

public void testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() {
createTable();
insertData(10 /* file count */);

List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files("
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also assert that OrderAwareCoaleseExec is inserted by inspecting the plan ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit tricky in this case as the result plan would be CallExec, we don't have an easy way to inspect the triggered plan from the procedure. I did check manually, though.

+ " table => '%s', "
+ "strategy => 'sort', "
+ " sort_order => 'zorder(c1, c2)', "
+ " options => map('shuffle-partitions-per-file', '2'))",
catalogName, tableIdent);

assertEquals(
"Action should rewrite 10 data files and add 1 data files",
row(10, 1),
Arrays.copyOf(output.get(0), 2));

// Due to Z_order, the data written will be in the below order.
// As there is only one small output file, we can validate the query ordering (as it will not
// change).
ImmutableList<Object[]> expectedRows =
ImmutableList.of(
row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null),
row(2, "bar", null),
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null),
row(1, "foo", null));
assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
}

@Test
public void testRewriteDataFilesWithFilter() {
createTable();
Expand Down
Expand Up @@ -36,6 +36,8 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce;
import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalescer;
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.Distributions;
import org.apache.spark.sql.connector.distributions.OrderedDistribution;
Expand All @@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {

public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;

/**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested the current implementation on a table with 1 TB of data and a cluster with 16 GB executors 7 cores each. The target file size is 1 GB (zstd Parquet data). Sort-based optimizations without this option were spilling and failed, I lost all executors one by one. I tried using 8 shuffle partitions per file and the operation succeeded without any failures and produced properly sized files.

* The number of shuffle partitions to use for each output file. By default, this file rewriter
* assumes each shuffle partition would become a separate output file. Attempting to generate
* large output files of 512 MB and more may strain the memory resources of the cluster as such
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and more => or higher

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

* rewrites would require lots of Spark memory. This parameter can be used to further divide up
* the data which will end up in a single file. For example, if the target file size is 2 GB, but
* the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will
* use a custom coalesce operation to stitch these sorted partitions back together into a single
* sorted file.
*
* <p>Note using this parameter requires enabling Iceberg Spark session extensions.
*/
public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not to block this change, but did we consider having shuffle-threshold? Ie, if we have some partition with 2G but others that are way less than 512MB, no need to shuffle the ones that are less?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean like switching to a local sort if the size of the data to compact is small?

Copy link
Collaborator

@szehon-ho szehon-ho Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just wondering the use case, where we set shuffle-partitions-per-file to 4, because we want 2GB files but can only shuffle 512mb. However, consider an Iceberg partition (rewrite group) that has only 512MB files during this rewrite. Will we still shuffle to four partitions in this case and coalesce at end, unnecessarily? I may be missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be still fine to apply this optimization as there is no extra cost. I achieved best results with 128 MB shuffle blocks so it should be fairly safe to assume the operation would complete fine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but would there be issues in contending for pods. Also wouldn't it make more sense to have 128MB as a conf (shuffle-threshold), otherwise its always a bit dynamic depending on the max partition size? Not sure if there are other issues with this approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I have never seen issues with this approach in any of our prod jobs in the last few years. Not applying this split if the size of the job is less than 128MB could be a valid step but it would require quite a bit of changes to pass more info around. I'd probably skip it for now until we experience any issues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can do it later then if there's a need


public static final int SHUFFLE_PARTITIONS_PER_FILE_DEFAULT = 1;

private double compressionFactor;
private int numShufflePartitionsPerFile;

protected SparkShufflingDataRewriter(SparkSession spark, Table table) {
super(spark, table);
Expand All @@ -75,13 +94,15 @@ public Set<String> validOptions() {
return ImmutableSet.<String>builder()
.addAll(super.validOptions())
.add(COMPRESSION_FACTOR)
.add(SHUFFLE_PARTITIONS_PER_FILE)
.build();
}

@Override
public void init(Map<String, String> options) {
super.init(options);
this.compressionFactor = compressionFactor(options);
this.numShufflePartitionsPerFile = numShufflePartitionsPerFile(options);
}

@Override
Expand Down Expand Up @@ -114,7 +135,16 @@ private Function<Dataset<Row>, Dataset<Row>> sortFunction(List<FileScanTask> gro
private LogicalPlan sortPlan(LogicalPlan plan, SortOrder[] ordering, int numShufflePartitions) {
SparkFunctionCatalog catalog = SparkFunctionCatalog.get();
OrderedWrite write = new OrderedWrite(ordering, numShufflePartitions);
return DistributionAndOrderingUtils$.MODULE$.prepareQuery(write, plan, Option.apply(catalog));
LogicalPlan sortPlan =
DistributionAndOrderingUtils$.MODULE$.prepareQuery(write, plan, Option.apply(catalog));

if (numShufflePartitionsPerFile == 1) {
return sortPlan;
} else {
OrderAwareCoalescer coalescer = new OrderAwareCoalescer(numShufflePartitionsPerFile);
int numOutputPartitions = numShufflePartitions / numShufflePartitionsPerFile;
return new OrderAwareCoalesce(numOutputPartitions, coalescer, sortPlan);
}
}

private Dataset<Row> transformPlan(Dataset<Row> df, Function<LogicalPlan, LogicalPlan> func) {
Expand All @@ -134,7 +164,7 @@ private org.apache.iceberg.SortOrder outputSortOrder(List<FileScanTask> group) {

private int numShufflePartitions(List<FileScanTask> group) {
int numOutputFiles = (int) numOutputFiles((long) (inputSize(group) * compressionFactor));
return Math.max(1, numOutputFiles);
return Math.max(1, numOutputFiles * numShufflePartitionsPerFile);
}

private double compressionFactor(Map<String, String> options) {
Expand All @@ -145,6 +175,19 @@ private double compressionFactor(Map<String, String> options) {
return value;
}

private int numShufflePartitionsPerFile(Map<String, String> options) {
int value =
PropertyUtil.propertyAsInt(
options, SHUFFLE_PARTITIONS_PER_FILE, SHUFFLE_PARTITIONS_PER_FILE_DEFAULT);
Preconditions.checkArgument(
value > 0, "'%s' is set to %s but must be > 0", SHUFFLE_PARTITIONS_PER_FILE, value);
Preconditions.checkArgument(
value == 1 || Spark3Util.extensionsEnabled(spark()),
"Using '%s' requires enabling Iceberg Spark session extensions",
SHUFFLE_PARTITIONS_PER_FILE);
return value;
}

private static class OrderedWrite implements RequiresDistributionAndOrdering {
private final OrderedDistribution distribution;
private final SortOrder[] ordering;
Expand Down
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.rdd.PartitionCoalescer
import org.apache.spark.rdd.PartitionGroup
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Attribute

// this node doesn't extend RepartitionOperation on purpose to keep this logic isolated
// and ignore it in optimizer rules such as CollapseRepartition
case class OrderAwareCoalesce(
numPartitions: Int,
coalescer: PartitionCoalescer,
child: LogicalPlan) extends OrderPreservingUnaryNode {

override def output: Seq[Attribute] = child.output

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
copy(child = newChild)
}
}

class OrderAwareCoalescer(val groupSize: Int) extends PartitionCoalescer with Serializable {

override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
val partitionBins = parent.partitions.grouped(groupSize)
partitionBins.map { partitions =>
val group = new PartitionGroup()
group.partitions ++= partitions
group
}.toArray
}
}
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.rdd.PartitionCoalescer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning

case class OrderAwareCoalesceExec(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inspired by CoalesceExec in Spark.

numPartitions: Int,
coalescer: PartitionCoalescer,
child: SparkPlan) extends UnaryExecNode {

override def output: Seq[Attribute] = child.output

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def outputPartitioning: Partitioning = {
if (numPartitions == 1) SinglePartition else UnknownPartitioning(numPartitions)
}

protected override def doExecute(): RDD[InternalRow] = {
val result = child.execute()
if (numPartitions == 1 && result.getNumPartitions < 1) {
// make sure we don't output an RDD with 0 partitions,
// when claiming that we have a `SinglePartition`
// see CoalesceExec in Spark
new CoalesceExec.EmptyRDDWithPartitions(sparkContext, numPartitions)
} else {
result.coalesce(numPartitions, shuffle = false, Some(coalescer))
}
}

override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = {
copy(child = newChild)
}
}
Expand Up @@ -894,6 +894,15 @@ public void testInvalidOptions() {
() -> basicRewrite(table).option(RewriteDataFiles.REWRITE_JOB_ORDER, "foo").execute())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid rewrite job order name: foo");

Assertions.assertThatThrownBy(
() ->
basicRewrite(table)
.sort(SortOrder.builderFor(table.schema()).asc("c2").build())
.option(SparkShufflingDataRewriter.SHUFFLE_PARTITIONS_PER_FILE, "5")
.execute())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("requires enabling Iceberg Spark session extensions");
}

@Test
Expand Down
Expand Up @@ -261,6 +261,7 @@ public void testSortDataValidOptions() {
Assert.assertEquals(
"Rewriter must report all supported options",
ImmutableSet.of(
SparkSortDataRewriter.SHUFFLE_PARTITIONS_PER_FILE,
SparkSortDataRewriter.TARGET_FILE_SIZE_BYTES,
SparkSortDataRewriter.MIN_FILE_SIZE_BYTES,
SparkSortDataRewriter.MAX_FILE_SIZE_BYTES,
Expand All @@ -281,6 +282,7 @@ public void testZOrderDataValidOptions() {
Assert.assertEquals(
"Rewriter must report all supported options",
ImmutableSet.of(
SparkZOrderDataRewriter.SHUFFLE_PARTITIONS_PER_FILE,
SparkZOrderDataRewriter.TARGET_FILE_SIZE_BYTES,
SparkZOrderDataRewriter.MIN_FILE_SIZE_BYTES,
SparkZOrderDataRewriter.MAX_FILE_SIZE_BYTES,
Expand Down