From b6d7d48ed907072783e6c0e0bb7ee667a9c8a718 Mon Sep 17 00:00:00 2001 From: Gabor Horvath Date: Sun, 21 Feb 2016 14:54:44 +0100 Subject: [PATCH 1/2] [FLINK-3422][streaming][api-breaking] Scramble HashPartitioner hashes. --- .../operators/hash/CompactingHashTable.java | 29 ++------- .../operators/shipping/OutputEmitter.java | 31 +--------- .../apache/flink/runtime/util/MathUtils.java | 59 ++++++++++++++++++- .../runtime/partitioner/HashPartitioner.java | 3 +- .../flink/streaming/api/IterateTest.java | 5 +- .../api/scala/StreamingOperatorsITCase.scala | 11 +++- 6 files changed, 78 insertions(+), 60 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java index 5a0c6cc751576..b4d03e75fb734 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java @@ -332,7 +332,7 @@ public final void insert(T record) throws IOException { return; } - final int hashCode = hash(this.buildSideComparator.hash(record)); + final int hashCode = MathUtils.jenkinsHash(this.buildSideComparator.hash(record)); final int posHashCode = hashCode % this.numBuckets; // get the bucket for the given hash code @@ -360,7 +360,7 @@ public void insertOrReplaceRecord(T record) throws IOException { return; } - final int searchHashCode = hash(this.buildSideComparator.hash(record)); + final int searchHashCode = MathUtils.jenkinsHash(this.buildSideComparator.hash(record)); final int posHashCode = searchHashCode % this.numBuckets; // get the bucket for the given hash code @@ -1140,26 +1140,7 @@ private void compactPartition(final int partitionNumber) throws IOException { this.compactionMemory.resetRWViews(); this.compactionMemory.pushDownPages(); } - - /** - * This function hashes an integer value. It is adapted from Bob Jenkins' website - * http://www.burtleburtle.net/bob/hash/integer.html. - * The hash function has the full avalanche property, meaning that every bit of the value to be hashed - * affects every bit of the hash value. - * - * @param code The integer to be hashed. - * @return The hash code for the integer. - */ - private static int hash(int code) { - code = (code + 0x7ed55d16) + (code << 12); - code = (code ^ 0xc761c23c) ^ (code >>> 19); - code = (code + 0x165667b1) + (code << 5); - code = (code + 0xd3a2646c) ^ (code << 9); - code = (code + 0xfd7046c5) + (code << 3); - code = (code ^ 0xb55a4f09) ^ (code >>> 16); - return code >= 0 ? code : -(code + 1); - } - + /** * Iterator that traverses the whole hash table once * @@ -1286,7 +1267,7 @@ public T getMatchFor(PT probeSideRecord, T reuse) { if (closed) { return null; } - final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord)); + final int searchHashCode = MathUtils.jenkinsHash(this.probeTypeComparator.hash(probeSideRecord)); final int posHashCode = searchHashCode % numBuckets; @@ -1359,7 +1340,7 @@ public T getMatchFor(PT probeSideRecord) { if (closed) { return null; } - final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord)); + final int searchHashCode = MathUtils.jenkinsHash(this.probeTypeComparator.hash(probeSideRecord)); final int posHashCode = searchHashCode % numBuckets; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java index 92e37878ce3c9..fdbcd9fb67c5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.runtime.io.network.api.writer.ChannelSelector; import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.util.MathUtils; /** * The output emitter decides to which of the possibly multiple output channels a record is sent. @@ -187,39 +188,11 @@ private int[] broadcast(int numberOfChannels) { private int[] hashPartitionDefault(T record, int numberOfChannels) { int hash = this.comparator.hash(record); - hash = murmurHash(hash); + this.channels[0] = MathUtils.murmurHash(hash) % numberOfChannels; - if (hash >= 0) { - this.channels[0] = hash % numberOfChannels; - } - else if (hash != Integer.MIN_VALUE) { - this.channels[0] = -hash % numberOfChannels; - } - else { - this.channels[0] = 0; - } - return this.channels; } - private int murmurHash(int k) { - k *= 0xcc9e2d51; - k = Integer.rotateLeft(k, 15); - k *= 0x1b873593; - - k = Integer.rotateLeft(k, 13); - k *= 0xe6546b64; - - k ^= 4; - k ^= k >>> 16; - k *= 0x85ebca6b; - k ^= k >>> 13; - k *= 0xc2b2ae35; - k ^= k >>> 16; - - return k; - } - private final int[] rangePartition(final T record, int numberOfChannels) { if (this.channels == null || this.channels.length != 1) { this.channels = new int[1]; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java index 939a2e1101845..2acc55cf137be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/MathUtils.java @@ -109,7 +109,64 @@ public static int checkedDownCast(long value) { public static boolean isPowerOf2(long value) { return (value & (value - 1)) == 0; } - + + /** + * This function hashes an integer value. It is adapted from Bob Jenkins' website + * http://www.burtleburtle.net/bob/hash/integer.html. + * The hash function has the full avalanche property, meaning that every bit of the value to be hashed + * affects every bit of the hash value. + * + * It is crucial to use different hash functions to partition data across machines and the internal partitioning of + * data structures. This hash function is intended for partitioning internally in data structures. + * + * @param code The integer to be hashed. + * @return The non-negative hash code for the integer. + */ + public static int jenkinsHash(int code) { + code = (code + 0x7ed55d16) + (code << 12); + code = (code ^ 0xc761c23c) ^ (code >>> 19); + code = (code + 0x165667b1) + (code << 5); + code = (code + 0xd3a2646c) ^ (code << 9); + code = (code + 0xfd7046c5) + (code << 3); + code = (code ^ 0xb55a4f09) ^ (code >>> 16); + return code >= 0 ? code : -(code + 1); + } + + /** + * This function hashes an integer value. + * + * It is crucial to use different hash functions to partition data across machines and the internal partitioning of + * data structures. This hash function is intended for partitioning across machines. + * + * @param code The integer to be hashed. + * @return The non-negative hash code for the integer. + */ + public static int murmurHash(int code) { + code *= 0xcc9e2d51; + code = Integer.rotateLeft(code, 15); + code *= 0x1b873593; + + code = Integer.rotateLeft(code, 13); + code *= 0xe6546b64; + + code ^= 4; + code ^= code >>> 16; + code *= 0x85ebca6b; + code ^= code >>> 13; + code *= 0xc2b2ae35; + code ^= code >>> 16; + + if (code >= 0) { + return code; + } + else if (code != Integer.MIN_VALUE) { + return -code; + } + else { + return 0; + } + } + // ============================================================================================ /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java index ec0cf94363a41..82f0141ab8917 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.util.MathUtils; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** @@ -48,7 +49,7 @@ public int[] selectChannels(SerializationDelegate> record, } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e); } - returnArray[0] = Math.abs(key.hashCode() % numberOfOutputChannels); + returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels; return returnArray; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java index cd73b4185b9ea..920185aa64b44 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.util.MathUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.IterativeStream; @@ -488,9 +489,9 @@ public Integer getKey(Integer value) throws Exception { public void flatMap(Integer value, Collector out) throws Exception { received++; if (key == -1) { - key = value % 3; + key = MathUtils.murmurHash(value % 3) % 3; } else { - assertEquals(key, value % 3); + assertEquals(key, MathUtils.murmurHash(value % 3) % 3); } if (value > 0) { out.collect(value - 1); diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala index 60a02e7085eed..fe49b1f90f533 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.api.common.functions.{RichMapFunction, FoldFunction} import org.apache.flink.core.fs.FileSystem +import org.apache.flink.runtime.util.MathUtils import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.test.util.TestBaseUtils @@ -71,7 +72,7 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { override def run(ctx: SourceContext[(Int, Int)]): Unit = { 0 until numElements foreach { - i => ctx.collect((i % numKeys, i)) + i => ctx.collect((MathUtils.murmurHash(i) % numKeys, i)) } } @@ -86,8 +87,12 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { } }) .map(new RichMapFunction[Int, (Int, Int)] { + var key: Int = -1 override def map(value: Int): (Int, Int) = { - (getRuntimeContext.getIndexOfThisSubtask, value) + if (key == -1) { + key = MathUtils.murmurHash(value) % numKeys + } + (key, value) } }) .split{ @@ -106,7 +111,7 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { .javaStream .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE) - val groupedSequence = 0 until numElements groupBy( _ % numKeys) + val groupedSequence = 0 until numElements groupBy( MathUtils.murmurHash(_) % numKeys ) expected1 = groupedSequence(0).scanLeft(0)(_ + _).tail.mkString("\n") expected2 = groupedSequence(1).scanLeft(0)(_ + _).tail.mkString("\n") From d55b020c56e46de6b3422b9302539169e7fe261a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Balassi?= Date: Sun, 21 Feb 2016 23:01:00 +0100 Subject: [PATCH 2/2] [FLINK-3422][streaming] Update tests reliant on hashing Closes #1685 --- .../tests/StormFieldsGroupingITCase.java | 12 +++++++--- .../flink/streaming/api/IterateTest.java | 9 ++++++++ .../api/StreamingOperatorsITCase.java | 22 +++++++++++-------- .../api/scala/StreamingOperatorsITCase.scala | 4 +++- 4 files changed, 34 insertions(+), 13 deletions(-) diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java index dfadd77e05e81..b87334515642c 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java @@ -21,13 +21,19 @@ import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; +import org.apache.flink.runtime.util.MathUtils; import org.apache.flink.storm.api.FlinkLocalCluster; import org.apache.flink.storm.api.FlinkTopology; import org.apache.flink.storm.tests.operators.FiniteRandomSpout; import org.apache.flink.storm.tests.operators.TaskIdBolt; import org.apache.flink.storm.util.BoltFileSink; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.util.StreamingProgramTestBase; +/** + * This test relies on the hash function used by the {@link DataStream#keyBy}, which is + * assumed to be {@link MathUtils#murmurHash}. + */ public class StormFieldsGroupingITCase extends StreamingProgramTestBase { private final static String topologyId = "FieldsGrouping Test"; @@ -43,9 +49,9 @@ protected void preSubmit() throws Exception { @Override protected void postSubmit() throws Exception { - compareResultsByLinesInMemory("4> -1930858313\n" + "4> 1431162155\n" + "4> 1654374947\n" - + "4> -65105105\n" + "3> -1155484576\n" + "3> 1033096058\n" + "3> -1557280266\n" - + "3> -1728529858\n" + "3> -518907128\n" + "3> -252332814", this.resultPath); + compareResultsByLinesInMemory("3> -1155484576\n" + "3> 1033096058\n" + "3> -1930858313\n" + + "3> 1431162155\n" + "4> -1557280266\n" + "4> -1728529858\n" + "4> 1654374947\n" + + "4> -65105105\n" + "4> -518907128\n" + "4> -252332814\n", this.resultPath); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java index 920185aa64b44..27a1e3cb6c2c9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -462,6 +462,15 @@ public String map2(String value) throws Exception { assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected); } + /** + * This test relies on the hash function used by the {@link DataStream#keyBy}, which is + * assumed to be {@link MathUtils#murmurHash}. + * + * For the test to pass all FlatMappers must see at least two records in the iteration, + * which can only be achieved if the hashed values of the input keys map to a complete + * congruence system. Given that the test is designed for 3 parallel FlatMapper instances + * keys chosen from the [1,3] range are a suitable choice. + */ @Test public void testGroupByFeedback() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java index 42febeafd77ef..9530d09ed02ad 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.util.MathUtils; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SplitStream; @@ -67,12 +68,13 @@ public void after() throws Exception { * of Tuple2 is created. The stream is grouped according to the first tuple * value. Each group is folded where the second tuple value is summed up. * - * @throws Exception + * This test relies on the hash function used by the {@link DataStream#keyBy}, which is + * assumed to be {@link MathUtils#murmurHash}. */ @Test - public void testFoldOperation() throws Exception { + public void testGroupedFoldOperation() throws Exception { int numElements = 10; - int numKeys = 2; + final int numKeys = 2; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream> sourceStream = env.addSource(new TupleSource(numElements, numKeys)); @@ -85,9 +87,13 @@ public Integer fold(Integer accumulator, Tuple2 value) throws return accumulator + value.f1; } }).map(new RichMapFunction>() { + int key = -1; @Override public Tuple2 map(Integer value) throws Exception { - return new Tuple2(getRuntimeContext().getIndexOfThisSubtask(), value); + if (key == -1){ + key = MathUtils.murmurHash(value) % numKeys; + } + return new Tuple2<>(key, value); } }).split(new OutputSelector>() { @Override @@ -95,7 +101,6 @@ public Iterable select(Tuple2 value) { List output = new ArrayList<>(); output.add(value.f0 + ""); - return output; } }); @@ -120,7 +125,7 @@ public Integer map(Tuple2 value) throws Exception { int counter2 = 0; for (int i = 0; i < numElements; i++) { - if (i % 2 == 0) { + if (MathUtils.murmurHash(i) % numKeys == 0) { counter1 += i; builder1.append(counter1 + "\n"); } else { @@ -196,7 +201,7 @@ public NonSerializableTupleSource(int numElements) { @Override public void run(SourceContext> ctx) throws Exception { for (int i = 0; i < numElements; i++) { - ctx.collect(new Tuple2(i, new NonSerializable(i))); + ctx.collect(new Tuple2<>(i, new NonSerializable(i))); } } @@ -217,14 +222,13 @@ public TupleSource(int numElements, int numKeys) { @Override public void run(SourceContext> ctx) throws Exception { for (int i = 0; i < numElements; i++) { - Tuple2 result = new Tuple2<>(i % numKeys, i); + Tuple2 result = new Tuple2<>(MathUtils.murmurHash(i) % numKeys, i); ctx.collect(result); } } @Override public void cancel() { - } } } diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala index fe49b1f90f533..e573fe0212b94 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala @@ -58,9 +58,11 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { * The stream is grouped by the first field. For each group, the resulting stream is folded by * summing up the second tuple field. * + * This test relies on the hash function used by the {@link DataStream#keyBy}, which is + * assumed to be {@link MathUtils#murmurHash}. */ @Test - def testFoldOperator(): Unit = { + def testGroupedFoldOperator(): Unit = { val numElements = 10 val numKeys = 2