Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-32188][Table SQL / Planner] Support to "where" query and simplify condition for an array-type filed of temporary table. #22719

Open
wants to merge 4 commits into
base: release-1.16
Choose a base branch
from

Conversation

YasuoStudyJava
Copy link
Contributor

@YasuoStudyJava YasuoStudyJava commented Jun 6, 2023

What is the purpose of the change

When submitting an SQL task with Flink to test a customized data source connector, I specified to query an array-type field of a temporary table with a fixed-value array. For example, "select * from image-source where URL=ARRAY ['/flink. jpg', '/flink_1. jpg']", but it couldn't obtain the corresponding predicate filters at all in the connector's DynamicTableSource.applyFilters method. This change can fix it.
By the way, linked to https://issues.apache.org/jira/browse/CALCITE-5733. Simplification seem to not take into account that the specified field is of array type. In other words,it can simplify "a = 1 AND a = 2" to "false",but can not simplify “a = [1,2] AND a = [2,3]” to "false". For example, "select * from image-source where URL=ARRAY ['/flink. jpg', '/flink_1. jpg'] AND URL=ARRAY ['/f. jpg', '/f_1. jpg']" can obtain two predicate conditions, this is illogical. Generally speaking, simplifying this SQL condition should not result in any predicates. Changes related to “RexSimplify.java” can fix it.

Brief change log

  • Changes related to “RexSimplify.java” can simplify for array type.
  • Changes related to "RexNodeExtractor.scala" can support where query like "where URL=ARRAY ['aaa', 'bbb']".

Verifying this change

This change involves customized data source connector, you can customize a connector by implement ScanTableSource, SupportsFilterPushDown. How to develop custom connectors is detailed in the Flink community. And then it can be verified as follows:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE IF NOT EXISTS mytable (\n" +
                "        a ARRAY<STRING>,\n" + 
                "        b STRING,\n" +
                "        proc AS proctime()\n" +
                "        ) WITH (\n" +
                "        'connector' = 'your customized data source connector',\n" +
                "        'options_1'='xxx',\n" +
                "        'options_2'='xxx',\n" +
                "        )");
String s1 = "select * from mytable where a = ARRAY['111', '222']";
TableResult result = tableEnv.executeSql(s1);
result.print();

//to test simplify for array-type, follow sql should return no results.
String s1 = "select * from mytable  where a = ARRAY['111', '222'] and a = ARRAY['222', '333']";
TableResult result = tableEnv.executeSql(s1);
result.print();

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

…ify condition for an array-type filed of temporary table.

linked to https://issues.apache.org/jira/browse/CALCITE-5733. Simplification seem to not take into account that the specified field is of array type. In other words,it can simplify "a = 1 AND a = 2" to "false",but can not simplify “a = [1,2] AND a = [2,3]” to "false". This change can fix it.
…ray-type filed of temporary table.

When submitting an SQL task with Flink to test a customized data source connector, I specified to query an array-type field of a temporary table with a fixed-value array. For example, "select * from image-source where URL=ARRAY ['/flink. jpg', '/flink_1. jpg']", but it couldn't obtain the corresponding predicate filters at all in the connector's DynamicTableSource.applyFilters method. This change can fix it.
@flinkbot
Copy link
Collaborator

flinkbot commented Jun 6, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@YasuoStudyJava
Copy link
Contributor Author

There are some format violations, I'll fix it later.

Fix format violations.
Fix format violations.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants