Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Add write options to override the compression properties of the table #8313

Merged
merged 18 commits into from
Aug 29, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,9 @@ private SparkSQLProperties() {}
// Controls the WAP branch used for write-audit-publish workflow.
// When set, new snapshots will be committed to this branch.
public static final String WAP_BRANCH = "spark.wap.branch";

// Controls write compress options
public static final String COMPRESSION_CODEC = "spark.sql.iceberg.write.compression-codec";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We rarely use extra write and read namespaces in SQL properties, the only exception is preserving grouping as it was not clear otherwise. What about dropping write from all names?

spark.sql.iceberg.compression-codec
spark.sql.iceberg.compression-level
spark.sql.iceberg.compression-strategy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

public static final String COMPRESSION_LEVEL = "spark.sql.iceberg.write.compression-level";
public static final String COMPRESSION_STRATEGY = "spark.sql.iceberg.write.compression-strategy";
}
Original file line number Diff line number Diff line change
Expand Up @@ -418,4 +418,64 @@ public String branch() {

return branch;
}

public String parquetCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.PARQUET_COMPRESSION)
.defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT)
.parse();
}

public String parquetCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL)
.defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT)
.parseOptional();
jerqi marked this conversation as resolved.
Show resolved Hide resolved
}

public String avroCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.AVRO_COMPRESSION)
.defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT)
.parse();
}

public String avroCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL)
.defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT)
.parseOptional();
jerqi marked this conversation as resolved.
Show resolved Hide resolved
}

public String orcCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.ORC_COMPRESSION)
.defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT)
.parse();
}

public String orcCompressionStrategy() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_STRATEGY)
.sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY)
.tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY)
.defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT)
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,9 @@ private SparkWriteOptions() {}

// Isolation Level for DataFrame calls. Currently supported by overwritePartitions
public static final String ISOLATION_LEVEL = "isolation-level";

// Controls write compress options
public static final String COMPRESSION_CODEC = "compression-codec";
public static final String COMPRESSION_LEVEL = "compression-level";
public static final String COMPRESSION_STRATEGY = "compression-strategy";
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.SparkAvroWriter;
import org.apache.iceberg.spark.data.SparkOrcWriter;
Expand All @@ -47,6 +48,7 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
private StructType dataSparkType;
private StructType equalityDeleteSparkType;
private StructType positionDeleteSparkType;
private Map<String, String> writeProperties;

SparkFileWriterFactory(
Table table,
Expand All @@ -60,7 +62,8 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
StructType equalityDeleteSparkType,
SortOrder equalityDeleteSortOrder,
Schema positionDeleteRowSchema,
StructType positionDeleteSparkType) {
StructType positionDeleteSparkType,
Map<String, String> writeProperties) {

super(
table,
Expand All @@ -76,6 +79,12 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
this.dataSparkType = dataSparkType;
this.equalityDeleteSparkType = equalityDeleteSparkType;
this.positionDeleteSparkType = positionDeleteSparkType;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: What about inlining if it fits on one line?

...
this.equalityDeleteSparkType = equalityDeleteSparkType;
this.positionDeleteSparkType = positionDeleteSparkType;
this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

if (writeProperties != null) {
this.writeProperties = writeProperties;
} else {
this.writeProperties = ImmutableMap.of();
}
}

static Builder builderFor(Table table) {
Expand All @@ -85,11 +94,13 @@ static Builder builderFor(Table table) {
@Override
protected void configureDataWrite(Avro.DataWriteBuilder builder) {
builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType()));
builder.setAll(writeProperties);
}

@Override
protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType()));
builder.setAll(writeProperties);
}

@Override
Expand All @@ -102,40 +113,47 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
StructType positionDeleteRowSparkType = (StructType) rowField.dataType();
builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType));
}
builder.setAll(writeProperties);
}

@Override
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType));
builder.setAll(writeProperties);
}

@Override
protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
builder.createWriterFunc(
msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType));
builder.setAll(writeProperties);
}

@Override
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
builder.createWriterFunc(
msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType));
builder.transformPaths(path -> UTF8String.fromString(path.toString()));
builder.setAll(writeProperties);
}

@Override
protected void configureDataWrite(ORC.DataWriteBuilder builder) {
builder.createWriterFunc(SparkOrcWriter::new);
builder.setAll(writeProperties);
}

@Override
protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
builder.createWriterFunc(SparkOrcWriter::new);
builder.setAll(writeProperties);
}

@Override
protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
builder.createWriterFunc(SparkOrcWriter::new);
builder.transformPaths(path -> UTF8String.fromString(path.toString()));
builder.setAll(writeProperties);
}

private StructType dataSparkType() {
Expand Down Expand Up @@ -180,6 +198,7 @@ static class Builder {
private SortOrder equalityDeleteSortOrder;
private Schema positionDeleteRowSchema;
private StructType positionDeleteSparkType;
private Map<String, String> writeProperties;

Builder(Table table) {
this.table = table;
Expand Down Expand Up @@ -250,6 +269,11 @@ Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) {
return this;
}

Builder writeProperties(Map<String, String> properties) {
this.writeProperties = properties;
return this;
}

SparkFileWriterFactory build() {
boolean noEqualityDeleteConf = equalityFieldIds == null && equalityDeleteRowSchema == null;
boolean fullEqualityDeleteConf = equalityFieldIds != null && equalityDeleteRowSchema != null;
Expand All @@ -269,7 +293,8 @@ SparkFileWriterFactory build() {
equalityDeleteSparkType,
equalityDeleteSortOrder,
positionDeleteRowSchema,
positionDeleteSparkType);
positionDeleteSparkType,
writeProperties);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@

import static org.apache.iceberg.IsolationLevel.SERIALIZABLE;
import static org.apache.iceberg.IsolationLevel.SNAPSHOT;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -54,6 +60,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.SparkWriteConf;
Expand Down Expand Up @@ -178,6 +185,32 @@ private WriterFactory createWriterFactory() {
// broadcast the table metadata as the writer factory will be sent to executors
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
Map<String, String> writeProperties = Maps.newHashMap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about adding a separate method for this? We could then call it directly in new WriterFactory(...).

private Map<String, String> writeProperties() {
  Map<String, String> properties = Maps.newHashMap();
  ...
  return properties;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

switch (format) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, writeConf.parquetCompressionCodec());
String parquetCompressionLevel = writeConf.parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
}

break;
case AVRO:
writeProperties.put(AVRO_COMPRESSION, writeConf.avroCompressionCodec());
String avroCompressionLevel = writeConf.avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, writeConf.avroCompressionLevel());
}

break;
case ORC:
writeProperties.put(ORC_COMPRESSION, writeConf.orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, writeConf.orcCompressionStrategy());
break;
default:
throw new IllegalArgumentException(String.format("Unknown file format %s", format));
}

jerqi marked this conversation as resolved.
Show resolved Hide resolved
return new WriterFactory(
tableBroadcast,
queryId,
Expand All @@ -186,7 +219,8 @@ private WriterFactory createWriterFactory() {
targetFileSize,
writeSchema,
dsSchema,
partitionedFanoutEnabled);
partitionedFanoutEnabled,
writeProperties);
}

private void commitOperation(SnapshotUpdate<?> operation, String description) {
Expand Down Expand Up @@ -616,6 +650,7 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr
private final StructType dsSchema;
private final boolean partitionedFanoutEnabled;
private final String queryId;
private final Map<String, String> writeProperties;

protected WriterFactory(
Broadcast<Table> tableBroadcast,
Expand All @@ -625,7 +660,8 @@ protected WriterFactory(
long targetFileSize,
Schema writeSchema,
StructType dsSchema,
boolean partitionedFanoutEnabled) {
boolean partitionedFanoutEnabled,
Map<String, String> writeProperties) {
this.tableBroadcast = tableBroadcast;
this.format = format;
this.outputSpecId = outputSpecId;
Expand All @@ -634,6 +670,7 @@ protected WriterFactory(
this.dsSchema = dsSchema;
this.partitionedFanoutEnabled = partitionedFanoutEnabled;
this.queryId = queryId;
this.writeProperties = writeProperties;
}

@Override
Expand All @@ -657,6 +694,7 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e
.dataFileFormat(format)
.dataSchema(writeSchema)
.dataSparkType(dsSchema)
.writeProperties(writeProperties)
.build();

if (spec.isUnpartitioned()) {
Expand Down