Skip to content

Conversation

@StephanEwen
Copy link
Contributor

The DataStream API currently has two aggregation functions that can be used on windows and in state, both of which have limitations:

  • ReduceFunction only supports one type as the type that is added and aggregated/returned.
  • FoldFunction Supports different types to add and return, but is not distributive, i.e. it cannot be used for hierarchical aggregation, for example to split the aggregation into to pre- and final-aggregation.

This pull request adds a generic and powerful aggregation function that supports:

  • Different types to add, accumulate, and return
  • The ability to merge partial aggregated by merging the accumulated type.

The requirement for this addition came from both the Table API and from plans to improve the efficiency of internal state handling.

Part1: Cleaning up the internal state abstraction

The main change is in commit ca5ce84

State interfaces (like ListState, ValueState, ReducingState) are very sparse and contain only methods exposed to the users. That makes sense to keep the public stable API minimal.
At the same time, the runtime needs more methods for its internal interaction with state, such as:

  • setting namespaces
  • accessing raw values
  • merging namespaces

These are currently realized by re-creating or re-obtaining the state objects from the KeyedStateBackend. That method causes quite an overhead for each access to the state. The KeyedStateBackend tries to do some tricks to reduce that overhead, but does it only partially and induces other overhead in the course.

The root cause of all these issues is a problem in the design: There is no proper "internal state abstraction" in a similar way as there is an external state abstraction (the public state API).
This adds a similar hierarchy of states for the internal methods:

             State
               |
               +-------------------InternalKvState
               |                         |
          MergingState                   |
               |                         |
               +-----------------InternalMergingState
               |                         |
      +--------+------+                  |
      |               |                  |
 ReducingState    ListState        +-----+-----------------+
      |               |            |                       |
      +-----------+   +-----------   -----------------InternalListState
                  |                |
                  +---------InternalReducingState

Part 2: The AggregateFunction

The main change is in commit 4a9fe96

The proposed interface is below. This type of interface is found in many APIs, like that of various databases, and also in Apache Beam:

  • The accumulator is the state of the running aggregate
  • Accumulators can be merged
  • Values are added to the accumulator
  • Getting the result from the accumulator perform an optional finalizing operation
public interface AggregateFunction<IN, ACC, OUT> extends Function {

	/** create a holder for the intermediate state */
	ACC createAccumulator();

	/** adds a value into the accumulator +/
	void add(IN value, ACC accumulator);

	/** Gets the aggregate from the accumulator, possibly finalizing it */
	OUT getResult(ACC accumulator);

	/** Merges two accumulators +/
	ACC merge(ACC a, ACC b);
}

Example use:

public class AverageAccumulator {
    long count;
    long sum;
}

// implementation of a simple average
public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {

    public AverageAccumulator createAccumulator() {
        return new AverageAccumulator();
    }

    public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
        a.count += b.count;
        a.sum += b.sum;
        return a;
    }

    public void add(Integer value, AverageAccumulator acc) {
        acc.sum += value;
        acc.count++;
    }

    public Double getResult(AverageAccumulator acc) {
        return acc.sum / (double) acc.count;
    }
}

// implementation of a weighted average
// this reuses the same accumulator type as the aggregate function for 'average'
public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {

    public AverageAccumulator createAccumulator() {
        return new AverageAccumulator();
    }

    public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
        a.count += b.count;
        a.sum += b.sum;
        return a;
    }

    public void add(Datum value, AverageAccumulator acc) {
        acc.count += value.getWeight();
        acc.sum += value.getValue();
    }

    public Double getResult(AverageAccumulator acc) {
        return acc.sum / (double) acc.count;
    }
}

Copy link
Contributor

@shaoxuan-wang shaoxuan-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, Stephan. Thanks for implementing this sweet new API for aggregate. I was trying to conducted an integration test with your new API. But it seems KVStateRequestSerializerRocksDBTest is broken. Can you please take a look.

Error:(62, 22) java: <N,T,ACC>createFoldingState(org.apache.flink.api.common.typeutils.TypeSerializer,org.apache.flink.api.common.state.FoldingStateDescriptor<T,ACC>) in org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend cannot override <N,T,ACC>createFoldingState(org.apache.flink.api.common.typeutils.TypeSerializer,org.apache.flink.api.common.state.FoldingStateDescriptor<T,ACC>) in org.apache.flink.runtime.state.AbstractKeyedStateBackend
return type org.apache.flink.api.common.state.FoldingState<T,ACC> is not compatible with org.apache.flink.runtime.state.internal.InternalFoldingState<N,T,ACC>

Error:(87, 53) java: incompatible types: org.apache.flink.api.common.state.ListState cannot be converted to org.apache.flink.runtime.state.internal.InternalListState<N,T>

Copy link
Contributor

@shaoxuan-wang shaoxuan-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I skipped (disabled) KVStateRequestSerializerRocksDBTest. Then with your new aggregate API on WindowedStream, I have tested some aggregates with tumble window as well as session window. It works pretty well (both tests returned expected results). Good job!


ACC createAccumulator();

void add(IN value, ACC accumulator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As proposed in https://goo.gl/00ea5j, this function will not only handle the accumulate, but also handle the retract. Instead of "add", can you please consider to use "update".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retractions are specific to the Table API. Do you expect this same interface to be used for user-defined aggregations there as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first feeling is to keep the name add() because it fits better together with the term Accumulator. One can view retractions as adding negative values. What do you think about that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableAPI UDAGG will be eventually translated to this windowStream API. The accumulate and retract will be handled in this add function. I think it is OK if we "view retractions as adding negative values".


OUT getResult(ACC accumulator);

ACC merge(ACC a, ACC b);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it is useful to extend merge function to accept a list of ACC: ACC merge(List a). There are cases where the group merging a list of instances is much more efficient than just merge only two instances.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be done, true. It would currently mean a slight overhead (for list creation), but that is probably okay.

I would like to address that change in a separate pull request: We should probably adjust the merging state implementation as well, to exploit that new signature.

Copy link
Contributor

@shaoxuan-wang shaoxuan-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a few comments/suggestions on the AggregateFunction interface.

@StephanEwen
Copy link
Contributor Author

Update: The first version had an issue with binary compatibility in the Scala DataStream API:

This Scala API accidentally exposed in Flink 1.0 a public method with an internal type as a parameter (an internal utility method). That should not have been the case, since method cannot be guaranteed when a parameter is a non-public type.

Unfortunately, the automatic tooling for binary compatibility checks is not flexible enough to work around that: Exclusions for methods do not work if parameter types were altered.

Due to that, the newer version rolls back the cleanup commit that renames the internal AggregateFunction.

@StephanEwen StephanEwen changed the title [FLINK-5582] [streaming] dd a general distributive aggregate function [FLINK-5582] [streaming] Add a general distributive aggregate function Jan 22, 2017
@StephanEwen
Copy link
Contributor Author

StephanEwen commented Jan 22, 2017

@shaoxuan-wang I cannot reproduce the compile error you posted.
The latest commit also gets a green light from Travis CI: https://travis-ci.org/StephanEwen/incubator-flink/builds/194239397

EDIT: I think you have pulled in "intermediate" commit from my private repository. That seems to have been broken there, but is not broken in this branch any more.

This introduces an internal state hierarchy that mirrors the external state hierarchy,
but gives the runtime access to methods that should not be part of the user facing API,
such as:
  - setting namespaces
  - accessing raw values
  - merging namespaces
The AggregateFunction implements a very flexible interface for distributive aggregations.
@StephanEwen
Copy link
Contributor Author

Since this requires constantly extensive merge conflict resolving with the master, I want to merge this soon.

@shaoxuan-wang has tested it life and the CI pass as well...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants