Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5: Increase default advisory partition size for writes #8660

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ private TableProperties() {}
public static final String SPARK_WRITE_ACCEPT_ANY_SCHEMA = "write.spark.accept-any-schema";
public static final boolean SPARK_WRITE_ACCEPT_ANY_SCHEMA_DEFAULT = false;

public static final String SPARK_WRITE_ADVISORY_PARTITION_SIZE_BYTES =
"write.spark.advisory-partition-size-bytes";

public static final String SNAPSHOT_ID_INHERITANCE_ENABLED =
"compatibility.snapshot-id-inheritance.enabled";
public static final boolean SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,19 @@ public void testCoalesceDelete() throws Exception {

// enable AQE and set the advisory partition size big enough to trigger combining
// set the number of shuffle partitions to 200 to distribute the work across reducers
// set the advisory partition size for shuffles small enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "200",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't me, it is Spotless. I only changed the last line.

SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"),
SQLConf.SHUFFLE_PARTITIONS().key(),
"200",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
"true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(),
"true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(),
"100",
SparkSQLProperties.ADVISORY_PARTITION_SIZE,
String.valueOf(256 * 1024 * 1024)),
() -> {
SparkPlan plan =
executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
Expand Down Expand Up @@ -238,12 +245,19 @@ public void testSkewDelete() throws Exception {

// enable AQE and set the advisory partition size small enough to trigger a split
// set the number of shuffle partitions to 2 to only have 2 reducers
// set the advisory partition size for shuffles big enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "2",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
SQLConf.SHUFFLE_PARTITIONS().key(),
"2",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
"true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(),
"true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(),
"256MB",
SparkSQLProperties.ADVISORY_PARTITION_SIZE,
"100"),
() -> {
SparkPlan plan =
executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,21 @@ public void testCoalesceMerge() {
// enable AQE and set the advisory partition big enough to trigger combining
// set the number of shuffle partitions to 200 to distribute the work across reducers
// disable broadcast joins to make sure the join triggers a shuffle
// set the advisory partition size for shuffles small enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "200",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), "-1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"),
SQLConf.SHUFFLE_PARTITIONS().key(),
"200",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(),
"-1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
"true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(),
"true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(),
"100",
SparkSQLProperties.ADVISORY_PARTITION_SIZE,
String.valueOf(256 * 1024 * 1024)),
() -> {
sql(
"MERGE INTO %s t USING source "
Expand Down Expand Up @@ -352,13 +360,21 @@ public void testSkewMerge() {
// enable AQE and set the advisory partition size small enough to trigger a split
// set the number of shuffle partitions to 2 to only have 2 reducers
// set the min coalesce partition size small enough to avoid coalescing
// set the advisory partition size for shuffles big enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "4",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE().key(), "100",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
SQLConf.SHUFFLE_PARTITIONS().key(),
"4",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE().key(),
"100",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
"true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(),
"true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(),
"256MB",
SparkSQLProperties.ADVISORY_PARTITION_SIZE,
"100"),
() -> {
SparkPlan plan =
executeAndKeepPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,19 @@ public void testCoalesceUpdate() {

// enable AQE and set the advisory partition size big enough to trigger combining
// set the number of shuffle partitions to 200 to distribute the work across reducers
// set the advisory partition size for shuffles small enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "200",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"),
SQLConf.SHUFFLE_PARTITIONS().key(),
"200",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
"true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(),
"true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(),
"100",
SparkSQLProperties.ADVISORY_PARTITION_SIZE,
String.valueOf(256 * 1024 * 1024)),
() -> {
SparkPlan plan =
executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget());
Expand Down Expand Up @@ -226,12 +233,19 @@ public void testSkewUpdate() {

// enable AQE and set the advisory partition size small enough to trigger a split
// set the number of shuffle partitions to 2 to only have 2 reducers
// set the advisory partition size for shuffles big enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "2",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
SQLConf.SHUFFLE_PARTITIONS().key(),
"2",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
"true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(),
"true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(),
"256MB",
SparkSQLProperties.ADVISORY_PARTITION_SIZE,
"100"),
() -> {
SparkPlan plan =
executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Pair;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;

class SparkCompressionUtil {

private static final String SHUFFLE_COMPRESSION_ENABLED = "spark.shuffle.compress";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These properties are internal in Spark. It requires some ugly code to get them:

org.apache.spark.internal.config.package$.MODULE$.SHUFFLE_COMPRESS().defaultValueString()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is doable but I am not sure is worth the effort.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with duplication. Let's just make sure that we have a comment for the block of settings that states that they come from Spark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment. There is also a test that verifies the Spark default values are as we expect.

private static final boolean SHUFFLE_COMPRESSION_ENABLED_DEFAULT = true;

private static final String SPARK_COMPRESSION_CODEC = "spark.io.compression.codec";
private static final String SPARK_COMPRESSION_CODEC_DEFAULT = "lz4";

private static final double DEFAULT_COLUMNAR_COMPRESSION = 2;
private static final Map<Pair<String, String>, Double> COLUMNAR_COMPRESSIONS =
initColumnarCompressions();

private static final double DEFAULT_ROW_BASED_COMPRESSION = 1;
private static final Map<Pair<String, String>, Double> ROW_BASED_COMPRESSIONS =
initRowBasedCompressions();

private SparkCompressionUtil() {}

/**
* Estimates how much the data in shuffle map files will compress once it is written to disk using
* a particular file format and codec.
*/
public static double shuffleCompressionRatio(
SparkSession spark, FileFormat outputFileFormat, String outputCodec) {
if (outputFileFormat == FileFormat.ORC || outputFileFormat == FileFormat.PARQUET) {
return columnarCompression(shuffleCodec(spark), outputCodec);
} else if (outputFileFormat == FileFormat.AVRO) {
return rowBasedCompression(shuffleCodec(spark), outputCodec);
} else {
return 1.0;
}
}

private static String shuffleCodec(SparkSession spark) {
SparkConf sparkConf = spark.sparkContext().conf();
return shuffleCompressionEnabled(sparkConf) ? sparkCodec(sparkConf) : "none";
}

private static boolean shuffleCompressionEnabled(SparkConf sparkConf) {
return sparkConf.getBoolean(SHUFFLE_COMPRESSION_ENABLED, SHUFFLE_COMPRESSION_ENABLED_DEFAULT);
}

private static String sparkCodec(SparkConf sparkConf) {
return sparkConf.get(SPARK_COMPRESSION_CODEC, SPARK_COMPRESSION_CODEC_DEFAULT);
}

private static double columnarCompression(String shuffleCodec, String outputCodec) {
Pair<String, String> key = Pair.of(normalize(shuffleCodec), normalize(outputCodec));
return COLUMNAR_COMPRESSIONS.getOrDefault(key, DEFAULT_COLUMNAR_COMPRESSION);
}

private static double rowBasedCompression(String shuffleCodec, String outputCodec) {
Pair<String, String> key = Pair.of(normalize(shuffleCodec), normalize(outputCodec));
return ROW_BASED_COMPRESSIONS.getOrDefault(key, DEFAULT_ROW_BASED_COMPRESSION);
}

private static String normalize(String value) {
return value != null ? value.toLowerCase(Locale.ROOT) : null;
}

private static Map<Pair<String, String>, Double> initColumnarCompressions() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not expect these values to be precise but they should be reasonable. I tested some of them on the cluster, some of them locally. It boils down to what kind of encoding we can apply to the incoming data. We can't predict that, unfortunately. We should be able to adaptively learn that in future PRs.

Map<Pair<String, String>, Double> compressions = Maps.newHashMap();

compressions.put(Pair.of("none", "zstd"), 4.0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These values (none, zstd, lz4, etc) should probably become constants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to implement this is to define some mappings for codecs + some ratio for the format.

compressions.put(Pair.of("none", "gzip"), 4.0);
compressions.put(Pair.of("none", "zlib"), 4.0);
compressions.put(Pair.of("none", "snappy"), 3.0);
compressions.put(Pair.of("none", "lz4"), 3.0);

compressions.put(Pair.of("zstd", "zstd"), 2.0);
compressions.put(Pair.of("zstd", "gzip"), 2.0);
compressions.put(Pair.of("zstd", "zlib"), 2.0);
compressions.put(Pair.of("zstd", "snappy"), 1.5);
compressions.put(Pair.of("zstd", "lz4"), 1.5);

compressions.put(Pair.of("snappy", "zstd"), 3.0);
compressions.put(Pair.of("snappy", "gzip"), 3.0);
compressions.put(Pair.of("snappy", "zlib"), 3.0);
compressions.put(Pair.of("snappy", "snappy"), 2.0);
compressions.put(Pair.of("snappy", "lz4"), 2.);

compressions.put(Pair.of("lz4", "zstd"), 3.0);
compressions.put(Pair.of("lz4", "gzip"), 3.0);
compressions.put(Pair.of("lz4", "zlib"), 3.0);
compressions.put(Pair.of("lz4", "snappy"), 2.0);
compressions.put(Pair.of("lz4", "lz4"), 2.0);

return compressions;
}

private static Map<Pair<String, String>, Double> initRowBasedCompressions() {
rdblue marked this conversation as resolved.
Show resolved Hide resolved
Map<Pair<String, String>, Double> compressions = Maps.newHashMap();

compressions.put(Pair.of("none", "zstd"), 2.0);
compressions.put(Pair.of("none", "gzip"), 2.0);

compressions.put(Pair.of("lz4", "zstd"), 1.5);
compressions.put(Pair.of("lz4", "gzip"), 1.5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why no zstd or snappy shuffle compression options? Just assume that this will use the default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should probably add these too, I missed them when I added values for ORC and Parquet.


return compressions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public int parse() {
}

public Integer parseOptional() {
return parse(Integer::parseInt, null);
return parse(Integer::parseInt, defaultValue);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a default value was provided explicitly, it should be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not throw an exception? Seems like these cases are mutually exclusive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. You can have a null default for objects like String. I think that's fine for String but when the defaultValue is set as a primitive long or int I think we don't need to support parseOptional and require parse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can probably do this for String only. There are table properties with explicit NULL default values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, parseOptional does produce Integer so if we have a default value that is null, it may still apply.

public static final String PARQUET_COMPRESSION_LEVEL_DEFAULT = null;

We can hit such cases for ints as well, I assume.

}
}

Expand All @@ -129,7 +129,7 @@ public long parse() {
}

public Long parseOptional() {
return parse(Long::parseLong, null);
return parse(Long::parseLong, defaultValue);
}
}

Expand All @@ -152,7 +152,7 @@ public String parse() {
}

public String parseOptional() {
return parse(Function.identity(), null);
return parse(Function.identity(), defaultValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,7 @@ private SparkSQLProperties() {}

// Overrides the delete planning mode
public static final String DELETE_PLANNING_MODE = "spark.sql.iceberg.delete-planning-mode";

// Overrides the advisory partition size
public static final String ADVISORY_PARTITION_SIZE = "spark.sql.iceberg.advisory-partition-size";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Do we need the write prefix or make it part of the name? The config only affects the final write.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a write.spark.advisory-partition-size table property? I wouldn't want to set this in the Spark context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I see this is a Spark default that is overridden by the table property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, there is a table property, a SQL config and a write option.

}