Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-1293] Add support for out-of-place aggregations #243

Closed
wants to merge 32 commits into from

Conversation

he-sk
Copy link

@he-sk he-sk commented Nov 28, 2014

This patch adds support for multiple aggregations on the same field and aggregations which change the output type, e.g., count and average.

It adds the following aggregation syntax to the Java API:

// A string key and a long value
DataSet<Tuple2<String, Long>> ds = ...

// count all the elements
DataSet<Tuple1<Long>> result = ds.aggregate(count()); 

//  count elements in each group
DataSet<Tuple2<String, Long>> result = ds.groupBy(0).aggregate(allKeys(), count()); 

// same as above but explicitly state the returned key(s)
DataSet<Tuple2<String, Long>> result = ds.groupBy(0).aggregate(key(0), count());

// only return counts, drop keys
DataSet<Tuple1<Long>> result = ds.groupBy(0).aggregate(count());

// average reuses count and sum
DataSet<Tuple4<String, Long, Long, Double>> result = 
    ds.groupBy(0).aggregate(allKeys(), count(), sum(1), average(1));

Five aggregation functions are supported: min, max, sum, count and average. Notice that count does not take a field reference in the example above.

Internally, the aggregation is implemented by the following operator chain:

Input -> Map1 -> Reduce -> Map2 -> Output
  • Map1 constructs an intermediate tuple on which the aggregation is performed. It's main task is to copy tuple fields that are used in multiple aggregation functions and initialize fields required for the internal implementation of aggregation functions. For example, average requires a field to count the tuples.
  • Reduce performs the actual aggregation
  • Map2 computes the final result of the aggregation function, e.g., computing the average from the sum and the count, and drops the fields used in groupings that are not requested in the output.

Average is implemented to reuse an existing sum and/or count function. In the last example above, the intermediate tuple contains only 3 fields: the key, a field to compute the sum, and a field to compute the count. The field holding the average is added after the Reduce operator in the Map2 operator.

Currently, only the Java API is implemented. My Scala knowledge is fairly limited, so it would be great if somebody else could pick that up.

Also, the result type of aggregate is simply <T extends Tuple>. To support type inference at compile time, it is necessary to integrate the work by Chen (#194).

import org.junit.Before;
import org.junit.Test;

public class AggregationApi1Test {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to AggregationTest?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregationTest is a better name because the alternative API is no longer in the branch.

However, I'm not sure about the location of the test. It looks more like a unit test because it checks various correct call patterns and error conditions. However, for many tests it constructs a complete execution environment and consequently runs slow.

In addition it cannot be in the flink-java package because the execution environment is in flink-runtime which depends on flink-java and then you have a circular dependency.

I think the best place would be flink-tests but I don't want to have it run as an integration test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you separate the pre-execution tests and the tests that check for result correctness into two classes?
The other operator have pre-flight tests as unit tests in flink-java and the tests that execute programs as IntegrationTests in flink-test.

@fhueske
Copy link
Contributor

fhueske commented Dec 1, 2014

Thanks for your pull request!
Great piece of work. Well commented, plenty of tests, nice!

Apart from Scala API support, there are several ways how this PR could be extended:

  1. Extend aggregation support for Flink's mutable value types (IntValue, DoubleValue, ...), i.e., sum() and avg() on numeric *Value types
  2. Add support for POJO data types and allowing aggregations of their members (output would still be tuples). This would mean to extract the aggregation fields by name in the initial Map operator.
  3. Do the aggregations internally on *Value types, which would mean to transform Java Standard types into *Values in the first Map operator. This would reduce object allocations and GC pressure.

The question is how much do we want to squeeze into this PR.

I'd say let's add support for Scala and treat the other issues as improvements over this work.

@he-sk If you want to try the Scala API support, you could have a look at how the old Aggregation operator was ported.

@rmetzger
Copy link
Contributor

rmetzger commented Dec 1, 2014

Thank you for reviewing the pull request!
I would suggest to first merge the change and then start improving it by changing the internal types.

Regarding the 2. point (POJO support). I have once started working on that. I think the best approach would be to add a method to the serializers which allows to access fields.
By doing so, we can re-use the existing code for analyzing (nested) POJO fields. Also, it would generalize between Tuples and POJOs and avoid special casing between Tuples and POJOs (it is trivial to implement field access for Tuples in the Tuple serializers)

Once the serialisation for POJOs has become more efficient, we can actually throw away the Tuple specific code and handle them like POJOs.

@he-sk he-sk force-pushed the aggregation branch 4 times, most recently from 6343785 to ee24d78 Compare December 15, 2014 13:14
@fhueske
Copy link
Contributor

fhueske commented Dec 15, 2014

@he-sk are you working on this PR?

@he-sk
Copy link
Author

he-sk commented Dec 15, 2014

@fhueske I'm updating it right now.

@fhueske
Copy link
Contributor

fhueske commented Dec 15, 2014

Cool, thanks! :-)
Did you extend the PR in any of the above mentioned directions (Pojos, Scala, ...)?

Let me know, when you want somebody to have a look again.

@he-sk
Copy link
Author

he-sk commented Dec 15, 2014

@fhueske No, I just updated it with the suggestions that you provided in your comments on the code.

@he-sk
Copy link
Author

he-sk commented Dec 15, 2014

@fhueske First, thanks for your review. I've included the suggestions that you had above. I've also separated the unit tests and the integration tests and moved them from flink-java-examples to flink-java and flink-test respectively.

I hope to work on the Scala API next.

@fhueske
Copy link
Contributor

fhueske commented Dec 15, 2014

Sounds good!
One thing that came to my mind. It would be good if we could keep the user-facing parts of the old API in their original place and mark them there as deprecated to avoid breaking code. This might lead to classes that contain code for both APIs (such as .java.aggregation.Aggregations).

What do you think? Would that work?

@he-sk
Copy link
Author

he-sk commented Dec 16, 2014

I think it would work. I just checked that a Java enum (i.e., the old Aggregations class) can contain static methods and you can import them static and call them just like there were defined in a normal class. I think that the Aggregations class and possibly the old AggregateOperator are the only user-facing classes. The different AggregationFunctions and the AggregationFunctionFactory are only used internally and could be moved.

@fhueske
Copy link
Contributor

fhueske commented Jan 15, 2015

Hi @he-sk
Are you planning to update this PR and move the deprecated public API back to its original location?
I can also take the PR from here, if that's OK with you.

@he-sk
Copy link
Author

he-sk commented Jan 16, 2015

Hi @fhueske,
If you want to take over the PR I would appreciate that. My Scala knowledge is extremely limited so I would need help to implement the Scala API anyway.

@tonycox
Copy link
Contributor

tonycox commented Jan 16, 2017

Is this PR still alive?

@fhueske
Copy link
Contributor

fhueske commented Jan 16, 2017

Thanks for the ping @tonycox.
I think this PR can be closed. The functionality it adds is available in the Table API.

@tonycox
Copy link
Contributor

tonycox commented Jan 16, 2017

Ok, cool. I can go through abandoned PRs and add them to https://issues.apache.org/jira/browse/FLINK-5384. Wouldn't you mind @fhueske ?

@fhueske
Copy link
Contributor

fhueske commented Jan 17, 2017

That would be nice @tonycox :-)
Thanks a lot!

@asfgit asfgit closed this in ece899a Jan 17, 2017
joseprupi pushed a commit to joseprupi/flink that referenced this pull request Feb 12, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants