Skip to content

Commit

Permalink
Spark 3.5: Increase default advisory partition size for writes (#8660)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Sep 27, 2023
1 parent 3e37106 commit 0b1b624
Show file tree
Hide file tree
Showing 14 changed files with 543 additions and 89 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ private TableProperties() {}
public static final String SPARK_WRITE_ACCEPT_ANY_SCHEMA = "write.spark.accept-any-schema";
public static final boolean SPARK_WRITE_ACCEPT_ANY_SCHEMA_DEFAULT = false;

public static final String SPARK_WRITE_ADVISORY_PARTITION_SIZE_BYTES =
"write.spark.advisory-partition-size-bytes";

public static final String SNAPSHOT_ID_INHERITANCE_ENABLED =
"compatibility.snapshot-id-inheritance.enabled";
public static final boolean SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,19 @@ public void testCoalesceDelete() throws Exception {

// enable AQE and set the advisory partition size big enough to trigger combining
// set the number of shuffle partitions to 200 to distribute the work across reducers
// set the advisory partition size for shuffles small enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "200",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"),
SQLConf.SHUFFLE_PARTITIONS().key(),
"200",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
"true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(),
"true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(),
"100",
SparkSQLProperties.ADVISORY_PARTITION_SIZE,
String.valueOf(256 * 1024 * 1024)),
() -> {
SparkPlan plan =
executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
Expand Down Expand Up @@ -238,12 +245,19 @@ public void testSkewDelete() throws Exception {

// enable AQE and set the advisory partition size small enough to trigger a split
// set the number of shuffle partitions to 2 to only have 2 reducers
// set the advisory partition size for shuffles big enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "2",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
SQLConf.SHUFFLE_PARTITIONS().key(),
"2",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
"true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(),
"true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(),
"256MB",
SparkSQLProperties.ADVISORY_PARTITION_SIZE,
"100"),
() -> {
SparkPlan plan =
executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,21 @@ public void testCoalesceMerge() {
// enable AQE and set the advisory partition big enough to trigger combining
// set the number of shuffle partitions to 200 to distribute the work across reducers
// disable broadcast joins to make sure the join triggers a shuffle
// set the advisory partition size for shuffles small enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "200",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), "-1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"),
SQLConf.SHUFFLE_PARTITIONS().key(),
"200",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(),
"-1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
"true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(),
"true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(),
"100",
SparkSQLProperties.ADVISORY_PARTITION_SIZE,
String.valueOf(256 * 1024 * 1024)),
() -> {
sql(
"MERGE INTO %s t USING source "
Expand Down Expand Up @@ -352,13 +360,21 @@ public void testSkewMerge() {
// enable AQE and set the advisory partition size small enough to trigger a split
// set the number of shuffle partitions to 2 to only have 2 reducers
// set the min coalesce partition size small enough to avoid coalescing
// set the advisory partition size for shuffles big enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "4",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE().key(), "100",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
SQLConf.SHUFFLE_PARTITIONS().key(),
"4",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE().key(),
"100",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
"true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(),
"true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(),
"256MB",
SparkSQLProperties.ADVISORY_PARTITION_SIZE,
"100"),
() -> {
SparkPlan plan =
executeAndKeepPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,19 @@ public void testCoalesceUpdate() {

// enable AQE and set the advisory partition size big enough to trigger combining
// set the number of shuffle partitions to 200 to distribute the work across reducers
// set the advisory partition size for shuffles small enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "200",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"),
SQLConf.SHUFFLE_PARTITIONS().key(),
"200",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
"true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(),
"true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(),
"100",
SparkSQLProperties.ADVISORY_PARTITION_SIZE,
String.valueOf(256 * 1024 * 1024)),
() -> {
SparkPlan plan =
executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget());
Expand Down Expand Up @@ -226,12 +233,19 @@ public void testSkewUpdate() {

// enable AQE and set the advisory partition size small enough to trigger a split
// set the number of shuffle partitions to 2 to only have 2 reducers
// set the advisory partition size for shuffles big enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "2",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
SQLConf.SHUFFLE_PARTITIONS().key(),
"2",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
"true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(),
"true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(),
"256MB",
SparkSQLProperties.ADVISORY_PARTITION_SIZE,
"100"),
() -> {
SparkPlan plan =
executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Pair;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;

class SparkCompressionUtil {

private static final String LZ4 = "lz4";
private static final String ZSTD = "zstd";
private static final String GZIP = "gzip";
private static final String ZLIB = "zlib";
private static final String SNAPPY = "snappy";
private static final String NONE = "none";

// an internal Spark config that controls whether shuffle data is compressed
private static final String SHUFFLE_COMPRESSION_ENABLED = "spark.shuffle.compress";
private static final boolean SHUFFLE_COMPRESSION_ENABLED_DEFAULT = true;

// an internal Spark config that controls what compression codec is used
private static final String SPARK_COMPRESSION_CODEC = "spark.io.compression.codec";
private static final String SPARK_COMPRESSION_CODEC_DEFAULT = "lz4";

private static final double DEFAULT_COLUMNAR_COMPRESSION = 2;
private static final Map<Pair<String, String>, Double> COLUMNAR_COMPRESSIONS =
initColumnarCompressions();

private static final double DEFAULT_ROW_BASED_COMPRESSION = 1;
private static final Map<Pair<String, String>, Double> ROW_BASED_COMPRESSIONS =
initRowBasedCompressions();

private SparkCompressionUtil() {}

/**
* Estimates how much the data in shuffle map files will compress once it is written to disk using
* a particular file format and codec.
*/
public static double shuffleCompressionRatio(
SparkSession spark, FileFormat outputFileFormat, String outputCodec) {
if (outputFileFormat == FileFormat.ORC || outputFileFormat == FileFormat.PARQUET) {
return columnarCompression(shuffleCodec(spark), outputCodec);
} else if (outputFileFormat == FileFormat.AVRO) {
return rowBasedCompression(shuffleCodec(spark), outputCodec);
} else {
return 1.0;
}
}

private static String shuffleCodec(SparkSession spark) {
SparkConf sparkConf = spark.sparkContext().conf();
return shuffleCompressionEnabled(sparkConf) ? sparkCodec(sparkConf) : NONE;
}

private static boolean shuffleCompressionEnabled(SparkConf sparkConf) {
return sparkConf.getBoolean(SHUFFLE_COMPRESSION_ENABLED, SHUFFLE_COMPRESSION_ENABLED_DEFAULT);
}

private static String sparkCodec(SparkConf sparkConf) {
return sparkConf.get(SPARK_COMPRESSION_CODEC, SPARK_COMPRESSION_CODEC_DEFAULT);
}

private static double columnarCompression(String shuffleCodec, String outputCodec) {
Pair<String, String> key = Pair.of(normalize(shuffleCodec), normalize(outputCodec));
return COLUMNAR_COMPRESSIONS.getOrDefault(key, DEFAULT_COLUMNAR_COMPRESSION);
}

private static double rowBasedCompression(String shuffleCodec, String outputCodec) {
Pair<String, String> key = Pair.of(normalize(shuffleCodec), normalize(outputCodec));
return ROW_BASED_COMPRESSIONS.getOrDefault(key, DEFAULT_ROW_BASED_COMPRESSION);
}

private static String normalize(String value) {
return value != null ? value.toLowerCase(Locale.ROOT) : null;
}

private static Map<Pair<String, String>, Double> initColumnarCompressions() {
Map<Pair<String, String>, Double> compressions = Maps.newHashMap();

compressions.put(Pair.of(NONE, ZSTD), 4.0);
compressions.put(Pair.of(NONE, GZIP), 4.0);
compressions.put(Pair.of(NONE, ZLIB), 4.0);
compressions.put(Pair.of(NONE, SNAPPY), 3.0);
compressions.put(Pair.of(NONE, LZ4), 3.0);

compressions.put(Pair.of(ZSTD, ZSTD), 2.0);
compressions.put(Pair.of(ZSTD, GZIP), 2.0);
compressions.put(Pair.of(ZSTD, ZLIB), 2.0);
compressions.put(Pair.of(ZSTD, SNAPPY), 1.5);
compressions.put(Pair.of(ZSTD, LZ4), 1.5);

compressions.put(Pair.of(SNAPPY, ZSTD), 3.0);
compressions.put(Pair.of(SNAPPY, GZIP), 3.0);
compressions.put(Pair.of(SNAPPY, ZLIB), 3.0);
compressions.put(Pair.of(SNAPPY, SNAPPY), 2.0);
compressions.put(Pair.of(SNAPPY, LZ4), 2.);

compressions.put(Pair.of(LZ4, ZSTD), 3.0);
compressions.put(Pair.of(LZ4, GZIP), 3.0);
compressions.put(Pair.of(LZ4, ZLIB), 3.0);
compressions.put(Pair.of(LZ4, SNAPPY), 2.0);
compressions.put(Pair.of(LZ4, LZ4), 2.0);

return compressions;
}

private static Map<Pair<String, String>, Double> initRowBasedCompressions() {
Map<Pair<String, String>, Double> compressions = Maps.newHashMap();

compressions.put(Pair.of(NONE, ZSTD), 2.0);
compressions.put(Pair.of(NONE, GZIP), 2.0);
compressions.put(Pair.of(NONE, ZLIB), 2.0);

compressions.put(Pair.of(ZSTD, SNAPPY), 0.5);
compressions.put(Pair.of(ZSTD, LZ4), 0.5);

compressions.put(Pair.of(SNAPPY, ZSTD), 1.5);
compressions.put(Pair.of(SNAPPY, GZIP), 1.5);
compressions.put(Pair.of(SNAPPY, ZLIB), 1.5);

compressions.put(Pair.of(LZ4, ZSTD), 1.5);
compressions.put(Pair.of(LZ4, GZIP), 1.5);
compressions.put(Pair.of(LZ4, ZLIB), 1.5);

return compressions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public int parse() {
}

public Integer parseOptional() {
return parse(Integer::parseInt, null);
return parse(Integer::parseInt, defaultValue);
}
}

Expand All @@ -129,7 +129,7 @@ public long parse() {
}

public Long parseOptional() {
return parse(Long::parseLong, null);
return parse(Long::parseLong, defaultValue);
}
}

Expand All @@ -152,7 +152,7 @@ public String parse() {
}

public String parseOptional() {
return parse(Function.identity(), null);
return parse(Function.identity(), defaultValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,7 @@ private SparkSQLProperties() {}

// Overrides the delete planning mode
public static final String DELETE_PLANNING_MODE = "spark.sql.iceberg.delete-planning-mode";

// Overrides the advisory partition size
public static final String ADVISORY_PARTITION_SIZE = "spark.sql.iceberg.advisory-partition-size";
}

0 comments on commit 0b1b624

Please sign in to comment.