From bf5373d879616e44dbde38d2720e80e52c76318a Mon Sep 17 00:00:00 2001 From: Raghav Kumar Gautam Date: Thu, 19 Jan 2017 16:41:00 -0800 Subject: [PATCH] STORM-2305: STORM-2279 calculates task index different from grouper code --- .../src/jvm/org/apache/storm/daemon/GrouperFactory.java | 3 ++- .../src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java | 3 +-- storm-core/src/jvm/org/apache/storm/utils/TupleUtils.java | 7 +++++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java index d06682fa31b..e64ec1db953 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java @@ -156,9 +156,10 @@ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List chooseTasks(int taskId, List values) { - int targetTaskIndex = Math.abs(TupleUtils.listHashCode(outFields.select(groupFields, values))) % numTasks; + int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks); return Collections.singletonList(targetTasks.get(targetTaskIndex)); } + } public static class GlobalGrouper implements CustomStreamGrouping { diff --git a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java index 231d48acd26..a76a912858b 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java @@ -3600,8 +3600,7 @@ public ComponentPageInfo getComponentPageInfo(String topoId, String componentId, List tasks = compToTasks.get(StormCommon.EVENTLOGGER_COMPONENT_ID); tasks.sort(null); // Find the task the events from this component route to. - int taskIndex = (TupleUtils.listHashCode(Arrays.asList(componentId)) % tasks.size() + tasks.size()) % - tasks.size(); + int taskIndex = TupleUtils.chooseTaskIndex(Collections.singletonList(componentId), tasks.size()); int taskId = tasks.get(taskIndex); String host = null; Integer port = null; diff --git a/storm-core/src/jvm/org/apache/storm/utils/TupleUtils.java b/storm-core/src/jvm/org/apache/storm/utils/TupleUtils.java index fff88d484b3..6c5c6b34432 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/TupleUtils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/TupleUtils.java @@ -40,7 +40,11 @@ public static boolean isTick(Tuple tuple) { && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); } - public static int listHashCode(List alist) { + public static int chooseTaskIndex(List keys, int numTasks) { + return Math.abs(listHashCode(keys)) % numTasks; + } + + private static int listHashCode(List alist) { if (alist == null) { return 1; } else { @@ -60,5 +64,4 @@ public static Map putTickFrequencyIntoComponentConfig(Map