From f7b1f1f21751ad6f3508c0846f0eca0c55c03c47 Mon Sep 17 00:00:00 2001 From: Ethan Li Date: Wed, 7 Feb 2018 13:40:45 -0600 Subject: [PATCH 1/4] [STORM-2940] dynamically set the CAPACITY value of LoadAwareShuffleGrouping --- .../grouping/LoadAwareShuffleGrouping.java | 21 +++++++++++++------ .../LoadAwareShuffleGroupingTest.java | 8 +++---- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java index 3fd75e53b4f..730f46d33bc 100644 --- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java +++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java @@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory; public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable { - static final int CAPACITY = 1000; + private int CAPACITY; private static final int MAX_WEIGHT = 100; private static class IndexAndWeights { final int index; @@ -85,6 +85,7 @@ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List(); @@ -163,7 +164,7 @@ private Scope transition(LoadMapping load) { if (targetInScope.isEmpty()) { Scope upScope = Scope.upgrade(currentScope); if (upScope == currentScope) { - throw new RuntimeException("This executor has no target tasks."); + throw new RuntimeException("The current scope " + currentScope + " has no target tasks."); } currentScope = upScope; return transition(load); @@ -229,11 +230,14 @@ private synchronized void updateRing(LoadMapping load) { } } - //in case we didn't fill in enough - for (; currentIdx < CAPACITY; currentIdx++) { - prepareChoices[currentIdx] = prepareChoices[random.nextInt(currentIdx)]; + if (currentIdx > 0) { + //in case we didn't fill in enough + for (; currentIdx < CAPACITY; currentIdx++) { + prepareChoices[currentIdx] = prepareChoices[random.nextInt(currentIdx)]; + } } - } else { + } + if (currentIdx == 0) { //This really should be impossible, because we go off of the min load, and inc anything within 5% of it. // But just to be sure it is never an issue, especially with float rounding etc. for (;currentIdx < CAPACITY; currentIdx++) { @@ -322,4 +326,9 @@ public static Scope upgrade(Scope current) { } } } + + //only for test + public int getCAPACITY() { + return CAPACITY; + } } \ No newline at end of file diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java index d704900b195..ed500e15be6 100644 --- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java +++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java @@ -101,10 +101,10 @@ public void testUnevenLoadOverTime() throws Exception { double expectedOnePercentage = expectedOneWeight / (expectedOneWeight + expectedTwoWeight); double expectedTwoPercentage = expectedTwoWeight / (expectedOneWeight + expectedTwoWeight); assertEquals("i = " + i, - expectedOnePercentage, countByType.getOrDefault(1, 0.0) / LoadAwareShuffleGrouping.CAPACITY, + expectedOnePercentage, countByType.getOrDefault(1, 0.0) / grouping.getCAPACITY(), 0.01); assertEquals("i = " + i, - expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / LoadAwareShuffleGrouping.CAPACITY, + expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / grouping.getCAPACITY(), 0.01); } @@ -121,9 +121,9 @@ public void testUnevenLoadOverTime() throws Exception { LOG.info("contByType = {}", countByType); double expectedOnePercentage = expectedOneWeight / (expectedOneWeight + expectedTwoWeight); double expectedTwoPercentage = expectedTwoWeight / (expectedOneWeight + expectedTwoWeight); - assertEquals(expectedOnePercentage, countByType.getOrDefault(1, 0.0) / LoadAwareShuffleGrouping.CAPACITY, + assertEquals(expectedOnePercentage, countByType.getOrDefault(1, 0.0) / grouping.getCAPACITY(), 0.01); - assertEquals(expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / LoadAwareShuffleGrouping.CAPACITY, + assertEquals(expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / grouping.getCAPACITY(), 0.01); } } From 292e03a7dd8e13b2e245fa347966520a91d1ef53 Mon Sep 17 00:00:00 2001 From: Ethan Li Date: Wed, 7 Feb 2018 15:08:37 -0600 Subject: [PATCH 2/4] address review comments --- .../grouping/LoadAwareShuffleGrouping.java | 24 +++++++++---------- .../LoadAwareShuffleGroupingTest.java | 8 +++---- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java index 730f46d33bc..26e8e9e8db4 100644 --- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java +++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java @@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory; public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable { - private int CAPACITY; + private int capacity; private static final int MAX_WEIGHT = 100; private static class IndexAndWeights { final int index; @@ -85,7 +85,7 @@ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List(); @@ -102,11 +102,11 @@ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List chooseTasks(int taskId, List values) { int rightNow; while (true) { rightNow = current.incrementAndGet(); - if (rightNow < CAPACITY) { + if (rightNow < capacity) { return rets[choices[rightNow]]; - } else if (rightNow == CAPACITY) { + } else if (rightNow == capacity) { current.set(0); return rets[choices[0]]; } @@ -223,8 +223,8 @@ private synchronized void updateRing(LoadMapping load) { if (weightSum > 0) { for (int target: targetsInScope) { IndexAndWeights indexAndWeights = orig.get(target); - int count = (int) ((indexAndWeights.weight / (double) weightSum) * CAPACITY); - for (int i = 0; i < count && currentIdx < CAPACITY; i++) { + int count = (int) ((indexAndWeights.weight / (double) weightSum) * capacity); + for (int i = 0; i < count && currentIdx < capacity; i++) { prepareChoices[currentIdx] = indexAndWeights.index; currentIdx++; } @@ -232,7 +232,7 @@ private synchronized void updateRing(LoadMapping load) { if (currentIdx > 0) { //in case we didn't fill in enough - for (; currentIdx < CAPACITY; currentIdx++) { + for (; currentIdx < capacity; currentIdx++) { prepareChoices[currentIdx] = prepareChoices[random.nextInt(currentIdx)]; } } @@ -240,7 +240,7 @@ private synchronized void updateRing(LoadMapping load) { if (currentIdx == 0) { //This really should be impossible, because we go off of the min load, and inc anything within 5% of it. // But just to be sure it is never an issue, especially with float rounding etc. - for (;currentIdx < CAPACITY; currentIdx++) { + for (;currentIdx < capacity; currentIdx++) { prepareChoices[currentIdx] = currentIdx % rets.length; } } @@ -328,7 +328,7 @@ public static Scope upgrade(Scope current) { } //only for test - public int getCAPACITY() { - return CAPACITY; + public int getCapacity() { + return capacity; } } \ No newline at end of file diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java index ed500e15be6..cd9b48cc691 100644 --- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java +++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java @@ -101,10 +101,10 @@ public void testUnevenLoadOverTime() throws Exception { double expectedOnePercentage = expectedOneWeight / (expectedOneWeight + expectedTwoWeight); double expectedTwoPercentage = expectedTwoWeight / (expectedOneWeight + expectedTwoWeight); assertEquals("i = " + i, - expectedOnePercentage, countByType.getOrDefault(1, 0.0) / grouping.getCAPACITY(), + expectedOnePercentage, countByType.getOrDefault(1, 0.0) / grouping.getCapacity(), 0.01); assertEquals("i = " + i, - expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / grouping.getCAPACITY(), + expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / grouping.getCapacity(), 0.01); } @@ -121,9 +121,9 @@ public void testUnevenLoadOverTime() throws Exception { LOG.info("contByType = {}", countByType); double expectedOnePercentage = expectedOneWeight / (expectedOneWeight + expectedTwoWeight); double expectedTwoPercentage = expectedTwoWeight / (expectedOneWeight + expectedTwoWeight); - assertEquals(expectedOnePercentage, countByType.getOrDefault(1, 0.0) / grouping.getCAPACITY(), + assertEquals(expectedOnePercentage, countByType.getOrDefault(1, 0.0) / grouping.getCapacity(), 0.01); - assertEquals(expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / grouping.getCAPACITY(), + assertEquals(expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / grouping.getCapacity(), 0.01); } } From 62d9d357c5d69dfaca4dbd3c9b84f83e220fa9af Mon Sep 17 00:00:00 2001 From: Ethan Li Date: Thu, 8 Feb 2018 10:27:50 -0600 Subject: [PATCH 3/4] lose granularity if targetTask size is larger than 1000 --- .../jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java index 26e8e9e8db4..832ba7f2899 100644 --- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java +++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java @@ -85,7 +85,7 @@ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List(); From 6ddf6f8dd0fb927f6b4520ebb3bd71978f96afd3 Mon Sep 17 00:00:00 2001 From: Ethan Li Date: Fri, 9 Feb 2018 08:50:57 -0600 Subject: [PATCH 4/4] add tests --- .../LoadAwareShuffleGroupingTest.java | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java index cd9b48cc691..12ab32aa67a 100644 --- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java +++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java @@ -138,9 +138,16 @@ private Map count(int[] choices, List[] rets) { } @Test - public void testLoadAwareShuffleGroupingWithEvenLoad() { - // just pick arbitrary number - final int numTasks = 7; + public void testLoadAwareShuffleGroupingWithEvenLoadWithManyTargets() { + testLoadAwareShuffleGroupingWithEvenLoad(1000); + } + + @Test + public void testLoadAwareShuffleGroupingWithEvenLoadWithLessTargets() { + testLoadAwareShuffleGroupingWithEvenLoad(7); + } + + private void testLoadAwareShuffleGroupingWithEvenLoad(int numTasks) { final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping(); // Define our taskIds and loads @@ -166,8 +173,16 @@ public void testLoadAwareShuffleGroupingWithEvenLoad() { } @Test - public void testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded() throws InterruptedException, ExecutionException { - final int numTasks = 7; + public void testLoadAwareShuffleGroupingWithEvenLoadMultiThreadedWithManyTargets() throws ExecutionException, InterruptedException { + testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded(1000); + } + + @Test + public void testLoadAwareShuffleGroupingWithEvenLoadMultiThreadedWithLessTargets() throws ExecutionException, InterruptedException { + testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded(7); + } + + private void testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded(int numTasks) throws InterruptedException, ExecutionException { final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();