Permalink
Browse files

Merge pull request #99 from aledsage/MinorFixes

Fix ResizingPolicy, and thus the test
  • Loading branch information...
2 parents e1d14f6 + f65d489 commit 67bb1f6181ef48b336fb5deaa2178591190bdbc2 @ahgittin ahgittin committed May 16, 2012
@@ -51,10 +51,6 @@ public synchronized T getLatestValue() {
}
public synchronized List<TimestampedValue<T>> getValuesInWindow(long now, long subTimePeriod) {
- if (values.size() <= minVals) {
- return ImmutableList.copyOf(values.subList(values.size()-minVals, values.size()));
- }
-
List<TimestampedValue<T>> result = new LinkedList<TimestampedValue<T>>();
TimestampedValue<T> mostRecentExpired = null;
for (TimestampedValue<T> val : values) {
@@ -68,7 +64,13 @@ public synchronized T getLatestValue() {
if (minExpiredVals > 0 && mostRecentExpired != null) {
result.add(0, mostRecentExpired);
}
- return result;
+
+ if (result.size() < minVals) {
+ int minIndex = Math.max(0, values.size()-minVals);
+ return ImmutableList.copyOf(values.subList(minIndex, values.size()));
+ } else {
+ return result;
+ }
}
public void add(T val) {
@@ -11,6 +11,8 @@ class TimeWindowedListTest {
@Test
public void testKeepsMinVals() {
TimeWindowedList list = new TimeWindowedList<Object>([timePeriod:1L, minVals:2]);
+ assertEquals(list.getValues(2L), timestampedValues());
+
list.add("a", 0L);
assertEquals(list.getValues(2L), timestampedValues("a", 0L));
@@ -65,6 +67,27 @@ class TimeWindowedListTest {
}
@Test
+ public void testGetsWindowWithMinWhenEmpty() {
+ TimeWindowedList list = new TimeWindowedList<Object>([timePeriod:1L, minVals:1]);
+ assertEquals(list.getValuesInWindow(1000L, 10L), timestampedValues());
+ }
+
+ @Test
+ public void testGetsWindowWithMinExpiredWhenEmpty() {
+ TimeWindowedList list = new TimeWindowedList<Object>([timePeriod:1L, minExpiredVals:1]);
+ assertEquals(list.getValuesInWindow(1000L, 10L), timestampedValues());
+ }
+
+ @Test
+ public void testGetsWindowWithMinValsWhenExpired() {
+ TimeWindowedList list = new TimeWindowedList<Object>([timePeriod:1L, minVals:1]);
+ list.add("a", 0L);
+ list.add("b", 1L);
+
+ assertEquals(list.getValuesInWindow(1000L, 10L), timestampedValues("b", 1L));
+ }
+
+ @Test
public void testZeroSizeWindowWithOneExpiredContainsOnlyMostRecentValue() {
TimeWindowedList list = new TimeWindowedList<Object>([timePeriod:0L, minExpiredVals:1]);
@@ -77,6 +100,10 @@ class TimeWindowedListTest {
assertEquals(list.getValuesInWindow(102L, 1L), timestampedValues("b", 100L));
}
+ private <T> List<TimestampedValue<T>> timestampedValues() {
+ return [];
+ }
+
private <T> List<TimestampedValue<T>> timestampedValues(T v1, long t1) {
return [new TimestampedValue(v1, t1)];
}
@@ -231,8 +231,7 @@ public class ResizingPolicy extends AbstractPolicy {
executorQueued.set(false)
long currentPoolSize = currentSizeOperator.call(poolEntity)
- int desiredPoolSize = calculateDesiredPoolSize(currentPoolSize)
- boolean stable = isDesiredPoolSizeStable()
+ def (int desiredPoolSize, boolean stable) = calculateDesiredPoolSize(currentPoolSize)
if (!stable) {
// the desired size fluctuations are not stable; ensure we check again later (due to time-window)
// even if no additional events have been received
@@ -269,20 +268,15 @@ public class ResizingPolicy extends AbstractPolicy {
}
}
- private boolean isDesiredPoolSizeStable() {
- long now = System.currentTimeMillis()
- List<TimestampedValue<?>> downWindowVals = recentDesiredResizes.getValuesInWindow(now, resizeDownStabilizationDelay)
- List<TimestampedValue<?>> upWindowVals = recentDesiredResizes.getValuesInWindow(now, resizeUpStabilizationDelay)
- return (minInWindow(downWindowVals, resizeDownStabilizationDelay) == maxInWindow(downWindowVals, resizeDownStabilizationDelay)) &&
- (minInWindow(upWindowVals, resizeUpStabilizationDelay) == maxInWindow(upWindowVals, resizeUpStabilizationDelay))
- }
-
/**
* Complicated logic for stabilization-delay...
* Only grow if we have consistently been asked to grow for the resizeUpStabilizationDelay period;
* Only shrink if we have consistently been asked to shrink for the resizeDownStabilizationDelay period.
+ *
+ * @return tuple of desired pool size, and whether this is "stable" (i.e. if we receive no more events
+ * will this continue to be the desired pool size)
*/
- private int calculateDesiredPoolSize(long currentPoolSize) {
+ private List<?> calculateDesiredPoolSize(long currentPoolSize) {
long now = System.currentTimeMillis()
List<TimestampedValue<?>> downsizeWindowVals = recentDesiredResizes.getValuesInWindow(now, resizeDownStabilizationDelay)
List<TimestampedValue<?>> upsizeWindowVals = recentDesiredResizes.getValuesInWindow(now, resizeUpStabilizationDelay)
@@ -300,10 +294,14 @@ public class ResizingPolicy extends AbstractPolicy {
desiredPoolSize = currentPoolSize
}
- if (LOG.isTraceEnabled()) LOG.trace("{} calculated desired pool size: from {} to {}; minDesired {}, maxDesired {}; downsizeHistory {}; upsizeHistor {}",
- this, currentPoolSize, desiredPoolSize, minDesiredPoolSize, maxDesiredPoolSize, downsizeWindowVals, upsizeWindowVals)
+ boolean stable = (minInWindow(downsizeWindowVals, resizeDownStabilizationDelay) == maxInWindow(downsizeWindowVals, resizeDownStabilizationDelay)) &&
+ (minInWindow(upsizeWindowVals, resizeUpStabilizationDelay) == maxInWindow(upsizeWindowVals, resizeUpStabilizationDelay))
+
+ if (LOG.isTraceEnabled()) LOG.trace("{} calculated desired pool size: from {} to {}; minDesired {}, maxDesired {}; " +
+ "stable {}; now {}; downsizeHistory {}; upsizeHistory {}",
+ this, currentPoolSize, desiredPoolSize, minDesiredPoolSize, maxDesiredPoolSize, stable, now, downsizeWindowVals, upsizeWindowVals)
- return desiredPoolSize
+ return [desiredPoolSize, stable]
}
/**
@@ -317,7 +315,7 @@ public class ResizingPolicy extends AbstractPolicy {
if (result == null && val.getTimestamp() > epoch) result = Integer.MAX_VALUE
if (result == null || (val.getValue() != null && val.getValue() > result)) result = val.getValue()
}
- return result
+ return (result != null ? result : Integer.MAX_VALUE)
}
/**
@@ -331,7 +329,7 @@ public class ResizingPolicy extends AbstractPolicy {
if (result == null && val.getTimestamp() > epoch) result = Integer.MIN_VALUE
if (result == null || (val.getValue() != null && val.getValue() < result)) result = val.getValue()
}
- return result
+ return (result != null ? result : Integer.MIN_VALUE)
}
@Override
@@ -278,20 +278,20 @@ class ResizingPolicyTest {
resizable.emit(ResizingPolicy.POOL_HOT, message(1, 61L, 1*10L, 1*20L)) // would grow to 4
// Wait for it to reach size 2, and confirm take expected time
- executeUntilSucceeds(timeout:TIMEOUT_MS) { assertEquals(resizable.currentSize, 2) }
+ executeUntilSucceeds(period:1, timeout:TIMEOUT_MS) { assertEquals(resizable.currentSize, 2) }
long timeToResize = stopwatch.elapsedMillis()
assertTrue(timeToResize >= resizeUpStabilizationDelay-EARLY_RETURN_MS &&
timeToResize <= resizeUpStabilizationDelay+OVERHEAD_DURATION_MS,
- "Resizing to 2: expected=$timeToResize; resizeUpStabilizationDelay=$resizeUpStabilizationDelay")
+ "Resizing to 2: time=$timeToResize; resizeUpStabilizationDelay=$resizeUpStabilizationDelay")
// Will then grow to 4 $resizeUpStabilizationDelay milliseconds after that emission
- executeUntilSucceeds(timeout:TIMEOUT_MS) { assertEquals(resizable.currentSize, 4) }
+ executeUntilSucceeds(period:1, timeout:TIMEOUT_MS) { assertEquals(resizable.currentSize, 4) }
long timeToResizeTo4 = stopwatch2.elapsedMillis()
assertTrue(timeToResizeTo4 >= resizeUpStabilizationDelay-EARLY_RETURN_MS &&
timeToResizeTo4 <= resizeUpStabilizationDelay+OVERHEAD_DURATION_MS,
- "Resizing to 4: expected=$timeToResize; resizeUpStabilizationDelay=$resizeUpStabilizationDelay")
+ "Resizing to 4: time=$timeToResize; resizeUpStabilizationDelay=$resizeUpStabilizationDelay")
}
@Test
@@ -371,20 +371,20 @@ class ResizingPolicyTest {
resizable.emit(ResizingPolicy.POOL_COLD, message(3, 1L, 3*10L, 3*20L)) // would shrink to 1
// Wait for it to shrink to size 2, and confirm take expected time
- executeUntilSucceeds(timeout:TIMEOUT_MS) { assertEquals(resizable.currentSize, 2) }
+ executeUntilSucceeds(period:1, timeout:TIMEOUT_MS) { assertEquals(resizable.currentSize, 2) }
long timeToResize = stopwatch.elapsedMillis()
assertTrue(timeToResize >= resizeDownStabilizationDelay-EARLY_RETURN_MS &&
timeToResize <= resizeDownStabilizationDelay+OVERHEAD_DURATION_MS,
- "Resizing to 2: expected=$timeToResize; resizeDownStabilizationDelay=$resizeDownStabilizationDelay")
+ "Resizing to 2: time=$timeToResize; resizeDownStabilizationDelay=$resizeDownStabilizationDelay")
// Will then shrink to 1 $resizeUpStabilizationDelay milliseconds after that emission
- executeUntilSucceeds(timeout:TIMEOUT_MS) { assertEquals(resizable.currentSize, 1) }
+ executeUntilSucceeds(period:1, timeout:TIMEOUT_MS) { assertEquals(resizable.currentSize, 1) }
long timeToResizeTo1 = stopwatch2.elapsedMillis()
assertTrue(timeToResizeTo1 >= resizeDownStabilizationDelay-EARLY_RETURN_MS &&
timeToResizeTo1 <= resizeDownStabilizationDelay+OVERHEAD_DURATION_MS,
- "Resizing to 4: expected=$timeToResize; resizeDownStabilizationDelay=$resizeDownStabilizationDelay")
+ "Resizing to 1: time=$timeToResize; resizeDownStabilizationDelay=$resizeDownStabilizationDelay")
}
@@ -80,7 +80,7 @@ public class WhirrHadoopCluster extends WhirrCluster {
@Override
public void stop() {
- proxy.stop()
+ proxy?.stop()
super.stop()
}

0 comments on commit 67bb1f6

Please sign in to comment.