Skip to content

Flink: get duplicate rows when sync CDC data by FlinkSQL #2918

@Reo-LEI

Description

@Reo-LEI

Duplicate Rows Problem

Recently, I trying use FlinkSQL to synchronize MySQL to iceberg. Saving MySQL/PostgreSQL to iceberg is very common case in the construction of data warehouses. But I got duplicat rows which have same primary key in iceberg table but not in mysql table. I will illustrate the reason of this problem as follow.

Assume we have a sample MySQL table which just has one record:

id data
1 aaa

And we define an ETL job by FlinkSQL as follow:

CREATE TABLE `mysql_table` (
    `id` INT, 
    `data` STRING
)
WITH (
    'connector' = 'mysql-cdc', 
    'database-name' = 'xxx', 
    ...
)

CREATE TABLE `iceberg_table` (
    `id` INT, 
    `data` STRING
    PRIMARY KEY(id) NOT ENFORCED
)
WITH (
    'connector' = 'iceberg', 
    'catalog-type' = 'hive', 
    'catalog-name' = 'iceberg', 
    'catalog-database' = 'xxx', 
    'format.version' = '2'
    ...
)

INSERT INTO `iceberg_table``
SELECT `id`, `data` FROM `mysql_table`

If we don't setting the default parallelism of job, we will get the job graph like this:

FlinkCDCSource ---forward---> Filter ---forward---> IcebergStreamWriter ---forward---> IcebergFilesCommitter

That will be work fine and not duplicat rows. because all opr parallelism is 1, and all cdc data will be distributed to one IcebergStreamWriter.

But once we setting the default parallelism of job by SET table.exec.resource.default-parallelism=3, we will get another job graph as follow:

                              +---> Filter ---forward---> IcebergStreamWriter-1 ---+
FlinkCDCSource ---rebalance---+---> Filter ---forward---> IcebergStreamWriter-2 ---+---rebalance---> IcebergFilesCommitter
                              +---> Filter ---forward---> IcebergStreamWriter-3 ---+

Notice the Filter opr parallelism is follow the job default parallelism and increase to 3. For iceberg FlinkSink, due to the writeParallelism is not be set so the IcebergStreamWriter parallelism will follow the rowDataInput(Filter) parallelism as 3.
The FlinkCDCSource parallelism will be alway 1, because binlog data need to be send by serial. Consider Flink use rebalance as default shuffle strategy. Now we can see the CDC data will be rebalance to three different Filter and then emit different IcebergStreamWriter.

At this time, If we insert one row and update it as follow, we will get an duplicate row.

INSERT INTO `mysql_table`(id, data) VALUES(2, 'bbb');
UPDATE `mysql_table` SET data = 'xxx' WHERE id = '2';

MySQL:

id data
1 aaa
2 xxx

Iceberg:

id data
1 aaa
2 bbb
2 xxx

We got the duplicate row of <2, 'bbb'> because the change log data wil be rebalance to different IcebergStreamWriter.
For the first INSERT SQL, we will got a +I record, and that will be send to the IcebergStreamWriter-1.
For the second UPDATE SQL, we will got a -U record and a +U record. The -U record will be send to the IcebergStreamWriter-2 and +U record wil be rebalance to the IcebergStreamWriter-3
Due to IcebergStreamWriter-2 have not any data INSERT before, it's insertedRowMap is empty, and then the pos-delete(-U) will be ignore and will not write to delete file.
Finally we got an duplicate row <2, 'bbb'>.

Disorder Problem

The reason for duplicat rows is as mentioned above, but when I try to fix that by add keyBy before IcebergStreamWriter, I encounter another problem.
For FlinkSQL, flink will append a Filter opr before call IcebergTableSink.getSinkRuntimeProvider to prevenet send null data to downstream. So, when we get the rowDataInput from flink in FlinkSink, rowDataInput definitely contain a Filter opr which parallelism is follow the job default parallelism as above.

In our case, if we just keyBy equalityFields before add IcebergStreamWriter, we will get the job graph as follow:

                              +---> Filter ---+      +---> IcebergStreamWriter-1 ---+
FlinkCDCSource ---rebalance---+---> Filter ---+-hash-+---> IcebergStreamWriter-2 ---+---rebalance---> IcebergFilesCommitter
                              +---> Filter ---+      +---> IcebergStreamWriter-3 ---+

Notice the CDC data come from FlinkCDCSource will be rebalance to Filter immediately. Even if all CDC data which have same key will be emit to same IcebergStreamWriter, but that will make disorder between CDC data(e.g. same IcebergStreamWriter will receive +U before -U). And finally we will get a lot of strange result.

Goal

In summary, I think iceberg should ensure that data with the same primary key will be updated and not duplicat rows when we sink CDC/UPSERT data to iceberg.
I trying to fix this problem by chaining the Source and Filter opr and add keyBy before IcebergStreamWriter when use FlinkSQL. Finally I want to got a correct job graph like this:

                                             +---> IcebergStreamWriter-1 ---+
FlinkCDCSource ---forward---> Filter ---hash-+---> IcebergStreamWriter-2 ---+---rebalance---> IcebergFilesCommitter
                                             +---> IcebergStreamWriter-3 ---+

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions