Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8897] [table] Fix rowtime materialization issues for filters and joins #6987

Closed
wants to merge 3 commits into from

Conversation

twalthr
Copy link
Contributor

@twalthr twalthr commented Nov 1, 2018

What is the purpose of the change

This PR solves the "mismatched type" AssertionError caused by invalid time indicator materialization. FLINK-6232 removed the materialization of filters for windowed joins, however, this led to mismatches between input types and expected input types of RexNodes. This PR modifies the window join rule to match against a newly materialized expression (CAST(rowtime) and PROCTIME(proctime)) instead.

Additionally, this PR simplifies the general time attribute logic for joins and gives better error messages for current limitations (namely no rowtime attributes in projections or condiitions). We should fix these limitation by some major refactoring as part of FLINK-10211.

Brief change log

  • Materialize conditions of filters and joins again and match against newly materialized expressions
  • Simplify code and improve error message

Verifying this change

  • Change is covered by existing tests.
  • New tests in JoinValidationTest
  • New test in TimeAttributesITCase

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@twalthr
Copy link
Contributor Author

twalthr commented Nov 1, 2018

@xccui @hequn8128 would be great if you can take a look

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks mostly good to me, but I'm more depending on the passing tests then on my own knowledge here. Left couple of smaller comments.

@@ -1003,12 +980,43 @@ class JoinTest extends TableTestBase {
util.verifyTable(result, expected)
}

private def verifyTimeBoundary(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please either revert this move or extract it to separate commit. As it is now, I can not easily tell if something has changed or if that's a simple refactor without any functional change.

@@ -30,18 +30,6 @@ class JoinValidationTest extends TableTestBase {
streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)

/** There should exist exactly two time conditions **/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's happened with this test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, DataStreamJoinRule would match this case after we materialize time indicators in the join condition.

@@ -136,4 +124,66 @@ class JoinValidationTest extends TableTestBase {

streamUtil.verifySql(sql, "n/a")
}

/** Rowtime attributes cannot be accessed in filter conditions yet. */
@Test(expected = classOf[TableException])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add also some trivial expected message matcher, otherwise this can give false negative results.

}

/** Rowtime attributes cannot be accessed in filter conditions yet. */
@Test(expected = classOf[TableException])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@hequn8128
Copy link
Contributor

Hi, @twalthr A neat fix! LGTM.

And thanks for adding the logic that throw an exception in DataStreamJoinRule when the rule doesn't match which I think makes the exceptions more friendly to users. How about add similar logic to DataStreamWindowJoinRule, i.e, throw an exception when window bound has been defined while contains rowtime field in the output.

Best, Hequn

@fhueske
Copy link
Contributor

fhueske commented Nov 5, 2018

Hi @hequn8128, the problem with throwing exceptions in the rules is that they result in canceling the optimizer completely, i.e., as soon as a rule is applied on a plan that does not meet the condition, the query fails even if the query could be transformed into a valid execution plan. I know that Calcite's "Cannot optimize" exceptions are hard to digest, but they are only thrown when the optimizer cannot generate a valid plan.

Aborting the optimization is especially tricky for specialized operators like window join that require very specific conditions. If these conditions are not met, the join can often be executed with a regular (fully materializing) join. I don't think we should speculate whether this is something that the user intended to do or not.

@twalthr
Copy link
Contributor Author

twalthr commented Nov 5, 2018

@fhueske This is also why FLINK-10211 should be fixed quite soon as it would properly materialize the time attributes and would reduce the need for exceptions that a non-advanced user can hardly understand.

@twalthr
Copy link
Contributor Author

twalthr commented Nov 5, 2018

I will merge these changes now to have it in Flink 1.7. But we should aim to fix these issues entirely by FLINK-10211.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants