Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5750] Incorrect translation of n-ary Union #6287

Closed
wants to merge 0 commits into from

Conversation

AlexanderKoltsov
Copy link
Contributor

What is the purpose of the change

This pull request adds supporting multiple inputs in DataSetUnionRule

Brief change log

  • DataSetUnionRule should consider all inputs instead of only the 1st and 2nd

Verifying this change

This change added the following test:
- Added unit test testValuesWithCast that validates VALUES operator with values which have to to be casted. This query will be transform to UNION of VALUES in plan optimizer since values arguments are not literal value

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@AlexanderKoltsov
Copy link
Contributor Author

Hi, This PR is continuation of #6267
Seems that during push I worked wrong with commits. Sorry!

@AlexanderKoltsov
Copy link
Contributor Author

The only thing that I didn't do is: I didn't add unit test for stream environment.
During this test I got exception.

Test in org.apache.flink.table.runtime.stream.sql package:

@Test
def testValuesWithCast(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = TableEnvironment.getTableEnvironment(env)
  StreamITCase.clear

  val sqlQuery = "VALUES (1, cast(1 as BIGINT) )," +
    "(2, cast(2 as BIGINT))," +
    "(3, cast(3 as BIGINT))"

  val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
  results.addSink(new StreamITCase.RetractingSink)
  env.execute()

  val expected = Seq(
    "1,1\n2,2\n3,3"
  )
  assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
}

Output:

org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:

FlinkLogicalUnion(all=[true])
FlinkLogicalCalc(expr#0=[{inputs}], expr#1=[1], expr#2=[1], EXPR$0=[$t1], EXPR$1=[$t2])
FlinkLogicalValues(tuples=[[{ 0 }]])
FlinkLogicalCalc(expr#0=[{inputs}], expr#1=[2], expr#2=[2], EXPR$0=[$t1], EXPR$1=[$t2])
FlinkLogicalValues(tuples=[[{ 0 }]])
FlinkLogicalCalc(expr#0=[{inputs}], expr#1=[3], expr#2=[3], EXPR$0=[$t1], EXPR$1=[$t2])
FlinkLogicalValues(tuples=[[{ 0 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.

at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:731)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:778)
at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:254)
at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:234)
at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:189)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants