New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction #7253
Conversation
7c7eb80
to
60e5d6f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for putting the PR together so quickly @dianfu . I haven't got a close look and will checkout and play around over the weekend. I just left a few suggestions based on a quick look. Please kindly take a look and see if they make sense :-)
|
||
@Test | ||
def testCollectAggregate(): Unit = { | ||
val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is deprecated right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. That's because GroupAggProcessFunction extends ProcessFunction
, not KeyedProcessFunction
and so we have to use LegacyKeyedProcessOperator at present. We can definitely improve GroupAggProcessFunction
to extend KeyedProcessFunction
, what about improving it in another ticket?
def testCollectAggregate(): Unit = { | ||
val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow]( | ||
new GroupAggProcessFunction( | ||
genCollectAggFunction, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is the only place related to the actual agg function. we can probably make this more generic (see how window operator test harness was implemented)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reimplemented this PR. Please help to take a look if this issue still exists.
expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 1).asJava), 1)) | ||
testHarness.processElement(new StreamRecord(CRow(2L: JLong, 1: JInt, "bbb"), 1)) | ||
expectedOutput.add(new StreamRecord(CRow("bbb", Map(1 -> 1).asJava), 1)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add something like:
// do a snapshot & close
State snapshot = testHarness.snapshot(0L, 0L);
testHarness.close();
// reopen and restore
testHarness = createTestHarness(operator);
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
this will catch some of the weird serialization/deserialization problem as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. What about adding this kind of tests for the operator tests, such as GroupAggregateHarnessTest, JoinHarnessTest, etc as I think it's more useful for operator test.
cecc443
to
0dfe579
Compare
@twalthr @sunjincheng121 @walterddr I have added an
Currently I have not changed other harness tests. We can improve other harness test in this way in the following PRs once we are sure this direction is what we want. Could you help to take a look at this PR? Thanks a lot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick update @dianfu. I myself like the way how the operator code is now constructed through SQL instead of hard-coded codegen. The only concern I have is that this might lead to more of an "ITCase" instead of unit-test as it adds possibility that test exception can now come from compilation. Maybe @twalthr can provide more insight on this.
Overall the change looks very good to me. I only added minor comments. If we all agree with how the harness test codegen can be replaced by actual compilation, we can definitely work together and change others as well. 👍
@@ -87,7 +85,7 @@ class HarnessTestBase { | |||
new RowTypeInfo(distinctCountAggregates.map(getAccumulatorTypeOfAggregateFunction(_)): _*) | |||
|
|||
protected val distinctCountDescriptor: String = EncodingUtils.encodeObjectToString( | |||
new MapStateDescriptor("distinctAgg0", distinctCountAggregationStateType, Types.LONG)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change doesn't seem necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} else { | ||
extractExpectedTransformation(one.getInput, prefixOperatorName) | ||
} | ||
case two: TwoInputTransformation[_, _, _] => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's throw unsupported operation for now, since there's no code path that executes two input transform yet. we can always add this logic later when necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
testHarness.close() | ||
} | ||
|
||
private def getState( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can probably be put into HarnessTestBase
as well. As of now I can only image the Operator to be accessible only when we need to manipulate the internal State to mock statebackend operation on top.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@walterddr Thanks a lot for your review and comments. Have updated the PR accordingly.
testHarness.close() | ||
} | ||
|
||
private def getState( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. Done.
@@ -87,7 +85,7 @@ class HarnessTestBase { | |||
new RowTypeInfo(distinctCountAggregates.map(getAccumulatorTypeOfAggregateFunction(_)): _*) | |||
|
|||
protected val distinctCountDescriptor: String = EncodingUtils.encodeObjectToString( | |||
new MapStateDescriptor("distinctAgg0", distinctCountAggregationStateType, Types.LONG)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} else { | ||
extractExpectedTransformation(one.getInput, prefixOperatorName) | ||
} | ||
case two: TwoInputTransformation[_, _, _] => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@twalthr Sorry for interrupt. It would be great if you can share some thoughts on the changes of this PR. Thanks in advance. |
@twalthr @sunjincheng121 Could you help to take a look at this PR? You can see from the harness test of #7286 that this change makes it very easy to write a new harness test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @dianfu Thanks for your great job, I like your way of solving HarnessTest, which overrides the codegen logic and covers the test of the use of dataView by acc. Very good!
I only left two minor suggestions.
Thanks,
Jincheng
expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 2).asJava), 1)) | ||
expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2, 2 -> 1).asJava), 1)) | ||
|
||
// remove some state: state may be cleaned up by the state backend if not accessed more than ttl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more than ttl
-> beyond ttl time
?
@@ -491,13 +491,83 @@ class HarnessTestBase { | |||
distinctCountFuncName, | |||
distinctCountAggCode) | |||
|
|||
def createHarnessTester[KEY, IN, OUT]( | |||
dataStream: DataStream[_], | |||
prefixOperatorName: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How a bout add aggFieldAlias: String = ""
, for resolve scenarios where GroupBy is included in multiple UNION clauses. e.g:
(SELECT b, max(a) as maxA FROM T GROUP BY b)
UNION (
SELECT b, min(a) as minA FROM (
SELECT a, b FROM T GROUP BY a, b
) GROUP BY b
)
And we using this method as follows: createHarnessTester(xx, "groupBy", "minA")
。
I didn't find a case where I had to test it in this way, it was just an enhanced suggestion.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createHarnessTester will be used not only in agg related tests but also in other harness tests, such as stream join tests, temporal join tests, sort tests, etc. So field aggFieldAlias seems a little wired from my point of view. What about adding it when we really need it? At that time we may have a better idea on how such a field will look like. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, If this tester will using in join test, we should add TwoInputTransformation check in the extractExpectedTransformation
logic, then we also need add a xxName(may be not named aggFieldAlias), e.g.: In multiple join scenes
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right. Will do that when updating the join related harness tests.
…ckend and add harness tests for CollectAggFunction
712c88a
to
56aa9e6
Compare
@sunjincheng121 Thanks a lot for your review, appreciate! Updated the PR accordingly. |
Thanks for the update and feedback @dianfu Bests, |
Merging... |
@sunjincheng121 @walterddr Thanks a lot for the review and merge. Very appreciated! |
…ckend and add harness tests for CollectAggFunction This closes apache#7253
What is the purpose of the change
This pull request enables the harness tests to test with RocksdbStateBackend and adds harness test for CollectAggFunction
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation