-
Notifications
You must be signed in to change notification settings - Fork 3k
[iceberg-1746] Implement spark fanout writer #1774
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Wouldn't we need to also eliminate the pre-sorting from the Spark write to benefit from this? As is all the rows that go into Partitioned writer are assumed to be pre grouped already so a fan out wouldn't really help. Maybe I'm missing something |
|
Seems like this provides the ability to write out non grouped records, won't this create too many possibly small files? What use-cases you have in mind for this @XuQianJin-Stars ? |
hi @rdsr @RussellSpitzer Thanks for the reminder, I will take a closer look at this piece. I only started learning iceberg not long ago. Hope you have more comments to discuss with me. Thanks again. |
well, I can add corresponding unit tests in |
|
I think we could use the fanout writer for streaming write case which may be hard to group due to latency requirement. |
|
If this is not the default and there is a way to turn it on, I could see some value to this. And we may need to use it by default for streaming. This just uses higher memory consumption to avoid needing to cluster rows by partition in tasks. |
User can turn it on according to the parameter |
|
OK now I understand what "fanout" meant. Didn't know about Flink implementation. Thanks. I see the concern of the overall number of output files, but if I understand correctly, using fanout writer would produce the same number of output files - this just eliminates the necessity of "local sort" at the cost of multiple files opening together for write. For the best result of number of output files, we still need to repartition based on partition, regardless of using fanout writer. Another question is, is it better to have the flag on table properties, or have the option on Spark Iceberg sink? The actual concern would be predicting how many files need to be opened together for write. This would be highly depending on the cardinality of partitions for the output, which might depend on the characteristic of the outputs, but might be also "query dependent" like we consider about batch vs streaming. I'm not maintaining the Iceberg table in production scale so can't say. Probably @aokolnychyi would have some insight on this? |
|
I removed the
For spark fanout task writer, I think it's reasonable for the spark streaming scenarios because in that case we don't necessary to shuffle the records based on partition keys. Moving the @XuQianJin-Stars Mind to update this PR to address the CI issue ? Thanks. |
site/docs/configuration.md
Outdated
| | target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes | | ||
| | check-nullability | true | Sets the nullable check on fields | | ||
| | snapshot-property._custom-key_ | null | Adds an entry with custom-key and corresponding value in the snapshot summary | | ||
| | partitioned.fanout.enabled | Table write.partitioned.fanout.enabled | Overrides this table's write.partitioned.fanout.enabled | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Table write.partitioned.fanout.enabled
We won't have table option for the write.partitioned.fanout.enabled, right ? Could just write false by default here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Table write.partitioned.fanout.enabled
We won't have table option for the
write.partitioned.fanout.enabled, right ? Could just writefalseby default here.
right, We won't have table option write.partitioned.fanout.enabled, false is ok.
| TableProperties.WRITE_PARTITIONED_FANOUT_ENABLED, | ||
| TableProperties.WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)) { | ||
| writer = new SparkPartitionedFanoutWriter(spec, format, appenderFactory, fileFactory, io.value(), | ||
| Long.MAX_VALUE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: don't need to break into a new line here
| schema, structType); | ||
| } else { | ||
| writer = new SparkPartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), | ||
| Long.MAX_VALUE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
| table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); | ||
| this.targetFileSize = options.getLong("target-file-size-bytes", tableTargetFileSize); | ||
|
|
||
| boolean tablePartitionedFanoutEnabled = PropertyUtil.propertyAsBoolean( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: will we set this for a given table ? In my option, it's per job ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need set this option for a given table, Because some tables require fanout.
|
|
||
| Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class); | ||
|
|
||
| df.select("id", "data").sort("data").write() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For partitioned fanout case, we don't have to sort based on data column ? Otherwise, what's the difference compared to PartitionedWriter ?
spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
Show resolved
Hide resolved
| private final InternalRowWrapper internalRowWrapper; | ||
|
|
||
| public SparkPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, | ||
| FileAppenderFactory<InternalRow> appenderFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems we could format the code to align with the previous line ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well
| } else { | ||
| writer = new SparkPartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, | ||
| schema, structType); | ||
| if (PropertyUtil.propertyAsBoolean(properties, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think it's more clear to just use :
if(spec.fields().isEmpty()){
return UnpartitionedWriter
} else if(xxx){
return SparkPartitionedFanoutWriter ;
} else {
return SparkPartitionedWriter
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well
| } | ||
| @Test | ||
| public void testPartitionedFanoutCreateWithTargetFileSizeViaOption() throws IOException { | ||
| partitionedCreateWithTargetFileSizeViaOption(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need an unit test to cover the spark's partitioned.fanout.enabled option ? I saw there's an unit test which use the table's write.partitioned.fanout.enabled property to define the fanout behavior.
| } else { | ||
| return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), | ||
| targetFileSize, writeSchema, dsSchema); | ||
| if (partitionedFanoutEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could simplify it as:
if(spec.fields().isEmpty()){
} else if(partitionedFanoutEnabled){
} else {
}| PartitionedFanout24Writer(PartitionSpec spec, FileFormat format, | ||
| SparkAppenderFactory appenderFactory, | ||
| OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize, | ||
| Schema schema, StructType sparkSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: format those lines as:
PartitionedFanout24Writer(PartitionSpec spec, FileFormat format,
SparkAppenderFactory appenderFactory,
OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize,
Schema schema, StructType sparkSchema) {
super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, schema, sparkSchema);
}| } else { | ||
| return new Partitioned3Writer( | ||
| spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema, dsSchema); | ||
| if (partitionedFanoutEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
|
||
| private static class PartitionedFanout3Writer extends SparkPartitionedFanoutWriter | ||
| implements DataWriter<InternalRow> { | ||
| PartitionedFanout3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: format
openinx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Patch looks good to me overall, left several comments. The key point for me is : I think we need to provide an unit test to address the spark option partitioned.fanout.enabled, for both spark2 and spark3.
Thanks @XuQianJin-Stars for contribution.
Changes involved in this PR:
PartitionedFanoutWriterof flink tocore/src/main/java/org/apache/iceberg/iopackage.write.partitioned.fanout.enabled= truein the setting table. The default is false and can also set spark optionpartitioned.fanout.enabledto override it.