Skip to content

Conversation

@wuchong
Copy link
Member

@wuchong wuchong commented Mar 17, 2020

What is the purpose of the change

If a computed column refers a rowtime column (the column has been defined watermark), a query on the table will be failed with the following exception:

[ERROR] Could not execute SQL statement. Reason:
 java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
 validated type:
 RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIMESTAMP(3) ts) NOT NULL
 converted type:
 RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIME ATTRIBUTE(ROWTIME) ts) NOT NULL
 rel:
 LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[$3], ts=[$4])
 LogicalWatermarkAssigner(rowtime=[order_time], watermark=[$1])
 LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[*($2, 1000)], ts=[+($1, 28800000:INTERVAL HOUR)])
 LogicalTableScan(table=[[default_catalog, default_database, orders, source: [Kafka010TableSource(order_id, order_time, amount)]]])

Brief change log

  • erase time indicators before translate CatalogSourceTable into RelNodes in CatalogSourceTable#toRel.

Verifying this change

  • added a unit plan test to reproduce this problem and verify the changes.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@wuchong wuchong requested a review from JingsongLi March 17, 2020 03:46
@wuchong
Copy link
Member Author

wuchong commented Mar 17, 2020

cc @danny0405

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit a891d89 (Tue Mar 17 03:48:46 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 17, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build


// 2. push computed column project
val fieldNames = rowType.getFieldNames.asScala
val fieldNames = erasedRowType.getFieldNames.asScala
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the right place to fix is the SqlToOperationConverter#createTableSchema, before we make the computed column type inference, the time attribute should be patched up based on the watermark definitions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After an offline discussion with @danny0405 , we reached an consensus that erasing time indicators in CatalogSourceTable#toRel is the only way to fix this for now. So we will keep the current implementation in PR.

@JingsongLi
Copy link
Contributor

I don't fully understand this, if we just throw time indicator information, how to support process time field? Downstream operators how to know this is a process time field?

@wuchong
Copy link
Member Author

wuchong commented Mar 20, 2020

I don't fully understand this, if we just throw time indicator information, how to support process time field? Downstream operators how to know this is a process time field?

We didn't drop the time indicator information. The RelDataType from CatalogSourceTable to TableScan -> Project -> Watermark is identical, which means it keeps the time indicators. After translation, the time indicator is attached by Watermark node.

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, looks fine from my side.

@libenchao
Copy link
Member

LGTM, +1 for merging.

@wuchong
Copy link
Member Author

wuchong commented Mar 22, 2020

Thanks for the reviewing @libenchao @danny0405 , will merge this.

@wuchong wuchong merged commit 54395d9 into apache:master Mar 22, 2020
@wuchong wuchong deleted the fix-computed-column-ref-rowtime branch March 22, 2020 07:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants