Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5: Increase default advisory partition size for writes #8660

Merged

Conversation

aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Sep 26, 2023

This PR increases the default advisory partition size for writes and allows users to control it explicitly.

By default, we will try to write 128 MB data files and 32 MB position delete files. In the future, we may make this configurable but it should be a reasonable start, especially given that users can set this manually now.

withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "200",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't me, it is Spotless. I only changed the last line.

return value != null ? value.toLowerCase(Locale.ROOT) : null;
}

private static Map<Pair<String, String>, Double> initColumnarCompressions() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not expect these values to be precise but they should be reasonable. I tested some of them on the cluster, some of them locally. It boils down to what kind of encoding we can apply to the incoming data. We can't predict that, unfortunately. We should be able to adaptively learn that in future PRs.

@@ -106,7 +106,7 @@ public int parse() {
}

public Integer parseOptional() {
return parse(Integer::parseInt, null);
return parse(Integer::parseInt, defaultValue);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a default value was provided explicitly, it should be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not throw an exception? Seems like these cases are mutually exclusive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. You can have a null default for objects like String. I think that's fine for String but when the defaultValue is set as a primitive long or int I think we don't need to support parseOptional and require parse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can probably do this for String only. There are table properties with explicit NULL default values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, parseOptional does produce Integer so if we have a default value that is null, it may still apply.

public static final String PARQUET_COMPRESSION_LEVEL_DEFAULT = null;

We can hit such cases for ints as well, I assume.

@@ -64,4 +64,7 @@ private SparkSQLProperties() {}

// Overrides the delete planning mode
public static final String DELETE_PLANNING_MODE = "spark.sql.iceberg.delete-planning-mode";

// Overrides the advisory partition size
public static final String ADVISORY_PARTITION_SIZE = "spark.sql.iceberg.advisory-partition-size";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Do we need the write prefix or make it part of the name? The config only affects the final write.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a write.spark.advisory-partition-size table property? I wouldn't want to set this in the Spark context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I see this is a Spark default that is overridden by the table property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, there is a table property, a SQL config and a write option.

@@ -75,6 +76,10 @@ public class SparkWriteConf {

private static final Logger LOG = LoggerFactory.getLogger(SparkWriteConf.class);

private static final long DATA_FILE_SIZE = 128 * 1024 * 1024; // 128 MB
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are some default values we aim for. They should be safe without affecting the parallelism too much. We can make this configurable in the future.

@@ -484,42 +525,24 @@ private Map<String, String> deleteWriteProperties() {

switch (deleteFormat) {
case PARQUET:
setWritePropertyWithFallback(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using setWritePropertyWithFallback, I added defaults to our delete configs. It made my life easier as I needed to know the delete output codec and I no longer have to check nulls and overrides explicitly.

There are tests in TestCompressionSettings and TestSparkWriteConf to verify the new logic.

Copy link
Contributor

@rdblue rdblue Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine to me, but was it related to the advisory partition size changes? It seems like it doesn't overlap.Maybe just because we're predicting the compression ratio based on the codec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, it was needed to compute the codec.

@@ -47,4 +51,8 @@ public SortOrder[] ordering() {
public boolean hasOrdering() {
return ordering.length != 0;
}

public long advisoryPartitionSize() {
return distribution instanceof UnspecifiedDistribution ? 0 : advisoryPartitionSize;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark will complain if we request a value > 0 with UnspecifiedDistribution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good thing for a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add.

@@ -132,7 +132,9 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde

@Override
public Distribution requiredDistribution() {
return writeRequirements.distribution();
Distribution distribution = writeRequirements.distribution();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both Distribution and SortOrder implementation in Spark provide reasonable toString().

23/09/26 17:00:55 INFO SparkWrite: Requesting 402653184 bytes advisory partition size for table testhive.default.table
23/09/26 17:00:55 INFO SparkWrite: Requesting ClusteredDistribution(bucket(8, c3)) as write distribution for table testhive.default.table
23/09/26 17:00:55 INFO SparkWrite: Requesting [bucket(8, c3) ASC NULLS FIRST, id ASC NULLS FIRST] as write ordering for table testhive.default.table


class SparkCompressionUtil {

private static final String SHUFFLE_COMPRESSION_ENABLED = "spark.shuffle.compress";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These properties are internal in Spark. It requires some ugly code to get them:

org.apache.spark.internal.config.package$.MODULE$.SHUFFLE_COMPRESS().defaultValueString()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is doable but I am not sure is worth the effort.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with duplication. Let's just make sure that we have a comment for the block of settings that states that they come from Spark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment. There is also a test that verifies the Spark default values are as we expect.

private static Map<Pair<String, String>, Double> initColumnarCompressions() {
Map<Pair<String, String>, Double> compressions = Maps.newHashMap();

compressions.put(Pair.of("none", "zstd"), 4.0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These values (none, zstd, lz4, etc) should probably become constants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to implement this is to define some mappings for codecs + some ratio for the format.

}

private long advisoryPartitionSize(long defaultValue) {
return confParser
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is okay, although since the default differs between data and deletes, it seems strange not to allow setting this specifically for deletes. The write option makes sense, but the table and session properties would likely differ.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was thinking about this too. The problem there is that copy-on-write DELETE still uses the data config. If we were to offer such a property, would it mean it is supported for copy-on-write as well? Any thoughts on the potential name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we typically use a variation that adds merge, update, or delete somewhere. I don't feel strongly so let's go without this for now. We can add more settings later.

}

private long advisoryPartitionSize(
long targetFileSize, FileFormat outputFileFormat, String outputCodec) {
Copy link
Contributor

@rdblue rdblue Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little odd to call this targetFileSize when it isn't the table's target file size. I don't have a much better name though. Maybe a comment here to clarify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can call it expectedFileSize or similar?

compressions.put(Pair.of("none", "gzip"), 2.0);

compressions.put(Pair.of("lz4", "zstd"), 1.5);
compressions.put(Pair.of("lz4", "gzip"), 1.5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why no zstd or snappy shuffle compression options? Just assume that this will use the default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should probably add these too, I missed them when I added values for ORC and Parquet.

import org.apache.spark.sql.connector.expressions.SortOrder;

/** A set of requirements such as distribution and ordering reported to Spark during writes. */
public class SparkWriteRequirements {

public static final SparkWriteRequirements EMPTY =
new SparkWriteRequirements(Distributions.unspecified(), new SortOrder[0]);
new SparkWriteRequirements(Distributions.unspecified(), new SortOrder[0], 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 0 a special signal to Spark that this is not requesting an advisory size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it matches the default value in RequiresDistributionAndOrdering and means no preference.

/**
 * Returns the advisory (not guaranteed) shuffle partition size in bytes for this write.
 * <p>
 * Implementations may override this to indicate the preferable partition size in shuffles
 * performed to satisfy the requested distribution. Note that Spark doesn't support setting
 * the advisory partition size for {@link UnspecifiedDistribution}, the query will fail if
 * the advisory partition size is set but the distribution is unspecified. Data sources may
 * either request a particular number of partitions via {@link #requiredNumPartitions()} or
 * a preferred partition size, not both.
 * <p>
 * Data sources should be careful with large advisory sizes as it will impact the writing
 * parallelism and may degrade the overall job performance.
 * <p>
 * Note this value only acts like a guidance and Spark does not guarantee the actual and advisory
 * shuffle partition sizes will match. Ignored if the adaptive execution is disabled.
 *
 * @return the advisory partition size, any value less than 1 means no preference.
 */
default long advisoryPartitionSizeInBytes() { return 0; }

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 overall, with a few minor comments.

Copy link
Contributor

@danielcweeks danielcweeks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 one on this and spoke to Ryan about it. We need to make sure to note that you can disable by setting the advisory size to zero as a fallback for anyone who experiences undesirable sizes/performance.

@aokolnychyi aokolnychyi merged commit 0b1b624 into apache:master Sep 27, 2023
47 checks passed
@aokolnychyi
Copy link
Contributor Author

Thank you, @rdblue @danielcweeks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants