Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[table] Add support for temporal join on rolling aggregates #24506

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

schevalley2
Copy link

What is the purpose of the change

This is more of a proposal to demonstrate a possible fix. I am looking for feedback for people that are more knowledgeable.

Following this thread on the mailing list: https://lists.apache.org/thread/9q7sjyqptcnw1371wc190496nwpdv1tz

Given an order table:

CREATE TABLE orders (
    order_id INT,
    price DECIMAL(6, 2),
    currency_id INT,
    order_time AS NOW(),
    WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
) WITH (…)

and a currency rate table:

CREATE TABLE currency_rates (
    currency_id INT,
    conversion_rate DECIMAL(4, 3),
    created_at AS NOW(),
    WATERMARK FOR created_at AS created_at - INTERVAL '2' SECOND
    PRIMARY KEY (currency_id) NOT ENFORCED
) WITH (…)

that we would aggregate in an unbounded way like this:

CREATE TEMPORARY VIEW max_rates AS (
    SELECT
        currency_id,
        MAX(conversion_rate) AS max_rate
    FROM currency_rates
    GROUP BY currency_id
);

It's not possible to do a temporal join between orders and max_rates and it fails with the following error:

Exception in thread "main" org.apache.flink.table.api.ValidationException:
Event-Time Temporal Table Join  requires both primary key and row time 
attribute in versioned table, but no row time attribute can be found.

After some investigation we realised the way the temporal join checks for event/proc time is by looking if the row types contains some timing information, so we added to max_rates another columns like this:

CREATE TEMPORARY VIEW max_rates AS (
    SELECT
        currency_id,
        MAX(conversion_rate) AS max_rate,
        LAST_VALUE(created_at) AS updated_at
    FROM currency_rates
    GROUP BY currency_id
);

However, LAST_VALUE does not support timestamp type (FLINK-15867). We added that and we ended up with a Planner assertion error:

java.lang.AssertionError: Sql optimization: Assertion error: type mismatch:
ref:
TIMESTAMP_LTZ(3) *ROWTIME* NOT NULL
input:
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL

	at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:79)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)

We understood the issue was in the way RelTimeIndicatorConverter rewrites the FlinkLogicalJoin. Left and right input expressions get their TimeIndicatorRelDataType types replaced by normal timestamps but in the temporal join case, the condition is not rewritten (but in the else it's actually done).

However, I thought it might not be the solution to the problem. So I also compared if I replaced max_rates definition with a simple SELECT like in:

CREATE TEMPORARY VIEW max_rates AS (
       SELECT
        currency_id,
        conversion_rate AS max_rate,
        created_at AS updated_at
    FROM currency_rates
);

What I've noticed is that the timestamp on the right side of the join is not replaced and stay a TimeIndicatorRelDataType. This is because the graph of RelNode on the right side is:

FlinkLogicalTableSourceScan -> FlinkLogicalCalc -> FlinkLogicalWatermarkAssigner -> FlinkLogicalSnapshot -> FlinkLogicalJoin

and WatermarkAssigner overrides deriveRowType which actually force the TimeIndicatorRelDataType to be there, whereas for the FlinkLogicalAggregate it simply gets converted into a normal timestamp.

So the use of LAST_VALUE(…) here is a bit of a hack to keep having the time information in the query. It actually would not even work depending on the aggregation ones want to write.

However, it seems that supporting temporal join with rolling aggregate would be a good idea.

Looking forward to discuss more on this with you.

Brief change log

Verifying this change

  • Added test that execute the type of queries we wanted to support
  • Added tests that checks that with the new feature disabled, the join is not supported

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): don't no / no (it's part of the planner)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? not documented

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 15, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@schevalley2 schevalley2 force-pushed the temporal-join-on-rolling-aggregates branch 2 times, most recently from 555cf79 to f78ce8e Compare March 18, 2024 12:07
@schevalley2 schevalley2 force-pushed the temporal-join-on-rolling-aggregates branch from f78ce8e to b3ffe10 Compare March 25, 2024 11:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants