From ac6de2cb4128bdf9c1567839b8b1ef8272738510 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=ED=98=95=EC=A4=80?= Date: Fri, 25 Jul 2014 19:07:50 +0900 Subject: [PATCH] TAJO-976: HashPartitioner doesn't make desired number of partitions infrequently. --- .../org/apache/tajo/datum/Float4Datum.java | 3 +- .../org/apache/tajo/datum/Float8Datum.java | 3 +- .../java/org/apache/tajo/datum/Int2Datum.java | 3 +- .../java/org/apache/tajo/datum/Int4Datum.java | 3 +- .../java/org/apache/tajo/datum/Int8Datum.java | 3 +- .../java/org/apache/tajo/datum/TextDatum.java | 4 +- .../java/org/apache/tajo/util/MurmurHash.java | 213 ++++++++++++++++++ .../planner/physical/HashPartitioner.java | 3 +- .../planner/physical/TestHashPartitioner.java | 39 +++- .../testGroupByWithSameExprs1.sql | 2 + .../testGroupByWithSameExprs2.sql | 4 +- .../TestGroupByQuery/testGroupBy2.result | 4 +- .../testGroupByWithSameConstantKeys1.result | 4 +- .../testGroupbyWithJson.result | 4 +- .../testHavingWithNamedTarget.result | 4 +- ...tBroadcastMultiColumnPartitionTable.result | 4 +- 16 files changed, 277 insertions(+), 23 deletions(-) create mode 100644 tajo-common/src/main/java/org/apache/tajo/util/MurmurHash.java diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java index e24bce4f8b..610ea9571b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Float4Datum.java @@ -22,6 +22,7 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.exception.InvalidCastException; import org.apache.tajo.exception.InvalidOperationException; +import org.apache.tajo.util.MurmurHash; import org.apache.tajo.util.NumberUtil; import org.apache.tajo.util.datetime.DateTimeUtil; import org.apache.tajo.util.datetime.TimeMeta; @@ -106,7 +107,7 @@ public int size() { @Override public int hashCode() { - return (int) val; + return MurmurHash.hash(val); } @Override diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java index 0542148e26..90adcc724a 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java @@ -22,6 +22,7 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.MurmurHash; import org.apache.tajo.util.NumberUtil; import org.apache.tajo.util.datetime.DateTimeUtil; import org.apache.tajo.util.datetime.TimeMeta; @@ -96,7 +97,7 @@ public int size() { @Override public int hashCode() { - return (int) val; + return MurmurHash.hash(val); } public boolean equals(Object obj) { diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java index 38cf019882..ab17bdc421 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int2Datum.java @@ -21,6 +21,7 @@ import com.google.gson.annotations.Expose; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.exception.InvalidOperationException; +import org.apache.tajo.util.MurmurHash; import org.apache.tajo.util.NumberUtil; import org.apache.tajo.util.datetime.DateTimeUtil; import org.apache.tajo.util.datetime.TimeMeta; @@ -97,7 +98,7 @@ public int size() { @Override public int hashCode() { - return val; + return MurmurHash.hash(val); } @Override diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java index d26b6b2358..9a60863598 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int4Datum.java @@ -21,6 +21,7 @@ import com.google.gson.annotations.Expose; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.exception.InvalidOperationException; +import org.apache.tajo.util.MurmurHash; import org.apache.tajo.util.NumberUtil; import org.apache.tajo.util.datetime.DateTimeUtil; import org.apache.tajo.util.datetime.TimeMeta; @@ -102,7 +103,7 @@ public int size() { @Override public int hashCode() { - return val; + return MurmurHash.hash(val); } public boolean equals(Object obj) { diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java index 46a1353cd6..db8a12bc06 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Int8Datum.java @@ -22,6 +22,7 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.exception.InvalidCastException; import org.apache.tajo.exception.InvalidOperationException; +import org.apache.tajo.util.MurmurHash; import org.apache.tajo.util.NumberUtil; import org.apache.tajo.util.datetime.DateTimeUtil; import org.apache.tajo.util.datetime.TimeMeta; @@ -108,7 +109,7 @@ public int size() { @Override public int hashCode() { - return (int) val; + return MurmurHash.hash(val); } @Override diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java index 49f09f6842..e8424b3147 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java @@ -23,8 +23,8 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.exception.InvalidCastException; import org.apache.tajo.exception.InvalidOperationException; +import org.apache.tajo.util.MurmurHash; -import java.util.Arrays; import java.util.Comparator; public class TextDatum extends Datum { @@ -140,7 +140,7 @@ public Datum equalsTo(Datum datum) { @Override public int hashCode() { - return Arrays.hashCode(bytes); + return MurmurHash.hash(bytes); } @Override diff --git a/tajo-common/src/main/java/org/apache/tajo/util/MurmurHash.java b/tajo-common/src/main/java/org/apache/tajo/util/MurmurHash.java new file mode 100644 index 0000000000..b60df9c47a --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/util/MurmurHash.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +/** + * This class is borrowed from the following source code + * https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/hash/MurmurHash.java + */ +public class MurmurHash { + public static int hash(Object o) { + if (o == null) { + return 0; + } + if (o instanceof Long) { + return hashLong((Long) o); + } + if (o instanceof Integer) { + return hashLong((Integer) o); + } + if (o instanceof Double) { + return hashLong(Double.doubleToRawLongBits((Double) o)); + } + if (o instanceof Float) { + return hashLong(Float.floatToRawIntBits((Float) o)); + } + if (o instanceof String) { + return hash(((String) o).getBytes()); + } + if (o instanceof byte[]) { + return hash((byte[]) o); + } + return hash(o.toString()); + } + + public static int hash(byte[] data) { + return hash(data, data.length, -1); + } + + public static int hash(byte[] data, int seed) { + return hash(data, data.length, seed); + } + + public static int hash(byte[] data, int length, int seed) { + int m = 0x5bd1e995; + int r = 24; + + int h = seed ^ length; + + int len_4 = length >> 2; + + for (int i = 0; i < len_4; i++) { + int i_4 = i << 2; + int k = data[i_4 + 3]; + k = k << 8; + k = k | (data[i_4 + 2] & 0xff); + k = k << 8; + k = k | (data[i_4 + 1] & 0xff); + k = k << 8; + k = k | (data[i_4 + 0] & 0xff); + k *= m; + k ^= k >>> r; + k *= m; + h *= m; + h ^= k; + } + + // avoid calculating modulo + int len_m = len_4 << 2; + int left = length - len_m; + + if (left != 0) { + if (left >= 3) { + h ^= (int) data[length - 3] << 16; + } + if (left >= 2) { + h ^= (int) data[length - 2] << 8; + } + if (left >= 1) { + h ^= (int) data[length - 1]; + } + + h *= m; + } + + h ^= h >>> 13; + h *= m; + h ^= h >>> 15; + + return h; + } + + public static int hashLong(long data) { + int m = 0x5bd1e995; + int r = 24; + + int h = 0; + + int k = (int) data * m; + k ^= k >>> r; + h ^= k * m; + + k = (int) (data >> 32) * m; + k ^= k >>> r; + h *= m; + h ^= k * m; + + h ^= h >>> 13; + h *= m; + h ^= h >>> 15; + + return h; + } + + public static long hash64(Object o) { + if (o == null) { + return 0l; + } else if (o instanceof String) { + final byte[] bytes = ((String) o).getBytes(); + return hash64(bytes, bytes.length); + } else if (o instanceof byte[]) { + final byte[] bytes = (byte[]) o; + return hash64(bytes, bytes.length); + } + return hash64(o.toString()); + } + + // 64 bit implementation copied from here: https://github.com/tnm/murmurhash-java + + /** + * Generates 64 bit hash from byte array with default seed value. + * + * @param data byte array to hash + * @param length length of the array to hash + * @return 64 bit hash of the given string + */ + public static long hash64(final byte[] data, int length) { + return hash64(data, length, 0xe17a1465); + } + + + /** + * Generates 64 bit hash from byte array of the given length and seed. + * + * @param data byte array to hash + * @param length length of the array to hash + * @param seed initial seed value + * @return 64 bit hash of the given array + */ + public static long hash64(final byte[] data, int length, int seed) { + final long m = 0xc6a4a7935bd1e995L; + final int r = 47; + + long h = (seed & 0xffffffffl) ^ (length * m); + + int length8 = length / 8; + + for (int i = 0; i < length8; i++) { + final int i8 = i * 8; + long k = ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8) + + (((long) data[i8 + 2] & 0xff) << 16) + (((long) data[i8 + 3] & 0xff) << 24) + + (((long) data[i8 + 4] & 0xff) << 32) + (((long) data[i8 + 5] & 0xff) << 40) + + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8 + 7] & 0xff) << 56); + + k *= m; + k ^= k >>> r; + k *= m; + + h ^= k; + h *= m; + } + + switch (length % 8) { + case 7: + h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48; + case 6: + h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40; + case 5: + h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32; + case 4: + h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24; + case 3: + h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16; + case 2: + h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8; + case 1: + h ^= (long) (data[length & ~7] & 0xff); + h *= m; + } + ; + + h ^= h >>> r; + h *= m; + h ^= h >>> r; + + return h; + } +} \ No newline at end of file diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java index 3ae53d9e97..233d6ece7f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java @@ -43,7 +43,6 @@ public int getPartition(Tuple tuple) { for (int i = 0; i < partitionKeyIds.length; i++) { keyTuple.put(i, tuple.get(partitionKeyIds[i])); } - return (keyTuple.hashCode() & Integer.MAX_VALUE) % - (numPartitions == 32 ? numPartitions-1 : numPartitions); + return (keyTuple.hashCode() & Integer.MAX_VALUE) % numPartitions; } } diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java index f0d846c1c2..2241870d1d 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java @@ -18,15 +18,21 @@ package org.apache.tajo.engine.planner.physical; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.TextDatum; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TestHashPartitioner { @@ -81,4 +87,31 @@ public final void testGetPartition() { int part2 = p.getPartition(tuple4); assertEquals(part2, p.getPartition(tuple5)); } + + @Test + public final void testGetPartition2() { + // https://issues.apache.org/jira/browse/TAJO-976 + Random rand = new Random(); + String[][] data = new String[1000][]; + + for (int i = 0; i < 1000; i++) { + data[i] = new String[]{ String.valueOf(rand.nextInt(1000)), String.valueOf(rand.nextInt(1000)), String.valueOf(rand.nextInt(1000))}; + } + + int[] testNumPartitions = new int[]{31, 62, 124, 32, 63, 125}; + for (int index = 0; index < testNumPartitions.length; index++) { + Partitioner p = new HashPartitioner(new int[]{0, 1, 2}, testNumPartitions[index]); + + Set ids = new TreeSet(); + for (int i = 0; i < data.length; i++) { + Tuple tuple = new VTuple( + new Datum[]{new TextDatum(data[i][0]), new TextDatum(data[i][1]), new TextDatum(data[i][2])}); + + ids.add(p.getPartition(tuple)); + } + + // The number of partitions isn't exactly matched. + assertTrue(ids.size() + 5 >= testNumPartitions[index]); + } + } } diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupByWithSameExprs1.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupByWithSameExprs1.sql index 17c88c5248..a04745e1e1 100644 --- a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupByWithSameExprs1.sql +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupByWithSameExprs1.sql @@ -3,4 +3,6 @@ select from lineitem group by + l_orderkey + l_partkey +order by l_orderkey + l_partkey; \ No newline at end of file diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupByWithSameExprs2.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupByWithSameExprs2.sql index a0a1c116be..074e25205a 100644 --- a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupByWithSameExprs2.sql +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupByWithSameExprs2.sql @@ -3,4 +3,6 @@ select from lineitem group by - l_orderkey + l_partkey; \ No newline at end of file + l_orderkey + l_partkey +order by + total1, total2; \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupBy2.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupBy2.result index 23efdb72d8..6afdd23d57 100644 --- a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupBy2.result +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupBy2.result @@ -1,4 +1,4 @@ unique_key ------------------------------- -3 -2 \ No newline at end of file +2 +3 \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupByWithSameConstantKeys1.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupByWithSameConstantKeys1.result index b08b1bc407..554b66418d 100644 --- a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupByWithSameConstantKeys1.result +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupByWithSameConstantKeys1.result @@ -1,5 +1,5 @@ a,b,c,d ------------------------------- -1,##,##,2 +2,##,##,2 3,##,##,1 -2,##,##,2 \ No newline at end of file +1,##,##,2 \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result index 627db72ddf..366b76e8c7 100644 --- a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result @@ -1,5 +1,5 @@ l_orderkey,total,num ------------------------------- 3,2.5,3 -1,1.0,3 -2,2.0,1 \ No newline at end of file +2,2.0,1 +1,1.0,3 \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testHavingWithNamedTarget.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testHavingWithNamedTarget.result index 627db72ddf..366b76e8c7 100644 --- a/tajo-core/src/test/resources/results/TestGroupByQuery/testHavingWithNamedTarget.result +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testHavingWithNamedTarget.result @@ -1,5 +1,5 @@ l_orderkey,total,num ------------------------------- 3,2.5,3 -1,1.0,3 -2,2.0,1 \ No newline at end of file +2,2.0,1 +1,1.0,3 \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastMultiColumnPartitionTable.result b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastMultiColumnPartitionTable.result index df3c7bc079..9ef26b46d7 100644 --- a/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastMultiColumnPartitionTable.result +++ b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastMultiColumnPartitionTable.result @@ -1,5 +1,5 @@ col3 ------------------------------- 01 -10 -12 \ No newline at end of file +12 +10 \ No newline at end of file