Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Add write options to override the compression properties of the table #8313

Merged
merged 18 commits into from
Aug 29, 2023
3 changes: 3 additions & 0 deletions docs/spark-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ df.write
| check-ordering | true | Checks if input schema and table schema are same |
| isolation-level | null | Desired isolation level for Dataframe overwrite operations. `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. |
| validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../../api#table-metadata) or [Snapshots table](../spark-queries#snapshots). If null, the table's oldest known snapshot is used. |
| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write |
| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |

CommitMetadata provides an interface to add custom metadata to a snapshot summary during a SQL execution, which can be beneficial for purposes such as auditing or change tracking. If properties start with `snapshot-property.`, then that prefix will be removed from each property. Here is an example:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,9 @@ private SparkSQLProperties() {}
// Controls the WAP branch used for write-audit-publish workflow.
// When set, new snapshots will be committed to this branch.
public static final String WAP_BRANCH = "spark.wap.branch";

// Controls write compress options
public static final String COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec";
public static final String COMPRESSION_LEVEL = "spark.sql.iceberg.compression-level";
public static final String COMPRESSION_STRATEGY = "spark.sql.iceberg.compression-strategy";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
import static org.apache.iceberg.DistributionMode.HASH;
import static org.apache.iceberg.DistributionMode.NONE;
import static org.apache.iceberg.DistributionMode.RANGE;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;

import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -418,4 +424,95 @@ public String branch() {

return branch;
}

public String parquetCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.PARQUET_COMPRESSION)
.defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT)
.parse();
}

public String parquetCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL)
.defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT)
.parseOptional();
jerqi marked this conversation as resolved.
Show resolved Hide resolved
}

public String avroCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.AVRO_COMPRESSION)
.defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT)
.parse();
}

public String avroCompressionLevel() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_LEVEL)
.sessionConf(SparkSQLProperties.COMPRESSION_LEVEL)
.tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL)
.defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT)
.parseOptional();
jerqi marked this conversation as resolved.
Show resolved Hide resolved
}

public String orcCompressionCodec() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_CODEC)
.sessionConf(SparkSQLProperties.COMPRESSION_CODEC)
.tableProperty(TableProperties.ORC_COMPRESSION)
.defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT)
.parse();
}

public String orcCompressionStrategy() {
return confParser
.stringConf()
.option(SparkWriteOptions.COMPRESSION_STRATEGY)
.sessionConf(SparkSQLProperties.COMPRESSION_STRATEGY)
.tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY)
.defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT)
.parse();
}

public Map<String, String> writeProperties(FileFormat format) {
Map<String, String> writeProperties = Maps.newHashMap();

switch (format) {
case PARQUET:
writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec());
String parquetCompressionLevel = parquetCompressionLevel();
if (parquetCompressionLevel != null) {
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
Copy link
Collaborator

@szehon-ho szehon-ho Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi guys, while working on #8299, I noticed the test failing and that this is missing delete file compression override. I think the TestCompression delete file compression check passes by chance, I have a fix over there for it: 43e3cab

Copy link
Contributor Author

@jerqi jerqi Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for this mistake. I will check how to valid the correctness of the case.
企业微信截图_9057e533-5033-48a3-817e-a22238277d8d
If we don't set the delete compression codec, we will reuse data compression codec. So the test case passed.
I have raised a pr #8438 to fix this issue.

}

jerqi marked this conversation as resolved.
Show resolved Hide resolved
break;
case AVRO:
writeProperties.put(AVRO_COMPRESSION, avroCompressionCodec());
String avroCompressionLevel = avroCompressionLevel();
if (avroCompressionLevel != null) {
writeProperties.put(AVRO_COMPRESSION_LEVEL, avroCompressionLevel());
}

break;
case ORC:
writeProperties.put(ORC_COMPRESSION, orcCompressionCodec());
writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy());
break;
default:
throw new IllegalArgumentException(String.format("Unknown file format %s", format));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether it is worth failing the query. Maybe, just do nothing here?

Copy link
Contributor Author

@jerqi jerqi Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refer to the Flink's implement #6049. Flink choose to fail the query. It also has benefits if we choose to fail the query. We can find the wrong config option more easily.

}

return writeProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,9 @@ private SparkWriteOptions() {}

// Isolation Level for DataFrame calls. Currently supported by overwritePartitions
public static final String ISOLATION_LEVEL = "isolation-level";

// Controls write compress options
public static final String COMPRESSION_CODEC = "compression-codec";
public static final String COMPRESSION_LEVEL = "compression-level";
public static final String COMPRESSION_STRATEGY = "compression-strategy";
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.SparkAvroWriter;
import org.apache.iceberg.spark.data.SparkOrcWriter;
Expand All @@ -47,6 +48,7 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
private StructType dataSparkType;
private StructType equalityDeleteSparkType;
private StructType positionDeleteSparkType;
private Map<String, String> writeProperties;

SparkFileWriterFactory(
Table table,
Expand All @@ -60,7 +62,8 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
StructType equalityDeleteSparkType,
SortOrder equalityDeleteSortOrder,
Schema positionDeleteRowSchema,
StructType positionDeleteSparkType) {
StructType positionDeleteSparkType,
Map<String, String> writeProperties) {

super(
table,
Expand All @@ -76,6 +79,7 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
this.dataSparkType = dataSparkType;
this.equalityDeleteSparkType = equalityDeleteSparkType;
this.positionDeleteSparkType = positionDeleteSparkType;
this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of();
}

static Builder builderFor(Table table) {
Expand All @@ -85,11 +89,13 @@ static Builder builderFor(Table table) {
@Override
protected void configureDataWrite(Avro.DataWriteBuilder builder) {
builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType()));
builder.setAll(writeProperties);
}

@Override
protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType()));
builder.setAll(writeProperties);
}

@Override
Expand All @@ -102,40 +108,48 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
StructType positionDeleteRowSparkType = (StructType) rowField.dataType();
builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType));
}

builder.setAll(writeProperties);
}

@Override
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType));
builder.setAll(writeProperties);
}

@Override
protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
builder.createWriterFunc(
msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType));
builder.setAll(writeProperties);
}

@Override
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
builder.createWriterFunc(
msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType));
builder.transformPaths(path -> UTF8String.fromString(path.toString()));
builder.setAll(writeProperties);
}

@Override
protected void configureDataWrite(ORC.DataWriteBuilder builder) {
builder.createWriterFunc(SparkOrcWriter::new);
builder.setAll(writeProperties);
}

@Override
protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
builder.createWriterFunc(SparkOrcWriter::new);
builder.setAll(writeProperties);
}

@Override
protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
builder.createWriterFunc(SparkOrcWriter::new);
builder.transformPaths(path -> UTF8String.fromString(path.toString()));
builder.setAll(writeProperties);
}

private StructType dataSparkType() {
Expand Down Expand Up @@ -180,6 +194,7 @@ static class Builder {
private SortOrder equalityDeleteSortOrder;
private Schema positionDeleteRowSchema;
private StructType positionDeleteSparkType;
private Map<String, String> writeProperties;

Builder(Table table) {
this.table = table;
Expand Down Expand Up @@ -250,6 +265,11 @@ Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) {
return this;
}

Builder writeProperties(Map<String, String> properties) {
this.writeProperties = properties;
return this;
}

SparkFileWriterFactory build() {
boolean noEqualityDeleteConf = equalityFieldIds == null && equalityDeleteRowSchema == null;
boolean fullEqualityDeleteConf = equalityFieldIds != null && equalityDeleteRowSchema != null;
Expand All @@ -269,7 +289,8 @@ SparkFileWriterFactory build() {
equalityDeleteSparkType,
equalityDeleteSortOrder,
positionDeleteRowSchema,
positionDeleteSparkType);
positionDeleteSparkType,
writeProperties);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class SparkPositionDeletesRewrite implements Write {
private final String fileSetId;
private final int specId;
private final StructLike partition;
private final Map<String, String> writeProperties;

/**
* Constructs a {@link SparkPositionDeletesRewrite}.
Expand Down Expand Up @@ -106,6 +108,7 @@ public class SparkPositionDeletesRewrite implements Write {
this.fileSetId = writeConf.rewrittenFileSetId();
this.specId = specId;
this.partition = partition;
this.writeProperties = writeConf.writeProperties(format);
}

@Override
Expand All @@ -129,7 +132,8 @@ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
writeSchema,
dsSchema,
specId,
partition);
partition,
writeProperties);
}

@Override
Expand Down Expand Up @@ -174,6 +178,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
private final StructType dsSchema;
private final int specId;
private final StructLike partition;
private final Map<String, String> writeProperties;

PositionDeletesWriterFactory(
Broadcast<Table> tableBroadcast,
Expand All @@ -183,7 +188,8 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
Schema writeSchema,
StructType dsSchema,
int specId,
StructLike partition) {
StructLike partition,
Map<String, String> writeProperties) {
this.tableBroadcast = tableBroadcast;
this.queryId = queryId;
this.format = format;
Expand All @@ -192,6 +198,7 @@ static class PositionDeletesWriterFactory implements DataWriterFactory {
this.dsSchema = dsSchema;
this.specId = specId;
this.partition = partition;
this.writeProperties = writeProperties;
}

@Override
Expand Down Expand Up @@ -219,6 +226,7 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
SparkFileWriterFactory.builderFor(table)
.deleteFileFormat(format)
.positionDeleteSparkType(deleteSparkTypeWithoutRow)
.writeProperties(writeProperties)
.build();

return new DeleteWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
private final Context context;

private boolean cleanupOnAbort = true;
private final Map<String, String> writeProperties;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: This variable should be co-located with other final variables above. It is super minor but let's do this in a follow-up.

Copy link
Contributor Author

@jerqi jerqi Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I have raised a follow-up pr #8421. The pr has been merged.


SparkPositionDeltaWrite(
SparkSession spark,
Expand All @@ -126,6 +127,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.writeRequirements = writeConf.positionDeltaRequirements(command);
this.context = new Context(dataSchema, writeConf, info, writeRequirements);
this.writeProperties = writeConf.writeProperties(context.dataFileFormat);
}

@Override
Expand Down Expand Up @@ -155,7 +157,7 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
// broadcast the table metadata as the writer factory will be sent to executors
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
return new PositionDeltaWriteFactory(tableBroadcast, command, context);
return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties);
}

@Override
Expand Down Expand Up @@ -326,11 +328,17 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory {
private final Broadcast<Table> tableBroadcast;
private final Command command;
private final Context context;
private final Map<String, String> writeProperties;

PositionDeltaWriteFactory(Broadcast<Table> tableBroadcast, Command command, Context context) {
PositionDeltaWriteFactory(
Broadcast<Table> tableBroadcast,
Command command,
Context context,
Map<String, String> writeProperties) {
this.tableBroadcast = tableBroadcast;
this.command = command;
this.context = context;
this.writeProperties = writeProperties;
}

@Override
Expand All @@ -356,6 +364,7 @@ public DeltaWriter<InternalRow> createWriter(int partitionId, long taskId) {
.dataSparkType(context.dataSparkType())
.deleteFileFormat(context.deleteFileFormat())
.positionDeleteSparkType(context.deleteSparkType())
.writeProperties(writeProperties)
.build();

if (command == DELETE) {
Expand Down