diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java new file mode 100644 index 000000000000..83cf27a5e85d --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.partition; + +import com.google.common.base.Preconditions; +import com.google.common.hash.Hashing; + +import static java.nio.charset.StandardCharsets.UTF_8; + + +/** + * Implementation of {@link PartitionFunction} which partitions based on 32 bit murmur3 hash + */ +public class Murmur3PartitionFunction implements PartitionFunction { + private static final String NAME = "Murmur3"; + private final int _numPartitions; + + /** + * Constructor for the class. + * @param numPartitions Number of partitions. + */ + public Murmur3PartitionFunction(int numPartitions) { + Preconditions.checkArgument(numPartitions > 0, "Number of partitions must be > 0"); + _numPartitions = numPartitions; + } + + @Override + public int getPartition(Object value) { + return (Hashing.murmur3_32_fixed(9001).hashBytes(value.toString().getBytes(UTF_8)).asInt() & Integer.MAX_VALUE) + % _numPartitions; + } + + @Override + public String getName() { + return NAME; + } + + @Override + public int getNumPartitions() { + return _numPartitions; + } + + // Keep it for backward-compatibility, use getName() instead + @Override + public String toString() { + return NAME; + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java index 77bfc4c08d0b..0cb836afd96d 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java @@ -28,7 +28,7 @@ public class PartitionFunctionFactory { // Enum for various partition functions to be added. public enum PartitionFunctionType { - Modulo, Murmur, ByteArray, HashCode, BoundedColumnValue; + Modulo, Murmur, Murmur3, ByteArray, HashCode, BoundedColumnValue; // Add more functions here. private static final Map VALUE_MAP = new HashMap<>(); @@ -77,6 +77,9 @@ public static PartitionFunction getPartitionFunction(String functionName, int nu case Murmur: return new MurmurPartitionFunction(numPartitions); + case Murmur3: + return new Murmur3PartitionFunction(numPartitions); + case ByteArray: return new ByteArrayPartitionFunction(numPartitions); diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java index 924cc87ffc93..b75a78c596f5 100644 --- a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java @@ -115,6 +115,37 @@ public void testMurmurPartitioner() { } } + /** + * Unit test for {@link Murmur3PartitionFunction}. + * + */ + @Test + public void testMurmur3Partitioner() { + long seed = System.currentTimeMillis(); + Random random = new Random(seed); + + for (int i = 0; i < NUM_ROUNDS; i++) { + int numPartitions = random.nextInt(MAX_NUM_PARTITIONS) + 1; + + String functionName = "Murmur3"; + PartitionFunction partitionFunction = + PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, null); + + testBasicProperties(partitionFunction, functionName, numPartitions); + + for (int j = 0; j < NUM_ROUNDS; j++) { + int value = j == 0 ? Integer.MIN_VALUE : random.nextInt(); + int partition1 = partitionFunction.getPartition(value); + int partition2 = partitionFunction.getPartition(Integer.toString(value)); + assertEquals(partition1, partition2); + assertTrue(partition1 >= 0 && partition1 < numPartitions); + } + } + } + /** * Unit test for {@link MurmurPartitionFunction}. *