Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10259] [table] Fix key extraction for GroupWindows. #6641

Closed
wants to merge 2 commits into from

Conversation

fhueske
Copy link
Contributor

@fhueske fhueske commented Aug 31, 2018

What is the purpose of the change

Fixes the key validation for SQL queries with group window aggregates that are emitted to an UpsertStreamTableSink.

Brief change log

  • Handle window properties (start, end, rowtime) in one equivalence group because all property attributes identify the same window.
  • Add tests for SQL INSERT INTO of different result types to different types of table sinks.

Verifying this change

  • Run the added tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? n/a

Copy link
Contributor

@hequn8128 hequn8128 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fhueske Thanks for fixing the bug. I left some minor suggestions.


import scala.collection.JavaConverters._

class InsertIntoITCase extends AbstractTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extends StreamingWithStateTestBase?

class InsertIntoITCase extends AbstractTestBase {

@Test
def testInsertIntoAppendStreamToTableSink(): Unit = {
Copy link
Contributor

@hequn8128 hequn8128 Sep 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have this test case, should we remove the testInsertIntoMemoryTable in SqlITCase ? It seems the two test cases are very similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

"Received retraction messages for append only table",
results.exists(!_.f0))

val retracted = RowCollector.upsertResults(results, Array(0, 1, 2)).sorted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Array(0, 1, 2) -> Array(0, 1)

@twalthr twalthr changed the title [FLINK-10259] Fix key extraction for GroupWindows. [FLINK-10259] [table] Fix key extraction for GroupWindows. Sep 6, 2018
@fhueske
Copy link
Contributor Author

fhueske commented Sep 16, 2018

Thanks for the review @hequn8128!
I've updated the PR.

@hequn8128
Copy link
Contributor

Thanks for the update @fhueske . Looks good to me. +1 to merge

@fhueske
Copy link
Contributor Author

fhueske commented Sep 18, 2018

Thanks for the review @hequn8128! Merging

@asfgit asfgit closed this in f28b829 Sep 18, 2018
asfgit pushed a commit that referenced this pull request Sep 18, 2018
@fhueske fhueske deleted the fixKeyValidator branch September 18, 2018 23:12
asfgit pushed a commit that referenced this pull request Sep 19, 2018
kl0u pushed a commit to kl0u/flink that referenced this pull request Sep 20, 2018
Clarkkkkk pushed a commit to Clarkkkkk/flink that referenced this pull request Mar 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants