Skip to content

Limit Scan will emit wrong data for partitioned table in partition filter scan #2353

@luoyuxia

Description

@luoyuxia

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

0.8.0 (latest release)

Please describe the bug 🐞

Can be reproduce by the following test, add a new test method in FlinkUnionReadPrimaryKeyTableITCase

@Test
    void t1() throws Exception {
        boolean isPartitioned = true;
        JobClient jobClient = buildTieringJob(execEnv);

        String tableName = "pk_table_full" + (isPartitioned ? "_partitioned" : "_non_partitioned");
        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
        // create table & write initial data
        long tableId =
                preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned, bucketLogEndOffset);

//        // wait unit records have been synced
//        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
//
//        // check the status of replica after synced
//        assertReplicaStatus(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned, bucketLogEndOffset);

        // will read paimon snapshot, won't merge log since it's empty
        List<String> resultEmptyLog =
                toSortedRows(
                        batchTEnv.executeSql(
                                "select * from " + tableName + " where c16 = 'not_exists' limit 10"));
        System.out.println(resultEmptyLog);

        jobClient.cancel().get();
    }

Then print

[+I[false, 1, 2, 3, 4, 5.1, 6.0, string, 0.09, 10, 2023-10-25T12:01:13.182Z, 2023-10-25T12:01:13.182005Z, 2023-10-25T12:01:13.183, 2023-10-25T12:01:13.183006, [1, 2, 3, 4], not_exists], +I[false, 1, 2, 3, 4, 5.1, 6.0, string, 0.09, 10, 2023-10-25T12:01:13.182Z, 2023-10-25T12:01:13.182005Z, 2023-10-25T12:01:13.183, 2023-10-25T12:01:13.183006, [1, 2, 3, 4], not_exists], +I[true, 10, 20, 30, 40, 50.1, 60.0, another_string, 0.90, 100, 2023-10-25T12:01:13.200Z, 2023-10-25T12:01:13.200005Z, 2023-10-25T12:01:13.201, 2023-10-25T12:01:13.201006, [1, 2, 3, 4], not_exists], +I[true, 10, 20, 30, 40, 50.1, 60.0, another_string, 0.90, 100, 2023-10-25T12:01:13.200Z, 2023-10-25T12:01:13.200005Z, 2023-10-25T12:01:13.201, 2023-10-25T12:01:13.201006, [1, 2, 3, 4], not_exists]]

Note the main branch won't have the issue, the reason is after #1934 , the filter won't be accept, so the limit won't push down.

But I create the issue to track it in case some day that we accept the filter again, the reason is that the source accept the partition filter, and then do the limit scan, but the limit scan don't respect the partition filter and return the result. Flink'll append the partition predicate 'not_exists' into the scan row, which cause wrong result.

Solution

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions