-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.4: Add write options to override the compression properties of the table #8313
Changes from 5 commits
bdf359c
8b8affc
49063cb
d08226b
e2ac0c3
bb11538
86458e9
923e03a
b607d7a
083e84f
102c119
faf8f5d
371ac02
9321807
c6218f7
91a88a9
f0e295a
2c0308c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ | |
import org.apache.iceberg.orc.ORC; | ||
import org.apache.iceberg.parquet.Parquet; | ||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
import org.apache.iceberg.spark.SparkSchemaUtil; | ||
import org.apache.iceberg.spark.data.SparkAvroWriter; | ||
import org.apache.iceberg.spark.data.SparkOrcWriter; | ||
|
@@ -47,6 +48,7 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> { | |
private StructType dataSparkType; | ||
private StructType equalityDeleteSparkType; | ||
private StructType positionDeleteSparkType; | ||
private Map<String, String> writeProperties; | ||
|
||
SparkFileWriterFactory( | ||
Table table, | ||
|
@@ -60,7 +62,8 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> { | |
StructType equalityDeleteSparkType, | ||
SortOrder equalityDeleteSortOrder, | ||
Schema positionDeleteRowSchema, | ||
StructType positionDeleteSparkType) { | ||
StructType positionDeleteSparkType, | ||
Map<String, String> writeProperties) { | ||
|
||
super( | ||
table, | ||
|
@@ -76,6 +79,12 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> { | |
this.dataSparkType = dataSparkType; | ||
this.equalityDeleteSparkType = equalityDeleteSparkType; | ||
this.positionDeleteSparkType = positionDeleteSparkType; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Optional: What about inlining if it fits on one line?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
if (writeProperties != null) { | ||
this.writeProperties = writeProperties; | ||
} else { | ||
this.writeProperties = ImmutableMap.of(); | ||
} | ||
} | ||
|
||
static Builder builderFor(Table table) { | ||
|
@@ -85,11 +94,13 @@ static Builder builderFor(Table table) { | |
@Override | ||
protected void configureDataWrite(Avro.DataWriteBuilder builder) { | ||
builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType())); | ||
builder.setAll(writeProperties); | ||
} | ||
|
||
@Override | ||
protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { | ||
builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType())); | ||
builder.setAll(writeProperties); | ||
} | ||
|
||
@Override | ||
|
@@ -102,40 +113,48 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { | |
StructType positionDeleteRowSparkType = (StructType) rowField.dataType(); | ||
builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType)); | ||
} | ||
|
||
builder.setAll(writeProperties); | ||
} | ||
|
||
@Override | ||
protected void configureDataWrite(Parquet.DataWriteBuilder builder) { | ||
builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType)); | ||
builder.setAll(writeProperties); | ||
} | ||
|
||
@Override | ||
protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { | ||
builder.createWriterFunc( | ||
msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType)); | ||
builder.setAll(writeProperties); | ||
} | ||
|
||
@Override | ||
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { | ||
builder.createWriterFunc( | ||
msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType)); | ||
builder.transformPaths(path -> UTF8String.fromString(path.toString())); | ||
builder.setAll(writeProperties); | ||
} | ||
|
||
@Override | ||
protected void configureDataWrite(ORC.DataWriteBuilder builder) { | ||
builder.createWriterFunc(SparkOrcWriter::new); | ||
builder.setAll(writeProperties); | ||
} | ||
|
||
@Override | ||
protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) { | ||
builder.createWriterFunc(SparkOrcWriter::new); | ||
builder.setAll(writeProperties); | ||
} | ||
|
||
@Override | ||
protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) { | ||
builder.createWriterFunc(SparkOrcWriter::new); | ||
builder.transformPaths(path -> UTF8String.fromString(path.toString())); | ||
builder.setAll(writeProperties); | ||
} | ||
|
||
private StructType dataSparkType() { | ||
|
@@ -180,6 +199,7 @@ static class Builder { | |
private SortOrder equalityDeleteSortOrder; | ||
private Schema positionDeleteRowSchema; | ||
private StructType positionDeleteSparkType; | ||
private Map<String, String> writeProperties; | ||
|
||
Builder(Table table) { | ||
this.table = table; | ||
|
@@ -250,6 +270,11 @@ Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) { | |
return this; | ||
} | ||
|
||
Builder writeProperties(Map<String, String> properties) { | ||
this.writeProperties = properties; | ||
return this; | ||
} | ||
|
||
SparkFileWriterFactory build() { | ||
boolean noEqualityDeleteConf = equalityFieldIds == null && equalityDeleteRowSchema == null; | ||
boolean fullEqualityDeleteConf = equalityFieldIds != null && equalityDeleteRowSchema != null; | ||
|
@@ -269,7 +294,8 @@ SparkFileWriterFactory build() { | |
equalityDeleteSparkType, | ||
equalityDeleteSortOrder, | ||
positionDeleteRowSchema, | ||
positionDeleteSparkType); | ||
positionDeleteSparkType, | ||
writeProperties); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,12 @@ | |
|
||
import static org.apache.iceberg.IsolationLevel.SERIALIZABLE; | ||
import static org.apache.iceberg.IsolationLevel.SNAPSHOT; | ||
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; | ||
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; | ||
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; | ||
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; | ||
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; | ||
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; | ||
|
||
import java.io.IOException; | ||
import java.util.Arrays; | ||
|
@@ -54,6 +60,7 @@ | |
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
import org.apache.iceberg.spark.CommitMetadata; | ||
import org.apache.iceberg.spark.FileRewriteCoordinator; | ||
import org.apache.iceberg.spark.SparkWriteConf; | ||
|
@@ -178,6 +185,32 @@ private WriterFactory createWriterFactory() { | |
// broadcast the table metadata as the writer factory will be sent to executors | ||
Broadcast<Table> tableBroadcast = | ||
sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); | ||
Map<String, String> writeProperties = Maps.newHashMap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about adding a separate method for this? We could then call it directly in
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
switch (format) { | ||
case PARQUET: | ||
writeProperties.put(PARQUET_COMPRESSION, writeConf.parquetCompressionCodec()); | ||
String parquetCompressionLevel = writeConf.parquetCompressionLevel(); | ||
if (parquetCompressionLevel != null) { | ||
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); | ||
} | ||
|
||
break; | ||
case AVRO: | ||
writeProperties.put(AVRO_COMPRESSION, writeConf.avroCompressionCodec()); | ||
String avroCompressionLevel = writeConf.avroCompressionLevel(); | ||
if (avroCompressionLevel != null) { | ||
writeProperties.put(AVRO_COMPRESSION_LEVEL, writeConf.avroCompressionLevel()); | ||
} | ||
|
||
break; | ||
case ORC: | ||
writeProperties.put(ORC_COMPRESSION, writeConf.orcCompressionCodec()); | ||
writeProperties.put(ORC_COMPRESSION_STRATEGY, writeConf.orcCompressionStrategy()); | ||
break; | ||
default: | ||
throw new IllegalArgumentException(String.format("Unknown file format %s", format)); | ||
} | ||
|
||
jerqi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return new WriterFactory( | ||
tableBroadcast, | ||
queryId, | ||
|
@@ -186,7 +219,8 @@ private WriterFactory createWriterFactory() { | |
targetFileSize, | ||
writeSchema, | ||
dsSchema, | ||
partitionedFanoutEnabled); | ||
partitionedFanoutEnabled, | ||
writeProperties); | ||
} | ||
|
||
private void commitOperation(SnapshotUpdate<?> operation, String description) { | ||
|
@@ -616,6 +650,7 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr | |
private final StructType dsSchema; | ||
private final boolean partitionedFanoutEnabled; | ||
private final String queryId; | ||
private final Map<String, String> writeProperties; | ||
|
||
protected WriterFactory( | ||
Broadcast<Table> tableBroadcast, | ||
|
@@ -625,7 +660,8 @@ protected WriterFactory( | |
long targetFileSize, | ||
Schema writeSchema, | ||
StructType dsSchema, | ||
boolean partitionedFanoutEnabled) { | ||
boolean partitionedFanoutEnabled, | ||
Map<String, String> writeProperties) { | ||
this.tableBroadcast = tableBroadcast; | ||
this.format = format; | ||
this.outputSpecId = outputSpecId; | ||
|
@@ -634,6 +670,7 @@ protected WriterFactory( | |
this.dsSchema = dsSchema; | ||
this.partitionedFanoutEnabled = partitionedFanoutEnabled; | ||
this.queryId = queryId; | ||
this.writeProperties = writeProperties; | ||
} | ||
|
||
@Override | ||
|
@@ -657,6 +694,7 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e | |
.dataFileFormat(format) | ||
.dataSchema(writeSchema) | ||
.dataSparkType(dsSchema) | ||
.writeProperties(writeProperties) | ||
.build(); | ||
|
||
if (spec.isUnpartitioned()) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We rarely use extra
write
andread
namespaces in SQL properties, the only exception is preserving grouping as it was not clear otherwise. What about droppingwrite
from all names?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.