diff --git a/flink-core/pom.xml b/flink-core/pom.xml index 1a68cb4f7580b0..7039e48bf8bc04 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -80,13 +80,6 @@ under the License. - - - org.apache.commons - commons-collections4 - 4.1 - - org.apache.avro diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MultisetSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MultisetSerializer.java deleted file mode 100644 index ef09d46568ef50..00000000000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MultisetSerializer.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.typeutils.base; - -import org.apache.commons.collections4.multiset.AbstractMultiSet; -import org.apache.commons.collections4.multiset.HashMultiSet; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.IOException; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A serializer for Multisets. The serializer relies on an element serializer - * for the serialization of the Multiset's elements. - * - *

The serialization format for the Multiset is as follows: four bytes for the length of the lost, - * followed by the serialized representation of each element. - * - * @param The type of element in the Multiset. - */ -@Internal -public final class MultisetSerializer extends TypeSerializer> { - - private static final long serialVersionUID = 12L; - - /** The serializer for the elements of the Multiset */ - private final TypeSerializer elementSerializer; - - /** - * Creates a Multiset serializer that uses the given serializer to serialize the Multiset's elements. - * - * @param elementSerializer The serializer for the elements of the Multiset - */ - public MultisetSerializer(TypeSerializer elementSerializer) { - this.elementSerializer = checkNotNull(elementSerializer); - } - - // ------------------------------------------------------------------------ - // MultisetSerializer specific properties - // ------------------------------------------------------------------------ - - /** - * Gets the serializer for the elements of the Multiset. - * @return The serializer for the elements of the Multiset - */ - public TypeSerializer getElementSerializer() { - return elementSerializer; - } - - // ------------------------------------------------------------------------ - // Type Serializer implementation - // ------------------------------------------------------------------------ - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public TypeSerializer> duplicate() { - TypeSerializer duplicateElement = elementSerializer.duplicate(); - return duplicateElement == elementSerializer ? - this : new MultisetSerializer<>(duplicateElement); - } - - @Override - public AbstractMultiSet createInstance() { - return new HashMultiSet<>(); - } - - @Override - public AbstractMultiSet copy(AbstractMultiSet from) { - return new HashMultiSet<>(from); - } - - @Override - public AbstractMultiSet copy(AbstractMultiSet from, AbstractMultiSet reuse) { - return copy(from); - } - - @Override - public int getLength() { - return -1; // var length - } - - @Override - public void serialize(AbstractMultiSet multiSet, DataOutputView target) throws IOException { - final int size = multiSet.size(); - target.writeInt(size); - - for (T element : multiSet) { - elementSerializer.serialize(element, target); - } - } - - @Override - public AbstractMultiSet deserialize(DataInputView source) throws IOException { - final int size = source.readInt(); - final AbstractMultiSet multiSet = new HashMultiSet<>(); - for (int i = 0; i < size; i++) { - multiSet.add(elementSerializer.deserialize(source)); - } - return multiSet; - } - - @Override - public AbstractMultiSet deserialize(AbstractMultiSet reuse, DataInputView source) - throws IOException { - return deserialize(source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - // copy number of elements - final int num = source.readInt(); - target.writeInt(num); - for (int i = 0; i < num; i++) { - elementSerializer.copy(source, target); - } - } - - // -------------------------------------------------------------------- - - @Override - public boolean equals(Object obj) { - return obj == this || - (obj != null && obj.getClass() == getClass() && - elementSerializer.equals(((MultisetSerializer) obj).elementSerializer)); - } - - @Override - public boolean canEqual(Object obj) { - return true; - } - - @Override - public int hashCode() { - return elementSerializer.hashCode(); - } - - // -------------------------------------------------------------------------------------------- - // Serializer configuration snapshotting & compatibility - // -------------------------------------------------------------------------------------------- - - @Override - public CollectionSerializerConfigSnapshot snapshotConfiguration() { - return new CollectionSerializerConfigSnapshot<>(elementSerializer); - } - - @Override - public CompatibilityResult> ensureCompatibility( - TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof CollectionSerializerConfigSnapshot) { - Tuple2, TypeSerializerConfigSnapshot> previousElemSerializerAndConfig = - ((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); - - CompatibilityResult compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousElemSerializerAndConfig.f0, - UnloadableDummyTypeSerializer.class, - previousElemSerializerAndConfig.f1, - elementSerializer); - - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new MultisetSerializer<>( - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); - } - } - - return CompatibilityResult.requiresMigration(); - } -} diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java index f861372b520cda..5aadc006bca5f8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java @@ -18,12 +18,11 @@ package org.apache.flink.api.java.typeutils; -import org.apache.commons.collections4.multiset.AbstractMultiSet; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.MultisetSerializer; + +import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -33,19 +32,17 @@ * @param The type of the elements in the Multiset. */ @PublicEvolving -public final class MultisetTypeInfo extends TypeInformation> { +public final class MultisetTypeInfo extends MapTypeInfo { private static final long serialVersionUID = 1L; - private final TypeInformation elementTypeInfo; - public MultisetTypeInfo(Class elementTypeClass) { - this.elementTypeInfo = of(checkNotNull(elementTypeClass, "elementTypeClass")); + super(elementTypeClass, Integer.class); } public MultisetTypeInfo(TypeInformation elementTypeInfo) { - this.elementTypeInfo = checkNotNull(elementTypeInfo, "elementTypeInfo"); + super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO); } // ------------------------------------------------------------------------ @@ -56,7 +53,7 @@ public MultisetTypeInfo(TypeInformation elementTypeInfo) { * Gets the type information for the elements contained in the Multiset */ public TypeInformation getElementTypeInfo() { - return elementTypeInfo; + return getKeyTypeInfo(); } // ------------------------------------------------------------------------ @@ -87,8 +84,8 @@ public int getTotalFields() { @SuppressWarnings("unchecked") @Override - public Class> getTypeClass() { - return (Class>)(Class)AbstractMultiSet.class; + public Class> getTypeClass() { + return (Class>)(Class)Map.class; } @Override @@ -96,17 +93,11 @@ public boolean isKeyType() { return false; } - @Override - public TypeSerializer> createSerializer(ExecutionConfig config) { - TypeSerializer elementTypeSerializer = elementTypeInfo.createSerializer(config); - return new MultisetSerializer<>(elementTypeSerializer); - } - // ------------------------------------------------------------------------ @Override public String toString() { - return "Multiset<" + elementTypeInfo + '>'; + return "Multiset<" + getKeyTypeInfo() + '>'; } @Override @@ -116,7 +107,7 @@ public boolean equals(Object obj) { } else if (obj instanceof MultisetTypeInfo) { final MultisetTypeInfo other = (MultisetTypeInfo) obj; - return other.canEqual(this) && elementTypeInfo.equals(other.elementTypeInfo); + return other.canEqual(this) && getKeyTypeInfo().equals(other.getKeyTypeInfo()); } else { return false; } @@ -124,7 +115,7 @@ else if (obj instanceof MultisetTypeInfo) { @Override public int hashCode() { - return 31 * elementTypeInfo.hashCode() + 1; + return 31 * getKeyTypeInfo().hashCode() + 1; } @Override diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MultisetSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MultisetSerializerTest.java deleted file mode 100644 index 6f2646a44573f8..00000000000000 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MultisetSerializerTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.typeutils.base; - -import org.apache.commons.collections4.multiset.AbstractMultiSet; -import org.apache.commons.collections4.multiset.HashMultiSet; -import org.apache.flink.api.common.typeutils.SerializerTestBase; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import java.util.Random; - -/** - * Test for {@link MultisetSerializer}. - */ -public class MultisetSerializerTest extends SerializerTestBase> { - - @Override - protected TypeSerializer> createSerializer() { - return new MultisetSerializer(LongSerializer.INSTANCE); - } - - @Override - protected int getLength() { - return -1; - } - - @SuppressWarnings("unchecked") - @Override - protected Class> getTypeClass() { - return (Class>) (Class) AbstractMultiSet.class; - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - @Override - protected AbstractMultiSet[] getTestData() { - final Random rnd = new Random(123654789); - - // empty Multisets - final AbstractMultiSet set1 = new HashMultiSet<>(); - - // single element Multisets - final AbstractMultiSet set2 = new HashMultiSet<>(); - set2.add(12345L); - - // larger Multisets - final AbstractMultiSet set3 = new HashMultiSet<>(); - for (int i = 0; i < rnd.nextInt(200); i++) { - set3.add(rnd.nextLong()); - } - - return (AbstractMultiSet[]) new AbstractMultiSet[]{ - set1, set2, set3 - }; - } -} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala index 3e71c99ee4ba59..9696ced32c4b43 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala @@ -74,7 +74,8 @@ class ExpressionReducer(config: TableConfig) case (SqlTypeName.ANY, _) | (SqlTypeName.ROW, _) | (SqlTypeName.ARRAY, _) | - (SqlTypeName.MAP, _) => None + (SqlTypeName.MAP, _) | + (SqlTypeName.MULTISET, _) => None case (_, e) => Some(e) } @@ -112,7 +113,11 @@ class ExpressionReducer(config: TableConfig) val unreduced = constExprs.get(i) unreduced.getType.getSqlTypeName match { // we insert the original expression for object literals - case SqlTypeName.ANY | SqlTypeName.ROW | SqlTypeName.ARRAY | SqlTypeName.MAP => + case SqlTypeName.ANY | + SqlTypeName.ROW | + SqlTypeName.ARRAY | + SqlTypeName.MAP | + SqlTypeName.MULTISET => reducedValues.add(unreduced) case _ => val reducedValue = reduced.getField(reducedIdx) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala index f106771e7138f6..5cbf48a9e9b816 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala @@ -19,36 +19,49 @@ package org.apache.flink.table.functions.aggfunctions import java.lang.{Iterable => JIterable} +import java.util +import java.util.function.BiFunction -import org.apache.commons.collections4.multiset.{AbstractMultiSet, HashMultiSet} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1} import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo} import org.apache.flink.table.functions.AggregateFunction +import scala.collection.JavaConverters._ + /** The initial accumulator for Collect aggregate function */ -class CollectAccumulator[E] extends JTuple1[AbstractMultiSet[E]] +class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]] abstract class CollectAggFunction[E] - extends AggregateFunction[AbstractMultiSet[E], CollectAccumulator[E]] { + extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] { + + @transient + private lazy val addFunction = new BiFunction[Integer, Integer, Integer] { + override def apply(t: Integer, u: Integer): Integer = t + u + } override def createAccumulator(): CollectAccumulator[E] = { val acc = new CollectAccumulator[E]() - acc.f0 = new HashMultiSet() + acc.f0 = new util.HashMap[E, Integer]() acc } def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = { if (value != null) { - accumulator.f0.add(value) + if (accumulator.f0.containsKey(value)) { + val add = (x: Integer, y: Integer) => x + y + accumulator.f0.merge(value, 1, addFunction) + } else { + accumulator.f0.put(value, 1) + } } } - override def getValue(accumulator: CollectAccumulator[E]): AbstractMultiSet[E] = { + override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, Integer] = { if (accumulator.f0.size() > 0) { - new HashMultiSet(accumulator.f0) + new util.HashMap(accumulator.f0) } else { - null.asInstanceOf[AbstractMultiSet[E]] + null.asInstanceOf[util.Map[E, Integer]] } } @@ -59,19 +72,23 @@ abstract class CollectAggFunction[E] override def getAccumulatorType: TypeInformation[CollectAccumulator[E]] = { new TupleTypeInfo( classOf[CollectAccumulator[E]], - new GenericTypeInfo[AbstractMultiSet[E]](classOf[AbstractMultiSet[E]])) + new GenericTypeInfo[util.Map[E, Integer]](classOf[util.Map[E, Integer]])) } def merge(acc: CollectAccumulator[E], its: JIterable[CollectAccumulator[E]]): Unit = { val iter = its.iterator() while (iter.hasNext) { - acc.f0.addAll(iter.next().f0) + for ((k: E, v: Integer) <- iter.next().f0.asScala) { + acc.f0.merge(k, v, addFunction) + } } } - def retract(acc: CollectAccumulator[E], value: Any): Unit = { + def retract(acc: CollectAccumulator[E], value: E): Unit = { if (value != null) { - acc.f0.remove(value) + if (0 == acc.f0.merge(value, -1, addFunction)) { + acc.f0.remove(value) + } } } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala index 8509a8ec9de828..f3e1a6264822ea 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala @@ -94,7 +94,7 @@ trait FlinkRelNode extends RelNode { case SqlTypeName.ARRAY => // 16 is an arbitrary estimate estimateDataTypeSize(t.getComponentType) * 16 - case SqlTypeName.MAP => + case SqlTypeName.MAP | SqlTypeName.MULTISET => // 16 is an arbitrary estimate (estimateDataTypeSize(t.getKeyType) + estimateDataTypeSize(t.getValueType)) * 16 case SqlTypeName.ANY => 128 // 128 is an arbitrary estimate diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/CollectAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/CollectAggFunctionTest.scala index 89a90bbe2f29a0..9e6a54a1bbb52f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/CollectAggFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/CollectAggFunctionTest.scala @@ -18,8 +18,10 @@ package org.apache.flink.table.runtime.aggfunctions -import com.google.common.collect.ImmutableList -import org.apache.commons.collections4.multiset.{AbstractMultiSet, HashMultiSet} +import java.util + +import com.google.common.collect.ImmutableMap +import org.apache.curator import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.functions.aggfunctions._ @@ -27,161 +29,194 @@ import org.apache.flink.table.functions.aggfunctions._ * Test case for built-in collect aggregate functions */ class StringCollectAggFunctionTest - extends AggFunctionTestBase[AbstractMultiSet[String], CollectAccumulator[String]] { + extends AggFunctionTestBase[util.Map[String, Integer], CollectAccumulator[String]] { override def inputValueSets: Seq[Seq[_]] = Seq( - Seq("a", "a", "b", null, "c", null, "d", "e", null, "f"), + Seq("a", "a", "b", null, "c", null, "d", "e", null, "f", null), Seq(null, null, null, null, null, null) ) - override def expectedResults: Seq[AbstractMultiSet[String]] = { - val set1 = new HashMultiSet[String]() - set1.addAll(ImmutableList.of("a", "a", "b", "c", "d", "e", "f")) - Seq(set1, null) + override def expectedResults: Seq[util.Map[String, Integer]] = { + val map = new util.HashMap[String, Integer]() + map.put("a", 2) + map.put("b", 1) + map.put("c", 1) + map.put("d", 1) + map.put("e", 1) + map.put("f", 1) + Seq(map, null) } - override def aggregator: AggregateFunction[AbstractMultiSet[String], CollectAccumulator[String]] = + override def aggregator: AggregateFunction[ + util.Map[String, Integer], CollectAccumulator[String]] = new StringCollectAggFunction() override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } class IntCollectAggFunctionTest - extends AggFunctionTestBase[AbstractMultiSet[Int], CollectAccumulator[Int]] { + extends AggFunctionTestBase[util.Map[Int, Integer], CollectAccumulator[Int]] { override def inputValueSets: Seq[Seq[_]] = Seq( - Seq(1, 1, 2, null, 3, null, 4, 5, null, 6), + Seq(1, 1, 2, null, 3, null, 4, 5, null), Seq(null, null, null, null, null, null) ) - override def expectedResults: Seq[AbstractMultiSet[Int]] = { - val set1 = new HashMultiSet[Int]() - set1.addAll(ImmutableList.of(1, 1, 2, 3, 4, 5, 6)) - Seq(set1, null) + override def expectedResults: Seq[util.Map[Int, Integer]] = { + val map = new util.HashMap[Int, Integer]() + map.put(1, 2) + map.put(2, 1) + map.put(3, 1) + map.put(4, 1) + map.put(5, 1) + Seq(map, null) } - override def aggregator: AggregateFunction[AbstractMultiSet[Int], CollectAccumulator[Int]] = + override def aggregator: AggregateFunction[util.Map[Int, Integer], CollectAccumulator[Int]] = new IntCollectAggFunction() override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } class ByteCollectAggFunctionTest - extends AggFunctionTestBase[AbstractMultiSet[Byte], CollectAccumulator[Byte]] { + extends AggFunctionTestBase[util.Map[Byte, Integer], CollectAccumulator[Byte]] { override def inputValueSets: Seq[Seq[_]] = Seq( - Seq(1.toByte, 1.toByte, 2.toByte, null, 3.toByte, null, 4.toByte, 5.toByte, null, 6.toByte), + Seq(1.toByte, 1.toByte, 2.toByte, null, 3.toByte, null, 4.toByte, 5.toByte, null), Seq(null, null, null, null, null, null) ) - override def expectedResults: Seq[AbstractMultiSet[Byte]] = { - val set1 = new HashMultiSet[Byte]() - set1.addAll(ImmutableList.of(1, 1, 2, 3, 4, 5, 6)) - Seq(set1, null) + override def expectedResults: Seq[util.Map[Byte, Integer]] = { + val map = new util.HashMap[Byte, Integer]() + map.put(1, 2) + map.put(2, 1) + map.put(3, 1) + map.put(4, 1) + map.put(5, 1) + Seq(map, null) } - override def aggregator: AggregateFunction[AbstractMultiSet[Byte], CollectAccumulator[Byte]] = + override def aggregator: AggregateFunction[util.Map[Byte, Integer], CollectAccumulator[Byte]] = new ByteCollectAggFunction() override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } class ShortCollectAggFunctionTest - extends AggFunctionTestBase[AbstractMultiSet[Short], CollectAccumulator[Short]] { + extends AggFunctionTestBase[util.Map[Short, Integer], CollectAccumulator[Short]] { override def inputValueSets: Seq[Seq[_]] = Seq( Seq(1.toShort, 1.toShort, 2.toShort, null, - 3.toShort, null, 4.toShort, 5.toShort, null, 6.toShort), + 3.toShort, null, 4.toShort, 5.toShort, null), Seq(null, null, null, null, null, null) ) - override def expectedResults: Seq[AbstractMultiSet[Short]] = { - val set1 = new HashMultiSet[Short]() - set1.addAll(ImmutableList.of(1, 1, 2, 3, 4, 5, 6)) - Seq(set1, null) + override def expectedResults: Seq[util.Map[Short, Integer]] = { + val map = new util.HashMap[Short, Integer]() + map.put(1, 2) + map.put(2, 1) + map.put(3, 1) + map.put(4, 1) + map.put(5, 1) + Seq(map, null) } - override def aggregator: AggregateFunction[AbstractMultiSet[Short], CollectAccumulator[Short]] = + override def aggregator: AggregateFunction[util.Map[Short, Integer], CollectAccumulator[Short]] = new ShortCollectAggFunction() override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } class LongCollectAggFunctionTest - extends AggFunctionTestBase[AbstractMultiSet[Long], CollectAccumulator[Long]] { + extends AggFunctionTestBase[util.Map[Long, Integer], CollectAccumulator[Long]] { override def inputValueSets: Seq[Seq[_]] = Seq( - Seq(1L, 1L, 2L, null, 3L, null, 4L, 5L, null, 6L), + Seq(1L, 1L, 2L, null, 3L, null, 4L, 5L, null), Seq(null, null, null, null, null, null) ) - override def expectedResults: Seq[AbstractMultiSet[Long]] = { - val set1 = new HashMultiSet[Long]() - set1.addAll(ImmutableList.of(1, 1, 2, 3, 4, 5, 6)) - Seq(set1, null) + override def expectedResults: Seq[util.Map[Long, Integer]] = { + val map = new util.HashMap[Long, Integer]() + map.put(1, 2) + map.put(2, 1) + map.put(3, 1) + map.put(4, 1) + map.put(5, 1) + Seq(map, null) } - override def aggregator: AggregateFunction[AbstractMultiSet[Long], CollectAccumulator[Long]] = + override def aggregator: AggregateFunction[util.Map[Long, Integer], CollectAccumulator[Long]] = new LongCollectAggFunction() override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } class FloatAggFunctionTest - extends AggFunctionTestBase[AbstractMultiSet[Float], CollectAccumulator[Float]] { + extends AggFunctionTestBase[util.Map[Float, Integer], CollectAccumulator[Float]] { override def inputValueSets: Seq[Seq[_]] = Seq( - Seq(1f, 1f, 2f, null, 3.2f, null, 4f, 5f, null, 6f), + Seq(1f, 1f, 2f, null, 3.2f, null, 4f, 5f, null), Seq(null, null, null, null, null, null) ) - override def expectedResults: Seq[AbstractMultiSet[Float]] = { - val set1 = new HashMultiSet[Float]() - set1.addAll(ImmutableList.of(1f, 1f, 2f, 3.2f, 4f, 5f, 6f)) - Seq(set1, null) + override def expectedResults: Seq[util.Map[Float, Integer]] = { + val map = new util.HashMap[Float, Integer]() + map.put(1, 2) + map.put(2, 1) + map.put(3.2f, 1) + map.put(4, 1) + map.put(5, 1) + Seq(map, null) } - override def aggregator: AggregateFunction[AbstractMultiSet[Float], CollectAccumulator[Float]] = + override def aggregator: AggregateFunction[util.Map[Float, Integer], CollectAccumulator[Float]] = new FloatCollectAggFunction() override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } class DoubleAggFunctionTest - extends AggFunctionTestBase[AbstractMultiSet[Double], CollectAccumulator[Double]] { + extends AggFunctionTestBase[util.Map[Double, Integer], CollectAccumulator[Double]] { override def inputValueSets: Seq[Seq[_]] = Seq( - Seq(1d, 1d, 2d, null, 3.2d, null, 4d, 5d, null, 6d), + Seq(1d, 1d, 2d, null, 3.2d, null, 4d, 5d, null), Seq(null, null, null, null, null, null) ) - override def expectedResults: Seq[AbstractMultiSet[Double]] = { - val set1 = new HashMultiSet[Double]() - set1.addAll(ImmutableList.of(1, 1, 2, 3.2, 4, 5, 6)) - Seq(set1, null) + override def expectedResults: Seq[util.Map[Double, Integer]] = { + val map = new util.HashMap[Double, Integer]() + map.put(1, 2) + map.put(2, 1) + map.put(3.2d, 1) + map.put(4, 1) + map.put(5, 1) + Seq(map, null) } - override def aggregator: AggregateFunction[AbstractMultiSet[Double], CollectAccumulator[Double]] = + override def aggregator: AggregateFunction[ + util.Map[Double, Integer], CollectAccumulator[Double]] = new DoubleCollectAggFunction() override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) } class ObjectAggFunctionTest - extends AggFunctionTestBase[AbstractMultiSet[Object], CollectAccumulator[Object]] { + extends AggFunctionTestBase[util.Map[Object, Integer], CollectAccumulator[Object]] { override def inputValueSets: Seq[Seq[_]] = Seq( Seq(Tuple2(1, "a"), Tuple2(1, "a"), null, Tuple2(2, "b")), Seq(null, null, null, null, null, null) ) - override def expectedResults: Seq[AbstractMultiSet[Object]] = { - val set1 = new HashMultiSet[Object]() - set1.addAll(ImmutableList.of(Tuple2(1, "a"), Tuple2(1, "a"), Tuple2(2, "b"))) - Seq(set1, null) + override def expectedResults: Seq[util.Map[Object, Integer]] = { + val map = new util.HashMap[Object, Integer]() + map.put(Tuple2(1, "a"), 2) + map.put(Tuple2(2, "b"), 1) + Seq(map, null) } - override def aggregator: AggregateFunction[AbstractMultiSet[Object], CollectAccumulator[Object]] = + override def aggregator: AggregateFunction[ + util.Map[Object, Integer], CollectAccumulator[Object]] = new ObjectCollectAggFunction() override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any]) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala index 26dfb376191863..ce0875b0002331 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala @@ -346,12 +346,12 @@ class AggregateITCase( val result = tEnv.sql(sqlQuery).toDataSet[Row].collect() val expected = Seq( - "1,[1:1]", - "2,[2:1]", "2,[2:1]", - "3,[3:1]", "3,[3:2]", - "4,[4:2]", "4,[4:2]", - "5,[5:1]", "5,[5:1]", "5,[5:3]", - "6,[6:1]", "6,[6:2]", "6,[6:3]" + "1,{1=1}", + "2,{2=1}", "2,{2=1}", + "3,{3=1}", "3,{3=2}", + "4,{4=2}", "4,{4=2}", + "5,{5=1}", "5,{5=1}", "5,{5=3}", + "6,{6=1}", "6,{6=2}", "6,{6=3}" ).mkString("\n") TestBaseUtils.compareResultAsText(result.asJava, expected) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala index 181012751b3338..e03bef1f96785c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala @@ -102,12 +102,12 @@ class SqlITCase extends StreamingWithStateTestBase { env.execute() val expected = List( - "1,[1:1]", - "2,[2:1, 3:1]", - "3,[4:1, 5:1, 6:1]", - "4,[7:1, 8:1, 9:1, 10:1]", - "5,[11:1, 12:1, 13:1, 14:1, 15:1]", - "6,[16:1, 17:1, 18:1, 19:1, 20:1, 21:1]") + "1,{1=1}", + "2,{2=1, 3=1}", + "3,{4=1, 5=1, 6=1}", + "4,{7=1, 8=1, 9=1, 10=1}", + "5,{11=1, 12=1, 13=1, 14=1, 15=1}", + "6,{16=1, 17=1, 18=1, 19=1, 20=1, 21=1}") assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } @@ -136,9 +136,9 @@ class SqlITCase extends StreamingWithStateTestBase { env.execute() val expected = List( - "1,[(12,45.6):1]", - "2,[(13,41.6):1, (12,45.612):1]", - "3,[(18,42.6):1, (14,45.2136):1]") + "1,{(12,45.6)=1}", + "2,{(13,41.6)=1, (12,45.612)=1}", + "3,{(18,42.6)=1, (14,45.2136)=1}") assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) }