From 745c0cd34168e8e79c106289d50962e1674a13c5 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Mon, 13 Jul 2015 21:59:10 +0200 Subject: [PATCH] [FLINK-2148] [contrib] Exact and approximate countDistinct on streams --- flink-contrib/flink-streaming-contrib/pom.xml | 5 + .../contrib/streaming/DataStreamUtils.java | 155 +++++++++++++++++- .../streaming/CountDistinctITCase.java | 56 +++++++ .../SingleOutputStreamOperator.java | 2 +- .../StreamExecutionEnvironment.java | 19 +++ .../StreamingScalaAPICompletenessTest.scala | 5 +- 6 files changed, 238 insertions(+), 4 deletions(-) create mode 100644 flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CountDistinctITCase.java diff --git a/flink-contrib/flink-streaming-contrib/pom.xml b/flink-contrib/flink-streaming-contrib/pom.xml index 0979643020fcf..4c87589c8ed60 100644 --- a/flink-contrib/flink-streaming-contrib/pom.xml +++ b/flink-contrib/flink-streaming-contrib/pom.xml @@ -53,6 +53,11 @@ under the License. ${project.version} test + + com.clearspring.analytics + stream + 2.9.0 + diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java index 276409d2dadcf..cccbde5e0d120 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.contrib.streaming; import java.io.IOException; +import java.io.Serializable; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; @@ -27,7 +28,11 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.runtime.net.NetUtils; +import org.apache.flink.streaming.util.FieldAccessor; +import com.clearspring.analytics.stream.cardinality.ICardinality; +import com.clearspring.analytics.stream.cardinality.HyperLogLog; public class DataStreamUtils { @@ -36,7 +41,7 @@ public class DataStreamUtils { * @return The iterator */ public static Iterator collect(DataStream stream) { - TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionEnvironment().getConfig()); + TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionEnvironment().getConfig()); DataStreamIterator it = new DataStreamIterator(serializer); //Find out what IP of us should be given to CollectSink, that it will be able to connect to @@ -61,7 +66,7 @@ public static Iterator collect(DataStream stream) { DataStreamSink sink = stream.addSink(new CollectSink(clientAddress, it.getPort(), serializer)); sink.setParallelism(1); // It would not work if multiple instances would connect to the same port - (new CallExecute(stream)).start(); + (new CallExecute(stream)).start(); return it; } @@ -83,4 +88,150 @@ public void run(){ } } } + + + public static class Statistics { + + /** + * Approximately counts the distinct values in a data stream. + * + * The precision can be set by the rsd parameter. + * + * @param rsd + * The relative standard deviation of the result from + * the exact result. Smaller values create counters + * that require more space. + * @param stream + * The {@link DataStream} to work with + * @return The transformed {@link DataStream}. + */ + public static DataStream countDistinctApprox(double rsd, DataStream stream) { + return stream.map(new CountDistinctMapFunction(new CountDistinctApprox(rsd))).setParallelism(1); + } + + /** + * Approximately counts the distinct values in a data stream at the + * specified field position, and writes the result to another + * (or back to the same) field. + * + * The precision can be set by the rsd parameter. + * + * The output field must be of type Integer. + * + * @param inPos + * The input position in the tuple/array + * @param outPos + * The output position in the tuple/array + * @param rsd + * The relative standard deviation of the result from + * the exact result. Smaller values create counters + * that require more space. + * @param stream + * The {@link DataStream} to work with + * @return The transformed {@link DataStream}. + */ + public static DataStream countDistinctApprox(int inPos, int outPos, double rsd, + DataStream stream) { + return stream.map(new CountDistinctFieldMapFunction( + FieldAccessor.create(inPos, stream.getType(), stream.getExecutionConfig()), + FieldAccessor.create(outPos, stream.getType(), stream.getExecutionConfig()), + new CountDistinctApprox(rsd) + )).setParallelism(1); + } + + /** + * Approximately counts the distinct values in a data stream at the + * specified field, and writes the result to another + * (or back to the same) field. + * + * The precision can be set by the rsd parameter. + * + * The output field must be of type Integer. + * + * The fields can be specified by a field expression that can be either + * the name of a public field or a getter method with parentheses of the + * stream's underlying type. A dot can be used to drill down into objects, + * as in {@code "field1.getInnerField2()" }. + * + * @param inField + * The input position in the tuple/array + * @param outField + * The output position in the tuple/array + * @param rsd + * The relative standard deviation of the result from + * the exact result. Smaller values create counters + * that require more space. + * @param stream + * The {@link DataStream} to work with + * @return The transformed {@link DataStream}. + */ + public static DataStream countDistinctApprox(String inField, String outField, double rsd, + DataStream stream) { + return stream.map(new CountDistinctFieldMapFunction( + FieldAccessor.create(inField, stream.getType(), stream.getExecutionConfig()), + FieldAccessor.create(outField, stream.getType(), stream.getExecutionConfig()), + new CountDistinctApprox(rsd) + )).setParallelism(1); + } + + protected interface CountDistinct { + public Long offer(T elem); + } + + /** + * Calculates count distinct using the specified implementation, + */ + protected static class CountDistinctMapFunction implements MapFunction { + CountDistinct countDistinctImpl; + + public CountDistinctMapFunction(CountDistinct countDistinctImpl) { + this.countDistinctImpl = countDistinctImpl; + } + + @Override + public Long map(R record) throws Exception { + return countDistinctImpl.offer(record); + } + } + + /** + * Calculates count distinct of one field using the specified implementation, + * and writes the result to an other field. + */ + protected static class CountDistinctFieldMapFunction implements MapFunction { + FieldAccessor inAccessor; + FieldAccessor outAccessor; + + CountDistinct countDistinctImpl; + + public CountDistinctFieldMapFunction(FieldAccessor inAccessor, FieldAccessor outAccessor, + CountDistinct countDistinctImpl) { + this.inAccessor = inAccessor; + this.outAccessor = outAccessor; + this.countDistinctImpl = countDistinctImpl; + } + + @Override + public R map(R record) throws Exception { + F fieldVal = inAccessor.get(record); + return outAccessor.set(record, countDistinctImpl.offer(fieldVal)); + } + } + + protected static class CountDistinctApprox implements CountDistinct, Serializable { + + private static final long serialVersionUID = 1L; + + ICardinality card; + + public CountDistinctApprox(double rsd) { + card = new HyperLogLog(rsd); + } + + public Long offer(T elem) { + card.offer(elem); + return card.cardinality(); + } + } + } } diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CountDistinctITCase.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CountDistinctITCase.java new file mode 100644 index 0000000000000..595d2069e353d --- /dev/null +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CountDistinctITCase.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Iterator; + +/** + * This test verifies the behavior of DataStreamUtils.collect. + */ +public class CountDistinctITCase { + + final Integer[] inputData = new Integer[]{1,2,3,2,4,5,6,5,3,7,8,9,9,10,9,8,10,11}; + final int[] referenceOutput = new int[]{1,2,3,3,4,5,6,6,6,7,8,9,9,10,10,10,10,11}; + + @Test + public void testCountDistinctApprox() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + + DataStream input = env.fromArray(inputData); + + DataStream result = DataStreamUtils.Statistics.countDistinctApprox(0.01, input); + + int i = 0; + for (Iterator it = DataStreamUtils.collect(result); it.hasNext(); ) { + Long x = it.next(); + if (x != referenceOutput[i]) { + Assert.fail(String.format("Should have got %d, got %d instead.", referenceOutput[i], x)); + } + i++; + } + if (i != referenceOutput.length) { + Assert.fail(String.format("Should have collected %d numbers, got %d instead.", referenceOutput.length, i)); + } + } +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index b4a99c82c5ffb..6d2d0b90e06c6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -58,7 +58,7 @@ public String getName(){ * * @return The named operator. */ - public DataStream name(String name){ + public SingleOutputStreamOperator name(String name){ streamGraph.getStreamNode(id).setOperatorName(name); return this; } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 58348e38d8e95..049826352c29f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -525,6 +525,25 @@ public DataStreamSource fromCollection(Collection data) { return fromCollection(data, typeInfo); } + /** + * Creates a data stream from the given non-empty array. The type of the data stream is that of the + * elements in the array. + * + *

Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a + * parallelism one.

+ * + * @param data + * The array to create the data stream from. + * @param + * The generic type of the returned data stream. + * @return + * The data stream representing the given array + */ + public DataStreamSource fromArray(OUT[] data) { + Preconditions.checkNotNull(data, "Array must not be null"); + return fromCollection(Arrays.asList(data)); + } + /** * Creates a data stream from the given non-empty collection. * diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala index 7ebc16147831f..13e1c59de702b 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala @@ -51,7 +51,10 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { "org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig", // TypeHints are only needed for Java API, Scala API doesn't need them - "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.returns" + "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.returns", + + // An Array is implicitly converted to WrappedArray, so it works with fromCollection + "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromArray" ) val excludedPatterns = Seq( // We don't have project on tuples in the Scala API