From d884f8a3dda9f9d81064280e61eb9e56d4542e87 Mon Sep 17 00:00:00 2001 From: Kevin Peek Date: Mon, 30 Oct 2017 11:22:20 -0400 Subject: [PATCH] STORM-2782 - refactor partial key grouping --- .../storm/grouping/PartialKeyGrouping.java | 199 +++++++++++++----- .../BalancedTargetSelectorTest.java | 64 ++++++ .../PartialKeyGroupingTest.java | 45 +++- .../RandomTwoTaskAssignmentCreatorTest.java | 62 ++++++ 4 files changed, 311 insertions(+), 59 deletions(-) create mode 100644 storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java rename storm-client/test/jvm/org/apache/storm/grouping/{ => partialKeyGrouping}/PartialKeyGroupingTest.java (69%) create mode 100644 storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/RandomTwoTaskAssignmentCreatorTest.java diff --git a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java index e1af16de158..70dfeaa62c8 100644 --- a/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java +++ b/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java @@ -17,40 +17,64 @@ */ package org.apache.storm.grouping; +import com.google.common.collect.Maps; + import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Random; import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.task.WorkerTopologyContext; import org.apache.storm.tuple.Fields; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; +/** + * A variation on FieldGrouping. This grouping operates on a partitioning of the incoming + * tuples (like a FieldGrouping), but it can send Tuples from a given partition to + * multiple downstream tasks. + * + * Given a total pool of target tasks, this grouping will always send Tuples with a given + * key to one member of a subset of those tasks. Each key is assigned a subset of tasks. + * Each tuple is then sent to one task from that subset. + * + * Notes: + * - the default TaskSelector ensures each task gets as close to a balanced number of Tuples as possible + * - the default AssignmentCreator hashes the key and produces an assignment of two tasks + */ public class PartialKeyGrouping implements CustomStreamGrouping, Serializable { - private static final long serialVersionUID = -447379837314000353L; + private static final long serialVersionUID = -1672360572274911808L; private List targetTasks; - private long[] targetTaskStats; - private HashFunction h1 = Hashing.murmur3_128(13); - private HashFunction h2 = Hashing.murmur3_128(17); private Fields fields = null; private Fields outFields = null; + private AssignmentCreator assignmentCreator; + private TargetSelector targetSelector; + public PartialKeyGrouping() { - //Empty + this(null); } public PartialKeyGrouping(Fields fields) { + this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector()); + } + + public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) { + this(fields, assignmentCreator, new BalancedTargetSelector()); + } + + public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) { this.fields = fields; + this.assignmentCreator = assignmentCreator; + this.targetSelector = targetSelector; } @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) { this.targetTasks = targetTasks; - targetTaskStats = new long[this.targetTasks.size()]; if (this.fields != null) { this.outFields = context.getComponentOutputFields(stream); } @@ -60,47 +84,126 @@ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List chooseTasks(int taskId, List values) { List boltIds = new ArrayList<>(1); if (values.size() > 0) { - byte[] raw; - if (fields != null) { - List selectedFields = outFields.select(fields, values); - ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4); - for (Object o: selectedFields) { - if (o instanceof List) { - out.putInt(Arrays.deepHashCode(((List)o).toArray())); - } else if (o instanceof Object[]) { - out.putInt(Arrays.deepHashCode((Object[])o)); - } else if (o instanceof byte[]) { - out.putInt(Arrays.hashCode((byte[]) o)); - } else if (o instanceof short[]) { - out.putInt(Arrays.hashCode((short[]) o)); - } else if (o instanceof int[]) { - out.putInt(Arrays.hashCode((int[]) o)); - } else if (o instanceof long[]) { - out.putInt(Arrays.hashCode((long[]) o)); - } else if (o instanceof char[]) { - out.putInt(Arrays.hashCode((char[]) o)); - } else if (o instanceof float[]) { - out.putInt(Arrays.hashCode((float[]) o)); - } else if (o instanceof double[]) { - out.putInt(Arrays.hashCode((double[]) o)); - } else if (o instanceof boolean[]) { - out.putInt(Arrays.hashCode((boolean[]) o)); - } else if (o != null) { - out.putInt(o.hashCode()); - } else { - out.putInt(0); - } + final byte[] rawKeyBytes = getKeyBytes(values); + + final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes); + final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey); + + boltIds.add(selectedTask); + } + return boltIds; + } + + + /** + * Extract the key from the input Tuple. + */ + private byte[] getKeyBytes(List values) { + byte[] raw; + if (fields != null) { + List selectedFields = outFields.select(fields, values); + ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4); + for (Object o: selectedFields) { + if (o instanceof List) { + out.putInt(Arrays.deepHashCode(((List)o).toArray())); + } else if (o instanceof Object[]) { + out.putInt(Arrays.deepHashCode((Object[])o)); + } else if (o instanceof byte[]) { + out.putInt(Arrays.hashCode((byte[]) o)); + } else if (o instanceof short[]) { + out.putInt(Arrays.hashCode((short[]) o)); + } else if (o instanceof int[]) { + out.putInt(Arrays.hashCode((int[]) o)); + } else if (o instanceof long[]) { + out.putInt(Arrays.hashCode((long[]) o)); + } else if (o instanceof char[]) { + out.putInt(Arrays.hashCode((char[]) o)); + } else if (o instanceof float[]) { + out.putInt(Arrays.hashCode((float[]) o)); + } else if (o instanceof double[]) { + out.putInt(Arrays.hashCode((double[]) o)); + } else if (o instanceof boolean[]) { + out.putInt(Arrays.hashCode((boolean[]) o)); + } else if (o != null) { + out.putInt(o.hashCode()); + } else { + out.putInt(0); } - raw = out.array(); - } else { - raw = values.get(0).toString().getBytes(); // assume key is the first field } - int firstChoice = (int) (Math.abs(h1.hashBytes(raw).asLong()) % this.targetTasks.size()); - int secondChoice = (int) (Math.abs(h2.hashBytes(raw).asLong()) % this.targetTasks.size()); - int selected = targetTaskStats[firstChoice] > targetTaskStats[secondChoice] ? secondChoice : firstChoice; - boltIds.add(targetTasks.get(selected)); - targetTaskStats[selected]++; + raw = out.array(); + } else { + raw = values.get(0).toString().getBytes(); // assume key is the first field + } + return raw; + } + + /*================================================== + * Helper Classes + *==================================================*/ + + /** + * This interface is responsible for choosing a subset of the target tasks to use for a given key. + * + * NOTE: whatever scheme you use to create the assignment should be deterministic. This may be executed on multiple + * Storm Workers, thus each of them needs to come up with the same assignment for a given key. + */ + public interface AssignmentCreator extends Serializable { + int[] createAssignment(List targetTasks, byte[] key); + } + + /** + * This interface chooses one element from a task assignment to send a specific Tuple to. + */ + public interface TargetSelector extends Serializable { + Integer chooseTask(int[] assignedTasks); + } + + /*========== Implementations ==========*/ + + /** + * This implementation of AssignmentCreator chooses two arbitrary tasks. + */ + public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator { + /** + * Creates a two task assignment by selecting random tasks. + */ + public int[] createAssignment(List tasks, byte[] key) { + // It is necessary that this produce a deterministic assignment based on the key, so seed the Random from the key + final long seedForRandom = Arrays.hashCode(key); + final Random random = new Random(seedForRandom); + final int choice1 = random.nextInt(tasks.size()); + int choice2 = random.nextInt(tasks.size()); + // ensure that choice1 and choice2 are not the same task + choice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2; + return new int[] {tasks.get(choice1), tasks.get(choice2)}; + } + } + + /** + * A basic implementation of target selection. This strategy chooses the task within the assignment that has + * received the fewest Tuples overall from this instance of the grouping. + */ + public static class BalancedTargetSelector implements TargetSelector { + private Map targetTaskStats = Maps.newHashMap(); + + /** + * Chooses one of the incoming tasks and selects the one that has been selected + * the fewest times so far. + */ + public Integer chooseTask(int[] assignedTasks) { + Integer taskIdWithMinLoad = null; + Long minTaskLoad = Long.MAX_VALUE; + + for (Integer currentTaskId : assignedTasks) { + final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L); + if (currentTaskLoad < minTaskLoad) { + minTaskLoad = currentTaskLoad; + taskIdWithMinLoad = currentTaskId; + } + } + + targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1); + return taskIdWithMinLoad; } - return boltIds; } -} +} \ No newline at end of file diff --git a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java new file mode 100644 index 00000000000..2b880d50acd --- /dev/null +++ b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/BalancedTargetSelectorTest.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.grouping.partialKeyGrouping; + +import org.apache.storm.grouping.PartialKeyGrouping; +import org.apache.storm.utils.Utils; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.util.Arrays; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +public class BalancedTargetSelectorTest { + + + private static final int[] TASK_LIST = {9, 8, 7, 6}; + + private final PartialKeyGrouping.TargetSelector targetSelector = new PartialKeyGrouping.BalancedTargetSelector(); + + @Test + public void classIsSerializable() throws Exception { + Utils.javaSerialize(targetSelector); + } + + @Test + public void selectorReturnsTasksInAssignment() { + // select tasks once more than the number of tasks available + for (int i = 0; i < TASK_LIST.length + 1; i++) { + int selectedTask = targetSelector.chooseTask(TASK_LIST); + assertThat(selectedTask, Matchers.in(Arrays.stream(TASK_LIST).boxed().collect(Collectors.toList()))); + } + } + + @Test + public void selectsTaskThatHasBeenUsedTheLeast() { + // ensure that the first three tasks have been selected before + targetSelector.chooseTask(new int[] {TASK_LIST[0]}); + targetSelector.chooseTask(new int[] {TASK_LIST[1]}); + targetSelector.chooseTask(new int[] {TASK_LIST[2]}); + + // now, selecting from the full set should cause the fourth task to be chosen. + int selectedTask = targetSelector.chooseTask(TASK_LIST); + assertThat(selectedTask, equalTo(TASK_LIST[3])); + } + +} diff --git a/storm-client/test/jvm/org/apache/storm/grouping/PartialKeyGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/PartialKeyGroupingTest.java similarity index 69% rename from storm-client/test/jvm/org/apache/storm/grouping/PartialKeyGroupingTest.java rename to storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/PartialKeyGroupingTest.java index 09450629a0b..405b0a492c5 100644 --- a/storm-client/test/jvm/org/apache/storm/grouping/PartialKeyGroupingTest.java +++ b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/PartialKeyGroupingTest.java @@ -15,25 +15,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.grouping; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -import java.util.List; - -import org.junit.Test; +package org.apache.storm.grouping.partialKeyGrouping; +import com.google.common.collect.Lists; import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.grouping.PartialKeyGrouping; import org.apache.storm.task.WorkerTopologyContext; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.junit.Test; -import com.google.common.collect.Lists; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class PartialKeyGroupingTest { + + @Test + public void testGroupingIsSerializable() throws Exception { + PartialKeyGrouping grouping = new PartialKeyGrouping(new Fields("some_field")); + Utils.javaSerialize(grouping); + } + @Test public void testChooseTasks() { PartialKeyGrouping pkg = new PartialKeyGrouping(); @@ -48,6 +57,20 @@ public void testChooseTasks() { assertThat(choice3, is(choice1)); } + @Test + public void testChooseTasksWithoutConsecutiveTaskIds() { + PartialKeyGrouping pkg = new PartialKeyGrouping(); + pkg.prepare(null, null, Lists.newArrayList(9, 8, 7, 1, 2, 3)); + Values message = new Values("key1"); + List choice1 = pkg.chooseTasks(0, message); + assertThat(choice1.size(), is(1)); + List choice2 = pkg.chooseTasks(0, message); + assertThat(choice2, is(not(choice1))); + List choice3 = pkg.chooseTasks(0, message); + assertThat(choice3, is(not(choice2))); + assertThat(choice3, is(choice1)); + } + @Test public void testChooseTasksFields() { PartialKeyGrouping pkg = new PartialKeyGrouping(new Fields("test")); diff --git a/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/RandomTwoTaskAssignmentCreatorTest.java b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/RandomTwoTaskAssignmentCreatorTest.java new file mode 100644 index 00000000000..332fed55256 --- /dev/null +++ b/storm-client/test/jvm/org/apache/storm/grouping/partialKeyGrouping/RandomTwoTaskAssignmentCreatorTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.grouping.partialKeyGrouping; + +import com.google.common.collect.Lists; +import org.apache.storm.grouping.PartialKeyGrouping; +import org.apache.storm.utils.Utils; +import org.junit.Test; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +public class RandomTwoTaskAssignmentCreatorTest { + + private static final byte[] GROUPING_KEY_ONE = "some_key_one".getBytes(); + private static final byte[] GROUPING_KEY_TWO = "some_key_two".getBytes(); + + @Test + public void classIsSerializable() throws Exception { + PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.RandomTwoTaskAssignmentCreator(); + Utils.javaSerialize(assignmentCreator); + } + + @Test + public void returnsAssignmentOfExpectedSize() { + PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.RandomTwoTaskAssignmentCreator(); + int[] assignedTasks = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); + assertThat(assignedTasks.length, equalTo(2)); + } + + @Test + public void returnsDifferentAssignmentForDifferentKeys() { + PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.RandomTwoTaskAssignmentCreator(); + int[] assignmentOne = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); + int[] assignmentTwo = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_TWO); + assertThat(assignmentOne, not(equalTo(assignmentTwo))); + } + + @Test + public void returnsSameAssignmentForSameKey() { + PartialKeyGrouping.AssignmentCreator assignmentCreator = new PartialKeyGrouping.RandomTwoTaskAssignmentCreator(); + int[] assignmentOne = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); + int[] assignmentOneAgain = assignmentCreator.createAssignment(Lists.newArrayList(9, 8, 7, 6), GROUPING_KEY_ONE); + assertThat(assignmentOne, equalTo(assignmentOneAgain)); + } +}