Skip to content

Commit

Permalink
[FLINK-12133] [table-runtime-blink] Support unbounded aggregate in st…
Browse files Browse the repository at this point in the history
…reaming table runtime

This closes #8202
  • Loading branch information
wuchong committed Apr 20, 2019
1 parent 58e69a0 commit fefdd08
Show file tree
Hide file tree
Showing 133 changed files with 7,339 additions and 1,865 deletions.
Expand Up @@ -37,6 +37,7 @@
import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.NOT;
import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.OR;
import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.PLUS;
import static org.apache.flink.table.expressions.InternalFunctionDefinitions.THROW_EXCEPTION;

/**
* Builder for {@link Expression}s.
Expand Down Expand Up @@ -118,4 +119,8 @@ public static TypeLiteralExpression typeLiteral(TypeInformation<?> type) {
public static Expression concat(Expression input1, Expression input2) {
return call(CONCAT, input1, input2);
}

public static Expression throwException(String msg, TypeInformation<?> type) {
return call(THROW_EXCEPTION, typeLiteral(type));
}
}
Expand Up @@ -16,22 +16,16 @@
* limitations under the License.
*/

package org.apache.flink.table.api.dataview
package org.apache.flink.table.expressions;

import org.apache.flink.table.functions.AggregateFunction
import static org.apache.flink.table.expressions.FunctionDefinition.Type.SCALAR_FUNCTION;

/**
* A [[DataView]] is a collection type that can be used in the accumulator of an
* [[AggregateFunction]].
*
* Depending on the context in which the [[AggregateFunction]] is
* used, a [[DataView]] can be backed by a Java heap collection or a state backend.
*/
trait DataView extends Serializable {
* Dictionary of function definitions for all internal used functions.
*/
public class InternalFunctionDefinitions {

/**
* Clears the [[DataView]] and removes all data.
*/
def clear(): Unit
public static final FunctionDefinition THROW_EXCEPTION =
new FunctionDefinition("throwException", SCALAR_FUNCTION);

}
Expand Up @@ -133,6 +133,12 @@ && isTemporal(toInternalType(child.get(1).getType()))) {
return relBuilder.call(FlinkSqlOperatorTable.LESS_THAN, child);
} else if (BuiltInFunctionDefinitions.GREATER_THAN.equals(def)) {
return relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN, child);
} else if (BuiltInFunctionDefinitions.OR.equals(def)) {
return relBuilder.call(FlinkSqlOperatorTable.OR, child);
} else if (BuiltInFunctionDefinitions.CONCAT.equals(def)) {
return relBuilder.call(FlinkSqlOperatorTable.CONCAT, child);
} else if (InternalFunctionDefinitions.THROW_EXCEPTION.equals(def)) {
return relBuilder.call(FlinkSqlOperatorTable.THROW_EXCEPTION, child);
} else if (BuiltInFunctionDefinitions.AND.equals(def)) {
return relBuilder.call(FlinkSqlOperatorTable.AND, child);
} else if (BuiltInFunctionDefinitions.NOT.equals(def)) {
Expand Down
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.functions.aggfunctions;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.table.api.dataview.MapView;
import org.apache.flink.table.functions.AggregateFunction;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Aggregate function for COLLECT.
* @param <T> type of collect element.
*/
public class CollectAggFunction<T>
extends AggregateFunction<Map<T, Integer>, CollectAggFunction.CollectAccumulator<T>> {

private static final long serialVersionUID = -5860934997657147836L;

private final TypeInformation<T> elementType;

public CollectAggFunction(TypeInformation<T> elementType) {
this.elementType = elementType;
}

/** The initial accumulator for Collect aggregate function. */
public static class CollectAccumulator<T> {
public MapView<T, Integer> map = null;

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CollectAccumulator<?> that = (CollectAccumulator<?>) o;
return Objects.equals(map, that.map);
}
}

public CollectAccumulator<T> createAccumulator() {
CollectAccumulator<T> acc = new CollectAccumulator<>();
acc.map = new MapView<>(elementType, Types.INT);
return acc;
}

public void resetAccumulator(CollectAccumulator<T> accumulator) {
accumulator.map.clear();
}

public void accumulate(CollectAccumulator<T> accumulator, T value) throws Exception {
if (value != null) {
Integer count = accumulator.map.get(value);
if (count != null) {
accumulator.map.put(value, count + 1);
} else {
accumulator.map.put(value, 1);
}
}
}

public void retract(CollectAccumulator<T> accumulator, T value) throws Exception {
if (value != null) {
Integer count = accumulator.map.get(value);
if (count != null) {
if (count == 1) {
accumulator.map.remove(value);
} else {
accumulator.map.put(value, count - 1);
}
} else {
accumulator.map.put(value, -1);
}
}
}

public void merge(CollectAccumulator<T> accumulator, Iterable<CollectAccumulator<T>> others) throws Exception {
for (CollectAccumulator<T> other : others) {
for (Map.Entry<T, Integer> entry : other.map.entries()) {
T key = entry.getKey();
Integer newCount = entry.getValue();
Integer oldCount = accumulator.map.get(key);
if (oldCount == null) {
accumulator.map.put(key, newCount);
} else {
accumulator.map.put(key, oldCount + newCount);
}
}
}
}

@Override
public Map<T, Integer> getValue(CollectAccumulator<T> accumulator) {
Map<T, Integer> result = new HashMap<>();
try {
for (Map.Entry<T, Integer> entry : accumulator.map.entries()) {
result.put(entry.getKey(), entry.getValue());
}
return result;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public TypeInformation<Map<T, Integer>> getResultType() {
return new MapTypeInfo<>(elementType, Types.INT);
}
}
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
import org.apache.flink.table.type.InternalType;
import org.apache.flink.table.type.InternalTypes;
import org.apache.flink.table.typeutils.BinaryStringTypeInfo;

import static org.apache.flink.table.expressions.ExpressionBuilder.concat;
import static org.apache.flink.table.expressions.ExpressionBuilder.ifThenElse;
Expand Down Expand Up @@ -65,12 +66,12 @@ public UnresolvedReferenceExpression[] aggBufferAttributes() {

@Override
public InternalType[] getAggBufferTypes() {
return new InternalType[] { InternalTypes.STRING };
return new InternalType[] { InternalTypes.STRING, InternalTypes.STRING };
}

@Override
public TypeInformation getResultType() {
return BasicTypeInfo.STRING_TYPE_INFO;
return BinaryStringTypeInfo.INSTANCE;
}

@Override
Expand Down

0 comments on commit fefdd08

Please sign in to comment.