Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,20 @@ class SparkConfParser {
private final Map<String, String> properties;
private final RuntimeConfig sessionConf;
private final Map<String, String> options;
private final String tableName;

SparkConfParser() {
this.properties = ImmutableMap.of();
this.sessionConf = new RuntimeConfig(SQLConf.get());
this.options = ImmutableMap.of();
this.tableName = null;
}

SparkConfParser(SparkSession spark, Table table, Map<String, String> options) {
this.properties = table.properties();
this.sessionConf = spark.conf();
this.options = options;
this.tableName = table.name();
}

public BooleanConfParser booleanConf() {
Expand Down Expand Up @@ -268,6 +271,14 @@ protected T parse(Function<String, T> conversion, T defaultValue) {
}

if (sessionConfName != null) {
if (tableName != null) {
String tableSessionConfName = sessionConfName + "." + tableName;
String tableSessionConfValue = sessionConf.get(tableSessionConfName, null);
if (tableSessionConfValue != null) {
return conversion.apply(tableSessionConfValue);
}
}

String sessionConfValue = sessionConf.get(sessionConfName, null);
if (sessionConfValue != null) {
return conversion.apply(sessionConfValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@
*
* <ol>
* <li>Read options
* <li>Session configuration
* <li>Table-scoped session configuration (e.g. {@code
* spark.sql.iceberg.split-size.<catalog>.<database>.<table>})
* <li>Global session configuration (e.g. {@code spark.sql.iceberg.split-size})
* <li>Table metadata
* </ol>
*
* The most specific value is set in read options and takes precedence over all other configs. If no
* read option is provided, this class checks the session configuration for any overrides. If no
* applicable value is found in the session configuration, this class uses the table metadata.
* read option is provided, this class checks the session configuration for a table-scoped override
* using the fully qualified table name from {@link Table#name()} as a suffix. If no table-scoped
* value is found, it falls back to the global session configuration key. If no applicable value is
* found in the session configuration, this class uses the table metadata.
*
* <p>Note this class is NOT meant to be serialized and sent to executors.
*/
Expand Down Expand Up @@ -195,13 +199,18 @@ public int orcBatchSize() {
}

public Long splitSizeOption() {
return confParser.longConf().option(SparkReadOptions.SPLIT_SIZE).parseOptional();
return confParser
.longConf()
.option(SparkReadOptions.SPLIT_SIZE)
.sessionConf(SparkSQLProperties.SPLIT_SIZE)
.parseOptional();
}

public long splitSize() {
return confParser
.longConf()
.option(SparkReadOptions.SPLIT_SIZE)
.sessionConf(SparkSQLProperties.SPLIT_SIZE)
.tableProperty(TableProperties.SPLIT_SIZE)
.defaultValue(TableProperties.SPLIT_SIZE_DEFAULT)
.parse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ private SparkSQLProperties() {}
// Overrides the delete planning mode
public static final String DELETE_PLANNING_MODE = "spark.sql.iceberg.delete-planning-mode";

// Overrides the split target size for scan planning
public static final String SPLIT_SIZE = "spark.sql.iceberg.split-size";

// Controls whether to report locality information to Spark while allocating input partitions
public static final String LOCALITY = "spark.sql.iceberg.locality.enabled";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,20 @@ class SparkConfParser {
private final Map<String, String> properties;
private final RuntimeConfig sessionConf;
private final CaseInsensitiveStringMap options;
private final String tableName;

SparkConfParser() {
this.properties = ImmutableMap.of();
this.sessionConf = new RuntimeConfig(SQLConf.get());
this.options = CaseInsensitiveStringMap.empty();
this.tableName = null;
}

SparkConfParser(SparkSession spark, Table table, Map<String, String> options) {
this.properties = table.properties();
this.sessionConf = spark.conf();
this.options = asCaseInsensitiveStringMap(options);
this.tableName = table.name();
}

public BooleanConfParser booleanConf() {
Expand Down Expand Up @@ -278,6 +281,20 @@ protected T parse(Function<String, T> conversion, T defaultValue) {
}

if (sessionConfName != null) {
if (tableName != null) {
String tableSessionConfName = sessionConfName + "." + tableName;
String tableSessionConfValue = sessionConf.get(tableSessionConfName, null);
if (tableSessionConfValue != null) {
return conversion.apply(tableSessionConfValue);
}

String sparkTableSessionConfValue =
sessionConf.get(toCamelCase(tableSessionConfName), null);
if (sparkTableSessionConfValue != null) {
return conversion.apply(sparkTableSessionConfValue);
}
}

String sessionConfValue = sessionConf.get(sessionConfName, null);
if (sessionConfValue != null) {
return conversion.apply(sessionConfValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@
*
* <ol>
* <li>Read options
* <li>Session configuration
* <li>Table-scoped session configuration (e.g. {@code
* spark.sql.iceberg.split-size.<catalog>.<database>.<table>})
* <li>Global session configuration (e.g. {@code spark.sql.iceberg.split-size})
* <li>Table metadata
* </ol>
*
* The most specific value is set in read options and takes precedence over all other configs. If no
* read option is provided, this class checks the session configuration for any overrides. If no
* applicable value is found in the session configuration, this class uses the table metadata.
* read option is provided, this class checks the session configuration for a table-scoped override
* using the fully qualified table name from {@link Table#name()} as a suffix. If no table-scoped
* value is found, it falls back to the global session configuration key. If no applicable value is
* found in the session configuration, this class uses the table metadata.
*
* <p>Note this class is NOT meant to be serialized and sent to executors.
*/
Expand Down Expand Up @@ -191,13 +195,18 @@ public int orcBatchSize() {
}

public Long splitSizeOption() {
return confParser.longConf().option(SparkReadOptions.SPLIT_SIZE).parseOptional();
return confParser
.longConf()
.option(SparkReadOptions.SPLIT_SIZE)
.sessionConf(SparkSQLProperties.SPLIT_SIZE)
.parseOptional();
}

public long splitSize() {
return confParser
.longConf()
.option(SparkReadOptions.SPLIT_SIZE)
.sessionConf(SparkSQLProperties.SPLIT_SIZE)
.tableProperty(TableProperties.SPLIT_SIZE)
.defaultValue(TableProperties.SPLIT_SIZE_DEFAULT)
.parse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ private SparkSQLProperties() {}
// Overrides the advisory partition size
public static final String ADVISORY_PARTITION_SIZE = "spark.sql.iceberg.advisory-partition-size";

// Overrides the split target size for scan planning
public static final String SPLIT_SIZE = "spark.sql.iceberg.split-size";

// Controls whether to report locality information to Spark while allocating input partitions
public static final String LOCALITY = "spark.sql.iceberg.locality.enabled";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,75 @@ public void testDurationConf() {
});
}

@TestTemplate
public void testTableScopedSessionConfTakesPrecedenceOverGlobal() {
Table table = validationCatalog.loadTable(tableIdent);
String confName = "spark.sql.iceberg.some-int-conf";
String tableConfName = confName + "." + table.name();

withSQLConf(
ImmutableMap.of(confName, "1", tableConfName, "2"),
() -> {
SparkConfParser parser = new SparkConfParser(spark, table, ImmutableMap.of());
Integer value = parser.intConf().sessionConf(confName).parseOptional();
assertThat(value).isEqualTo(2);
});
}

@TestTemplate
public void testGlobalSessionConfUsedWhenNoTableScopedKey() {
Table table = validationCatalog.loadTable(tableIdent);
String confName = "spark.sql.iceberg.some-int-conf";

withSQLConf(
ImmutableMap.of(confName, "1"),
() -> {
SparkConfParser parser = new SparkConfParser(spark, table, ImmutableMap.of());
Integer value = parser.intConf().sessionConf(confName).parseOptional();
assertThat(value).isEqualTo(1);
});
}

@TestTemplate
public void testOptionTakesPrecedenceOverTableScopedSessionConf() {
Table table = validationCatalog.loadTable(tableIdent);
String confName = "spark.sql.iceberg.some-int-conf";
String tableConfName = confName + "." + table.name();
String optionName = "some-int-option";

withSQLConf(
ImmutableMap.of(confName, "1", tableConfName, "2"),
() -> {
Map<String, String> options = ImmutableMap.of(optionName, "3");
SparkConfParser parser = new SparkConfParser(spark, table, options);
Integer value = parser.intConf().option(optionName).sessionConf(confName).parseOptional();
assertThat(value).isEqualTo(3);
});
}

@TestTemplate
public void testTableScopedSessionConfTakesPrecedenceOverTableProperty() {
Table table = validationCatalog.loadTable(tableIdent);
String confName = "spark.sql.iceberg.some-conf";
String tableConfName = confName + "." + table.name();
String tablePropertyName = "some-property";

table.updateProperties().set(tablePropertyName, "from-table-property").commit();

withSQLConf(
ImmutableMap.of(tableConfName, "from-table-session"),
() -> {
SparkConfParser parser = new SparkConfParser(spark, table, ImmutableMap.of());
String value =
parser
.stringConf()
.sessionConf(confName)
.tableProperty(tablePropertyName)
.parseOptional();
assertThat(value).isEqualTo("from-table-session");
});
}

@TestTemplate
public void testDeleteGranularityDefault() {
Table table = validationCatalog.loadTable(tableIdent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,20 @@ class SparkConfParser {
private final Map<String, String> properties;
private final RuntimeConfig sessionConf;
private final CaseInsensitiveStringMap options;
private final String tableName;

SparkConfParser() {
this.properties = ImmutableMap.of();
this.sessionConf = new org.apache.spark.sql.classic.RuntimeConfig(SQLConf.get());
this.options = CaseInsensitiveStringMap.empty();
this.tableName = null;
}

SparkConfParser(SparkSession spark, Table table, Map<String, String> options) {
this.properties = table.properties();
this.sessionConf = spark.conf();
this.options = asCaseInsensitiveStringMap(options);
this.tableName = table.name();
}

public BooleanConfParser booleanConf() {
Expand Down Expand Up @@ -278,6 +281,20 @@ protected T parse(Function<String, T> conversion, T defaultValue) {
}

if (sessionConfName != null) {
if (tableName != null) {
String tableSessionConfName = sessionConfName + "." + tableName;
String tableSessionConfValue = sessionConf.get(tableSessionConfName, null);
if (tableSessionConfValue != null) {
return conversion.apply(tableSessionConfValue);
}

String sparkTableSessionConfValue =
sessionConf.get(toCamelCase(tableSessionConfName), null);
if (sparkTableSessionConfValue != null) {
return conversion.apply(sparkTableSessionConfValue);
}
}

String sessionConfValue = sessionConf.get(sessionConfName, null);
if (sessionConfValue != null) {
return conversion.apply(sessionConfValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@
*
* <ol>
* <li>Read options
* <li>Session configuration
* <li>Table-scoped session configuration (e.g. {@code
* spark.sql.iceberg.split-size.<catalog>.<database>.<table>})
* <li>Global session configuration (e.g. {@code spark.sql.iceberg.split-size})
* <li>Table metadata
* </ol>
*
* The most specific value is set in read options and takes precedence over all other configs. If no
* read option is provided, this class checks the session configuration for any overrides. If no
* applicable value is found in the session configuration, this class uses the table metadata.
* read option is provided, this class checks the session configuration for a table-scoped override
* using the fully qualified table name from {@link Table#name()} as a suffix. If no table-scoped
* value is found, it falls back to the global session configuration key. If no applicable value is
* found in the session configuration, this class uses the table metadata.
*
* <p>Note this class is NOT meant to be serialized and sent to executors.
*/
Expand Down Expand Up @@ -191,13 +195,18 @@ public int orcBatchSize() {
}

public Long splitSizeOption() {
return confParser.longConf().option(SparkReadOptions.SPLIT_SIZE).parseOptional();
return confParser
.longConf()
.option(SparkReadOptions.SPLIT_SIZE)
.sessionConf(SparkSQLProperties.SPLIT_SIZE)
.parseOptional();
}

public long splitSize() {
return confParser
.longConf()
.option(SparkReadOptions.SPLIT_SIZE)
.sessionConf(SparkSQLProperties.SPLIT_SIZE)
.tableProperty(TableProperties.SPLIT_SIZE)
.defaultValue(TableProperties.SPLIT_SIZE_DEFAULT)
.parse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ private SparkSQLProperties() {}
// Overrides the advisory partition size
public static final String ADVISORY_PARTITION_SIZE = "spark.sql.iceberg.advisory-partition-size";

// Overrides the split target size for scan planning
public static final String SPLIT_SIZE = "spark.sql.iceberg.split-size";

// Controls whether to report locality information to Spark while allocating input partitions
public static final String LOCALITY = "spark.sql.iceberg.locality.enabled";

Expand Down
Loading