-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Fixes flink sink failed due to updating partition spec #7171
Conversation
cc @chenjunjiedada @stevenzwu @rdblue Pls take a look when you are free. Thanks a lot. |
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
Outdated
Show resolved
Hide resolved
@szehon-ho @stevenzwu can we get this merged ? |
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
Outdated
Show resolved
Hide resolved
Thanks @hililiwei, comment has been addressed. Pls take another look. |
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
Outdated
Show resolved
Hide resolved
@ConeyLiu is there a typo in the above descriptions. seems contradicting each other. |
@stevenzwu I'm sorry for the mistake. Just updated the descriptions. |
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
Show resolved
Hide resolved
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
Show resolved
Hide resolved
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
Show resolved
Hide resolved
Thanks @stevenzwu for the review, and I am sorry for the later response due to some things to do. I updated the fixes implementation. Please take another look when you are free, thanks a lot. |
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
Outdated
Show resolved
Hide resolved
|
||
specId = | ||
getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId); | ||
Assert.assertEquals( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking the staging manifest files are written with old partition spec.
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
Outdated
Show resolved
Hide resolved
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
Outdated
Show resolved
Hide resolved
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Show resolved
Hide resolved
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
Outdated
Show resolved
Hide resolved
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
@stevenzwu, We also have a requirement to migrate the table without restarting the Flink job since users may have thousands of production streaming jobs online. Right now, I don't have a full solution in my mind, the early thinking is to notify the task manager to update the writer after checkpoint. Do you have a such kind requirement as well? Any idea? |
@chenjunjiedada we probably can take this discussion in a separate issue. I remember some previous ask in this area about handling table schema evolution without manual intervention. I couldn't seem to find the PR or issue. there are two slightly different asks.
case 1 can be implemented with resolving the write schema (or partition spec) not during job initialization, rather during task initialization. writers periodically check (e.g. every checkpoint cycle) if table schema or partition spec changed. if changed, writers can fail the job. Restart and task initialization will load the latest schema and spec. However, it does bring scalability concern because every writer task (hundreds or more) need to load a Iceberg table from catalog to retrieve the schema and partition spec. Case 2 can be implemented similarly. But it is more risky. if bad records (schema) can cause unintended change in Iceberg table schema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very close now. the latest approach seems cleaner. left a few nit comments
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
thanks @ConeyLiu for the fix and @Reo-LEI and @hililiwei for the review |
Thanks all. Will submit backport PRs. |
Co-authored-by: xianyangliu <xianyangliu@tencent.com>
We use a
SerializableTable
instance to createIcebergStreamWriter
. The PartitionSpec ofSerializableTable
is fixed and will not change after the job started. While the PartitionSpec forIcebergFilesCommitter
is refreshed with the table snapshot changing. This could fail the fink sink job when updating the partition spec in another job. Because we use the wrong partition spec to write those DataFiles/DeleteFiles to ManifestFile.For example, we got the following error when updating the partition spec:
In this patch, we use the correct partition spec to write the staging manifest file.