Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-6250] Distinct procTime with Rows boundaries #3732

Closed
wants to merge 5 commits into from

Conversation

huawei-flink
Copy link

Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.

  • General

    • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
    • The pull request addresses only one issue
    • Each commit in the PR has a meaningful commit message (including the JIRA id)
  • Documentation

    • Documentation has been added for new functionality
    • Old documentation affected by the pull request has been updated
    • JavaDoc for public methods has been added
  • Tests & Build

    • Functionality added by the pull request is covered by tests
    • mvn clean verify has been executed successfully locally or a Travis build has passed

smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)

val distinctValDescriptor : MapStateDescriptor[Any, Row] =
new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", classOf[Any], classOf[Row])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we should use detail Key type and Row type, otherwise it can not be serialize.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I have bypassed the problem by using a different distinctMapState per aggregation [value, long]. I don't think it is necessary to preserve the type in the serialization, as aggregation works with numbers, and these do not have problems with serialization. or am I missing something?

i = 0
while (i < aggregates.length) {
val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
retractVal = retractList.get(0).getField(aggFields(i)(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use two-dimensional array? It seems enough to use one-dim to record the aggregate index.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand your question correctly, the list is necessary to include co-occurring events (i.e. function processed in the same MS). However this part was included in the original code already merged.

retractVal = retractList.get(0).getField(aggFields(i)(0))
if(distinctAggsFlag(i)){
distinctCounter += 1
val counterRow = distinctValueState.get(retractVal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different aggregate param has different type, i think we can not use one mapstate to store like e1,e2 in sum(dist e1), sum(dist e2)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the map state takes Any as key, but I agree with you that I didn't test this aspect through

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test you suggested indeed failed. great point! the new version passes the problem.


val sqlQuery = "SELECT a, " +
" SUM(DIST(e)) OVER (" +
" ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumE " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add some test for multi aggregation with distinct.

Copy link
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @huawei-flink,

thanks for working on this. I think the approach with the MapState[X, Long] is very good. We should integrate this with the code generation. This could work as follows: GeneratedAggregrations is extended by a method initialize(ctx: RuntimeContext). Then the function can register its own state objects. We would use this to generate MapStates for all distinct aggregates. The code generating function CodeGenerator.generateAggregations() would need another Array[Boolean] parameter. If these are set, it generates the initialize method to register state objects. The accumulate() and retract() methods would be generated to check the map state first for distinct aggregates. It would also be good if the code-gen would try to reuse the same MapState if two function aggregate the same distinct field.

Regarding the approach with the DIST() function, I have a few concerns. Although it would work, I think it has a few issues:

  • we would need to remove it (and possibly break the queries of users)
  • using a function to indicate DISTINCT is no semantically correct
  • users could try to applied it anywhere else in a query

Does @rtudoran have an estimate when DISTINCT would be available in Calcite master? The Calcite community usually releases quite often (from my experience at least once in a Flink release cycle).

What do you think?
Best, Fabian

import java.nio.charset.Charset
import java.util.List

import org.apache.calcite.rel.`type`._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many unused imports

*/
object DistinctAggregatorExtractor extends SqlFunction("DIST", SqlKind.OTHER_FUNCTION,
ReturnTypes.ARG0, InferTypes.RETURN_TYPE,
OperandTypes.NUMERIC, SqlFunctionCategory.NUMERIC) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An aggregation function can also return non-numeric types such as MIN(String). So SqlFunctionCategory.NUMERIC might not be the right category. OTOH, I don't know how Calcite uses this category.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one never ends to learn. :-)

@@ -91,6 +93,22 @@ class DataStreamOverAggregate(

val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)

val distinctVarMap: Map[String,Boolean] = new HashMap[String, Boolean]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do the extraction in the DataStreamOverAggregateRule. There we have proper access to the input Calc and the RexProgram. Extraction the function call as a String is quite fragile. The calc could for instance contain an attribute called "DISTRIBUTION".

The rule would remove unnest the expression from the DIST() RexNode and remove DIST. The distinct information would need to be added to the DataStreamOverAggregate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. The string trick is anyway a temporary workaround.

val exp = iter.next
if(exp.contains("DIST")){
val varName = exp.substring(exp.indexOf("$"))
distinctVarMap.put(varName,true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+space

isRowTimeType: Boolean,
isRowsClause: Boolean): DataStream[Row] = {

val overWindow: Group = logicWindow.groups.get(0)
val partitionKeys: Array[Int] = overWindow.keys.toArray
val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates

val aggregateCalls = overWindow.getAggregateCalls(logicWindow)
val distinctAggFlags: Array[Boolean] = new Array[Boolean](aggregateCalls.size)
for (i <- 0 until aggregateCalls.size()){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+space .size()) {

// decrease the counter and continue
else {
distinctValCounter -= 1
distinctValueStateList(i).put(retractVal,distinctValCounter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+space

distinctValCounter -= 1
distinctValueStateList(i).put(retractVal,distinctValCounter)
}
}else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+space


// get oldest element beyond buffer size
// and if oldest element exist, retract value
var removeCounter :Integer = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used

// get oldest element beyond buffer size
// and if oldest element exist, retract value
var removeCounter :Integer = 0
var distinctCounter : Integer = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used

var distinctValCounter: Long = distinctValueStateList(i).get(inputValue)
// if counter is 0L first time we aggregate
// for a seen value but never accumulated
if(distinctValCounter == 0L){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't MapState.get return null when the key is not contained?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it is a long, it returns 0L, don't ask me why. :-D

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, good to know :-)

@stefanobortoli
Copy link
Contributor

@fhueske, I agree with you about the risk of temporary DIST() ingestion. Perhaps we could meanwhile just work on the "ProcessFunction + Code generation" keeping the DIST function for test purposes tests. My concern is that the code my change again and all the work would just be wasted. To be honest, the code generation is quite new to me, and I will have to learn to work on that. Meanwhile, I have almost completed a version that relies on current code generation, nesting the distinct logic. As it is almost done, I will share this one as well and then if necessary move to the code generation. what do you think?

@fhueske
Copy link
Contributor

fhueske commented Apr 21, 2017

sounds good to me. IMO, we can also add the runtime code to the code base even if there is no API support if we cover it with test cases. Then we could quickly enable it once DISTINCT in OVER becomes available.

@fhueske
Copy link
Contributor

fhueske commented Apr 21, 2017

Btw, the code generation is not so fancy. The best way to learn it would be to debug a simple batch GROUP BY query (once batch aggregations are code-gen'd) as well

@stefanobortoli
Copy link
Contributor

@fhueske I have just pushed a version working with code generation (without modifying the code generation) There will be the need for some refactoring in the AggregateUtil function, but if the overall concept is sound, I will fix things.

@hongyuhong , @shijinkui you could also have a look if you have time.

@rtudoran
Copy link
Contributor

@fhueske @stefanobortoli I suggest we merge this temporary solution into flink (with using a special marker for distinct) until the flink module will be upgraded to the next calcite release. I have fixed the issue into calcite.
However, the advantages of pushing already this is that:

  1. we can reuse the code
  2. when we have the distinct marker we can simply modify the check for distinct for the aggregates in the DataStreamOver

@stefanobortoli
Copy link
Contributor

@fhueske @sunjincheng121 @shijinkui @hongyuhong I have created a PR with the latest master with the code generated distinct, #3771 please have a look. If we it is fine, we can basically support distinct for all the window types

@fhueske
Copy link
Contributor

fhueske commented Apr 26, 2017

Thanks @stefanobortoli! I'll have a look at #3771

@fhueske
Copy link
Contributor

fhueske commented Apr 26, 2017

Since PR #3771 is the follow up of this PR, could you close this one? Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants