From 52f5ec0333208ba2181fcbc68a031751780136bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Hermann?= Date: Fri, 26 Jun 2015 17:23:36 +0200 Subject: [PATCH 1/3] [FLINK-2138] [streaming] Added custom partitioning to DataStream --- .../org/apache/flink/api/java/DataSet.java | 4 +- .../streaming/api/datastream/DataStream.java | 70 +++++++++++++++++-- .../partitioner/CustomPartitionerWrapper.java | 57 +++++++++++++++ .../partitioner/StreamPartitioner.java | 2 +- .../streaming/util/keys/KeySelectorUtil.java | 40 +++++++++++ .../flink/streaming/api/DataStreamTest.java | 28 +++++++- 6 files changed, 193 insertions(+), 8 deletions(-) create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index e217e5307bde1..d24a35028edc1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -1128,14 +1128,14 @@ public PartitionOperator partitionCustom(Partitioner partitioner, Stri /** * Partitions a DataSet on the key returned by the selector, using a custom partitioner. - * This method takes the key selector t get the key to partition on, and a partitioner that + * This method takes the key selector to get the key to partition on, and a partitioner that * accepts the key type. *

* Note: This method works only on single field keys, i.e. the selector cannot return tuples * of fields. * * @param partitioner The partitioner to assign partitions to keys. - * @param keyExtractor The KeyExtractor with which the DataSet is hash-partitioned. + * @param keyExtractor The KeyExtractor with which the DataSet is partitioned. * @return The partitioned DataSet. * * @see KeySelector diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index bf0ff2338b00a..8fb896e44e79c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -25,6 +25,8 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -64,9 +66,10 @@ import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; +import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper; +import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner; import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner; -import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.util.keys.KeySelectorUtil; @@ -81,7 +84,6 @@ *

    *
  • {@link DataStream#map},
  • *
  • {@link DataStream#filter}, or
  • - *
  • {@link DataStream#sum}.
  • *
* * @param @@ -450,6 +452,66 @@ private DataStream partitionByHash(Keys keys) { clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig())))); } + /** + * Partitions a tuple DataStream on the specified key fields using a custom partitioner. + * This method takes the key position to partition on, and a partitioner that accepts the key type. + *

+ * Note: This method works only on single field keys. + * + * @param partitioner The partitioner to assign partitions to keys. + * @param field The field index on which the DataStream is to partitioned. + * @return The partitioned DataStream. + */ + public DataStream partitionCustom(Partitioner partitioner, int field) { + Keys.ExpressionKeys outExpressionKeys = new Keys.ExpressionKeys(new int[]{field}, getType()); + return partitionCustom(partitioner, outExpressionKeys); + } + + /** + * Partitions a POJO DataStream on the specified key fields using a custom partitioner. + * This method takes the key expression to partition on, and a partitioner that accepts the key type. + *

+ * Note: This method works only on single field keys. + * + * @param partitioner The partitioner to assign partitions to keys. + * @param field The field index on which the DataStream is to partitioned. + * @return The partitioned DataStream. + */ + public DataStream partitionCustom(Partitioner partitioner, String field) { + Keys.ExpressionKeys outExpressionKeys = new Keys.ExpressionKeys(new String[]{field}, getType()); + return partitionCustom(partitioner, outExpressionKeys); + } + + + /** + * Partitions a DataStream on the key returned by the selector, using a custom partitioner. + * This method takes the key selector to get the key to partition on, and a partitioner that + * accepts the key type. + *

+ * Note: This method works only on single field keys, i.e. the selector cannot return tuples + * of fields. + * + * @param partitioner + * The partitioner to assign partitions to keys. + * @param keySelector + * The KeySelector with which the DataStream is partitioned. + * @return The partitioned DataStream. + * @see KeySelector + */ + public > DataStream partitionCustom(Partitioner partitioner, KeySelector keySelector) { + return setConnectionType(new CustomPartitionerWrapper(clean(partitioner), clean(keySelector))); + } + + // private helper method for custom partitioning + private DataStream partitionCustom(Partitioner partitioner, Keys keys) { + KeySelector keySelector = KeySelectorUtil.getSelectorForOneKey(keys, partitioner, getType(), getExecutionConfig()); + + return setConnectionType( + new CustomPartitionerWrapper( + clean(partitioner), + clean(keySelector))); + } + /** * Sets the partitioning of the {@link DataStream} so that the output tuples * are broadcasted to every parallel instance of the next component. @@ -530,7 +592,7 @@ public DataStream global() { * iteration head. The user can also use different feedback type than the * input of the iteration and treat the input and feedback streams as a * {@link ConnectedDataStream} be calling - * {@link IterativeDataStream#withFeedbackType(TypeInfo)} + * {@link IterativeDataStream#withFeedbackType(TypeInformation)} *

* A common usage pattern for streaming iterations is to use output * splitting to send a part of the closing data stream to the head. Refer to @@ -561,7 +623,7 @@ public IterativeDataStream iterate() { * iteration head. The user can also use different feedback type than the * input of the iteration and treat the input and feedback streams as a * {@link ConnectedDataStream} be calling - * {@link IterativeDataStream#withFeedbackType(TypeInfo)} + * {@link IterativeDataStream#withFeedbackType(TypeInformation)} *

* A common usage pattern for streaming iterations is to use output * splitting to send a part of the closing data stream to the head. Refer to diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java new file mode 100644 index 0000000000000..75867cd439d55 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.partitioner; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * Partitioner that selects the channel with a user defined partitioner function on a key. + * + * @param + * Type of the key + * @param + * Type of the data + */ +public class CustomPartitionerWrapper extends StreamPartitioner { + private static final long serialVersionUID = 1L; + + private int[] returnArray = new int[1]; + Partitioner partitioner; + KeySelector keySelector; + + public CustomPartitionerWrapper(Partitioner partitioner, KeySelector keySelector) { + super(PartitioningStrategy.CUSTOM); + this.partitioner = partitioner; + this.keySelector = keySelector; + } + + @Override + public int[] selectChannels(SerializationDelegate> record, + int numberOfOutputChannels) { + + K key = record.getInstance().getKey(keySelector); + + returnArray[0] = partitioner.partition(key, + numberOfOutputChannels); + + return returnArray; + } +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java index 3af7c7a6704af..ef598c679f49d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java @@ -27,7 +27,7 @@ public abstract class StreamPartitioner implements public enum PartitioningStrategy { - FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY + FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY, CUSTOM } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java index 77467b5a0703e..49f2fe0468519 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java @@ -20,6 +20,7 @@ import java.lang.reflect.Array; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -71,6 +72,45 @@ public class KeySelectorUtil { return new ComparableKeySelector(comparator, keyLength); } + public static KeySelector getSelectorForOneKey(Keys keys, Partitioner partitioner, TypeInformation typeInfo, + ExecutionConfig executionConfig) { + if (partitioner != null) { + keys.validateCustomPartitioner(partitioner, null); + } + + int[] logicalKeyPositions = keys.computeLogicalKeyPositions(); + + if (logicalKeyPositions.length != 1) { + throw new IllegalArgumentException("There must be exactly 1 key specified"); + } + + TypeComparator comparator = ((CompositeType) typeInfo).createComparator( + logicalKeyPositions, new boolean[1], 0, executionConfig); + return new OneKeySelector(comparator); + } + + public static class OneKeySelector implements KeySelector { + + private static final long serialVersionUID = 1L; + + private TypeComparator comparator; + private Object[] keyArray; + private K key; + + public OneKeySelector(TypeComparator comparator) { + this.comparator = comparator; + keyArray = new Object[1]; + } + + @Override + public K getKey(IN value) throws Exception { + comparator.extractKeys(value, keyArray, 0); + key = (K) keyArray[0]; + return key; + } + + } + public static class ComparableKeySelector implements KeySelector { private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index f3b98b21a22d7..764c6f2d38f19 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -52,6 +53,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.windowing.helper.Count; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; +import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper; import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner; import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; @@ -64,7 +66,7 @@ public class DataStreamTest { private static final long MEMORYSIZE = 32; - private static int PARALLELISM = 1; + private static int PARALLELISM = 2; /** * Tests {@link SingleOutputStreamOperator#name(String)} functionality. @@ -167,6 +169,26 @@ public void testPartitioning() { assertFalse(isGrouped(partition2)); assertFalse(isGrouped(partition4)); + // Testing DataStream custom partitioning + Partitioner longPartitioner = new Partitioner() { + @Override + public int partition(Long key, int numPartitions) { + return 100; + } + }; + + DataStream customPartition1 = src1.partitionCustom(longPartitioner, 0); + DataStream customPartition3 = src1.partitionCustom(longPartitioner, "f0"); + DataStream customPartition4 = src1.partitionCustom(longPartitioner, new FirstSelector()); + + assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition1.getId(), createDownStreamId(customPartition1)))); + assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition3.getId(), createDownStreamId(customPartition3)))); + assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition4.getId(), createDownStreamId(customPartition4)))); + + assertFalse(isGrouped(customPartition1)); + assertFalse(isGrouped(customPartition3)); + assertFalse(isGrouped(customPartition4)); + //Testing ConnectedDataStream grouping ConnectedDataStream connectedGroup1 = connected.groupBy(0, 0); Integer downStreamId1 = createDownStreamId(connectedGroup1); @@ -524,6 +546,10 @@ private static boolean isPartitioned(StreamEdge edge) { return edge.getPartitioner() instanceof FieldsPartitioner; } + private static boolean isCustomPartitioned(StreamEdge edge) { + return edge.getPartitioner() instanceof CustomPartitionerWrapper; + } + private static class FirstSelector implements KeySelector, Long> { @Override public Long getKey(Tuple2 value) throws Exception { From 9531176d84dabe6e3e4989d06ee9db72ff5caa1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Hermann?= Date: Fri, 26 Jun 2015 17:56:16 +0200 Subject: [PATCH 2/3] [FLINK-2138] [streaming] Added custom partitioning to scala DataStream --- .../org/apache/flink/api/scala/DataSet.scala | 2 +- .../streaming/api/datastream/DataStream.java | 3 +- .../streaming/api/scala/DataStream.scala | 54 ++++++++++++++++--- .../streaming/api/scala/DataStreamTest.scala | 39 +++++++++++++- 4 files changed, 85 insertions(+), 13 deletions(-) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index b14c9c20c3e0b..fd1492ae564bd 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -1197,7 +1197,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { /** * Partitions a DataSet on the key returned by the selector, using a custom partitioner. - * This method takes the key selector t get the key to partition on, and a partitioner that + * This method takes the key selector to get the key to partition on, and a partitioner that * accepts the key type. *

* Note: This method works only on single field keys, i.e. the selector cannot return tuples diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 8fb896e44e79c..c9c1f4978ba72 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -498,7 +497,7 @@ public DataStream partitionCustom(Partitioner partitioner, String fi * @return The partitioned DataStream. * @see KeySelector */ - public > DataStream partitionCustom(Partitioner partitioner, KeySelector keySelector) { + public DataStream partitionCustom(Partitioner partitioner, KeySelector keySelector) { return setConnectionType(new CustomPartitionerWrapper(clean(partitioner), clean(keySelector))); } diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index d0441a9c268ba..fbd6502d47710 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -18,22 +18,23 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.api.common.io.OutputFormat -import org.apache.flink.api.scala.ClosureCleaner -import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat -import org.apache.flink.core.fs.{FileSystem, Path} - import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, FoldFunction, MapFunction, ReduceFunction} +import org.apache.flink.api.common.functions.{ReduceFunction, FlatMapFunction, MapFunction, + Partitioner, FoldFunction, FilterFunction} +import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat +import org.apache.flink.core.fs.{FileSystem, Path} import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator} +import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, + GroupedDataStream, SingleOutputStreamOperator} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType -import org.apache.flink.streaming.api.functions.sink.{FileSinkFunctionByMillis, SinkFunction} import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} +import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, StreamReduce} import org.apache.flink.streaming.api.windowing.helper.WindowingHelper import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy} @@ -288,6 +289,43 @@ class DataStream[T](javaStream: JavaStream[T]) { javaStream.partitionByHash(keyExtractor) } + /** + * Partitions a tuple DataStream on the specified key fields using a custom partitioner. + * This method takes the key position to partition on, and a partitioner that accepts the key + * type. + *

+ * Note: This method works only on single field keys. + */ + def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T] = + javaStream.partitionCustom(partitioner, field) + + /** + * Partitions a POJO DataStream on the specified key fields using a custom partitioner. + * This method takes the key expression to partition on, and a partitioner that accepts the key + * type. + *

+ * Note: This method works only on single field keys. + */ + def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String) + : DataStream[T] = javaStream.partitionCustom(partitioner, field) + + /** + * Partitions a DataStream on the key returned by the selector, using a custom partitioner. + * This method takes the key selector to get the key to partition on, and a partitioner that + * accepts the key type. + *

+ * Note: This method works only on single field keys, i.e. the selector cannot return tuples + * of fields. + */ + def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K) + : DataStream[T] = { + val cleanFun = clean(fun) + val keyExtractor = new KeySelector[T, K] { + def getKey(in: T) = cleanFun(in) + } + javaStream.partitionCustom(partitioner, keyExtractor) + } + /** * Sets the partitioning of the DataStream so that the output tuples * are broad casted to every parallel instance of the next component. This @@ -296,7 +334,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def broadcast: DataStream[T] = javaStream.broadcast() - + /** * Sets the partitioning of the DataStream so that the output values all go to * the first instance of the next processing operator. Use this setting with care diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index aa1c219b71b93..5d44e6bfe3cb7 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -20,11 +20,12 @@ package org.apache.flink.streaming.api.scala import java.lang -import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, + Partitioner, FoldFunction, Function} import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.functions.co.CoMapFunction -import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph, StreamNode} +import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph} import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator} import org.apache.flink.streaming.api.windowing.helper.Count import org.apache.flink.streaming.runtime.partitioner._ @@ -105,6 +106,36 @@ class DataStreamTest { assert(isPartitioned(graph.getStreamEdge(group3.getId, createDownStreamId(group3)))) assert(isPartitioned(graph.getStreamEdge(group4.getId, createDownStreamId(group4)))) + //Testing DataStream partitioning + val partition1: DataStream[_] = src1.partitionByHash(0) + val partition2: DataStream[_] = src1.partitionByHash(1, 0) + val partition3: DataStream[_] = src1.partitionByHash("_1") + val partition4: DataStream[_] = src1.partitionByHash((x : (Long, Long)) => x._1); + + assert(isPartitioned(graph.getStreamEdge(partition1.getId, createDownStreamId(partition1)))) + assert(isPartitioned(graph.getStreamEdge(partition2.getId, createDownStreamId(partition2)))) + assert(isPartitioned(graph.getStreamEdge(partition3.getId, createDownStreamId(partition3)))) + assert(isPartitioned(graph.getStreamEdge(partition4.getId, createDownStreamId(partition4)))) + + // Testing DataStream custom partitioning + val longPartitioner: Partitioner[Long] = new Partitioner[Long] { + override def partition(key: Long, numPartitions: Int): Int = 0 + } + + val customPartition1: DataStream[_] = + src1.partitionCustom(longPartitioner, 0) + val customPartition3: DataStream[_] = + src1.partitionCustom(longPartitioner, "_1") + val customPartition4: DataStream[_] = + src1.partitionCustom(longPartitioner, (x : (Long, Long)) => x._1) + + assert(isCustomPartitioned( + graph.getStreamEdge(customPartition1.getId, createDownStreamId(customPartition1)))) + assert(isCustomPartitioned( + graph.getStreamEdge(customPartition3.getId, createDownStreamId(customPartition3)))) + assert(isCustomPartitioned( + graph.getStreamEdge(customPartition4.getId, createDownStreamId(customPartition4)))) + //Testing ConnectedDataStream grouping val connectedGroup1: ConnectedDataStream[_, _] = connected.groupBy(0, 0) val downStreamId1: Integer = createDownStreamId(connectedGroup1) @@ -465,6 +496,10 @@ class DataStreamTest { return edge.getPartitioner.isInstanceOf[FieldsPartitioner[_]] } + private def isCustomPartitioned(edge: StreamEdge): Boolean = { + return edge.getPartitioner.isInstanceOf[CustomPartitionerWrapper[_, _]] + } + private def createDownStreamId(dataStream: DataStream[_]): Integer = { return dataStream.print.getId } From a3ee7eb2cfc0a62685c1cd30bea80a6fe26a7410 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Hermann?= Date: Wed, 1 Jul 2015 14:26:33 +0200 Subject: [PATCH 3/3] [FLINK-2138] [streaming] Added docs and tests for partitioning --- docs/apis/programming_guide.md | 13 + docs/apis/streaming_guide.md | 19 ++ .../flink/streaming/api/PartitionerTest.java | 229 ++++++++++++++++++ 3 files changed, 261 insertions(+) create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md index 17903a9649176..edf20035f2c72 100644 --- a/docs/apis/programming_guide.md +++ b/docs/apis/programming_guide.md @@ -957,6 +957,19 @@ val result = in.partitionByHash(0).mapPartition { ... } + + Custom Partitioning + +

Manually specify a partitioning over the data. +
+ Note: This method works only on single field keys.

+{% highlight scala %} +val in: DataSet[(Int, String)] = // [...] +val result = in + .partitionCustom(partitioner: Partitioner[K], key) +{% endhighlight %} + + Sort Partition diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md index c612b6959661a..fd0f8df3dc6c4 100644 --- a/docs/apis/streaming_guide.md +++ b/docs/apis/streaming_guide.md @@ -311,6 +311,25 @@ Usage: `dataStream.broadcast()` * *Global*: All data points are directed to the first instance of the operator. Usage: `dataStream.global()` +Custom partitioning can also be used by giving a Partitioner function and a single field key to partition on, similarly to the batch API. +
+
+{% highlight java %} +DataStream> in = // [...] +DataStream> result =in + .partitionCustom(Partitioner partitioner, key) +{% endhighlight %} +
+
+ +{% highlight scala %} +val in: DataSet[(Int, String)] = // [...] +val result = in + .partitionCustom(partitioner: Partitioner[K], key) +{% endhighlight %} +
+
+ By default *Forward* partitioning is used. Partitioning does not remain in effect after a transformation, so it needs to be set again for subsequent operations. diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java new file mode 100644 index 0000000000000..c8588341d57d5 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.TestListResultSink; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.junit.Test; + +/** + * IT case that tests the different stream partitioning schemes. + */ +public class PartitionerTest { + + public static final int PARALLELISM = 3; + public static final int MEMORY_SIZE = 32; + + @Test + public void partitionerTest() { + + TestListResultSink> hashPartitionResultSink = + new TestListResultSink>(); + TestListResultSink> customPartitionResultSink = + new TestListResultSink>(); + TestListResultSink> broadcastPartitionResultSink = + new TestListResultSink>(); + TestListResultSink> forwardPartitionResultSink = + new TestListResultSink>(); + TestListResultSink> rebalancePartitionResultSink = + new TestListResultSink>(); + TestListResultSink> globalPartitionResultSink = + new TestListResultSink>(); + + + StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORY_SIZE); + DataStream> src = env.fromElements( + new Tuple1("a"), + new Tuple1("b"), + new Tuple1("b"), + new Tuple1("a"), + new Tuple1("a"), + new Tuple1("c"), + new Tuple1("a") + ); + + // partition by hash + src + .partitionByHash(0) + .map(new SubtaskIndexAssigner()) + .addSink(hashPartitionResultSink); + + // partition custom + DataStream> partitionCustom = src + .partitionCustom(new Partitioner() { + @Override + public int partition(String key, int numPartitions) { + if (key.equals("c")) { + return 2; + } else { + return 0; + } + } + }, 0) + .map(new SubtaskIndexAssigner()); + + partitionCustom.addSink(customPartitionResultSink); + + // partition broadcast + src.broadcast().map(new SubtaskIndexAssigner()).addSink(broadcastPartitionResultSink); + + // partition forward + src.map(new SubtaskIndexAssigner()).addSink(forwardPartitionResultSink); + + // partition rebalance + src.rebalance().map(new SubtaskIndexAssigner()).addSink(rebalancePartitionResultSink); + + // partition global + src.global().map(new SubtaskIndexAssigner()).addSink(globalPartitionResultSink); + + try { + env.execute(); + } catch (Exception e) { + fail(e.getMessage()); + } + + List> hashPartitionResult = hashPartitionResultSink.getResult(); + List> customPartitionResult = customPartitionResultSink.getResult(); + List> broadcastPartitionResult = broadcastPartitionResultSink.getResult(); + List> forwardPartitionResult = forwardPartitionResultSink.getResult(); + List> rebalancePartitionResult = rebalancePartitionResultSink.getResult(); + List> globalPartitionResult = globalPartitionResultSink.getResult(); + + verifyHashPartitioning(hashPartitionResult); + verifyCustomPartitioning(customPartitionResult); + verifyBroadcastPartitioning(broadcastPartitionResult); + verifyRebalancePartitioning(forwardPartitionResult); + verifyRebalancePartitioning(rebalancePartitionResult); + verifyGlobalPartitioning(globalPartitionResult); + } + + private static void verifyHashPartitioning(List> hashPartitionResult) { + HashMap verifier = new HashMap(); + for (Tuple2 elem : hashPartitionResult) { + Integer subtaskIndex = verifier.get(elem.f1); + if (subtaskIndex == null) { + verifier.put(elem.f1, elem.f0); + } else if (subtaskIndex != elem.f0) { + fail(); + } + } + } + + private static void verifyCustomPartitioning(List> customPartitionResult) { + for (Tuple2 stringWithSubtask : customPartitionResult) { + if (stringWithSubtask.f1.equals("c")) { + assertEquals(new Integer(2), stringWithSubtask.f0); + } else { + assertEquals(new Integer(0), stringWithSubtask.f0); + } + } + } + + private static void verifyBroadcastPartitioning(List> broadcastPartitionResult) { + List> expected = Arrays.asList( + new Tuple2(0, "a"), + new Tuple2(0, "b"), + new Tuple2(0, "b"), + new Tuple2(0, "a"), + new Tuple2(0, "a"), + new Tuple2(0, "c"), + new Tuple2(0, "a"), + new Tuple2(1, "a"), + new Tuple2(1, "b"), + new Tuple2(1, "b"), + new Tuple2(1, "a"), + new Tuple2(1, "a"), + new Tuple2(1, "c"), + new Tuple2(1, "a"), + new Tuple2(2, "a"), + new Tuple2(2, "b"), + new Tuple2(2, "b"), + new Tuple2(2, "a"), + new Tuple2(2, "a"), + new Tuple2(2, "c"), + new Tuple2(2, "a")); + + assertEquals( + new HashSet>(expected), + new HashSet>(broadcastPartitionResult)); + } + + private static void verifyRebalancePartitioning(List> rebalancePartitionResult) { + List> expected = Arrays.asList( + new Tuple2(0, "a"), + new Tuple2(1, "b"), + new Tuple2(2, "b"), + new Tuple2(0, "a"), + new Tuple2(1, "a"), + new Tuple2(2, "c"), + new Tuple2(0, "a")); + + assertEquals( + new HashSet>(expected), + new HashSet>(rebalancePartitionResult)); + } + + private static void verifyGlobalPartitioning(List> globalPartitionResult) { + List> expected = Arrays.asList( + new Tuple2(0, "a"), + new Tuple2(0, "b"), + new Tuple2(0, "b"), + new Tuple2(0, "a"), + new Tuple2(0, "a"), + new Tuple2(0, "c"), + new Tuple2(0, "a")); + + assertEquals( + new HashSet>(expected), + new HashSet>(globalPartitionResult)); + } + + private static class SubtaskIndexAssigner + extends RichMapFunction, Tuple2> { + + private int indexOfSubtask; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + RuntimeContext runtimeContext = getRuntimeContext(); + indexOfSubtask = runtimeContext.getIndexOfThisSubtask(); + } + + @Override + public Tuple2 map(Tuple1 value) throws Exception { + return new Tuple2(indexOfSubtask, value.f0); + } + } +}