From 5d2b30e1656942c3624c63fd5fefbd6b5729dac6 Mon Sep 17 00:00:00 2001 From: Kevin Peek Date: Thu, 31 Aug 2017 09:27:49 -0400 Subject: [PATCH 1/8] STORM-2782 - refactor partial key grouping to make it more flexible and reusable --- .../storm/grouping/PartialKeyGrouping.java | 192 +++++++++++++----- .../BalancedTargetSelectorTest.java | 47 +++++ .../HashingTwoTaskAssignmentCreatorTest.java | 47 +++++ .../PartialKeyGroupingTest.java | 35 ++-- 4 files changed, 255 insertions(+), 66 deletions(-) create mode 100644 storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java create mode 100644 storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java rename storm-client/test/jvm/org/apache/storm/grouping/{ => partialKeyGrouping}/PartialKeyGroupingTest.java (80%) diff --git a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java index e1af16de158..90618035a36 100644 --- a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java +++ b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java @@ -17,40 +17,64 @@ */ package org.apache.storm.grouping; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.Fields; + import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.List; +import java.util.Map; -import org.apache.storm.generated.GlobalStreamId; -import org.apache.storm.task.WorkerTopologyContext; -import org.apache.storm.tuple.Fields; - -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; - +/** + * A variation on FieldGrouping. This grouping operates on a partitioning of the incoming + * tuples (like a FieldGrouping), but it can send Tuples from a given partition to multiple downstream tasks. + * + * Given a total pool of target tasks, this grouping will always send Tuples with a given key to one member of + * a subset of those tasks. Each key is assigned a subset of tasks. Each tuple is then sent to one task + * from that subset. + * + * Notes: + * - the default TaskSelector ensures each task gets as close to a balanced number of Tuples as possible + * - the default AssignmentCreator hashes the key and produces an assignment of two tasks + */ public class PartialKeyGrouping implements CustomStreamGrouping, Serializable { - private static final long serialVersionUID = -447379837314000353L; + private static final long serialVersionUID = -1672360572274911808L; private List targetTasks; - private long[] targetTaskStats; - private HashFunction h1 = Hashing.murmur3_128(13); - private HashFunction h2 = Hashing.murmur3_128(17); private Fields fields = null; private Fields outFields = null; + private AssignmentCreator assignmentCreator; + private TargetSelector targetSelector; + public PartialKeyGrouping() { - //Empty + this(null); } public PartialKeyGrouping(Fields fields) { + this(fields, new HashingTwoTaskAssignmentCreator(), new BalancedTargetSelector()); + } + + public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) { + this(fields, assignmentCreator, new BalancedTargetSelector()); + } + + public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) { this.fields = fields; + this.assignmentCreator = assignmentCreator; + this.targetSelector = targetSelector; } @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) { this.targetTasks = targetTasks; - targetTaskStats = new long[this.targetTasks.size()]; if (this.fields != null) { this.outFields = context.getComponentOutputFields(stream); } @@ -60,47 +84,109 @@ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List chooseTasks(int taskId, List values) { List boltIds = new ArrayList<>(1); if (values.size() > 0) { - byte[] raw; - if (fields != null) { - List selectedFields = outFields.select(fields, values); - ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4); - for (Object o: selectedFields) { - if (o instanceof List) { - out.putInt(Arrays.deepHashCode(((List)o).toArray())); - } else if (o instanceof Object[]) { - out.putInt(Arrays.deepHashCode((Object[])o)); - } else if (o instanceof byte[]) { - out.putInt(Arrays.hashCode((byte[]) o)); - } else if (o instanceof short[]) { - out.putInt(Arrays.hashCode((short[]) o)); - } else if (o instanceof int[]) { - out.putInt(Arrays.hashCode((int[]) o)); - } else if (o instanceof long[]) { - out.putInt(Arrays.hashCode((long[]) o)); - } else if (o instanceof char[]) { - out.putInt(Arrays.hashCode((char[]) o)); - } else if (o instanceof float[]) { - out.putInt(Arrays.hashCode((float[]) o)); - } else if (o instanceof double[]) { - out.putInt(Arrays.hashCode((double[]) o)); - } else if (o instanceof boolean[]) { - out.putInt(Arrays.hashCode((boolean[]) o)); - } else if (o != null) { - out.putInt(o.hashCode()); - } else { - out.putInt(0); - } + final byte[] rawKeyBytes = getKeyBytes(values); + + final List taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes); + final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey); + + boltIds.add(selectedTask); + } + return boltIds; + } + + + /** + * Extract the key from the input Tuple. + */ + private byte[] getKeyBytes(List values) { + byte[] raw; + if (fields != null) { + List selectedFields = outFields.select(fields, values); + ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4); + for (Object o: selectedFields) { + if (o instanceof List) { + out.putInt(Arrays.deepHashCode(((List)o).toArray())); + } else if (o instanceof Object[]) { + out.putInt(Arrays.deepHashCode((Object[])o)); + } else if (o instanceof byte[]) { + out.putInt(Arrays.hashCode((byte[]) o)); + } else if (o instanceof short[]) { + out.putInt(Arrays.hashCode((short[]) o)); + } else if (o instanceof int[]) { + out.putInt(Arrays.hashCode((int[]) o)); + } else if (o instanceof long[]) { + out.putInt(Arrays.hashCode((long[]) o)); + } else if (o instanceof char[]) { + out.putInt(Arrays.hashCode((char[]) o)); + } else if (o instanceof float[]) { + out.putInt(Arrays.hashCode((float[]) o)); + } else if (o instanceof double[]) { + out.putInt(Arrays.hashCode((double[]) o)); + } else if (o instanceof boolean[]) { + out.putInt(Arrays.hashCode((boolean[]) o)); + } else if (o != null) { + out.putInt(o.hashCode()); + } else { + out.putInt(0); } - raw = out.array(); - } else { - raw = values.get(0).toString().getBytes(); // assume key is the first field } - int firstChoice = (int) (Math.abs(h1.hashBytes(raw).asLong()) % this.targetTasks.size()); - int secondChoice = (int) (Math.abs(h2.hashBytes(raw).asLong()) % this.targetTasks.size()); - int selected = targetTaskStats[firstChoice] > targetTaskStats[secondChoice] ? secondChoice : firstChoice; - boltIds.add(targetTasks.get(selected)); - targetTaskStats[selected]++; + raw = out.array(); + } else { + raw = values.get(0).toString().getBytes(); // assume key is the first field + } + return raw; + } + + /*================================================== + * Helper Classes + *==================================================*/ + + /** + * This interface is responsible for choosing a subset of the target tasks to use for a given key. + * + * NOTE: whatever scheme you use to create the assignment should be deterministic. This may be executed on multiple + * Storm Workers, thus each of them needs to come up with the same assignment for a given key. + */ + public interface AssignmentCreator extends Serializable { + List createAssignment(List targetTasks, byte[] key); + } + + /** + * This interface chooses one element from a task assignment to send a specific Tuple to. + */ + public interface TargetSelector extends Serializable { + Integer chooseTask(List assignedTasks); + } + + /*========== Implementations ==========*/ + + public static class HashingTwoTaskAssignmentCreator implements AssignmentCreator { + + private HashFunction h1 = Hashing.murmur3_128(13); + private HashFunction h2 = Hashing.murmur3_128(17); + + public List createAssignment(List tasks, byte[] key) { + int firstChoiceIndex = (int) (Math.abs(h1.hashBytes(key).asLong()) % tasks.size()); + int secondChoiceIndex = (int) (Math.abs(h2.hashBytes(key).asLong()) % tasks.size()); + return Lists.newArrayList(tasks.get(firstChoiceIndex), tasks.get(secondChoiceIndex)); + } + } + + + /** + * A basic implementation of target selection. This strategy chooses the task within the assignment that has + * received the fewest Tuples overall from this instance of the grouping. + */ + public static class BalancedTargetSelector implements TargetSelector { + private Map targetTaskStats = Maps.newConcurrentMap(); + + public Integer chooseTask(List assignedTasks) { + Integer selectedTask = assignedTasks.stream() + .min(Comparator.comparing(task -> targetTaskStats.getOrDefault(task, 0L))) + .orElse(null); + + targetTaskStats.put(selectedTask, targetTaskStats.getOrDefault(selectedTask, 0L) + 1); + return selectedTask; } - return boltIds; } -} +} \ No newline at end of file diff --git a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java new file mode 100644 index 00000000000..206681f60ec --- /dev/null +++ b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java @@ -0,0 +1,47 @@ +package org.apache.storm.grouping.partialKeyGrouping; + +import com.google.common.collect.Lists; +import org.apache.storm.grouping.PartialKeyGrouping; +import org.apache.storm.utils.Utils; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +public class BalancedTargetSelectorTest { + + + private static final List TASK_LIST = Lists.newArrayList(9, 8, 7, 6); + + private final PartialKeyGrouping.TargetSelector targetSelector = new PartialKeyGrouping.BalancedTargetSelector(); + + @Test + public void classIsSerializable() throws Exception { + Utils.javaSerialize(targetSelector); + } + + @Test + public void selectorReturnsTasksInAssignment() { + // select tasks once more than the number of tasks available + for (int i = 0; i < TASK_LIST.size() + 1; i++) { + int selectedTask = targetSelector.chooseTask(TASK_LIST); + assertThat(selectedTask, Matchers.isIn(TASK_LIST)); + } + } + + @Test + public void selectsTaskThatHasBeenUsedTheLeast() { + // ensure that the first three tasks have been selected before + targetSelector.chooseTask(Lists.newArrayList(TASK_LIST.get(0))); + targetSelector.chooseTask(Lists.newArrayList(TASK_LIST.get(1))); + targetSelector.chooseTask(Lists.newArrayList(TASK_LIST.get(2))); + + // now, selecting from the full set should cause the fourth task to be chosen. + int selectedTask = targetSelector.chooseTask(Lists.newArrayList(TASK_LIST)); + assertThat(selectedTask, equalTo(TASK_LIST.get(3))); + } + +} diff --git a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java new file mode 100644 index 00000000000..51dbf0471da --- /dev/null +++ b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java @@ -0,0 +1,47 @@ +package org.apache.storm.grouping.partialKeyGrouping; + +import com.google.common.collect.Lists; +import org.apache.storm.grouping.PartialKeyGrouping; +import org.apache.storm.utils.Utils; +import org.junit.Test; + +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +public class HashingTwoTaskAssignmentCreatorTest { + + private static final byte[] GROUPING_KEY_ONE = "some_key_one".getBytes(); + private static final byte[] GROUPING_KEY_TWO = "some_key_two".getBytes(); + + @Test + public void classIsSerializable() throws Exception { + PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.HashingTwoTaskAssignmentCreator(); + Utils.javaSerialize(assignmentCreator); + } + + @Test + public void returnsAssignmentOfExpectedSize() { + PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.HashingTwoTaskAssignmentCreator(); + List assignedTasks = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); + assertThat(assignedTasks.size(), equalTo(2)); + } + + @Test + public void returnsDifferentAssignmentForDifferentKeys() { + PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.HashingTwoTaskAssignmentCreator(); + List assignmentOne = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); + List assignmentTwo = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_TWO); + assertThat(assignmentOne, not(equalTo(assignmentTwo))); + } + + @Test + public void returnsSameAssignmentForSameKey() { + PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.HashingTwoTaskAssignmentCreator(); + List assignmentOne = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); + List assignmentOneAgain = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); + assertThat(assignmentOne, equalTo(assignmentOneAgain)); + } +} diff --git a/storm-client/test/jvm/org/apache/storm/grouping/PartialKeyGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/PartialKeyGroupingTest.java similarity index 80% rename from storm-client/test/jvm/org/apache/storm/grouping/PartialKeyGroupingTest.java rename to storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/PartialKeyGroupingTest.java index 09450629a0b..aaa628d1a23 100644 --- a/storm-client/test/jvm/org/apache/storm/grouping/PartialKeyGroupingTest.java +++ b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/PartialKeyGroupingTest.java @@ -15,29 +15,38 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.grouping; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -import java.util.List; - -import org.junit.Test; +package org.apache.storm.grouping.partialKeyGrouping; +import com.google.common.collect.Lists; import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.grouping.PartialKeyGrouping; import org.apache.storm.task.WorkerTopologyContext; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.junit.Test; -import com.google.common.collect.Lists; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class PartialKeyGroupingTest { + + @Test + public void testGroupingIsSerializable() throws Exception { + PartialKeyGrouping grouping = new PartialKeyGrouping(new Fields("some_field")); + Utils.javaSerialize(grouping); + } + @Test public void testChooseTasks() { PartialKeyGrouping pkg = new PartialKeyGrouping(); - pkg.prepare(null, null, Lists.newArrayList(0, 1, 2, 3, 4, 5)); + pkg.prepare(null, null, Lists.newArrayList(9, 8, 7, 1, 2, 3)); Values message = new Values("key1"); List choice1 = pkg.chooseTasks(0, message); assertThat(choice1.size(), is(1)); @@ -53,7 +62,7 @@ public void testChooseTasksFields() { PartialKeyGrouping pkg = new PartialKeyGrouping(new Fields("test")); WorkerTopologyContext context = mock(WorkerTopologyContext.class); when(context.getComponentOutputFields(any(GlobalStreamId.class))).thenReturn(new Fields("test")); - pkg.prepare(context, mock(GlobalStreamId.class), Lists.newArrayList(0, 1, 2, 3, 4, 5)); + pkg.prepare(context, mock(GlobalStreamId.class), Lists.newArrayList(9, 8, 7, 1, 2, 3)); Values message = new Values("key1"); List choice1 = pkg.chooseTasks(0, message); assertThat(choice1.size(), is(1)); From bdd64fd68a14543a01823e5bd8c4a112d4ded8f8 Mon Sep 17 00:00:00 2001 From: Kevin Peek Date: Thu, 19 Oct 2017 12:37:05 -0400 Subject: [PATCH 2/8] STORM-2782 - add license to new test files --- .../BalancedTargetSelectorTest.java | 17 +++++++++++++++++ .../HashingTwoTaskAssignmentCreatorTest.java | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java index 206681f60ec..9f8d9ce6b6d 100644 --- a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java +++ b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.grouping.partialKeyGrouping; import com.google.common.collect.Lists; diff --git a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java index 51dbf0471da..50c8e07f4dd 100644 --- a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java +++ b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.grouping.partialKeyGrouping; import com.google.common.collect.Lists; From b1c526c1b14aa5c546ba342eee0fc8317d5b66c5 Mon Sep 17 00:00:00 2001 From: Kevin Peek Date: Fri, 20 Oct 2017 12:11:03 -0400 Subject: [PATCH 3/8] STORM-2782 - refactor BalancedTargetSelector for performance --- .../storm/grouping/PartialKeyGrouping.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java index 90618035a36..4deffa85b98 100644 --- a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java +++ b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java @@ -29,7 +29,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.List; import java.util.Map; @@ -181,12 +180,19 @@ public static class BalancedTargetSelector implements TargetSelector { private Map targetTaskStats = Maps.newConcurrentMap(); public Integer chooseTask(List assignedTasks) { - Integer selectedTask = assignedTasks.stream() - .min(Comparator.comparing(task -> targetTaskStats.getOrDefault(task, 0L))) - .orElse(null); + Integer taskIdWithMinLoad = null; + Long minTaskLoad = Long.MAX_VALUE; + + for (Integer currentTaskId : assignedTasks) { + final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L); + if (currentTaskLoad < minTaskLoad) { + minTaskLoad = currentTaskLoad; + taskIdWithMinLoad = currentTaskId; + } + } - targetTaskStats.put(selectedTask, targetTaskStats.getOrDefault(selectedTask, 0L) + 1); - return selectedTask; + targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1); + return taskIdWithMinLoad; } } } \ No newline at end of file From 32599c0e68a075cbd9399334cd2c6ff3db58c9a2 Mon Sep 17 00:00:00 2001 From: Kevin Peek Date: Fri, 27 Oct 2017 07:49:15 -0400 Subject: [PATCH 4/8] STORM-2782 - use int[] instead of List in partial key grouping --- .../storm/grouping/PartialKeyGrouping.java | 13 ++++++------ .../BalancedTargetSelectorTest.java | 20 +++++++++---------- .../HashingTwoTaskAssignmentCreatorTest.java | 14 ++++++------- .../PartialKeyGroupingTest.java | 16 ++++++++++++++- 4 files changed, 37 insertions(+), 26 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java index 4deffa85b98..1ca08489c46 100644 --- a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java +++ b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java @@ -17,7 +17,6 @@ */ package org.apache.storm.grouping; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; @@ -85,7 +84,7 @@ public List chooseTasks(int taskId, List values) { if (values.size() > 0) { final byte[] rawKeyBytes = getKeyBytes(values); - final List taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes); + final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes); final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey); boltIds.add(selectedTask); @@ -147,14 +146,14 @@ private byte[] getKeyBytes(List values) { * Storm Workers, thus each of them needs to come up with the same assignment for a given key. */ public interface AssignmentCreator extends Serializable { - List createAssignment(List targetTasks, byte[] key); + int[] createAssignment(List targetTasks, byte[] key); } /** * This interface chooses one element from a task assignment to send a specific Tuple to. */ public interface TargetSelector extends Serializable { - Integer chooseTask(List assignedTasks); + Integer chooseTask(int[] assignedTasks); } /*========== Implementations ==========*/ @@ -164,10 +163,10 @@ public static class HashingTwoTaskAssignmentCreator implements AssignmentCreator private HashFunction h1 = Hashing.murmur3_128(13); private HashFunction h2 = Hashing.murmur3_128(17); - public List createAssignment(List tasks, byte[] key) { + public int[] createAssignment(List tasks, byte[] key) { int firstChoiceIndex = (int) (Math.abs(h1.hashBytes(key).asLong()) % tasks.size()); int secondChoiceIndex = (int) (Math.abs(h2.hashBytes(key).asLong()) % tasks.size()); - return Lists.newArrayList(tasks.get(firstChoiceIndex), tasks.get(secondChoiceIndex)); + return new int[] {tasks.get(firstChoiceIndex), tasks.get(secondChoiceIndex)}; } } @@ -179,7 +178,7 @@ public List createAssignment(List tasks, byte[] key) { public static class BalancedTargetSelector implements TargetSelector { private Map targetTaskStats = Maps.newConcurrentMap(); - public Integer chooseTask(List assignedTasks) { + public Integer chooseTask(int[] assignedTasks) { Integer taskIdWithMinLoad = null; Long minTaskLoad = Long.MAX_VALUE; diff --git a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java index 9f8d9ce6b6d..2b880d50acd 100644 --- a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java +++ b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java @@ -17,13 +17,13 @@ */ package org.apache.storm.grouping.partialKeyGrouping; -import com.google.common.collect.Lists; import org.apache.storm.grouping.PartialKeyGrouping; import org.apache.storm.utils.Utils; import org.hamcrest.Matchers; import org.junit.Test; -import java.util.List; +import java.util.Arrays; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; @@ -31,7 +31,7 @@ public class BalancedTargetSelectorTest { - private static final List TASK_LIST = Lists.newArrayList(9, 8, 7, 6); + private static final int[] TASK_LIST = {9, 8, 7, 6}; private final PartialKeyGrouping.TargetSelector targetSelector = new PartialKeyGrouping.BalancedTargetSelector(); @@ -43,22 +43,22 @@ public void classIsSerializable() throws Exception { @Test public void selectorReturnsTasksInAssignment() { // select tasks once more than the number of tasks available - for (int i = 0; i < TASK_LIST.size() + 1; i++) { + for (int i = 0; i < TASK_LIST.length + 1; i++) { int selectedTask = targetSelector.chooseTask(TASK_LIST); - assertThat(selectedTask, Matchers.isIn(TASK_LIST)); + assertThat(selectedTask, Matchers.in(Arrays.stream(TASK_LIST).boxed().collect(Collectors.toList()))); } } @Test public void selectsTaskThatHasBeenUsedTheLeast() { // ensure that the first three tasks have been selected before - targetSelector.chooseTask(Lists.newArrayList(TASK_LIST.get(0))); - targetSelector.chooseTask(Lists.newArrayList(TASK_LIST.get(1))); - targetSelector.chooseTask(Lists.newArrayList(TASK_LIST.get(2))); + targetSelector.chooseTask(new int[] {TASK_LIST[0]}); + targetSelector.chooseTask(new int[] {TASK_LIST[1]}); + targetSelector.chooseTask(new int[] {TASK_LIST[2]}); // now, selecting from the full set should cause the fourth task to be chosen. - int selectedTask = targetSelector.chooseTask(Lists.newArrayList(TASK_LIST)); - assertThat(selectedTask, equalTo(TASK_LIST.get(3))); + int selectedTask = targetSelector.chooseTask(TASK_LIST); + assertThat(selectedTask, equalTo(TASK_LIST[3])); } } diff --git a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java index 50c8e07f4dd..712fbdbed27 100644 --- a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java +++ b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java @@ -22,8 +22,6 @@ import org.apache.storm.utils.Utils; import org.junit.Test; -import java.util.List; - import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; @@ -42,23 +40,23 @@ public void classIsSerializable() throws Exception { @Test public void returnsAssignmentOfExpectedSize() { PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.HashingTwoTaskAssignmentCreator(); - List assignedTasks = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); - assertThat(assignedTasks.size(), equalTo(2)); + int[] assignedTasks = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); + assertThat(assignedTasks.length, equalTo(2)); } @Test public void returnsDifferentAssignmentForDifferentKeys() { PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.HashingTwoTaskAssignmentCreator(); - List assignmentOne = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); - List assignmentTwo = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_TWO); + int[] assignmentOne = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); + int[] assignmentTwo = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_TWO); assertThat(assignmentOne, not(equalTo(assignmentTwo))); } @Test public void returnsSameAssignmentForSameKey() { PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.HashingTwoTaskAssignmentCreator(); - List assignmentOne = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); - List assignmentOneAgain = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); + int[] assignmentOne = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); + int[] assignmentOneAgain = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); assertThat(assignmentOne, equalTo(assignmentOneAgain)); } } diff --git a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/PartialKeyGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/PartialKeyGroupingTest.java index aaa628d1a23..405b0a492c5 100644 --- a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/PartialKeyGroupingTest.java +++ b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/PartialKeyGroupingTest.java @@ -45,6 +45,20 @@ public void testGroupingIsSerializable() throws Exception { @Test public void testChooseTasks() { + PartialKeyGrouping pkg = new PartialKeyGrouping(); + pkg.prepare(null, null, Lists.newArrayList(0, 1, 2, 3, 4, 5)); + Values message = new Values("key1"); + List choice1 = pkg.chooseTasks(0, message); + assertThat(choice1.size(), is(1)); + List choice2 = pkg.chooseTasks(0, message); + assertThat(choice2, is(not(choice1))); + List choice3 = pkg.chooseTasks(0, message); + assertThat(choice3, is(not(choice2))); + assertThat(choice3, is(choice1)); + } + + @Test + public void testChooseTasksWithoutConsecutiveTaskIds() { PartialKeyGrouping pkg = new PartialKeyGrouping(); pkg.prepare(null, null, Lists.newArrayList(9, 8, 7, 1, 2, 3)); Values message = new Values("key1"); @@ -62,7 +76,7 @@ public void testChooseTasksFields() { PartialKeyGrouping pkg = new PartialKeyGrouping(new Fields("test")); WorkerTopologyContext context = mock(WorkerTopologyContext.class); when(context.getComponentOutputFields(any(GlobalStreamId.class))).thenReturn(new Fields("test")); - pkg.prepare(context, mock(GlobalStreamId.class), Lists.newArrayList(9, 8, 7, 1, 2, 3)); + pkg.prepare(context, mock(GlobalStreamId.class), Lists.newArrayList(0, 1, 2, 3, 4, 5)); Values message = new Values("key1"); List choice1 = pkg.chooseTasks(0, message); assertThat(choice1.size(), is(1)); From d5ed22cc2742ff5eb3f79749006ea9647e07541d Mon Sep 17 00:00:00 2001 From: Kevin Peek Date: Fri, 27 Oct 2017 08:56:40 -0400 Subject: [PATCH 5/8] STORM-2782 - avoid ConcurrentMap when we do not need it --- .../src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java index 1ca08489c46..92e8d9dc2ad 100644 --- a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java +++ b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java @@ -176,7 +176,7 @@ public int[] createAssignment(List tasks, byte[] key) { * received the fewest Tuples overall from this instance of the grouping. */ public static class BalancedTargetSelector implements TargetSelector { - private Map targetTaskStats = Maps.newConcurrentMap(); + private Map targetTaskStats = Maps.newHashMap(); public Integer chooseTask(int[] assignedTasks) { Integer taskIdWithMinLoad = null; From 6af9ca660d83d7798ddba19754a2211b637f6e02 Mon Sep 17 00:00:00 2001 From: Kevin Peek Date: Fri, 27 Oct 2017 10:23:07 -0400 Subject: [PATCH 6/8] STORM-2782 - fix some style issues --- .../storm/grouping/PartialKeyGrouping.java | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java index 92e8d9dc2ad..7dbec7313a6 100644 --- a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java +++ b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java @@ -20,24 +20,25 @@ import com.google.common.collect.Maps; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; -import org.apache.storm.generated.GlobalStreamId; -import org.apache.storm.task.WorkerTopologyContext; -import org.apache.storm.tuple.Fields; - import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.Fields; + /** * A variation on FieldGrouping. This grouping operates on a partitioning of the incoming - * tuples (like a FieldGrouping), but it can send Tuples from a given partition to multiple downstream tasks. + * tuples (like a FieldGrouping), but it can send Tuples from a given partition to + * multiple downstream tasks. * - * Given a total pool of target tasks, this grouping will always send Tuples with a given key to one member of - * a subset of those tasks. Each key is assigned a subset of tasks. Each tuple is then sent to one task - * from that subset. + * Given a total pool of target tasks, this grouping will always send Tuples with a given + * key to one member of a subset of those tasks. Each key is assigned a subset of tasks. + * Each tuple is then sent to one task from that subset. * * Notes: * - the default TaskSelector ensures each task gets as close to a balanced number of Tuples as possible @@ -63,7 +64,7 @@ public PartialKeyGrouping(Fields fields) { public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) { this(fields, assignmentCreator, new BalancedTargetSelector()); } - + public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) { this.fields = fields; this.assignmentCreator = assignmentCreator; @@ -158,11 +159,18 @@ public interface TargetSelector extends Serializable { /*========== Implementations ==========*/ + /** + * This implementation of AssignmentCreator chooses two target tasks by hashing the input's key. + * This matches the original behavior of this grouping and is thus the default. + */ public static class HashingTwoTaskAssignmentCreator implements AssignmentCreator { private HashFunction h1 = Hashing.murmur3_128(13); private HashFunction h2 = Hashing.murmur3_128(17); + /** + * Creates a two task assignment by hashing the incoming key. + */ public int[] createAssignment(List tasks, byte[] key) { int firstChoiceIndex = (int) (Math.abs(h1.hashBytes(key).asLong()) % tasks.size()); int secondChoiceIndex = (int) (Math.abs(h2.hashBytes(key).asLong()) % tasks.size()); @@ -178,6 +186,10 @@ public int[] createAssignment(List tasks, byte[] key) { public static class BalancedTargetSelector implements TargetSelector { private Map targetTaskStats = Maps.newHashMap(); + /** + * Chooses one of the incoming tasks and selects the one that has been selected + * the fewest times so far. + */ public Integer chooseTask(int[] assignedTasks) { Integer taskIdWithMinLoad = null; Long minTaskLoad = Long.MAX_VALUE; From a3a6640d2b599fc372b60b7af45fa8783f4239f4 Mon Sep 17 00:00:00 2001 From: Kevin Peek Date: Mon, 30 Oct 2017 10:30:11 -0400 Subject: [PATCH 7/8] STORM-2782 - instead of hash functions, use Random to select tasks for assignment --- .../storm/grouping/PartialKeyGrouping.java | 27 ++++++++++--------- ...> RandomTwoTaskAssignmentCreatorTest.java} | 10 +++---- 2 files changed, 19 insertions(+), 18 deletions(-) rename storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/{HashingTwoTaskAssignmentCreatorTest.java => RandomTwoTaskAssignmentCreatorTest.java} (90%) diff --git a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java index 7dbec7313a6..64ac8020e68 100644 --- a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java +++ b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java @@ -18,14 +18,15 @@ package org.apache.storm.grouping; import com.google.common.collect.Maps; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; + import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Random; + import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.task.WorkerTopologyContext; import org.apache.storm.tuple.Fields; @@ -58,7 +59,7 @@ public PartialKeyGrouping() { } public PartialKeyGrouping(Fields fields) { - this(fields, new HashingTwoTaskAssignmentCreator(), new BalancedTargetSelector()); + this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector()); } public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) { @@ -163,22 +164,22 @@ public interface TargetSelector extends Serializable { * This implementation of AssignmentCreator chooses two target tasks by hashing the input's key. * This matches the original behavior of this grouping and is thus the default. */ - public static class HashingTwoTaskAssignmentCreator implements AssignmentCreator { - - private HashFunction h1 = Hashing.murmur3_128(13); - private HashFunction h2 = Hashing.murmur3_128(17); - + public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator { /** - * Creates a two task assignment by hashing the incoming key. + * Creates a two task assignment by selecting random tasks. */ public int[] createAssignment(List tasks, byte[] key) { - int firstChoiceIndex = (int) (Math.abs(h1.hashBytes(key).asLong()) % tasks.size()); - int secondChoiceIndex = (int) (Math.abs(h2.hashBytes(key).asLong()) % tasks.size()); - return new int[] {tasks.get(firstChoiceIndex), tasks.get(secondChoiceIndex)}; + // It is necessary that this produce a deterministic assignment based on the key, so seed the Random from the key + final long seedForRandom = Arrays.hashCode(key); + final Random random = new Random(seedForRandom); + final int choice1 = random.nextInt(tasks.size()); + int choice2 = random.nextInt(tasks.size()); + // ensure that choice1 and choice2 are not the same task + choice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2; + return new int[] {tasks.get(choice1), tasks.get(choice2)}; } } - /** * A basic implementation of target selection. This strategy chooses the task within the assignment that has * received the fewest Tuples overall from this instance of the grouping. diff --git a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/RandomTwoTaskAssignmentCreatorTest.java similarity index 90% rename from storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java rename to storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/RandomTwoTaskAssignmentCreatorTest.java index 712fbdbed27..332fed55256 100644 --- a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/HashingTwoTaskAssignmentCreatorTest.java +++ b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/RandomTwoTaskAssignmentCreatorTest.java @@ -26,27 +26,27 @@ import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; -public class HashingTwoTaskAssignmentCreatorTest { +public class RandomTwoTaskAssignmentCreatorTest { private static final byte[] GROUPING_KEY_ONE = "some_key_one".getBytes(); private static final byte[] GROUPING_KEY_TWO = "some_key_two".getBytes(); @Test public void classIsSerializable() throws Exception { - PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.HashingTwoTaskAssignmentCreator(); + PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.RandomTwoTaskAssignmentCreator(); Utils.javaSerialize(assignmentCreator); } @Test public void returnsAssignmentOfExpectedSize() { - PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.HashingTwoTaskAssignmentCreator(); + PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.RandomTwoTaskAssignmentCreator(); int[] assignedTasks = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); assertThat(assignedTasks.length, equalTo(2)); } @Test public void returnsDifferentAssignmentForDifferentKeys() { - PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.HashingTwoTaskAssignmentCreator(); + PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.RandomTwoTaskAssignmentCreator(); int[] assignmentOne = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); int[] assignmentTwo = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_TWO); assertThat(assignmentOne, not(equalTo(assignmentTwo))); @@ -54,7 +54,7 @@ public void returnsDifferentAssignmentForDifferentKeys() { @Test public void returnsSameAssignmentForSameKey() { - PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.HashingTwoTaskAssignmentCreator(); + PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.RandomTwoTaskAssignmentCreator(); int[] assignmentOne = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); int[] assignmentOneAgain = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); assertThat(assignmentOne, equalTo(assignmentOneAgain)); From 459b716385d03d6115ffbc874af4510c4df5e9ab Mon Sep 17 00:00:00 2001 From: Kevin Peek Date: Mon, 30 Oct 2017 10:32:43 -0400 Subject: [PATCH 8/8] STORM-2782 - update javadoc --- .../src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java index 64ac8020e68..70dfeaa62c8 100644 --- a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java +++ b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java @@ -161,8 +161,7 @@ public interface TargetSelector extends Serializable { /*========== Implementations ==========*/ /** - * This implementation of AssignmentCreator chooses two target tasks by hashing the input's key. - * This matches the original behavior of this grouping and is thus the default. + * This implementation of AssignmentCreator chooses two arbitrary tasks. */ public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator { /**