Skip to content

Conversation

@wuchong
Copy link
Member

@wuchong wuchong commented Apr 28, 2019

What is the purpose of the change

Support translate "FOR SYSTEM_TIME AS OF" query into temporal table join for both Batch and Stream.

Brief change log

  • Introduce some API interface to support lookup table source
    • LookupableTableSource, AsyncTableFunction, DefinedPrimaryKey, DefinedTableIndex, TableIndex, LookupConfig
  • Support for generating optimized logical & physical plan for temporal table join for STREAM and BATCH.
  • Introduce temporal table join operators and async join operators for runtime.

Some differences between this PR and blink branch.

  • Support returns multiple lines for async temporal table join
  • Only allow FOR SYSTEM_TIME AS OF on left table's proctime field, not a PROCTIME() builtin function. This makes syntax clean.
  • Lookup function support external types (i.e. String, Timestamp...).

Verifying this change

  • Add several validation tests, plan tests , and integration tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@wuchong
Copy link
Member Author

wuchong commented Apr 29, 2019

Hi @godfreyhe , I add the JoinPushExpressionsRule.INSTANCE rule into predicate pushdown stage to make sure the calculation expression can be pushed down (to make sure ON mod(a, 4) = b is an equi join). That's why several results of plan test are changed. I have checked, the changes are reasonable.

It would be nice if you can have a look too.

/**
* The async join runner to lookup the dimension table.
*/
public class TemporalTableJoinAsyncRunner extends RichAsyncFunction<BaseRow, BaseRow> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add tests for these runners?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

new OneInputOperatorWrapper(genOperator)
}

private[flink] def generateFunction[T <: Function](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't adding this to FunctionCodeGenerator?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it actually generate code for a CalcProgram, it needs to access the private method generateProcessCode in this file. And FunctionCodeGenerator only accepts code body as parameter, not the `CalcProgram.

What do you think about renaming the method name to generateCalcFunction to align with generateCalcOperator ?

<Resource name="sql">
<![CDATA[
SELECT * FROM MyTable AS T
JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we want to attach the proctime to table T, as opposed to just use proctime()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I think using proctime() here is a little semantic ambiguous. Temporal table join is joining the snapshot of temporal table, the version of snapshot is determined by left table's time field. Say left.proctime, it means joining the current version of temporal table.

But temporalTest FOR SYSTEM_TIME AS OF PROCTIME() is not related to left table, it only returns one (latest) version of temporal table and then be joined with left table. It is the same with

SELECT * 
FROM left AS L, 
JOIN (
  SELECT * FROM temporalTest FOR SYSTEM_TIME AS OF PROCTIME()
) AS R ON L.id = R.id

That's why I want to restrict it a bit more in the first version. We can discuss it, and can support PROCTIME() if we think it is needed. It should be supported easily with several lines change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. The example you gave will cause different behavior with what we expected. The protime() can be translated to the processing time we begin to scan right table, and in this case, right table will become a static table. This is not we want.

@wuchong
Copy link
Member Author

wuchong commented May 4, 2019

Hi @KurtYoung , I have addressed the review comments.

  1. add unit tests for all join runners
  2. fix code generation problem when temporal join with udf filter (a bug found recently in internal)

* @param <T> type of the result
*/
@PublicEvolving
public interface LookupableTableSource<T> extends TableSource<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the relationship between this LookupableTableSource and DefinedIndexes and DefinedPrimaryKey? Does this table source support getting lookup function for random keys, even if the columns have no index on them or not the primary key?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the lookup key of LookupableTableSource must be either primary key or index. Therefore, a concrete LookupableTableSource should also implement DefinedIndexes or DefinedPrimaryKey.

Does this table source support getting lookup function for random keys, even if the columns have no index on them or not the primary key?

No. It will throw exceptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any better solution for this? It's hard for user to realize that LookupableTableSource should be used with DefinedIndexes or DefinedPrimaryKey

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, we will keep DefinedIndexes and DefinedPrimaryKey in the first version. And integrate them into table source v2 interface in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface should also implement DefinedIndexes and DefinedPrimaryKey?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to uncouple LookupableTableSource and DefinedIndexes&DefinedPrimaryKey. Because in order to make a lookup table source work, it only needs to provide a primary key or index, not both.

override def matches(call: RelOptRuleCall): Boolean = {
val join = call.rel[FlinkLogicalJoin](0)
val tableScan = call.rel[TableScan](3)
matches(join, tableScan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also check if it's snapshotted by proctime?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, whether it is snapshotted by proctime or rowtime will both be translated into temporal table join. And will throw exception if it is rowtime when physical translation, because we don't support rowtime temporal join currently.

So I think we don't need to check proctime here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed current TemporalTableJoin is a SingleRel, which is not suitable for further extension after we support scanning data into state and provide event time join. So i think it's inappropriate to translate snapshotted with rowtime to TemporalTableJoin and then throw exception inside this physical operator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

I'm also rethinking about the physical node name of TemporalTableJoin and TemporalTableFunctionJoin (i.e. temporal join in Flink). The node names are really confused to users. Actually, the TemporalTableJoin is joining a dimension table which is a SingleRel.

So how about renaming TemporalTableJoin to DimensionTableJoin, and renaming TemporalTableFunctionJoin to TemporalJoin. And we can change the translation rule to:

  • snapshot on proctime & table source supports LookupableTableSource ==> DimensionTableJoin
  • snapshot on rowtime & table source ONLY supports LookupableTableSource ==> exception
  • snapshot on proctime/rowtime & table source supports scanning ==> TemporalJoin

What do you think ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this idea. How about rename DimensionTableJoin to LookupJoin, and rename TemporalJoin to TemporalTableJoin.

Conceptually LookupJoin is very similar with NestedLoopJoin but is a SingleRel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to LookupJoin.

The only concern of TemporalTableJoin is that it is actually joining two streams, but there is a Table in the name. Do you think it matters?

Copy link
Contributor

@KurtYoung KurtYoung May 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because i think TemporalTable is more consistent with sql standard. TemporalJoin will confuse others about what is temporal? It's not intuitive that this is actually used for temporal table join. You can consider this name as TemporalTable ' Join

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's go forward with LookupJoin and TemporalTableJoin.

@wuchong
Copy link
Member Author

wuchong commented May 7, 2019

Hi @KurtYoung , I renamed the node name to LookupJoin and move the snapshot on proctime verifying to rules in the second commit: rename to LookupJoin.

* @param <T> type of the result
*/
@PublicEvolving
public interface LookupableTableSource<T> extends TableSource<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface should also implement DefinedIndexes and DefinedPrimaryKey?


new OneInputTransformation(
inputTransformation,
"TemporalTableJoin",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the name here


@Override
public void collect(T record) {
this.collected = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this method call collector.collect?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and i just found that the sub-class of this class RowToBaseRowCollector is actually calling getCollector.asInstanceOf[Collector[BaseRow]].collect(result),

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the collector.collect should be called by sub-classes. Because the getCollector should collect a final result of the Correlate, i.e. a JoinedRow combines left input and right row.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What i meant is this class already holds a private Collector<?> collector;, and also implements Collector<T> interface. In this collect(T record) method, why don't you collect the record with your own collector, but rely on the caller to first getCollector() and collect records?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because TableFunctionCollector abstract class does't know the what is the record type T. It might be a BaseRow, but can also be a Row (see RowToBaseRowCollector). As a result, TableFunctionCollector can't collect by himself, because he doesn't know what the result record is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then i think this class shouldn't be a collector, aka shouldn't implementing Collector<T> interface

Copy link
Member Author

@wuchong wuchong May 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to keep the interface, because it is the basic implementation of UDTF's collector, which is used to accept UDTF's result value.

What about provide a outputResult method used to collect final result. And remove getCollector() method and collect(T record) implementation.

	public void outputResult(Object result) {
		this.collected = true;
		collector.collect(result);
	}

All the sub-classes can use outputResult(result) to collect final result instead of getCollector().collect(result).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, let's try this.

/**
* Sets the current collector, which used to emit the final row.
*/
public void setCollector(ResultFuture<?> resultFuture) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setResultFuture?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

/**
* Gets the internal collector which used to emit the final row.
*/
public ResultFuture<?> getCollector() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getResultFuture?

/**
* The basic implementation of collector for {@link ResultFuture} in table joining.
*/
public abstract class TableFunctionResultFuture<T> extends AbstractRichFunction implements ResultFuture<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The responsibility of this class and TableFunctionCollector is not clean. It looks like they want to do some encapsulation but actually only contain some get and set methods. And the interface contract also seems not consistent, see the comment i left in TableFunctionCollector

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The responsibility of them are used to generate a collector which will do some filters or projections on both left input row and right table row. That's why we have setInput, getInput and a setCollector to set the real underlying collector.

The only difference between them is TableFunctionCollector has an additional collected flag used to indicate whether right table is empty. Because UDTF may call collect(T) zero or several times, we need a way to know whether it is called zero times, so that we can emit an null row for left join. However, ResultFuture.complete(Collection<OUT> result) will be called exactly once, so that we don't need a flag to indicate the "zero call", an empty or null result is the "zero call".

TableFunctionResultFuture<BaseRow> resultFuture = generatedResultFuture.newInstance(
getRuntimeContext().getUserCodeClassLoader());
FunctionUtils.setFunctionRuntimeContext(resultFuture, getRuntimeContext());
FunctionUtils.openFunction(resultFuture, new Configuration());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider saving parameters of open for use here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

TableFunctionResultFuture<BaseRow> resultFuture = generatedResultFuture.newInstance(
getRuntimeContext().getUserCodeClassLoader());
FunctionUtils.setFunctionRuntimeContext(resultFuture, getRuntimeContext());
FunctionUtils.openFunction(resultFuture, new Configuration());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider close resultFuture?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

wuchong added a commit to wuchong/flink that referenced this pull request May 8, 2019
wuchong added a commit to wuchong/flink that referenced this pull request May 8, 2019
@wuchong
Copy link
Member Author

wuchong commented May 8, 2019

Comments addressed

wuchong added a commit to wuchong/flink that referenced this pull request May 10, 2019
@wuchong
Copy link
Member Author

wuchong commented May 10, 2019

Rebased.

@wuchong
Copy link
Member Author

wuchong commented May 13, 2019

@KurtYoung TableFunctionCollector has been refactored.

@KurtYoung
Copy link
Contributor

+1

@asfgit asfgit closed this in 03ba663 May 13, 2019
@wuchong wuchong deleted the temporalJoin branch May 13, 2019 09:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants