Skip to content

Commit

Permalink
[FLINK-5630] [streaming api] Followups to the AggregateFunction
Browse files Browse the repository at this point in the history
  - Add a RichAggregateFunction
  - Document generic type parameters
  - Allowing different input/output types for the cases where an additional window apply function is specified
  - Adding the aggregate() methods to the Scala API
  - Adding the window translation tests
  • Loading branch information
StephanEwen committed Jan 25, 2017
1 parent 6f5c7d8 commit 1542260
Show file tree
Hide file tree
Showing 12 changed files with 1,500 additions and 46 deletions.
Expand Up @@ -18,14 +18,37 @@

package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;

/**
* The {@code AggregateFunction} is a flexible aggregation function, characterized by the
* following features:
*
* <ul>
* <li>The aggregates may use different types for input values, intermediate aggregates,
* and result type, to support a wide range of aggregation types.</li>
*
* <li>Support for distributive aggregations: Different intermediate aggregates can be
* merged together, to allow for pre-aggregation/final-aggregation optimizations.</li>
* </ul>
*
* <p>The {@code AggregateFunction}'s intermediate aggregate (in-progress aggregation state)
* is called the <i>accumulator</i>. Values are added to the accumulator, and final aggregates are
* obtained by finalizing the accumulator state. This supports aggregation functions where the
* intermediate state needs to be different than the aggregated values and the final result type,
* such as for example <i>average</i> (which typically keeps a count and sum).
* Merging intermediate aggregates (partial aggregates) means merging the accumulators.
*
* <p>The AggregationFunction itself is stateless. To allow a single AggregationFunction
* instance to maintain multiple aggregates (such as one aggregate per key), the
* AggregationFunction creates a new accumulator whenever a new aggregation is started.
*
* <p>Aggregation functions must be {@link Serializable} because they are sent around
* between distributed processes during distributed execution.
*
* <p>An example how to use this interface is below:
* <h1>Example: Average and Weighted Average</h1>
*
* <pre>{@code
* // the accumulator, which holds the state of the in-flight aggregate
Expand Down Expand Up @@ -81,14 +104,55 @@
* }
* }
* }</pre>
*
* @param <IN> The type of the values that are aggregated (input values)
* @param <ACC> The type of the accumulator (intermediate aggregate state).
* @param <OUT> The type of the aggregated result
*/
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {

/**
* Creates a new accumulator, starting a new aggregate.
*
* <p>The new accumulator is typically meaningless unless a value is added
* via {@link #add(Object, Object)}.
*
* <p>The accumulator is the state of a running aggregation. When a program has multiple
* aggregates in progress (such as per key and window), the state (per key and window)
* is the size of the accumulator.
*
* @return A new accumulator, corresponding to an empty aggregate.
*/
ACC createAccumulator();

/**
* Adds the given value to the given accumulator.
*
* @param value The value to add
* @param accumulator The accumulator to add the value to
*/
void add(IN value, ACC accumulator);

/**
* Gets the result of the aggregation from the accumulator.
*
* @param accumulator The accumulator of the aggregation
* @return The final aggregation result.
*/
OUT getResult(ACC accumulator);

/**
* Merges two accumulators, returning an accumulator with the merged state.
*
* <p>This function may reuse any of the given accumulators as the target for the merge
* and return that. The assumption is that the given accumulators will not be used any
* more after having been passed to this function.
*
* @param a An accumulator to merge
* @param b Another accumulator to merge
*
* @return The accumulator with the merged state
*/
ACC merge(ACC a, ACC b);
}
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.PublicEvolving;

/**
* Rich variant of the {@link AggregateFunction}. As a {@link RichFunction}, it gives access to the
* {@link RuntimeContext} and provides setup and teardown methods:
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
*
* @see AggregateFunction
*
* @param <IN> The type of the values that are aggregated (input values)
* @param <ACC> The type of the accumulator (intermediate aggregate state).
* @param <OUT> The type of the aggregated result
*/
@PublicEvolving
public abstract class RichAggregateFunction<IN, ACC, OUT>
extends AbstractRichFunction
implements AggregateFunction<IN, ACC, OUT> {

private static final long serialVersionUID = 1L;

@Override
public abstract ACC createAccumulator();

@Override
public abstract void add(IN value, ACC accumulator);

@Override
public abstract OUT getResult(ACC accumulator);

@Override
public abstract ACC merge(ACC a, ACC b);
}

0 comments on commit 1542260

Please sign in to comment.