From c337073bfc458d071da0536172bd300e763ceac2 Mon Sep 17 00:00:00 2001 From: gallenvara Date: Thu, 28 Jan 2016 01:34:10 +0800 Subject: [PATCH 1/7] Enable range partition with custom data distribution. --- .../operators/base/PartitionOperatorBase.java | 11 ++ .../api/java/operators/PartitionOperator.java | 28 +-- .../flink/api/java/utils/DataSetUtils.java | 33 ++++ .../flink/optimizer/dag/PartitionNode.java | 9 +- .../dataproperties/GlobalProperties.java | 9 + .../apache/flink/optimizer/plan/Channel.java | 1 + .../traversals/RangePartitionRewriter.java | 10 +- .../flink/api/scala/utils/package.scala | 60 +++++++ .../test/distribution/CustomDistribution.java | 164 ++++++++++++++++++ .../CustomDistributionITCase.java | 137 +++++++++++++++ 10 files changed, 445 insertions(+), 17 deletions(-) create mode 100644 flink-tests/src/test/java/org/apache/flink/test/distribution/CustomDistribution.java create mode 100644 flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java index fd71facb576f5..4b802aa1ed11e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.NoOpFunction; @@ -49,6 +50,8 @@ public static enum PartitionMethod { private Partitioner customPartitioner; + private DataDistribution distribution; + public PartitionOperatorBase(UnaryOperatorInformation operatorInfo, PartitionMethod pMethod, int[] keys, String name) { super(new UserCodeObjectWrapper(new NoOpFunction()), operatorInfo, keys, name); @@ -70,6 +73,14 @@ public Partitioner getCustomPartitioner() { return customPartitioner; } + public DataDistribution getDistribution() { + return this.distribution; + } + + public void setDistribution(DataDistribution distribution) { + this.distribution = distribution; + } + public void setCustomPartitioner(Partitioner customPartitioner) { if (customPartitioner != null) { int[] keys = getKeyColumns(0); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java index b2b9f6ed03a5e..db198ed669c11 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Operator; @@ -45,28 +46,33 @@ public class PartitionOperator extends SingleInputOperator customPartitioner; - - + private final DataDistribution distribution; + + public PartitionOperator(DataSet input, PartitionMethod pMethod, Keys pKeys, String partitionLocationName) { - this(input, pMethod, pKeys, null, null, partitionLocationName); + this(input, pMethod, pKeys, null, null, null, partitionLocationName); } - + + public PartitionOperator(DataSet input, PartitionMethod pMethod, Keys pKeys, DataDistribution distribution, String partitionLocationName) { + this(input, pMethod, pKeys, null, null, distribution, partitionLocationName); + } + public PartitionOperator(DataSet input, PartitionMethod pMethod, String partitionLocationName) { - this(input, pMethod, null, null, null, partitionLocationName); + this(input, pMethod, null, null, null, null, partitionLocationName); } public PartitionOperator(DataSet input, Keys pKeys, Partitioner customPartitioner, String partitionLocationName) { - this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, null, partitionLocationName); + this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, null, null, partitionLocationName); } - public

PartitionOperator(DataSet input, Keys pKeys, Partitioner

customPartitioner, + public

PartitionOperator(DataSet input, Keys pKeys, Partitioner

customPartitioner, TypeInformation

partitionerTypeInfo, String partitionLocationName) { - this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, partitionerTypeInfo, partitionLocationName); + this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, partitionerTypeInfo, null, partitionLocationName); } - private

PartitionOperator(DataSet input, PartitionMethod pMethod, Keys pKeys, Partitioner

customPartitioner, - TypeInformation

partitionerTypeInfo, String partitionLocationName) + private

PartitionOperator(DataSet input, PartitionMethod pMethod, Keys pKeys, Partitioner

customPartitioner, + TypeInformation

partitionerTypeInfo, DataDistribution distribution, String partitionLocationName) { super(input, input.getType()); @@ -82,6 +88,7 @@ private

PartitionOperator(DataSet input, PartitionMethod pMethod, Keys this.pKeys = pKeys; this.partitionLocationName = partitionLocationName; this.customPartitioner = customPartitioner; + this.distribution = distribution; } // -------------------------------------------------------------------------------------------- @@ -125,6 +132,7 @@ else if (pMethod == PartitionMethod.HASH || pMethod == PartitionMethod.CUSTOM || PartitionOperatorBase partitionedInput = new PartitionOperatorBase<>(operatorInfo, pMethod, logicalKeyPositions, name); partitionedInput.setInput(input); partitionedInput.setParallelism(getParallelism()); + partitionedInput.setDistribution(distribution); partitionedInput.setCustomPartitioner(customPartitioner); return partitionedInput; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java index 78e52319820e4..61a71aa32a75b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java @@ -21,16 +21,23 @@ import com.google.common.collect.Lists; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.base.PartitionOperatorBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.SampleInCoordinator; import org.apache.flink.api.java.functions.SampleInPartition; import org.apache.flink.api.java.functions.SampleWithFraction; import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.operators.MapPartitionOperator; +import org.apache.flink.api.java.operators.PartitionOperator; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.AbstractID; import org.apache.flink.util.Collector; @@ -250,6 +257,32 @@ public static DataSet sampleWithSize( return new GroupReduceOperator<>(mapPartitionOperator, input.getType(), sampleInCoordinator, callLocation); } + // -------------------------------------------------------------------------------------------- + // Partition + // -------------------------------------------------------------------------------------------- + + /** + * Range-partitions a DataSet on the specified tuple field positions. + */ + public static PartitionOperator partitionByRange(DataSet input, DataDistribution distribution, int... fields) { + return new PartitionOperator<>(input, PartitionOperatorBase.PartitionMethod.RANGE, new Keys.ExpressionKeys<>(fields, input.getType(), false), distribution, Utils.getCallLocationName()); + } + + /** + * Range-partitions a DataSet on the specified fields. + */ + public static PartitionOperator partitionByRange(DataSet input, DataDistribution distribution, String... fields) { + return new PartitionOperator<>(input, PartitionOperatorBase.PartitionMethod.RANGE, new Keys.ExpressionKeys<>(fields, input.getType()), distribution, Utils.getCallLocationName()); + } + + /** + * Range-partitions a DataSet using the specified key selector function. + */ + public static > PartitionOperator partitionByRange(DataSet input, DataDistribution distribution, KeySelector keyExtractor) { + final TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input.getType()); + return new PartitionOperator<>(input, PartitionOperatorBase.PartitionMethod.RANGE, new Keys.SelectorFunctionKeys<>(input.clean(keyExtractor), input.getType(), keyType), distribution, Utils.getCallLocationName()); + } + // -------------------------------------------------------------------------------------------- // Checksum // -------------------------------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java index 65a6e0237868a..9ecea6b061716 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; +import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; @@ -51,7 +52,7 @@ public PartitionNode(PartitionOperatorBase operator) { super(operator); OperatorDescriptorSingle descr = new PartitionDescriptor( - this.getOperator().getPartitionMethod(), this.keys, operator.getCustomPartitioner()); + this.getOperator().getPartitionMethod(), this.keys, operator.getCustomPartitioner(), operator.getDistribution()); this.possibleProperties = Collections.singletonList(descr); } @@ -88,12 +89,14 @@ public static class PartitionDescriptor extends OperatorDescriptorSingle { private final PartitionMethod pMethod; private final Partitioner customPartitioner; + private final DataDistribution distribution; - public PartitionDescriptor(PartitionMethod pMethod, FieldSet pKeys, Partitioner customPartitioner) { + public PartitionDescriptor(PartitionMethod pMethod, FieldSet pKeys, Partitioner customPartitioner, DataDistribution distribution) { super(pKeys); this.pMethod = pMethod; this.customPartitioner = customPartitioner; + this.distribution = distribution; } @Override @@ -127,7 +130,7 @@ protected List createPossibleGlobalProperties() { for (int field : this.keys) { ordering.appendOrdering(field, null, Order.ASCENDING); } - rgps.setRangePartitioned(ordering); + rgps.setRangePartitioned(ordering, distribution); break; default: throw new IllegalArgumentException("Invalid partition method"); diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java index 57ba29d06b1f8..253d5e0df653a 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; @@ -55,6 +56,8 @@ public class GlobalProperties implements Cloneable { private Partitioner customPartitioner; + private DataDistribution distribution; + // -------------------------------------------------------------------------------------------- /** @@ -92,6 +95,10 @@ public void setRangePartitioned(Ordering ordering) { this.partitioningFields = ordering.getInvolvedIndexes(); } + public void setDataDistribution(DataDistribution distribution) { + this.distribution = distribution; + } + public void setAnyPartitioning(FieldList partitionedFields) { if (partitionedFields == null) { throw new NullPointerException(); @@ -167,6 +174,8 @@ public Partitioner getCustomPartitioner() { return this.customPartitioner; } + public DataDistribution getDataDistribution() {return this.distribution;} + // -------------------------------------------------------------------------------------------- public boolean isPartitionedOnFields(FieldSet fields) { diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java index 508cc9505acdc..cb396a5ef9ddf 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java @@ -428,6 +428,7 @@ public GlobalProperties getGlobalProperties() { break; case PARTITION_RANGE: this.globalProps.setRangePartitioned(Utils.createOrdering(this.shipKeys, this.shipSortOrder)); + this.globalProps.setDataDistribution(this.dataDistribution); break; case FORWARD: break; diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java index 7656dfd172901..782b99e650783 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java @@ -113,10 +113,12 @@ public void postVisit(PlanNode node) { throw new InvalidProgramException("Range Partitioning not supported within iterations."); } - PlanNode channelSource = channel.getSource(); - List newSourceOutputChannels = rewriteRangePartitionChannel(channel); - channelSource.getOutgoingChannels().remove(channel); - channelSource.getOutgoingChannels().addAll(newSourceOutputChannels); + if (channel.getDataDistribution() == null) { + PlanNode channelSource = channel.getSource(); + List newSourceOutputChannels = rewriteRangePartitionChannel(channel); + channelSource.getOutgoingChannels().remove(channel); + channelSource.getOutgoingChannels().addAll(newSourceOutputChannels); + } } } } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala index 6407093cb58d2..adad9ab4ade34 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala @@ -19,9 +19,16 @@ package org.apache.flink.api.scala import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.distributions.DataDistribution +import org.apache.flink.api.common.operators.Keys +import org.apache.flink.api.common.operators.base.PartitionOperatorBase +import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.Utils import org.apache.flink.api.java.Utils.ChecksumHashCode +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.operators.PartitionOperator +import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.java.utils.{DataSetUtils => jutils} import org.apache.flink.util.AbstractID @@ -109,6 +116,59 @@ package object utils { wrap(jutils.sampleWithSize(self.javaSet, withReplacement, numSamples, seed)) } + // -------------------------------------------------------------------------------------------- + // Partitioning + // -------------------------------------------------------------------------------------------- + + /** + * Range-partitions a DataSet on the specified tuple field positions. + */ + def partitionByRange(distribution: DataDistribution, fields: Int*): DataSet[T] = { + val op = new PartitionOperator[T]( + self.javaSet, + PartitionMethod.RANGE, + new Keys.ExpressionKeys[T](fields.toArray, self.javaSet.getType), + distribution, + getCallLocationName()) + wrap(op) + } + + /** + * Range-partitions a DataSet on the specified fields. + */ + def partitionByRange(distribution: DataDistribution, + firstField: String, + otherFields: String*): DataSet[T] = { + val op = new PartitionOperator[T]( + self.javaSet, + PartitionMethod.RANGE, + new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, self.javaSet.getType), + distribution, + getCallLocationName()) + wrap(op) + } + + /** + * Range-partitions a DataSet using the specified key selector function. + */ + def partitionByRange[K: TypeInformation](distribution: DataDistribution, + fun: T => K): DataSet[T] = { + val keyExtractor = new KeySelector[T, K] { + val cleanFun = self.javaSet.clean(fun) + def getKey(in: T) = cleanFun(in) + } + val op = new PartitionOperator[T]( + self.javaSet, + PartitionMethod.RANGE, + new Keys.SelectorFunctionKeys[T, K]( + keyExtractor, + self.javaSet.getType, + implicitly[TypeInformation[K]]), + distribution, + getCallLocationName()) + wrap(op) + } + // -------------------------------------------------------------------------------------------- // Checksum // -------------------------------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/distribution/CustomDistribution.java b/flink-tests/src/test/java/org/apache/flink/test/distribution/CustomDistribution.java new file mode 100644 index 0000000000000..c04e6d18cded9 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/distribution/CustomDistribution.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.distribution; + +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.types.Value; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; + +/** + * The class is used to do the tests of range partition with customed data distribution. + */ +public class CustomDistribution implements DataDistribution { + + protected Value[][] boundaries; + + protected int dim; + + public CustomDistribution() {} + + public CustomDistribution(Value[] bucketBoundaries) { + + if (bucketBoundaries == null) { + throw new IllegalArgumentException("source key cannot be null"); + } + + dim = 1; + + @SuppressWarnings("unchecked") + Class clazz = bucketBoundaries[0].getClass(); + + boundaries = new Value[bucketBoundaries.length][]; + for (int i = 0; i < bucketBoundaries.length; i++) { + if (bucketBoundaries[i].getClass() != clazz) { + throw new IllegalArgumentException("The bucket boundaries are of different class types."); + } + boundaries[i] = new Value[] { bucketBoundaries[i] }; + } + } + + public CustomDistribution(Value[][] bucketBoundaries) { + if (bucketBoundaries == null) { + throw new IllegalArgumentException("Bucket boundaries must not be null."); + } + if (bucketBoundaries.length == 0) { + throw new IllegalArgumentException("Bucket boundaries must not be empty."); + } + + // dimensionality is one in this case + dim = bucketBoundaries[0].length; + + Class[] types = new Class[dim]; + for (int i = 0; i < dim; i++) { + types[i] = bucketBoundaries[0][i].getClass(); + } + + // check the array + for (int i = 1; i < bucketBoundaries.length; i++) { + if (bucketBoundaries[i].length != dim) { + throw new IllegalArgumentException("All bucket boundaries must have the same dimensionality."); + } + for (int d = 0; d < dim; d++) { + if (types[d] != bucketBoundaries[i][d].getClass()) { + throw new IllegalArgumentException("The bucket boundaries are of different class types."); + } + } + } + + boundaries = bucketBoundaries; + } + + @Override + public int getNumberOfFields() { + return this.dim; + } + + @Override + public Value[] getBucketBoundary(int bucketNum, int totalNumBuckets) { + // check validity of arguments + if(bucketNum < 0) { + throw new IllegalArgumentException("Requested bucket must be greater than or equal to 0."); + } else if(bucketNum >= (totalNumBuckets)) { + throw new IllegalArgumentException("Request bucket must be smaller than the total number of buckets."); + } + if(totalNumBuckets < 1) { + throw new IllegalArgumentException("Total number of bucket must be larger than 0."); + } + + final int maxNumBuckets = boundaries.length + 1; + final int n = maxNumBuckets / totalNumBuckets; + final int bucketId = bucketNum * n + (n - 1); + + return boundaries[bucketId]; + + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(this.dim); + out.writeInt(boundaries.length); + + // write types + for (int i = 0; i < dim; i++) { + out.writeUTF(boundaries[0][i].getClass().getName()); + } + + for (int i = 0; i < boundaries.length; i++) { + for (int d = 0; d < dim; d++) { + boundaries[i][d].write(out); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public void read(DataInputView in) throws IOException { + this.dim = in.readInt(); + final int len = in.readInt(); + + boundaries = new Value[len][]; + + // read types + Class[] types = new Class[dim]; + for (int i = 0; i < dim; i++) { + String className = in.readUTF(); + try { + types[i] = Class.forName(className, true, getClass().getClassLoader()).asSubclass(Value.class); + } catch (ClassNotFoundException e) { + throw new IOException("Could not load type class '" + className + "'."); + } catch (Throwable t) { + throw new IOException("Error loading type class '" + className + "'.", t); + } + } + + for (int i = 0; i < len; i++) { + Value[] bucket = new Value[dim]; + for (int d = 0; d < dim; d++) { + Value val = InstantiationUtil.instantiate(types[d], Value.class); + val.read(in); + bucket[d] = val; + } + boundaries[i] = bucket; + } + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java new file mode 100644 index 0000000000000..81096ddb32ca4 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.test.distribution.CustomDistribution; +import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.util.Collector; +import org.junit.Test; + + +import static org.junit.Assert.assertEquals; + + +public class CustomDistributionITCase { + + @Test + public void testRangeWithDistribution1() throws Exception{ + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + + DataSet> input1 = env.fromElements( + new Tuple3<>(1, 1, "Hi"), + new Tuple3<>(1, 2, "Hello"), + new Tuple3<>(1, 3, "Hello world"), + new Tuple3<>(2, 4, "Hello world, how are you?"), + new Tuple3<>(2, 5, "I am fine."), + new Tuple3<>(3, 6, "Luke Skywalker"), + new Tuple3<>(4, 7, "Comment#1"), + new Tuple3<>(4, 8, "Comment#2"), + new Tuple3<>(4, 9, "Comment#3"), + new Tuple3<>(5, 10, "Comment#4")); + + IntValue[] keys = new IntValue[4]; + + env.setParallelism(5); + + for (int i = 0; i < keys.length; i++) + { + keys[i] = new IntValue(i + 1); + } + + CustomDistribution cd = new CustomDistribution(keys); + + DataSet> out1 = DataSetUtils.partitionByRange(input1.mapPartition( + new MapPartitionFunction, Tuple1>() { + public void mapPartition(Iterable> values, Collector> out) { + IntValue key1; + for (Tuple3 s : values) { + key1 = new IntValue(s.f0); + out.collect(new Tuple1<>(key1)); + } + } + }), cd, 0).groupBy(0).sum(0); + + String expected = "[(3), (4), (3), (12), (5)]"; + assertEquals(expected, out1.collect().toString()); + } + + @Test + public void testRangeWithDistribution2() throws Exception{ + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + + DataSet, String>> input1 = env.fromElements( + new Tuple2<>(new Tuple2<>(1, 1), "Hi"), + new Tuple2<>(new Tuple2<>(1, 2), "Hello"), + new Tuple2<>(new Tuple2<>(1, 3), "Hello world"), + new Tuple2<>(new Tuple2<>(2, 4), "Hello world, how are you?"), + new Tuple2<>(new Tuple2<>(2, 5), "I am fine."), + new Tuple2<>(new Tuple2<>(3, 6), "Luke Skywalker"), + new Tuple2<>(new Tuple2<>(4, 7), "Comment#1"), + new Tuple2<>(new Tuple2<>(4, 8), "Comment#2"), + new Tuple2<>(new Tuple2<>(4, 9), "Comment#3"), + new Tuple2<>(new Tuple2<>(5, 10), "Comment#4")); + + IntValue[][] keys = new IntValue[2][2]; + + env.setParallelism(3); + + for (int i = 0; i < 2; i++) + { + for (int j = 0; j < 2; j++) + { + keys[i][j] = new IntValue(i + j); + } + } + + CustomDistribution cd = new CustomDistribution(keys); + + DataSet> out1= DataSetUtils.partitionByRange(input1.mapPartition( + new MapPartitionFunction, String>, Tuple1>>() { + public void mapPartition(Iterable, String>> values, Collector>> out) { + IntValue key1; + IntValue key2; + for (Tuple2, String> s : values) { + key1 = new IntValue(s.f0.f0); + key2 = new IntValue(s.f0.f1); + out.collect(new Tuple1<>(new Tuple2<>(key1, key2))); + } + } + }), cd, 0).mapPartition(new MapPartitionFunction>, Tuple1>() { + public void mapPartition(Iterable>> values, Collector> out) { + Tuple1 key; + for (Tuple1> s : values) { + key = new Tuple1<>(s.f0.f0); + out.collect(key); + } + } + }).groupBy(0).sum(0); + + String expected = "[(1), (4), (2), (3), (5), (12)]"; + assertEquals(expected, out1.collect().toString()); + } +} From 1970474af53756ef0c9a31f8b539d8419e887f6d Mon Sep 17 00:00:00 2001 From: gallenvara Date: Thu, 10 Mar 2016 17:15:35 +0800 Subject: [PATCH 2/7] Modify the first test of range partition with two key fields. --- .../CustomDistributionITCase.java | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java index 81096ddb32ca4..39fc25ace0c58 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java @@ -52,30 +52,35 @@ public void testRangeWithDistribution1() throws Exception{ new Tuple3<>(4, 8, "Comment#2"), new Tuple3<>(4, 9, "Comment#3"), new Tuple3<>(5, 10, "Comment#4")); - - IntValue[] keys = new IntValue[4]; - - env.setParallelism(5); - for (int i = 0; i < keys.length; i++) + IntValue[][] keys = new IntValue[2][2]; + + env.setParallelism(3); + + for (int i = 0; i < 2; i++) { - keys[i] = new IntValue(i + 1); + for (int j = 0; j < 2; j++) + { + keys[i][j] = new IntValue(i + j); + } } CustomDistribution cd = new CustomDistribution(keys); - DataSet> out1 = DataSetUtils.partitionByRange(input1.mapPartition( - new MapPartitionFunction, Tuple1>() { - public void mapPartition(Iterable> values, Collector> out) { + DataSet> out1 = DataSetUtils.partitionByRange(input1.mapPartition( + new MapPartitionFunction, Tuple2>() { + public void mapPartition(Iterable> values, Collector> out) { IntValue key1; + IntValue key2; for (Tuple3 s : values) { key1 = new IntValue(s.f0); - out.collect(new Tuple1<>(key1)); + key2 = new IntValue(s.f1); + out.collect(new Tuple2<>(key1, key2)); } } - }), cd, 0).groupBy(0).sum(0); + }), cd, 0, 1).groupBy(0).sum(0); - String expected = "[(3), (4), (3), (12), (5)]"; + String expected = "[(1,3), (4,5), (2,2), (3,6), (5,10), (12,9)]"; assertEquals(expected, out1.collect().toString()); } From 7e2783fd83f3e7e21fed07a600a85cdfdfd49abe Mon Sep 17 00:00:00 2001 From: gallenvara Date: Mon, 14 Mar 2016 18:12:52 +0800 Subject: [PATCH 3/7] Modify the test to validate the partitioning correctness. --- .../api/java/operators/PartitionOperator.java | 2 +- .../dataproperties/GlobalProperties.java | 17 ++- .../apache/flink/optimizer/plan/Channel.java | 3 +- .../traversals/RangePartitionRewriter.java | 10 +- .../GlobalPropertiesFilteringTest.java | 6 +- .../GlobalPropertiesMatchingTest.java | 26 ++-- .../CustomDistributionITCase.java | 127 ++++++++++-------- 7 files changed, 107 insertions(+), 84 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java index db198ed669c11..cb733d9db0f08 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java @@ -79,7 +79,7 @@ private

PartitionOperator(DataSet input, PartitionMethod pMethod, Keys Preconditions.checkNotNull(pMethod); Preconditions.checkArgument(pKeys != null || pMethod == PartitionMethod.REBALANCE, "Partitioning requires keys"); Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM || customPartitioner != null, "Custom partioning requires a partitioner."); - + Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition."); if (customPartitioner != null) { pKeys.validateCustomPartitioner(customPartitioner, partitionerTypeInfo); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java index 253d5e0df653a..2a35784065d55 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java @@ -83,9 +83,15 @@ public void setHashPartitioned(FieldList partitionedFields) { this.partitioningFields = partitionedFields; this.ordering = null; } - - public void setRangePartitioned(Ordering ordering) { + /** + * Set the parameters for range partition. + * + * @param ordering Order of the partitioned fields + * @param distribution The data distribution for range partition. User can supply a customized data distribution, + * also the data distribution can be null. + */ + public void setRangePartitioned(Ordering ordering, DataDistribution distribution) { if (ordering == null) { throw new NullPointerException(); } @@ -93,9 +99,6 @@ public void setRangePartitioned(Ordering ordering) { this.partitioning = PartitioningProperty.RANGE_PARTITIONED; this.ordering = ordering; this.partitioningFields = ordering.getInvolvedIndexes(); - } - - public void setDataDistribution(DataDistribution distribution) { this.distribution = distribution; } @@ -174,7 +177,9 @@ public Partitioner getCustomPartitioner() { return this.customPartitioner; } - public DataDistribution getDataDistribution() {return this.distribution;} + public DataDistribution getDataDistribution() { + return this.distribution; + } // -------------------------------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java index cb396a5ef9ddf..bd2a5949fbb14 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java @@ -427,8 +427,7 @@ public GlobalProperties getGlobalProperties() { this.globalProps.setHashPartitioned(this.shipKeys); break; case PARTITION_RANGE: - this.globalProps.setRangePartitioned(Utils.createOrdering(this.shipKeys, this.shipSortOrder)); - this.globalProps.setDataDistribution(this.dataDistribution); + this.globalProps.setRangePartitioned(Utils.createOrdering(this.shipKeys, this.shipSortOrder), this.dataDistribution); break; case FORWARD: break; diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java index 782b99e650783..f2798014512af 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java @@ -109,11 +109,11 @@ public void postVisit(PlanNode node) { // Make sure we only optimize the DAG for range partition, and do not optimize multi times. if (shipStrategy == ShipStrategyType.PARTITION_RANGE) { - if(node.isOnDynamicPath()) { - throw new InvalidProgramException("Range Partitioning not supported within iterations."); - } + if(channel.getDataDistribution() == null) { + if (node.isOnDynamicPath()) { + throw new InvalidProgramException("Range Partitioning not supported within iterations if users do not supply the data distribution."); + } - if (channel.getDataDistribution() == null) { PlanNode channelSource = channel.getSource(); List newSourceOutputChannels = rewriteRangePartitionChannel(channel); channelSource.getOutgoingChannels().remove(channel); @@ -223,7 +223,7 @@ private List rewriteRangePartitionChannel(Channel channel) { prRemoverNode.setParallelism(targetParallelism); prPlanNode.setParallelism(targetParallelism); GlobalProperties globalProperties = new GlobalProperties(); - globalProperties.setRangePartitioned(new Ordering(0, null, Order.ASCENDING)); + globalProperties.setRangePartitioned(new Ordering(0, null, Order.ASCENDING), channel.getDataDistribution()); prPlanNode.initProperties(globalProperties, new LocalProperties()); prPlanNode.setCosts(defaultZeroCosts); this.plan.getAllNodes().add(prPlanNode); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java index 3e32905f45ad6..3b3dbf2f280e1 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java @@ -258,7 +258,7 @@ public void testRangePartitioningPreserved1() { o.appendOrdering(5, LongValue.class, Order.DESCENDING); o.appendOrdering(2, StringValue.class, Order.ASCENDING); GlobalProperties gprops = new GlobalProperties(); - gprops.setRangePartitioned(o); + gprops.setRangePartitioned(o, null); GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); @@ -292,7 +292,7 @@ public void testRangePartitioningPreserved2() { o.appendOrdering(5, LongValue.class, Order.DESCENDING); o.appendOrdering(2, StringValue.class, Order.ASCENDING); GlobalProperties gprops = new GlobalProperties(); - gprops.setRangePartitioned(o); + gprops.setRangePartitioned(o, null); GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); @@ -326,7 +326,7 @@ public void testRangePartitioningErased() { o.appendOrdering(5, LongValue.class, Order.DESCENDING); o.appendOrdering(2, StringValue.class, Order.ASCENDING); GlobalProperties gprops = new GlobalProperties(); - gprops.setRangePartitioned(o); + gprops.setRangePartitioned(o, null); GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java index 52826d6c83be2..865c4d27291dc 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java @@ -78,19 +78,19 @@ public void testMatchingAnyPartitioning() { // match range partitioning { GlobalProperties gp1 = new GlobalProperties(); - gp1.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); + gp1.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING), null); assertTrue(req.isMetBy(gp1)); GlobalProperties gp2 = new GlobalProperties(); - gp2.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING)); + gp2.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING), null); assertTrue(req.isMetBy(gp2)); GlobalProperties gp3 = new GlobalProperties(); - gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING)); + gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING), null); assertFalse(req.isMetBy(gp3)); GlobalProperties gp4 = new GlobalProperties(); - gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING)); + gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING), null); assertTrue(req.isMetBy(gp4)); } @@ -149,7 +149,7 @@ public void testMatchingCustomPartitioning() { assertFalse(req.isMetBy(gp2)); GlobalProperties gp3 = new GlobalProperties(); - gp3.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); + gp3.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING), null); assertFalse(req.isMetBy(gp3)); } } @@ -206,19 +206,19 @@ public void testStrictlyMatchingAnyPartitioning() { // match range partitioning { GlobalProperties gp1 = new GlobalProperties(); - gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING)); + gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING), null); assertTrue(req.isMetBy(gp1)); GlobalProperties gp2 = new GlobalProperties(); - gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); + gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING), null); assertFalse(req.isMetBy(gp2)); GlobalProperties gp3 = new GlobalProperties(); - gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING)); + gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING), null); assertFalse(req.isMetBy(gp3)); GlobalProperties gp4 = new GlobalProperties(); - gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING)); + gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING), null); assertFalse(req.isMetBy(gp4)); } @@ -271,19 +271,19 @@ public void testStrictlyMatchingHashPartitioning() { // match range partitioning { GlobalProperties gp1 = new GlobalProperties(); - gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING)); + gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING), null); assertFalse(req.isMetBy(gp1)); GlobalProperties gp2 = new GlobalProperties(); - gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); + gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING), null); assertFalse(req.isMetBy(gp2)); GlobalProperties gp3 = new GlobalProperties(); - gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING)); + gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING), null); assertFalse(req.isMetBy(gp3)); GlobalProperties gp4 = new GlobalProperties(); - gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING)); + gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING), null); assertFalse(req.isMetBy(gp4)); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java index 39fc25ace0c58..943d6a8aea1da 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java @@ -18,12 +18,12 @@ package org.apache.flink.test.javaApiOperators; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.test.distribution.CustomDistribution; -import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.types.IntValue; @@ -31,13 +31,18 @@ import org.junit.Test; -import static org.junit.Assert.assertEquals; +import java.util.List; + +import static org.junit.Assert.assertTrue; public class CustomDistributionITCase { @Test - public void testRangeWithDistribution1() throws Exception{ + public void testPartitionWithDistribution1() throws Exception{ + /* + * Test the record partitioned rightly with one field according to the customized data distribution + */ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); @@ -45,7 +50,7 @@ public void testRangeWithDistribution1() throws Exception{ new Tuple3<>(1, 1, "Hi"), new Tuple3<>(1, 2, "Hello"), new Tuple3<>(1, 3, "Hello world"), - new Tuple3<>(2, 4, "Hello world, how are you?"), + new Tuple3<>(2, 4, "how are you?"), new Tuple3<>(2, 5, "I am fine."), new Tuple3<>(3, 6, "Luke Skywalker"), new Tuple3<>(4, 7, "Comment#1"), @@ -53,39 +58,50 @@ public void testRangeWithDistribution1() throws Exception{ new Tuple3<>(4, 9, "Comment#3"), new Tuple3<>(5, 10, "Comment#4")); - IntValue[][] keys = new IntValue[2][2]; + final IntValue[] keys = new IntValue[3]; - env.setParallelism(3); - - for (int i = 0; i < 2; i++) - { - for (int j = 0; j < 2; j++) - { - keys[i][j] = new IntValue(i + j); - } + for (int i = 0; i < 3; i++) { + keys[i] = new IntValue((i + 1) * 2); } - CustomDistribution cd = new CustomDistribution(keys); + final CustomDistribution cd = new CustomDistribution(keys); - DataSet> out1 = DataSetUtils.partitionByRange(input1.mapPartition( - new MapPartitionFunction, Tuple2>() { - public void mapPartition(Iterable> values, Collector> out) { + env.setParallelism(3); + + DataSet out1 = DataSetUtils.partitionByRange(input1.map(new MapFunction, Tuple2>() { + @Override + public Tuple2 map(Tuple3 value) throws Exception { IntValue key1; IntValue key2; - for (Tuple3 s : values) { - key1 = new IntValue(s.f0); - key2 = new IntValue(s.f1); - out.collect(new Tuple2<>(key1, key2)); + key1 = new IntValue(value.f0); + key2 = new IntValue(value.f1); + return new Tuple2<>(key1, key2); + } + }), cd, 0).mapPartition(new RichMapPartitionFunction, Boolean>() { + @Override + public void mapPartition(Iterable> values, Collector out) throws Exception { + boolean boo = true; + for (Tuple2 s : values) { + IntValue intValues= (IntValue)cd.getBucketBoundary(getRuntimeContext().getIndexOfThisSubtask(), 3)[0]; + if (s.f0.getValue() > intValues.getValue()) { + boo = false; + } } + out.collect(boo); } - }), cd, 0, 1).groupBy(0).sum(0); + }); - String expected = "[(1,3), (4,5), (2,2), (3,6), (5,10), (12,9)]"; - assertEquals(expected, out1.collect().toString()); + List result = out1.collect(); + for (int i = 0; i < result.size(); i++) { + assertTrue("The record is not emitted to the right partition", result.get(i)); + } } @Test public void testRangeWithDistribution2() throws Exception{ + /* + * Test the record partitioned rightly with two fields according to the customized data distribution + */ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); @@ -93,7 +109,7 @@ public void testRangeWithDistribution2() throws Exception{ new Tuple2<>(new Tuple2<>(1, 1), "Hi"), new Tuple2<>(new Tuple2<>(1, 2), "Hello"), new Tuple2<>(new Tuple2<>(1, 3), "Hello world"), - new Tuple2<>(new Tuple2<>(2, 4), "Hello world, how are you?"), + new Tuple2<>(new Tuple2<>(2, 4), "how are you?"), new Tuple2<>(new Tuple2<>(2, 5), "I am fine."), new Tuple2<>(new Tuple2<>(3, 6), "Luke Skywalker"), new Tuple2<>(new Tuple2<>(4, 7), "Comment#1"), @@ -102,41 +118,44 @@ public void testRangeWithDistribution2() throws Exception{ new Tuple2<>(new Tuple2<>(5, 10), "Comment#4")); IntValue[][] keys = new IntValue[2][2]; + env.setParallelism(2); - env.setParallelism(3); + for (int i = 0; i < 2; i++) { + keys[i][0] = new IntValue((i + 1) * 3); + } - for (int i = 0; i < 2; i++) - { - for (int j = 0; j < 2; j++) - { - keys[i][j] = new IntValue(i + j); - } + for (int i = 0; i < 2; i++) { + keys[i][1] = new IntValue((i + 1) * 5); } - CustomDistribution cd = new CustomDistribution(keys); - - DataSet> out1= DataSetUtils.partitionByRange(input1.mapPartition( - new MapPartitionFunction, String>, Tuple1>>() { - public void mapPartition(Iterable, String>> values, Collector>> out) { - IntValue key1; - IntValue key2; - for (Tuple2, String> s : values) { - key1 = new IntValue(s.f0.f0); - key2 = new IntValue(s.f0.f1); - out.collect(new Tuple1<>(new Tuple2<>(key1, key2))); - } + final CustomDistribution cd = new CustomDistribution(keys); + + DataSet out1 = DataSetUtils.partitionByRange(input1.map(new MapFunction, String>, Tuple2>() { + @Override + public Tuple2 map(Tuple2, String> value) throws Exception { + IntValue key1; + IntValue key2; + key1 = new IntValue(value.f0.f0); + key2 = new IntValue(value.f0.f1); + return new Tuple2<>(key1, key2); + } + }), cd, 0, 1).mapPartition(new RichMapPartitionFunction, Boolean>() { + @Override + public void mapPartition(Iterable> values, Collector out) throws Exception { + boolean boo = true; + for (Tuple2 s : values) { + IntValue[] intValues= (IntValue[])cd.getBucketBoundary(getRuntimeContext().getIndexOfThisSubtask(), 3); + if (s.f0.getValue() > intValues[0].getValue() || s.f1.getValue() > intValues[1].getValue()) { + boo = false; } - }), cd, 0).mapPartition(new MapPartitionFunction>, Tuple1>() { - public void mapPartition(Iterable>> values, Collector> out) { - Tuple1 key; - for (Tuple1> s : values) { - key = new Tuple1<>(s.f0.f0); - out.collect(key); } + out.collect(boo); } - }).groupBy(0).sum(0); + }); - String expected = "[(1), (4), (2), (3), (5), (12)]"; - assertEquals(expected, out1.collect().toString()); + List result = out1.collect(); + for (int i = 0; i < result.size(); i++) { + assertTrue("The record is not emitted to the right partition", result.get(i)); + } } } From 0b8beaad5636295ed1d6e02fd5eca254d33ae6d7 Mon Sep 17 00:00:00 2001 From: gallenvara Date: Thu, 17 Mar 2016 10:15:13 +0800 Subject: [PATCH 4/7] Modify the test with built-in data. --- .../distributions/DataDistribution.java | 7 + .../dataproperties/GlobalProperties.java | 15 ++ .../traversals/RangePartitionRewriter.java | 2 +- .../GlobalPropertiesFilteringTest.java | 6 +- .../GlobalPropertiesMatchingTest.java | 26 +-- .../dataproperties/MockDistribution.java | 7 + .../test/distribution/CustomDistribution.java | 164 ------------------ .../flink/test/distribution/TestDataDist.java | 77 ++++++++ .../CustomDistributionITCase.java | 94 +++------- 9 files changed, 147 insertions(+), 251 deletions(-) delete mode 100644 flink-tests/src/test/java/org/apache/flink/test/distribution/CustomDistribution.java create mode 100644 flink-tests/src/test/java/org/apache/flink/test/distribution/TestDataDist.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/distributions/DataDistribution.java b/flink-core/src/main/java/org/apache/flink/api/common/distributions/DataDistribution.java index 321948d8c7819..c0794d6c6b307 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/distributions/DataDistribution.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/distributions/DataDistribution.java @@ -22,6 +22,7 @@ import java.io.Serializable; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.io.IOReadableWritable; @PublicEvolving @@ -57,4 +58,10 @@ public interface DataDistribution extends IOReadableWritable, Serializable { * @return The number of fields in the (composite) key. */ int getNumberOfFields(); + + /** + * Gets the type of the key by which the dataSet is partitioned. + * @return The type of the key by which the dataSet is partitioned. + */ + TypeInformation[] getKeyTypes(); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java index 2a35784065d55..ca17c2ba19fc2 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java @@ -84,6 +84,21 @@ public void setHashPartitioned(FieldList partitionedFields) { this.ordering = null; } + /** + * Set the parameters for range partition. + * + * @param ordering Order of the partitioned fields + */ + public void setRangePartitioned(Ordering ordering) { + if (ordering == null) { + throw new NullPointerException(); + } + + this.partitioning = PartitioningProperty.RANGE_PARTITIONED; + this.ordering = ordering; + this.partitioningFields = ordering.getInvolvedIndexes(); + } + /** * Set the parameters for range partition. * diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java index f2798014512af..b1c5dae7c606b 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java @@ -223,7 +223,7 @@ private List rewriteRangePartitionChannel(Channel channel) { prRemoverNode.setParallelism(targetParallelism); prPlanNode.setParallelism(targetParallelism); GlobalProperties globalProperties = new GlobalProperties(); - globalProperties.setRangePartitioned(new Ordering(0, null, Order.ASCENDING), channel.getDataDistribution()); + globalProperties.setRangePartitioned(new Ordering(0, null, Order.ASCENDING)); prPlanNode.initProperties(globalProperties, new LocalProperties()); prPlanNode.setCosts(defaultZeroCosts); this.plan.getAllNodes().add(prPlanNode); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java index 3b3dbf2f280e1..3e32905f45ad6 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesFilteringTest.java @@ -258,7 +258,7 @@ public void testRangePartitioningPreserved1() { o.appendOrdering(5, LongValue.class, Order.DESCENDING); o.appendOrdering(2, StringValue.class, Order.ASCENDING); GlobalProperties gprops = new GlobalProperties(); - gprops.setRangePartitioned(o, null); + gprops.setRangePartitioned(o); GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); @@ -292,7 +292,7 @@ public void testRangePartitioningPreserved2() { o.appendOrdering(5, LongValue.class, Order.DESCENDING); o.appendOrdering(2, StringValue.class, Order.ASCENDING); GlobalProperties gprops = new GlobalProperties(); - gprops.setRangePartitioned(o, null); + gprops.setRangePartitioned(o); GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); @@ -326,7 +326,7 @@ public void testRangePartitioningErased() { o.appendOrdering(5, LongValue.class, Order.DESCENDING); o.appendOrdering(2, StringValue.class, Order.ASCENDING); GlobalProperties gprops = new GlobalProperties(); - gprops.setRangePartitioned(o, null); + gprops.setRangePartitioned(o); GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java index 865c4d27291dc..52826d6c83be2 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java @@ -78,19 +78,19 @@ public void testMatchingAnyPartitioning() { // match range partitioning { GlobalProperties gp1 = new GlobalProperties(); - gp1.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING), null); + gp1.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); assertTrue(req.isMetBy(gp1)); GlobalProperties gp2 = new GlobalProperties(); - gp2.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING), null); + gp2.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING)); assertTrue(req.isMetBy(gp2)); GlobalProperties gp3 = new GlobalProperties(); - gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING), null); + gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING)); assertFalse(req.isMetBy(gp3)); GlobalProperties gp4 = new GlobalProperties(); - gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING), null); + gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING)); assertTrue(req.isMetBy(gp4)); } @@ -149,7 +149,7 @@ public void testMatchingCustomPartitioning() { assertFalse(req.isMetBy(gp2)); GlobalProperties gp3 = new GlobalProperties(); - gp3.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING), null); + gp3.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); assertFalse(req.isMetBy(gp3)); } } @@ -206,19 +206,19 @@ public void testStrictlyMatchingAnyPartitioning() { // match range partitioning { GlobalProperties gp1 = new GlobalProperties(); - gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING), null); + gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING)); assertTrue(req.isMetBy(gp1)); GlobalProperties gp2 = new GlobalProperties(); - gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING), null); + gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); assertFalse(req.isMetBy(gp2)); GlobalProperties gp3 = new GlobalProperties(); - gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING), null); + gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING)); assertFalse(req.isMetBy(gp3)); GlobalProperties gp4 = new GlobalProperties(); - gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING), null); + gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING)); assertFalse(req.isMetBy(gp4)); } @@ -271,19 +271,19 @@ public void testStrictlyMatchingHashPartitioning() { // match range partitioning { GlobalProperties gp1 = new GlobalProperties(); - gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING), null); + gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING)); assertFalse(req.isMetBy(gp1)); GlobalProperties gp2 = new GlobalProperties(); - gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING), null); + gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING)); assertFalse(req.isMetBy(gp2)); GlobalProperties gp3 = new GlobalProperties(); - gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING), null); + gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING)); assertFalse(req.isMetBy(gp3)); GlobalProperties gp4 = new GlobalProperties(); - gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING), null); + gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING)); assertFalse(req.isMetBy(gp4)); } diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java index 483bc514b8606..aab514a957a9e 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java @@ -19,6 +19,8 @@ package org.apache.flink.optimizer.dataproperties; import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -37,6 +39,11 @@ public int getNumberOfFields() { return 0; } + @Override + public TypeInformation[] getKeyTypes() { + return new TypeInformation[]{TypeExtractor.getForClass(Integer.class)}; + } + @Override public void write(DataOutputView out) throws IOException { diff --git a/flink-tests/src/test/java/org/apache/flink/test/distribution/CustomDistribution.java b/flink-tests/src/test/java/org/apache/flink/test/distribution/CustomDistribution.java deleted file mode 100644 index c04e6d18cded9..0000000000000 --- a/flink-tests/src/test/java/org/apache/flink/test/distribution/CustomDistribution.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.distribution; - -import org.apache.flink.api.common.distributions.DataDistribution; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.Value; -import org.apache.flink.util.InstantiationUtil; - -import java.io.IOException; - -/** - * The class is used to do the tests of range partition with customed data distribution. - */ -public class CustomDistribution implements DataDistribution { - - protected Value[][] boundaries; - - protected int dim; - - public CustomDistribution() {} - - public CustomDistribution(Value[] bucketBoundaries) { - - if (bucketBoundaries == null) { - throw new IllegalArgumentException("source key cannot be null"); - } - - dim = 1; - - @SuppressWarnings("unchecked") - Class clazz = bucketBoundaries[0].getClass(); - - boundaries = new Value[bucketBoundaries.length][]; - for (int i = 0; i < bucketBoundaries.length; i++) { - if (bucketBoundaries[i].getClass() != clazz) { - throw new IllegalArgumentException("The bucket boundaries are of different class types."); - } - boundaries[i] = new Value[] { bucketBoundaries[i] }; - } - } - - public CustomDistribution(Value[][] bucketBoundaries) { - if (bucketBoundaries == null) { - throw new IllegalArgumentException("Bucket boundaries must not be null."); - } - if (bucketBoundaries.length == 0) { - throw new IllegalArgumentException("Bucket boundaries must not be empty."); - } - - // dimensionality is one in this case - dim = bucketBoundaries[0].length; - - Class[] types = new Class[dim]; - for (int i = 0; i < dim; i++) { - types[i] = bucketBoundaries[0][i].getClass(); - } - - // check the array - for (int i = 1; i < bucketBoundaries.length; i++) { - if (bucketBoundaries[i].length != dim) { - throw new IllegalArgumentException("All bucket boundaries must have the same dimensionality."); - } - for (int d = 0; d < dim; d++) { - if (types[d] != bucketBoundaries[i][d].getClass()) { - throw new IllegalArgumentException("The bucket boundaries are of different class types."); - } - } - } - - boundaries = bucketBoundaries; - } - - @Override - public int getNumberOfFields() { - return this.dim; - } - - @Override - public Value[] getBucketBoundary(int bucketNum, int totalNumBuckets) { - // check validity of arguments - if(bucketNum < 0) { - throw new IllegalArgumentException("Requested bucket must be greater than or equal to 0."); - } else if(bucketNum >= (totalNumBuckets)) { - throw new IllegalArgumentException("Request bucket must be smaller than the total number of buckets."); - } - if(totalNumBuckets < 1) { - throw new IllegalArgumentException("Total number of bucket must be larger than 0."); - } - - final int maxNumBuckets = boundaries.length + 1; - final int n = maxNumBuckets / totalNumBuckets; - final int bucketId = bucketNum * n + (n - 1); - - return boundaries[bucketId]; - - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeInt(this.dim); - out.writeInt(boundaries.length); - - // write types - for (int i = 0; i < dim; i++) { - out.writeUTF(boundaries[0][i].getClass().getName()); - } - - for (int i = 0; i < boundaries.length; i++) { - for (int d = 0; d < dim; d++) { - boundaries[i][d].write(out); - } - } - } - - @SuppressWarnings("unchecked") - @Override - public void read(DataInputView in) throws IOException { - this.dim = in.readInt(); - final int len = in.readInt(); - - boundaries = new Value[len][]; - - // read types - Class[] types = new Class[dim]; - for (int i = 0; i < dim; i++) { - String className = in.readUTF(); - try { - types[i] = Class.forName(className, true, getClass().getClassLoader()).asSubclass(Value.class); - } catch (ClassNotFoundException e) { - throw new IOException("Could not load type class '" + className + "'."); - } catch (Throwable t) { - throw new IOException("Error loading type class '" + className + "'.", t); - } - } - - for (int i = 0; i < len; i++) { - Value[] bucket = new Value[dim]; - for (int d = 0; d < dim; d++) { - Value val = InstantiationUtil.instantiate(types[d], Value.class); - val.read(in); - bucket[d] = val; - } - boundaries[i] = bucket; - } - } -} diff --git a/flink-tests/src/test/java/org/apache/flink/test/distribution/TestDataDist.java b/flink-tests/src/test/java/org/apache/flink/test/distribution/TestDataDist.java new file mode 100644 index 0000000000000..68594579a3527 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/distribution/TestDataDist.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.distribution; + +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** + * The class is used to do the tests of range partition with customed data distribution. + */ +public class TestDataDist implements DataDistribution { + + private int dim; + + public TestDataDist() {} + + /** + * Constructor of the customized distribution for range partition. + * @param dim the number of the fields. + */ + public TestDataDist(int dim) { + this.dim = dim; + } + + public int getParallelism() { + return 3; + } + + @Override + public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) { + if (dim == 1) { + return new Integer[]{(bucketNum + 1) * 7}; + } + return new Integer[]{(bucketNum + 1) * 7, (bucketNum) * 2 + 3}; + } + + @Override + public int getNumberOfFields() { + return this.dim; + } + + @Override + public TypeInformation[] getKeyTypes() { + return new TypeInformation[]{TypeExtractor.getForClass(Integer.class)}; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(this.dim); + } + + @Override + public void read(DataInputView in) throws IOException { + this.dim = in.readInt(); + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java index 943d6a8aea1da..4f86cb6a043a8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java @@ -20,13 +20,12 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapPartitionFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.test.distribution.CustomDistribution; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.types.IntValue; +import org.apache.flink.test.distribution.TestDataDist; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.util.Collector; import org.junit.Test; @@ -46,44 +45,20 @@ public void testPartitionWithDistribution1() throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - DataSet> input1 = env.fromElements( - new Tuple3<>(1, 1, "Hi"), - new Tuple3<>(1, 2, "Hello"), - new Tuple3<>(1, 3, "Hello world"), - new Tuple3<>(2, 4, "how are you?"), - new Tuple3<>(2, 5, "I am fine."), - new Tuple3<>(3, 6, "Luke Skywalker"), - new Tuple3<>(4, 7, "Comment#1"), - new Tuple3<>(4, 8, "Comment#2"), - new Tuple3<>(4, 9, "Comment#3"), - new Tuple3<>(5, 10, "Comment#4")); - - final IntValue[] keys = new IntValue[3]; - - for (int i = 0; i < 3; i++) { - keys[i] = new IntValue((i + 1) * 2); - } + DataSet> input1 = CollectionDataSets.get3TupleDataSet(env); - final CustomDistribution cd = new CustomDistribution(keys); + final TestDataDist dist = new TestDataDist(1); - env.setParallelism(3); + env.setParallelism(dist.getParallelism()); - DataSet out1 = DataSetUtils.partitionByRange(input1.map(new MapFunction, Tuple2>() { - @Override - public Tuple2 map(Tuple3 value) throws Exception { - IntValue key1; - IntValue key2; - key1 = new IntValue(value.f0); - key2 = new IntValue(value.f1); - return new Tuple2<>(key1, key2); - } - }), cd, 0).mapPartition(new RichMapPartitionFunction, Boolean>() { + DataSet out1 = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction, Boolean>() { @Override - public void mapPartition(Iterable> values, Collector out) throws Exception { + public void mapPartition(Iterable> values, Collector out) throws Exception { boolean boo = true; - for (Tuple2 s : values) { - IntValue intValues= (IntValue)cd.getBucketBoundary(getRuntimeContext().getIndexOfThisSubtask(), 3)[0]; - if (s.f0.getValue() > intValues.getValue()) { + int partitionIndex = getRuntimeContext().getIndexOfThisSubtask(); + + for (Tuple3 s : values) { + if (s.f0 > (partitionIndex + 1) * 7 || s.f0 < (partitionIndex) * 7) { boo = false; } } @@ -105,47 +80,26 @@ public void testRangeWithDistribution2() throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - DataSet, String>> input1 = env.fromElements( - new Tuple2<>(new Tuple2<>(1, 1), "Hi"), - new Tuple2<>(new Tuple2<>(1, 2), "Hello"), - new Tuple2<>(new Tuple2<>(1, 3), "Hello world"), - new Tuple2<>(new Tuple2<>(2, 4), "how are you?"), - new Tuple2<>(new Tuple2<>(2, 5), "I am fine."), - new Tuple2<>(new Tuple2<>(3, 6), "Luke Skywalker"), - new Tuple2<>(new Tuple2<>(4, 7), "Comment#1"), - new Tuple2<>(new Tuple2<>(4, 8), "Comment#2"), - new Tuple2<>(new Tuple2<>(4, 9), "Comment#3"), - new Tuple2<>(new Tuple2<>(5, 10), "Comment#4")); - - IntValue[][] keys = new IntValue[2][2]; - env.setParallelism(2); - - for (int i = 0; i < 2; i++) { - keys[i][0] = new IntValue((i + 1) * 3); - } + DataSet> input1 = CollectionDataSets.get3TupleDataSet(env); - for (int i = 0; i < 2; i++) { - keys[i][1] = new IntValue((i + 1) * 5); - } + final TestDataDist dist = new TestDataDist(2); - final CustomDistribution cd = new CustomDistribution(keys); + env.setParallelism(dist.getParallelism()); - DataSet out1 = DataSetUtils.partitionByRange(input1.map(new MapFunction, String>, Tuple2>() { + DataSet out1 = DataSetUtils.partitionByRange(input1.map(new MapFunction, Tuple3>() { @Override - public Tuple2 map(Tuple2, String> value) throws Exception { - IntValue key1; - IntValue key2; - key1 = new IntValue(value.f0.f0); - key2 = new IntValue(value.f0.f1); - return new Tuple2<>(key1, key2); + public Tuple3 map(Tuple3 value) throws Exception { + return new Tuple3<>(value.f0, value.f1.intValue(), value.f2); } - }), cd, 0, 1).mapPartition(new RichMapPartitionFunction, Boolean>() { + }), dist, 0, 1).mapPartition(new RichMapPartitionFunction, Boolean>() { @Override - public void mapPartition(Iterable> values, Collector out) throws Exception { + public void mapPartition(Iterable> values, Collector out) throws Exception { boolean boo = true; - for (Tuple2 s : values) { - IntValue[] intValues= (IntValue[])cd.getBucketBoundary(getRuntimeContext().getIndexOfThisSubtask(), 3); - if (s.f0.getValue() > intValues[0].getValue() || s.f1.getValue() > intValues[1].getValue()) { + int partitionIndex = getRuntimeContext().getIndexOfThisSubtask(); + + for (Tuple3 s : values) { + if (s.f0 > (partitionIndex + 1) * 7 || s.f0 < partitionIndex * 7 || + s.f1 > (partitionIndex * 2 + 3) || s.f1 < (partitionIndex * 2 + 1)) { boo = false; } } From d8d6c4b5a0729b65d014b3e9d3d47d1deaad7a9c Mon Sep 17 00:00:00 2001 From: gallenvara Date: Fri, 18 Mar 2016 00:22:04 +0800 Subject: [PATCH 5/7] Modify the test to avoid iterating. Modify the test to avoid iterating. --- .../CustomDistributionITCase.java | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java index 4f86cb6a043a8..af0023112805a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.test.javaApiOperators; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -30,8 +31,6 @@ import org.junit.Test; -import java.util.List; - import static org.junit.Assert.assertTrue; @@ -46,30 +45,31 @@ public void testPartitionWithDistribution1() throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); DataSet> input1 = CollectionDataSets.get3TupleDataSet(env); - final TestDataDist dist = new TestDataDist(1); env.setParallelism(dist.getParallelism()); - DataSet out1 = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction, Boolean>() { + DataSet result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction, Boolean>() { @Override public void mapPartition(Iterable> values, Collector out) throws Exception { - boolean boo = true; int partitionIndex = getRuntimeContext().getIndexOfThisSubtask(); + boolean checkPartition = true; for (Tuple3 s : values) { if (s.f0 > (partitionIndex + 1) * 7 || s.f0 < (partitionIndex) * 7) { - boo = false; + checkPartition = false; } } - out.collect(boo); + out.collect(checkPartition); + } + }).reduce(new ReduceFunction() { + @Override + public Boolean reduce(Boolean value1, Boolean value2) throws Exception { + return value1 && value2; } }); - List result = out1.collect(); - for (int i = 0; i < result.size(); i++) { - assertTrue("The record is not emitted to the right partition", result.get(i)); - } + assertTrue("The record is not emitted to the right partition", result.collect().get(0)); } @Test @@ -81,12 +81,11 @@ public void testRangeWithDistribution2() throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); DataSet> input1 = CollectionDataSets.get3TupleDataSet(env); - final TestDataDist dist = new TestDataDist(2); env.setParallelism(dist.getParallelism()); - DataSet out1 = DataSetUtils.partitionByRange(input1.map(new MapFunction, Tuple3>() { + DataSet result = DataSetUtils.partitionByRange(input1.map(new MapFunction, Tuple3>() { @Override public Tuple3 map(Tuple3 value) throws Exception { return new Tuple3<>(value.f0, value.f1.intValue(), value.f2); @@ -94,22 +93,24 @@ public Tuple3 map(Tuple3 value) }), dist, 0, 1).mapPartition(new RichMapPartitionFunction, Boolean>() { @Override public void mapPartition(Iterable> values, Collector out) throws Exception { - boolean boo = true; int partitionIndex = getRuntimeContext().getIndexOfThisSubtask(); + boolean checkPartition = true; for (Tuple3 s : values) { if (s.f0 > (partitionIndex + 1) * 7 || s.f0 < partitionIndex * 7 || s.f1 > (partitionIndex * 2 + 3) || s.f1 < (partitionIndex * 2 + 1)) { - boo = false; + checkPartition = false; } } - out.collect(boo); + out.collect(checkPartition); + } + }).reduce(new ReduceFunction() { + @Override + public Boolean reduce(Boolean value1, Boolean value2) throws Exception { + return value1 && value2; } }); - - List result = out1.collect(); - for (int i = 0; i < result.size(); i++) { - assertTrue("The record is not emitted to the right partition", result.get(i)); - } + + assertTrue("The record is not emitted to the right partition", result.collect().get(0)); } } From a2ec639319aeebda23cb7b02c779243ea25bd709 Mon Sep 17 00:00:00 2001 From: gallenvara Date: Sat, 19 Mar 2016 17:35:43 +0800 Subject: [PATCH 6/7] Move TestDataDist class into the test and modify the range boundary in the second test. --- .../flink/api/common/operators/Keys.java | 6 +- .../api/java/operators/PartitionOperator.java | 8 ++ .../dataproperties/MockDistribution.java | 4 +- .../flink/test/distribution/TestDataDist.java | 77 ------------ .../CustomDistributionITCase.java | 118 ++++++++++++++---- 5 files changed, 106 insertions(+), 107 deletions(-) delete mode 100644 flink-tests/src/test/java/org/apache/flink/test/distribution/TestDataDist.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java index ad21c476cac11..abe41af29b279 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java @@ -42,7 +42,7 @@ public abstract class Keys { public abstract int[] computeLogicalKeyPositions(); - protected abstract TypeInformation[] getKeyFieldTypes(); + public abstract TypeInformation[] getKeyFieldTypes(); public abstract void validateCustomPartitioner(Partitioner partitioner, TypeInformation typeInfo); @@ -134,7 +134,7 @@ public int[] computeLogicalKeyPositions() { } @Override - protected TypeInformation[] getKeyFieldTypes() { + public TypeInformation[] getKeyFieldTypes() { TypeInformation[] fieldTypes = new TypeInformation[keyFields.size()]; for (int i = 0; i < keyFields.size(); i++) { fieldTypes[i] = keyFields.get(i).getType(); @@ -337,7 +337,7 @@ public int[] computeLogicalKeyPositions() { } @Override - protected TypeInformation[] getKeyFieldTypes() { + public TypeInformation[] getKeyFieldTypes() { TypeInformation[] fieldTypes = new TypeInformation[keyFields.size()]; for (int i = 0; i < keyFields.size(); i++) { fieldTypes[i] = keyFields.get(i).getType(); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java index cb733d9db0f08..dc4a018cfe9f7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java @@ -34,6 +34,8 @@ import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.java.tuple.Tuple2; +import java.util.Arrays; + /** * This operator represents a partitioning. * @@ -80,6 +82,12 @@ private

PartitionOperator(DataSet input, PartitionMethod pMethod, Keys Preconditions.checkArgument(pKeys != null || pMethod == PartitionMethod.REBALANCE, "Partitioning requires keys"); Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM || customPartitioner != null, "Custom partioning requires a partitioner."); Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition."); + + if (distribution != null) { + Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same."); + Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), pKeys.getKeyFieldTypes()), "The types of key from the distribution and range partitioner are not equal."); + } + if (customPartitioner != null) { pKeys.validateCustomPartitioner(customPartitioner, partitionerTypeInfo); } diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java index aab514a957a9e..a35f0d0d0354c 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java @@ -19,8 +19,8 @@ package org.apache.flink.optimizer.dataproperties; import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -41,7 +41,7 @@ public int getNumberOfFields() { @Override public TypeInformation[] getKeyTypes() { - return new TypeInformation[]{TypeExtractor.getForClass(Integer.class)}; + return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO}; } @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/distribution/TestDataDist.java b/flink-tests/src/test/java/org/apache/flink/test/distribution/TestDataDist.java deleted file mode 100644 index 68594579a3527..0000000000000 --- a/flink-tests/src/test/java/org/apache/flink/test/distribution/TestDataDist.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.distribution; - -import org.apache.flink.api.common.distributions.DataDistribution; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.IOException; - -/** - * The class is used to do the tests of range partition with customed data distribution. - */ -public class TestDataDist implements DataDistribution { - - private int dim; - - public TestDataDist() {} - - /** - * Constructor of the customized distribution for range partition. - * @param dim the number of the fields. - */ - public TestDataDist(int dim) { - this.dim = dim; - } - - public int getParallelism() { - return 3; - } - - @Override - public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) { - if (dim == 1) { - return new Integer[]{(bucketNum + 1) * 7}; - } - return new Integer[]{(bucketNum + 1) * 7, (bucketNum) * 2 + 3}; - } - - @Override - public int getNumberOfFields() { - return this.dim; - } - - @Override - public TypeInformation[] getKeyTypes() { - return new TypeInformation[]{TypeExtractor.getForClass(Integer.class)}; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeInt(this.dim); - } - - @Override - public void read(DataInputView in) throws IOException { - this.dim = in.readInt(); - } -} diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java index af0023112805a..56113710efc42 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java @@ -18,20 +18,26 @@ package org.apache.flink.test.javaApiOperators; +import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.test.distribution.TestDataDist; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.util.Collector; import org.junit.Test; -import static org.junit.Assert.assertTrue; +import java.io.IOException; + +import static org.junit.Assert.fail; public class CustomDistributionITCase { @@ -53,23 +59,17 @@ public void testPartitionWithDistribution1() throws Exception{ @Override public void mapPartition(Iterable> values, Collector out) throws Exception { int partitionIndex = getRuntimeContext().getIndexOfThisSubtask(); - boolean checkPartition = true; for (Tuple3 s : values) { - if (s.f0 > (partitionIndex + 1) * 7 || s.f0 < (partitionIndex) * 7) { - checkPartition = false; + if ((s.f0 - 1) / 7 != partitionIndex) { + fail("Record was not correctly partitioned: " + s.toString()); } } - out.collect(checkPartition); - } - }).reduce(new ReduceFunction() { - @Override - public Boolean reduce(Boolean value1, Boolean value2) throws Exception { - return value1 && value2; } }); - assertTrue("The record is not emitted to the right partition", result.collect().get(0)); + result.output(new DiscardingOutputFormat()); + env.execute(); } @Test @@ -94,23 +94,91 @@ public Tuple3 map(Tuple3 value) @Override public void mapPartition(Iterable> values, Collector out) throws Exception { int partitionIndex = getRuntimeContext().getIndexOfThisSubtask(); - boolean checkPartition = true; for (Tuple3 s : values) { - if (s.f0 > (partitionIndex + 1) * 7 || s.f0 < partitionIndex * 7 || - s.f1 > (partitionIndex * 2 + 3) || s.f1 < (partitionIndex * 2 + 1)) { - checkPartition = false; + if (s.f0 <= partitionIndex * (partitionIndex + 1) / 2 || + s.f0 > (partitionIndex + 1) * (partitionIndex + 2) / 2 || + s.f1 - 1 != partitionIndex) { + fail("Record was not correctly partitioned: " + s.toString()); } } - out.collect(checkPartition); - } - }).reduce(new ReduceFunction() { - @Override - public Boolean reduce(Boolean value1, Boolean value2) throws Exception { - return value1 && value2; } }); - - assertTrue("The record is not emitted to the right partition", result.collect().get(0)); + + result.output(new DiscardingOutputFormat()); + env.execute(); + } + + /** + * The class is used to do the tests of range partition with customed data distribution. + */ + public static class TestDataDist implements DataDistribution { + + private int dim; + + public TestDataDist() {} + + /** + * Constructor of the customized distribution for range partition. + * @param dim the number of the fields. + */ + public TestDataDist(int dim) { + this.dim = dim; + } + + public int getParallelism() { + if (dim == 1) { + return 3; + } + return 6; + } + + @Override + public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) { + if (dim == 1) { + /* + for the first test, the boundary is just like : + (0, 7] + (7, 14] + (14, 21] + */ + + return new Integer[]{(bucketNum + 1) * 7}; + } + /* + for the second test, the boundary is just like : + (0, 1], (0, 1] + (1, 3], (1, 2] + (3, 6], (2, 3] + (6, 10], (3, 4] + (10, 15], (4, 5] + (15, 21], (5, 6] + */ + + return new Integer[]{(bucketNum + 1) * (bucketNum + 2) / 2, bucketNum + 1}; + } + + @Override + public int getNumberOfFields() { + return this.dim; + } + + @Override + public TypeInformation[] getKeyTypes() { + if (dim == 1) { + return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO}; + } + return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(this.dim); + } + + @Override + public void read(DataInputView in) throws IOException { + this.dim = in.readInt(); + } } } From da3f857237576787402b2af7a9a1d969281d1c39 Mon Sep 17 00:00:00 2001 From: gallenvara Date: Wed, 23 Mar 2016 16:55:35 +0800 Subject: [PATCH 7/7] Modify data distribution of the second test and check the partitions with lower and upper boundary. --- .../CustomDistributionITCase.java | 140 ++++++++++++------ 1 file changed, 96 insertions(+), 44 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java index 56113710efc42..062800fb360a3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java @@ -51,7 +51,7 @@ public void testPartitionWithDistribution1() throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); DataSet> input1 = CollectionDataSets.get3TupleDataSet(env); - final TestDataDist dist = new TestDataDist(1); + final TestDataDist1 dist = new TestDataDist1(); env.setParallelism(dist.getParallelism()); @@ -80,8 +80,18 @@ public void testRangeWithDistribution2() throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); - DataSet> input1 = CollectionDataSets.get3TupleDataSet(env); - final TestDataDist dist = new TestDataDist(2); + DataSet> input1 = env.fromElements( + new Tuple3<>(1, 5L, "Hi"), + new Tuple3<>(1, 11L, "Hello"), + new Tuple3<>(2, 3L, "World"), + new Tuple3<>(2, 13L, "Hello World"), + new Tuple3<>(3, 8L, "Say"), + new Tuple3<>(4, 0L, "Why"), + new Tuple3<>(4, 2L, "Java"), + new Tuple3<>(4, 11L, "Say Hello"), + new Tuple3<>(5, 2L, "Hi Java")); + + final TestDataDist2 dist = new TestDataDist2(); env.setParallelism(dist.getParallelism()); @@ -94,11 +104,28 @@ public Tuple3 map(Tuple3 value) @Override public void mapPartition(Iterable> values, Collector out) throws Exception { int partitionIndex = getRuntimeContext().getIndexOfThisSubtask(); - + boolean checkPartiton = true; + for (Tuple3 s : values) { - if (s.f0 <= partitionIndex * (partitionIndex + 1) / 2 || - s.f0 > (partitionIndex + 1) * (partitionIndex + 2) / 2 || - s.f1 - 1 != partitionIndex) { + + if (partitionIndex == 0) { + if (s.f0 > partitionIndex + 1 || (s.f0 == partitionIndex + 1 && s.f1 > dist.rightBoundary[partitionIndex])) { + checkPartiton = false; + } + } + else if (partitionIndex > 0 || partitionIndex < dist.getParallelism() - 1) { + if (s.f0 > partitionIndex + 1 || (s.f0 == partitionIndex + 1 && s.f1 > dist.rightBoundary[partitionIndex]) || + s.f0 < partitionIndex || (s.f0 == partitionIndex && s.f1 < dist.rightBoundary[partitionIndex - 1])) { + checkPartiton = false; + } + } + else { + if (s.f0 < partitionIndex || (s.f0 == partitionIndex && s.f1 < dist.rightBoundary[partitionIndex - 1])) { + checkPartiton = false; + } + } + + if (!checkPartiton) { fail("Record was not correctly partitioned: " + s.toString()); } } @@ -110,75 +137,100 @@ public void mapPartition(Iterable> values, Coll } /** - * The class is used to do the tests of range partition with customed data distribution. + * The class is used to do the tests of range partition with one key. */ - public static class TestDataDist implements DataDistribution { + public static class TestDataDist1 implements DataDistribution { + + /** + * Constructor of the customized distribution for range partition. + */ + public TestDataDist1() {} + + public int getParallelism() { + return 3; + } + + @Override + public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) { - private int dim; + /* + for the first test, the boundary is just like : + (0, 7] + (7, 14] + (14, 21] + */ + return new Integer[]{(bucketNum + 1) * 7}; + } - public TestDataDist() {} + @Override + public int getNumberOfFields() { + return 1; + } + + @Override + public TypeInformation[] getKeyTypes() { + return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO}; + } + + @Override + public void write(DataOutputView out) throws IOException { + + } + + @Override + public void read(DataInputView in) throws IOException { + + } + } + + /** + * The class is used to do the tests of range partition with two keys. + */ + public static class TestDataDist2 implements DataDistribution { + + public int rightBoundary[] = new int[]{6, 4, 9, 1, 2}; /** * Constructor of the customized distribution for range partition. - * @param dim the number of the fields. */ - public TestDataDist(int dim) { - this.dim = dim; - } + public TestDataDist2() {} public int getParallelism() { - if (dim == 1) { - return 3; - } - return 6; + return 5; } - + @Override public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) { - if (dim == 1) { - /* - for the first test, the boundary is just like : - (0, 7] - (7, 14] - (14, 21] - */ - - return new Integer[]{(bucketNum + 1) * 7}; - } + /* for the second test, the boundary is just like : - (0, 1], (0, 1] - (1, 3], (1, 2] - (3, 6], (2, 3] - (6, 10], (3, 4] - (10, 15], (4, 5] - (15, 21], (5, 6] + ((0, 0), (1, 6)] + ((1, 6), (2, 4)] + ((2, 4), (3, 9)] + ((3, 9), (4, 1)] + ((4, 1), (5, 2)] */ - - return new Integer[]{(bucketNum + 1) * (bucketNum + 2) / 2, bucketNum + 1}; + return new Integer[]{bucketNum + 1, rightBoundary[bucketNum]}; } @Override public int getNumberOfFields() { - return this.dim; + return 2; } @Override public TypeInformation[] getKeyTypes() { - if (dim == 1) { - return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO}; - } return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}; } @Override public void write(DataOutputView out) throws IOException { - out.writeInt(this.dim); + } @Override public void read(DataInputView in) throws IOException { - this.dim = in.readInt(); + } } }