-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Flink: Make dynamic Iceberg sink agnostic to user types #13394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
DynamicRecordProcessor currently performs two unrelated operations: 1. Converts user records to dynamic sink records 2. Converts dynamic sink records to internal records Let's decouple these steps and make users provide a stream of dynamic records directly. This design gives the following benefits: 1. More flexibility for users Users can implement ProcessFunction or any other interface in Flink with more flexibility to define custom mappings and operations. For example, they may want to use a ProcessFunction with access to RuntimeContext instead of the current limited DynamicRecordGenerator contract. Users can also manipulate the conversion topology directly. For example, by running DynamicRecord converters in a separate thread from the internal record processor. 2. Clear separation of concerns Users are responsible for mapping their types to DynamicRecord. DynamicIcebergSink should not be aware of user types. 3. Better visibility in Flink User record conversion will be a separate operator in the Flink job graph and provide its metrics. 4. Alignment with the existing IcebergSink API The existing static IcebergSink is agnostic to user types and doesn't force any record conversion contract.
@@ -191,16 +187,11 @@ public static class Builder<T> { | |||
|
|||
Builder() {} | |||
|
|||
public Builder<T> forInput(DataStream<T> inputStream) { | |||
public Builder forInput(DataStream<DynamicRecord> inputStream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At some point, we also had DynamicRecord
as input type. I agree that this gives greater flexibility to users. The downside is that there is an extra stage before the DynamicRecordProcessor. This introduces some overhead. For example, DynamicRecord will now have to be serialized, even if it is chained to the process operator. Maybe this isn't such a big problem, if we either add a dedicated serializer or make DynamicRecord
Pojo-compatible, according to https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos.
We should probably do some benchmarking to compare the main branch against this PR with:
- Kryo serializer (no code changes necessary)
- Pojo serializer (convert DynamicRecord to Flink Pojo)
- Dedicated serializer (Create a Flink TypeInformation and register it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for the performance tests.
We decided against it at that time, let's see if we can improve on this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I compared the TestDynamicIcebergSinkPerf.testDynamicSink
performance benchmark for this PR and the old version, and there was no statistical difference. Here are the results:
main
branch:
TEST RESULT: For table default.t_26 snapshot 2868386798804424438 written 0 records in -35623 ms
TEST RESULT: For table default.t_26 snapshot 7069644643429169829 written 1666600 records in 11386 ms
TEST RESULT: For table default.t_26 snapshot 6031041964330951590 written 1666600 records in 2778 ms
TEST RESULT: For table default.t_26 snapshot 2156359093155143830 written 1666600 records in 2814 ms
TEST RESULT: For table default.t_26 snapshot 2580547273748232679 written 1666600 records in 2744 ms
TEST RESULT: For table default.t_26 snapshot 2564715127645259386 written 1666600 records in 2798 ms
TEST RESULT: For table default.t_26 snapshot 3771055749763299893 written 1666600 records in 2668 ms
TEST RESULT: For table default.t_26 snapshot 6016960118945170848 written 1666600 records in 2668 ms
TEST RESULT: For table default.t_26 snapshot 7504949381208081312 written 1666600 records in 2643 ms
TEST RESULT: For table default.t_26 snapshot 8432520790651649149 written 1666600 records in 2669 ms
TEST RESULT: For table default.t_26 snapshot 6263708423788691662 written 1666600 records in 2455 ms
- This PR:
TEST RESULT: For table default.t_26 snapshot 2361210778853343244 written 0 records in -35186 ms
TEST RESULT: For table default.t_26 snapshot 1109176579012990641 written 1666600 records in 11571 ms
TEST RESULT: For table default.t_26 snapshot 6028394606366492990 written 1666600 records in 2769 ms
TEST RESULT: For table default.t_26 snapshot 1889252285380896154 written 1666600 records in 2717 ms
TEST RESULT: For table default.t_26 snapshot 1614760403647339883 written 1666600 records in 2602 ms
TEST RESULT: For table default.t_26 snapshot 8354008012153290489 written 1666600 records in 2681 ms
TEST RESULT: For table default.t_26 snapshot 8163325083185166838 written 1666600 records in 2579 ms
TEST RESULT: For table default.t_26 snapshot 1881898039513018409 written 1666600 records in 2562 ms
TEST RESULT: For table default.t_26 snapshot 6999646099254321715 written 1666600 records in 2576 ms
TEST RESULT: For table default.t_26 snapshot 6457661440126048547 written 1666600 records in 2612 ms
TEST RESULT: For table default.t_26 snapshot 291506493357250761 written 1666600 records in 2509 ms
By default, Flink always chains operations, so there should be minimal overhead. There is no serialisation either because we reuse objects between stages.
It may also be fine to keep DynamicRecord
as is for now. Most users, who care about performance, would use the Dynamic Sink with object reuse and operator chaining, so there will be no extra serialisation. And if not, Kryo serialiser works out of the box. Also, users can still plug in a custom serialiser, by using a returns()
API of the DataStream
.
@mxm, it doesn't seem like we can make DynamicRecord
Pojo-compatible because it uses a RowData
type. We would probably need to implement a custom serialiser, similar to DynamicRecordInternalSerializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default, Flink always chains operations, so there should be minimal overhead. There is no serialisation either because we reuse objects between stages.
True, but chaining can be influenced by the user, either programmatically via the config or via user operations, e.g. reshuffle transformations. Chaining without object reuse will still run through serialization cycle. Object reuse mode is enabled in the perf test because that gives optimal performance, but that's not how users typically run. Can we re-test with object reuse disabled? Just to know how much slower it is.
It may also be fine to keep DynamicRecord as is for now. Most users, who care about performance, would use the Dynamic Sink with object reuse and operator chaining, so there will be no extra serialisation. And if not, Kryo serialiser works out of the box. Also, users can still plug in a custom serialiser, by using a returns() API of the DataStream.
Ack, I agree. I would still double check that the out of the box performance is good enough such that it does not require too much tuning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With no object reuse, the dynamic sink performance test never completes with changes in this PR and spams with:
WARN org.apache.iceberg.flink.sink.dynamic.TableMetadataCache - Performance degraded as records with different schema is generated for the same table. Likely the DynamicRecord.schema is not reused. Reuse the same instance if the record schema is the same to improve performance
WARN org.apache.iceberg.flink.sink.dynamic.TableMetadataCache - Performance degraded as records with different schema is generated for the same table. Likely the DynamicRecord.schema is not reused. Reuse the same instance if the record schema is the same to improve performance
WARN org.apache.iceberg.flink.sink.dynamic.TableMetadataCache - Performance degraded as records with different schema is generated for the same table. Likely the DynamicRecord.schema is not reused. Reuse the same instance if the record schema is the same to improve performance
...
I guess this is because schemas are copied across stages. We may indeed need a custom serialiser to fix this or leave things as status quo. Maybe the current API is not too bad given it forces object reuse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Printing this message excessively may also cause performance issues.
I guess this is because schemas are copied across stages.
Yes, chaining will always copy, unless object reuse is enabled.
We may indeed need a custom serialiser to fix this or leave things as status quo.
A custom serializer per se wouldn't prevent this, unless it returned always the same schema object for when the schema doesn't change. The problem is that we don't have a cheap way to compare two schema objects.
Thanks for the investigation @aiborodin. |
DynamicRecordProcessor
currently performs two operations:Let's decouple these steps and make users provide a stream of dynamic records directly. This design gives the following benefits:
Users can implement
ProcessFunction
or any other interface in Flink with more flexibility to define custom mappings and operations. For example, they may want to use aProcessFunction
with access toRuntimeContext
instead of the current limitedDynamicRecordGenerator
contract. Users can also manipulate the conversion topology directly. For example, by runningDynamicRecord
converters in a separate thread from the internal record processor.Users are responsible for mapping their types to
DynamicRecord
.DynamicIcebergSink
should not be aware of user types.User record conversion will be a separate operator in the Flink job graph and provide its metrics.
IcebergSink
APIThe existing static IcebergSink is agnostic to user types and doesn't force any record conversion contract.