Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE #7189

Merged
merged 2 commits into from
Nov 30, 2018

Conversation

dawidwys
Copy link
Contributor

What is the purpose of the change

This PR enables user defined scalar function in MATCH_RECOGNIZE

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@dawidwys dawidwys changed the title [FLINK-10597] Enabled UDFs support in MATCH_RECOGNIZE [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE Nov 28, 2018
@twalthr twalthr self-assigned this Nov 28, 2018
Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @dawidwys. I added a couple minor comments.

if (clazz == classOf[IterativeCondition[_]]) {
val baseClass = classOf[IterativeCondition[_]]
val (functionClass, signature, inputStatements) =
if (clazz == classOf[IterativeCondition[_]] || clazz == classOf[RichIterativeCondition[_]]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to support both interface and rich class? I would keep it simple here and only support the rich variant.

LOG.debug("Instantiating IterativeCondition.")
function = clazz.newInstance()
// TODO add logic for opening and closing the function once it can be a RichFunction
FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why a distributed cache is not supported in a CepRuntimeContext? We don't expose many context properties in SQL UDFs but the distributed cache apparently seems to be important.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is any particular reason. Just wanted to maximally limit the scope. If it is important though, I will change that.

init()
}
override def close(): Unit = {
super.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line?

init()
}
override def close(): Unit = {
super.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line?

}
override def close(): Unit = {
super.close()
function.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use org.apache.flink.api.common.functions.util.FunctionUtils#closeFunction instead?

}
override def close(): Unit = {
super.close()
function.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use org.apache.flink.api.common.functions.util.FunctionUtils#closeFunction instead?

@@ -543,10 +543,79 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase {
// We do not assert the proctime in the result, cause it is currently
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this problem has not been solved with the new rich function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately not, there is a separate issue for this: https://issues.apache.org/jira/browse/FLINK-10596

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just introduced a new rich class. I think it would also be a perfect time to think about this shortcoming.

@@ -543,10 +543,79 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase {
// We do not assert the proctime in the result, cause it is currently
// accessed from System.currentTimeMillis(), so there is no graceful way to assert the proctime
}

@Test
def testRichUdfs(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: There are no RichUdfs just UserDefinedFunctions ;-)

}

class ToMillis extends ScalarFunction {
def eval(t: Timestamp): Long = {
t.toInstant.toEpochMilli + TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli)
}
}

private class RichScalarFunc extends ScalarFunction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give more meaningful name.

}

override def close(): Unit = {
prefix = "ERROR_VALUE"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line has no meaning so far. Remove the entire method?

Copy link
Contributor

@dianfu dianfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dawidwys Thanks a lot for the PR. Have left a few comments.

@@ -131,47 +132,39 @@ class MatchCodeGenerator(
: GeneratedFunction[F, T] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can already reuse FunctionCodeGenerator.generateFunction() as IterativeCondition and Pattern(Flat)SelectFunction already support rich interfaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one additional obstacle right now. The reusePatternLists() call, I would remove the comment and leave it separated, as there might be some additional differences in the future. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add something like reusablePrepareInputStatements in CodeGenerator and place it before reuseInputUnboxingCode when generating the function. Then we can make use of reusablePrepareInputStatements to construct the reusePatternLists. But I'm fine with the current implementation as reusablePrepareInputStatements is currently only useful for CEP.

@@ -35,20 +36,26 @@ import org.apache.flink.util.Collector
class PatternSelectFunctionRunner(
name: String,
code: String)
extends PatternFlatSelectFunction[Row, CRow]
with Compiler[PatternSelectFunction[Row, Row]]
extends RichPatternFlatSelectFunction[Row, CRow]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we use a RichPatternFlatSelectFunction which actually wraps a RichPatternSelectFunction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need access to the collector to be able to erase timestamp.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Make sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dawidwys I have rethought about this issue and what about always generating a PatternFlatSelectFunction no matter ONE ROW PER MATCH or ALL ROWS PER MATCH is specified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, had that in mind. I would change that either during introducing the ALL ROWS PER MATCH or as a separate issue, as it is not directly connected with this one and requires more changes in the code generator.

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @dawidwys. Feel free to merge this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants