Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Add write options to override the compression properties of the table #8313

Merged
merged 18 commits into from
Aug 29, 2023
3 changes: 3 additions & 0 deletions docs/spark-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ df.write
| check-ordering | true | Checks if input schema and table schema are same |
| isolation-level | null | Desired isolation level for Dataframe overwrite operations. `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. |
| validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../../api#table-metadata) or [Snapshots table](../spark-queries#snapshots). If null, the table's oldest known snapshot is used. |
| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write |
| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |

CommitMetadata provides an interface to add custom metadata to a snapshot summary during a SQL execution, which can be beneficial for purposes such as auditing or change tracking. If properties start with `snapshot-property.`, then that prefix will be removed from each property. Here is an example:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,9 @@ private SparkSQLProperties() {}
// Controls the WAP branch used for write-audit-publish workflow.
// When set, new snapshots will be committed to this branch.
public static final String WAP_BRANCH = "spark.wap.branch";

// Controls write compress options
public static final String COMPRESSION_CODEC = "spark.sql.iceberg.write.compression-codec";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We rarely use extra write and read namespaces in SQL properties, the only exception is preserving grouping as it was not clear otherwise. What about dropping write from all names?

spark.sql.iceberg.compression-codec
spark.sql.iceberg.compression-level
spark.sql.iceberg.compression-strategy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

public static final String COMPRESSION_LEVEL = "spark.sql.iceberg.write.compression-level";
public static final String COMPRESSION_STRATEGY = "spark.sql.iceberg.write.compression-strategy";
}
Original file line number Diff line number Diff line change
Expand Up @@ -418,4 +418,64 @@ public String branch() {

return branch;
}

public String parquetCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.PARQUET_COMPRESSION)
.defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT)
.parse();
}

public String parquetCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL)
.defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT)
.parseOptional();
jerqi marked this conversation as resolved.
Show resolved Hide resolved
}

public String avroCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.AVRO_COMPRESSION)
.defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT)
.parse();
}

public String avroCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL)
.defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT)
.parseOptional();
jerqi marked this conversation as resolved.
Show resolved Hide resolved
}

public String orcCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.ORC_COMPRESSION)
.defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT)
.parse();
}

public String orcCompressionStrategy() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_STRATEGY)
.sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY)
.tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY)
.defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT)
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,9 @@ private SparkWriteOptions() {}

// Isolation Level for DataFrame calls. Currently supported by overwritePartitions
public static final String ISOLATION_LEVEL = "isolation-level";

// Controls write compress options
public static final String COMPRESSION_CODEC = "compression-codec";
public static final String COMPRESSION_LEVEL = "compression-level";
public static final String COMPRESSION_STRATEGY = "compression-strategy";
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.SparkAvroWriter;
import org.apache.iceberg.spark.data.SparkOrcWriter;
Expand All @@ -47,6 +48,7 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
private StructType dataSparkType;
private StructType equalityDeleteSparkType;
private StructType positionDeleteSparkType;
private Map<String, String> writeProperties;

SparkFileWriterFactory(
Table table,
Expand All @@ -60,7 +62,8 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
StructType equalityDeleteSparkType,
SortOrder equalityDeleteSortOrder,
Schema positionDeleteRowSchema,
StructType positionDeleteSparkType) {
StructType positionDeleteSparkType,
Map<String, String> writeProperties) {

super(
table,
Expand All @@ -76,6 +79,12 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
this.dataSparkType = dataSparkType;
this.equalityDeleteSparkType = equalityDeleteSparkType;
this.positionDeleteSparkType = positionDeleteSparkType;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: What about inlining if it fits on one line?

...
this.equalityDeleteSparkType = equalityDeleteSparkType;
this.positionDeleteSparkType = positionDeleteSparkType;
this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

if (writeProperties != null) {
this.writeProperties = writeProperties;
} else {
this.writeProperties = ImmutableMap.of();
}
}

static Builder builderFor(Table table) {
Expand All @@ -85,11 +94,13 @@ static Builder builderFor(Table table) {
@Override
protected void configureDataWrite(Avro.DataWriteBuilder builder) {
builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType()));
builder.setAll(writeProperties);
}

@Override
protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType()));
builder.setAll(writeProperties);
}

@Override
Expand All @@ -102,40 +113,48 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
StructType positionDeleteRowSparkType = (StructType) rowField.dataType();
builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType));
}

builder.setAll(writeProperties);
}

@Override
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType));
builder.setAll(writeProperties);
}

@Override
protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
builder.createWriterFunc(
msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType));
builder.setAll(writeProperties);
}

@Override
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
builder.createWriterFunc(
msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType));
builder.transformPaths(path -> UTF8String.fromString(path.toString()));
builder.setAll(writeProperties);
}

@Override
protected void configureDataWrite(ORC.DataWriteBuilder builder) {
builder.createWriterFunc(SparkOrcWriter::new);
builder.setAll(writeProperties);
}

@Override
protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
builder.createWriterFunc(SparkOrcWriter::new);
builder.setAll(writeProperties);
}

@Override
protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
builder.createWriterFunc(SparkOrcWriter::new);
builder.transformPaths(path -> UTF8String.fromString(path.toString()));
builder.setAll(writeProperties);
}

private StructType dataSparkType() {
Expand Down Expand Up @@ -180,6 +199,7 @@ static class Builder {
private SortOrder equalityDeleteSortOrder;
private Schema positionDeleteRowSchema;
private StructType positionDeleteSparkType;
private Map<String, String> writeProperties;

Builder(Table table) {
this.table = table;
Expand Down Expand Up @@ -250,6 +270,11 @@ Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) {
return this;
}

Builder writeProperties(Map<String, String> properties) {
this.writeProperties = properties;
return this;
}

SparkFileWriterFactory build() {
boolean noEqualityDeleteConf = equalityFieldIds == null && equalityDeleteRowSchema == null;
boolean fullEqualityDeleteConf = equalityFieldIds != null && equalityDeleteRowSchema != null;
Expand All @@ -269,7 +294,8 @@ SparkFileWriterFactory build() {
equalityDeleteSparkType,
equalityDeleteSortOrder,
positionDeleteRowSchema,
positionDeleteSparkType);
positionDeleteSparkType,
writeProperties);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@

import static org.apache.iceberg.IsolationLevel.SERIALIZABLE;
import static org.apache.iceberg.IsolationLevel.SNAPSHOT;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -54,6 +60,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.SparkWriteConf;
Expand Down Expand Up @@ -178,6 +185,32 @@ private WriterFactory createWriterFactory() {
// broadcast the table metadata as the writer factory will be sent to executors
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
Map<String, String> writeProperties = Maps.newHashMap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about adding a separate method for this? We could then call it directly in new WriterFactory(...).

private Map<String, String> writeProperties() {
  Map<String, String> properties = Maps.newHashMap();
  ...
  return properties;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

switch (format) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, writeConf.parquetCompressionCodec());
String parquetCompressionLevel = writeConf.parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}

break;
case AVRO:
writeProperties.put(AVRO_COMPRESSION, writeConf.avroCompressionCodec());
String avroCompressionLevel = writeConf.avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, writeConf.avroCompressionLevel());
}

break;
case ORC:
writeProperties.put(ORC_COMPRESSION, writeConf.orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, writeConf.orcCompressionStrategy());
break;
default:
throw new IllegalArgumentException(String.format("Unknown file format %s", format));
}

jerqi marked this conversation as resolved.
Show resolved Hide resolved
return new WriterFactory(
tableBroadcast,
queryId,
Expand All @@ -186,7 +219,8 @@ private WriterFactory createWriterFactory() {
targetFileSize,
writeSchema,
dsSchema,
partitionedFanoutEnabled);
partitionedFanoutEnabled,
writeProperties);
}

private void commitOperation(SnapshotUpdate<?> operation, String description) {
Expand Down Expand Up @@ -616,6 +650,7 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr
private final StructType dsSchema;
private final boolean partitionedFanoutEnabled;
private final String queryId;
private final Map<String, String> writeProperties;

protected WriterFactory(
Broadcast<Table> tableBroadcast,
Expand All @@ -625,7 +660,8 @@ protected WriterFactory(
long targetFileSize,
Schema writeSchema,
StructType dsSchema,
boolean partitionedFanoutEnabled) {
boolean partitionedFanoutEnabled,
Map<String, String> writeProperties) {
this.tableBroadcast = tableBroadcast;
this.format = format;
this.outputSpecId = outputSpecId;
Expand All @@ -634,6 +670,7 @@ protected WriterFactory(
this.dsSchema = dsSchema;
this.partitionedFanoutEnabled = partitionedFanoutEnabled;
this.queryId = queryId;
this.writeProperties = writeProperties;
}

@Override
Expand All @@ -657,6 +694,7 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e
.dataFileFormat(format)
.dataSchema(writeSchema)
.dataSparkType(dsSchema)
.writeProperties(writeProperties)
.build();

if (spec.isUnpartitioned()) {
Expand Down