Skip to content

Overwritten row data is read from Iceberg table with where clause on value-changed field #5852

@ldwnt

Description

@ldwnt

Apache Iceberg version

0.13.0

Query engine

write: Flink
read: Flink/Spark

Please describe the bug 🐞

I use iceberg flink sink with upsert mode and equality id fields in flink connector. When one row is modified from source side, the new value is expected to read from iceberg target table. However, when where clause is used on this modified column, the old value is read instead.

Procedures to reproduce this bug:

  1. create iceberg table
CREATE TABLE IF NOT EXISTS ice_catalog.test.timequery_ice1(
  `id` int NOT NULL,
  `name` int,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH ('format-version'='2','engine.hive.enabled'='true','write.metadata.delete-after-commit.enabled'='true','write.metadata.previous-versions-max'='1');
  1. produce records to kafka topic:
1,1
  1. read from kafka and write to iceberg using flink sink:
        DataStream<RowData> ds = env
            .fromSource(kafka, WatermarkStrategy.forMonotonousTimestamps(), "src")
            .map((MapFunction<Tuple3<String, String, String>, RowData>) kafkaMessage -> {
                String[] sts = kafkaMessage.f2.split(",");
                GenericRowData rowData = new GenericRowData(2);
                rowData.setField(0, Integer.parseInt(sts[0]));
                rowData.setField(1, Integer.parseInt(sts[1]));
                return rowData;
            })
            .setParallelism(1).uid("@dsm").name("@dsm");

        TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, TableIdentifier.of("test.timequery_ice1"));

        FlinkSink.forRowData(ds)
            .tableLoader(tableLoader)
            .equalityFieldColumns(Arrays.asList("id"))
            .upsert(true)
            .uidPrefix("@s-test.timequery_ice2")
            .append()
            .setParallelism(1).uid("@s-test.timequery_ice2").name("@s-test.timequery_ice2");
  1. wait for checkpoint completion and read iceberg table using spark sql (OK)
select * from test.timequery_ice2 where name < 2;
1       1
  1. produce records to kafka topic:
1,2
  1. wait for checkpoint completion and read iceberg table using spark sql (NG, empty set expected)
select * from test.timequery_ice2 where name < 2;
1       1

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions