Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

The MpConainer now handles the passivating of Mps when the node's key…

…space responsibility changes.
  • Loading branch information...
commit 9069cb562b417e60eeee6c5da0d67f50d69dd029 1 parent cf5ea16
Jim Carroll authored
View
352 lib-dempsyimpl/src/main/java/com/nokia/dempsy/container/MpContainer.java
@@ -314,121 +314,214 @@ public int getProcessorCount()
return instances.size();
}
- AtomicBoolean isRunningMpInst = new AtomicBoolean(false);
- AtomicBoolean stopRunningMpInst = new AtomicBoolean(false);
- Object keyspaceResponsibilityChangedLock = new Object(); // we need to synchronize keyspaceResponsibilityChanged alone
-
- @Override
- public void keyspaceResponsibilityChanged(final RoutingStrategy.Inbound strategyInbound, boolean less, boolean more)
+ /**
+ * This is a helper class with unique synchronizataion behavior. It can be used to start
+ * a worker in another thread, allow the initiating thread to verify that's it's been
+ * started, and allow the early termination of that worker thread.
+ */
+ private class RunningEventSwitch
{
- synchronized(keyspaceResponsibilityChangedLock)
+ final AtomicBoolean isRunning = new AtomicBoolean(false);
+ final AtomicBoolean stopRunning = new AtomicBoolean(false);
+ // this flag is used to hold up the calling thread's exit of this method
+ // until the worker is underway.
+ final AtomicBoolean runningGate = new AtomicBoolean(false);
+
+ /**
+ * This is called from the worker thread to notify the fact that
+ * it's been started.
+ */
+ public void workerInitiateRun()
{
- // need to handle less by passivating ... but we'll ignore for now.
-
- // If more were added and we have a keySource we need to do
- // preinstantiation
- if (more && keySource != null)
+ // this is synchronized because it's used as a condition variable
+ // along with the condition.
+ synchronized(isRunning) { isRunning.set(true); }
+ stopRunning.set(false);
+
+ synchronized(runningGate)
+ {
+ runningGate.set(true);
+ runningGate.notify();
+ }
+ }
+
+ /**
+ * The worker thread can use this method to check if it's been explicitly preempted.
+ * @return
+ */
+ public boolean wasPreempted() { return stopRunning.get(); }
+
+ /**
+ * The worker thread should indicate that it's done in a finally clause on it's way
+ * out.
+ */
+ public void workerStopping()
+ {
+ // This kicks the preemptWorkerAndWait out.
+ synchronized(isRunning)
+ {
+ isRunning.set(false);
+ isRunning.notify();
+ }
+ }
+
+ /**
+ * The main thread uses this method when it needs to preempt the worker and
+ * wait for the worker to finish before continuing.
+ */
+ public void preemptWorkerAndWait()
+ {
+ // We need to see if we're already executing
+ stopRunning.set(true); // kick out any running instantiation thread
+ // wait for it to exit, it it's even running - also consider the overall
+ // Mp isRunning flag.
+ synchronized(isRunning)
{
- // We need to see if we're already executing
- stopRunningMpInst.set(true); // kick out any running instantiation thread
- // wait for it to exit, it it's even running
- synchronized(isRunningMpInst)
+ while (isRunning.get() && MpContainer.this.isRunning)
{
- while (isRunningMpInst.get() && isRunning)
- {
- try { isRunningMpInst.wait(); } catch (InterruptedException e) {}
- }
+ try { isRunning.wait(); } catch (InterruptedException e) {}
+ }
+ }
+ }
+
+ /**
+ * This allows the main thread to wait until the worker is started in order
+ * to continue. This method only works once. It resets the flag so a second
+ * call will block until another thread calls to workerInitiateRun().
+ */
+ public void waitForWorkerToStart()
+ {
+ // make sure the thread is running before we head out from the synchronized block
+ synchronized(runningGate)
+ {
+ while (runningGate.get() == false && MpContainer.this.isRunning)
+ {
+ try { runningGate.wait(); } catch (InterruptedException ie) {}
}
- // this flag is used to hold up the calling thread's exit of this method
- // until the Runnable is underway.
- final AtomicBoolean running = new AtomicBoolean(false);
-
- Thread t = new Thread(new Runnable()
+ runningGate.set(false); // reset this flag
+ }
+ }
+ }
+
+ final RunningEventSwitch expand = new RunningEventSwitch();
+ final RunningEventSwitch contract = new RunningEventSwitch();
+ final Object keyspaceResponsibilityChangedLock = new Object(); // we need to synchronize keyspaceResponsibilityChanged alone
+
+ private void runExpandKeyspace(final RoutingStrategy.Inbound strategyInbound)
+ {
+ List<Future<Object>> futures = new ArrayList<Future<Object>>();
+ expand.workerInitiateRun();
+
+ StatsCollector.TimerContext tcontext = null;
+ try{
+ tcontext = statCollector.preInstantiationStarted();
+ Iterable<?> iterable = keySource.getAllPossibleKeys();
+ for(final Object key: iterable)
+ {
+ if (expand.wasPreempted() || !isRunning)
+ break;
+
+ if(strategyInbound.doesMessageKeyBelongToNode(key))
{
- @Override
- public void run()
+ Callable<Object> callable = new Callable<Object>()
{
- synchronized(isRunningMpInst) { isRunningMpInst.set(true); }
- List<Future<Object>> futures = new ArrayList<Future<Object>>();
-
- stopRunningMpInst.set(false); // reset this flag in case it's been set
-
- synchronized(running)
+ Object k = key;
+
+ @Override
+ public Object call()
{
- running.set(true);
- running.notify();
- }
-
- StatsCollector.TimerContext tcontext = null;
- try{
- tcontext = statCollector.preInstantiationStarted();
- Iterable<?> iterable = keySource.getAllPossibleKeys();
- for(final Object key: iterable)
+ try { getInstanceForKey(k); }
+ catch(ContainerException e)
{
- if (stopRunningMpInst.get() || !isRunning)
- break;
-
- if(strategyInbound.doesMessageKeyBelongToNode(key))
- {
- Callable<Object> callable = new Callable<Object>()
- {
- Object k = key;
-
- @Override
- public Object call()
- {
- try { getInstanceForKey(k); }
- catch(ContainerException e)
- {
- logger.error("Failed to instantiate MP for Key "+key +
- " of type "+key.getClass().getSimpleName(), e);
- }
- return null;
- }
- };
-
- if (executor != null)
- futures.add(executor.submit(callable));
- else
- callable.call();
- }
+ logger.error("Failed to instantiate MP for Key "+key +
+ " of type "+key.getClass().getSimpleName(), e);
}
+ return null;
}
- catch(Throwable e)
- {
- logger.error("Exception occured while processing keys during pre-instantiation using KeyStore method"+
- keySource.getClass().getSimpleName()+":getAllPossibleKeys()", e);
- }
- finally
+ };
+
+ if (executor != null)
+ futures.add(executor.submit(callable));
+ else
+ callable.call();
+ }
+ }
+ }
+ catch(Throwable e)
+ {
+ logger.error("Exception occured while processing keys during pre-instantiation using KeyStore method"+
+ keySource.getClass().getSimpleName()+":getAllPossibleKeys()", e);
+ }
+ finally
+ {
+ if (tcontext != null) tcontext.stop();
+ if (expand.wasPreempted()) // this run is being preempted
+ {
+ for (Future<Object> f : futures)
+ try { f.cancel(false); } catch (Throwable th) { logger.warn("Error trying to cancel an attempt to pre-instantiate a Mp",th); }
+ }
+ expand.workerStopping();
+ }
+ }
+
+ @Override
+ public void keyspaceResponsibilityChanged(final RoutingStrategy.Inbound strategyInbound, boolean less, boolean more)
+ {
+ synchronized(keyspaceResponsibilityChangedLock)
+ {
+ // need to handle less by passivating
+ if (less)
+ {
+ contract.preemptWorkerAndWait();
+
+ Thread t = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
{
- if (tcontext != null) tcontext.stop();
- if (stopRunningMpInst.get()) // this run is being preempted
- {
- for (Future<Object> f : futures)
- try { f.cancel(false); } catch (Throwable th) { logger.warn("Error trying to cancel an attempt to pre-instantiate a Mp",th); }
- }
- synchronized(isRunningMpInst)
+ contract.workerInitiateRun();
+ doEvict(new EvictCheck()
{
- isRunningMpInst.set(false);
- isRunningMpInst.notify();
- }
+ // we shouldEvict if the message key no longer belongs as
+ // part of this container.
+ @Override
+ public boolean shouldEvict(Object key, InstanceWrapper wrapper) { return !strategyInbound.doesMessageKeyBelongToNode(key); }
+ // In this case, it's evictable.
+ @Override
+ public boolean isGenerallyEvitable() { return true; }
+ @Override
+ public boolean shouldStopEvicting() { return contract.wasPreempted(); }
+ });
}
+ finally { contract.workerStopping(); }
}
- }, "Pre-Instantation Thread");
+ }, "Keyspace Contraction Thread");
t.setDaemon(true);
t.start();
+ contract.waitForWorkerToStart();
+ }
- // make sure the thread is running before we head out from the synchronized block
- synchronized(running)
+ // If more were added and we have a keySource we need to do
+ // preinstantiation
+ if (more && keySource != null)
+ {
+ expand.preemptWorkerAndWait();
+
+ Thread t = new Thread(new Runnable()
{
- while (running.get() == false && isRunning)
- {
- try { running.wait(); } catch (InterruptedException ie) {}
- }
- }
+ @Override
+ public void run() { runExpandKeyspace(strategyInbound); }
+ }, "Pre-Instantation Thread");
+ t.setDaemon(true);
+ t.start();
+
+ expand.waitForWorkerToStart();
}
+
}
}
@@ -537,8 +630,51 @@ protected InstanceWrapper getInstanceForDispatch(Object message) throws Containe
return wrapper;
}
- public void evict() {
- if (!prototype.isEvictableSupported() || !isRunning)
+ public void evict()
+ {
+ doEvict(new EvictCheck()
+ {
+ @Override
+ public boolean shouldEvict(Object key, InstanceWrapper wrapper)
+ {
+ Object instance = wrapper.getInstance();
+ try
+ {
+ return prototype.invokeEvictable(instance);
+ }
+ catch (InvocationTargetException e)
+ {
+ logger.warn("Checking the eviction status/passivating of the Mp " + SafeString.objectDescription(instance) +
+ " resulted in an exception.",e.getCause());
+ }
+ catch (IllegalAccessException e)
+ {
+ logger.warn("It appears that the method for checking the eviction or passivating the Mp " + SafeString.objectDescription(instance) +
+ " is not defined correctly. Is it visible?",e);
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean isGenerallyEvitable() { return prototype.isEvictableSupported(); }
+ @Override
+ public boolean shouldStopEvicting() { return false; }
+ });
+ }
+
+ private interface EvictCheck
+ {
+ boolean isGenerallyEvitable();
+
+ boolean shouldEvict(Object key, InstanceWrapper wrapper);
+
+ boolean shouldStopEvicting();
+ }
+
+ private void doEvict(EvictCheck check)
+ {
+ if (!check.isGenerallyEvitable() || !isRunning)
return;
StatsCollector.TimerContext tctx = null;
@@ -551,7 +687,7 @@ public void evict() {
Map<Object,InstanceWrapper> instancesToEvict = new HashMap<Object,InstanceWrapper>(instances.size() + 10);
instancesToEvict.putAll(instances);
- while (instancesToEvict.size() > 0 && instances.size() > 0 && isRunning)
+ while (instancesToEvict.size() > 0 && instances.size() > 0 && isRunning && !check.shouldStopEvicting())
{
// store off anything that passes for later removal. This is to avoid a
// ConcurrentModificationException.
@@ -559,6 +695,9 @@ public void evict() {
for (Map.Entry<Object, InstanceWrapper> entry : instancesToEvict.entrySet())
{
+ if (check.shouldStopEvicting())
+ break;
+
Object key = entry.getKey();
InstanceWrapper wrapper = entry.getValue();
boolean gotLock = false;
@@ -573,28 +712,21 @@ public void evict() {
boolean removeInstance = false;
- Object instance = wrapper.getInstance();
- try {
- if (prototype.invokeEvictable(instance)) {
+ try
+ {
+ if (check.shouldEvict(key,wrapper))
+ {
removeInstance = true;
wrapper.markEvicted();
prototype.passivate(wrapper.getInstance());
// wrapper.markPassivated();
}
}
- catch (InvocationTargetException e)
- {
- logger.warn("Checking the eviction status/passivating of the Mp " + SafeString.objectDescription(instance) +
- " resulted in an exception.",e.getCause());
- }
- catch (IllegalAccessException e)
- {
- logger.warn("It appears that the method for checking the eviction or passivating the Mp " + SafeString.objectDescription(instance) +
- " is not defined correctly. Is it visible?",e);
- }
catch (Throwable e)
{
- logger.warn("Checking the eviction status/passivating of the Mp " + SafeString.objectDescription(instance) +
+ Object instance = null;
+ try { instance = wrapper.getInstance(); } catch(Throwable th) {} // not sure why this would ever happen
+ logger.warn("Checking the eviction status/passivating of the Mp " + SafeString.objectDescription(instance == null ? wrapper : instance) +
" resulted in an exception.",e);
}
View
214 lib-dempsyimpl/src/test/java/com/nokia/dempsy/container/TestMpContainer.java
@@ -17,18 +17,23 @@
package com.nokia.dempsy.container;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
@@ -36,6 +41,7 @@
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.nokia.dempsy.Dispatcher;
+import com.nokia.dempsy.KeySource;
import com.nokia.dempsy.TestUtils;
import com.nokia.dempsy.annotations.Activation;
import com.nokia.dempsy.annotations.Evictable;
@@ -48,6 +54,7 @@
import com.nokia.dempsy.messagetransport.Sender;
import com.nokia.dempsy.messagetransport.blockingqueue.BlockingQueueAdaptor;
import com.nokia.dempsy.monitoring.coda.MetricGetters;
+import com.nokia.dempsy.router.RoutingStrategy;
import com.nokia.dempsy.serialization.Serializer;
import com.nokia.dempsy.serialization.java.JavaSerializer;
@@ -143,11 +150,12 @@ public void tearDown() throws Exception
public volatile int invocationCount;
public volatile int outputCount;
public volatile AtomicBoolean evict = new AtomicBoolean(false);
- public static AtomicInteger cloneCount = new AtomicInteger(0);
+ public AtomicInteger cloneCount = new AtomicInteger(0);
public volatile CountDownLatch latch = new CountDownLatch(0);
public static AtomicLong numOutputExecutions = new AtomicLong(0);
public static CountDownLatch blockAllOutput = new CountDownLatch(0);
+ public AtomicLong passivateCount = new AtomicLong(0);
public CountDownLatch blockPassivate = new CountDownLatch(0);
public AtomicBoolean throwPassivateException = new AtomicBoolean(false);
public AtomicLong passivateExceptionCount = new AtomicLong(0);
@@ -169,6 +177,8 @@ public void activate(byte[] data)
@Passivation
public void passivate() throws InterruptedException
{
+ passivateCount.incrementAndGet();
+
blockPassivate.await();
if (throwPassivateException.get())
@@ -324,14 +334,15 @@ public void testEvictable() throws Exception {
assertEquals("activation count, 2nd message", 1, mp.activationCount);
assertEquals("invocation count, 2nd message", 2, mp.invocationCount);
- int tmpCloneCount = TestProcessor.cloneCount.intValue();
+ TestProcessor prototype = context.getBean(TestProcessor.class);
+ int tmpCloneCount = prototype.cloneCount.intValue();
mp.evict.set(true);
container.evict();
inputQueue.add(serializer.serialize(new ContainerTestMessage("foo")));
assertNotNull(outputQueue.poll(baseTimeoutMillis, TimeUnit.MILLISECONDS));
- assertEquals("Clone count, 2nd message", tmpCloneCount+1, TestProcessor.cloneCount.intValue());
+ assertEquals("Clone count, 2nd message", tmpCloneCount+1, prototype.cloneCount.intValue());
}
@Test
@@ -353,7 +364,8 @@ public void testEvictableWithPassivateException() throws Exception
assertEquals("activation count, 2nd message", 1, mp.activationCount);
assertEquals("invocation count, 2nd message", 2, mp.invocationCount);
- int tmpCloneCount = TestProcessor.cloneCount.intValue();
+ TestProcessor prototype = context.getBean(TestProcessor.class);
+ int tmpCloneCount = prototype.cloneCount.intValue();
mp.evict.set(true);
container.evict();
@@ -362,7 +374,7 @@ public void testEvictableWithPassivateException() throws Exception
inputQueue.add(serializer.serialize(new ContainerTestMessage("foo")));
assertNotNull(outputQueue.poll(baseTimeoutMillis, TimeUnit.MILLISECONDS));
- assertEquals("Clone count, 2nd message", tmpCloneCount+1, TestProcessor.cloneCount.intValue());
+ assertEquals("Clone count, 2nd message", tmpCloneCount+1, prototype.cloneCount.intValue());
}
@Test
@@ -388,9 +400,11 @@ public void testEvictableWithBusyMp() throws Throwable
// sending it a message will now cause it to hang up while processing
inputQueue.add(serializer.serialize(new ContainerTestMessage("foo")));
+
+ TestProcessor prototype = context.getBean(TestProcessor.class);
// keep track of the cloneCount for later checking
- int tmpCloneCount = TestProcessor.cloneCount.intValue();
+ int tmpCloneCount = prototype.cloneCount.intValue();
// invocation count should go to 2
assertTrue(TestUtils.poll(baseTimeoutMillis, mp, new TestUtils.Condition<TestProcessor>()
@@ -423,7 +437,7 @@ public void testEvictableWithBusyMp() throws Throwable
inputQueue.add(serializer.serialize(new ContainerTestMessage("foo")));
assertNotNull(outputQueue.poll(baseTimeoutMillis, TimeUnit.MILLISECONDS));
- assertEquals("Clone count, 2nd message", tmpCloneCount+1, TestProcessor.cloneCount.intValue());
+ assertEquals("Clone count, 2nd message", tmpCloneCount+1, prototype.cloneCount.intValue());
}
@Test
@@ -572,4 +586,188 @@ public void testEvictCollisionWithBlocking() throws Throwable
assertEquals(1,mp.invocationCount);
assertEquals(2,mp2.invocationCount);
}
+
+ static class FixedInbound implements RoutingStrategy.Inbound
+ {
+ static AtomicReference<Set<Object>> validKeys = new AtomicReference<Set<Object>>(new HashSet<Object>());
+
+ @Override
+ public void stop(){ }
+
+ @Override
+ public boolean doesMessageKeyBelongToNode(Object messageKey) { return validKeys.get().contains(messageKey); }
+
+ public FixedInbound set(Object... keys) { validKeys.set(new HashSet<Object>(Arrays.asList(keys))); return this; }
+
+ public FixedInbound clear() { validKeys.set(new HashSet<Object>(Arrays.asList())); return this; }
+ }
+
+ static class FixedKeySource implements KeySource<String>
+ {
+ AtomicReference<List<String>> keys = new AtomicReference<List<String>>(new ArrayList<String>());
+
+ @Override
+ public Iterable<String> getAllPossibleKeys() { return keys.get(); }
+
+ public FixedKeySource set(String[]...keys)
+ {
+ List<String> newKeys = new ArrayList<String>();
+ for (String[] keyset : keys)
+ newKeys.addAll(Arrays.asList(keyset));
+ this.keys.set(newKeys);
+ return this;
+ }
+ }
+
+ @Test
+ public void testKeyspaceExpandThenContraction() throws Throwable
+ {
+ final String[] keys = { "foo1", "foo2" };
+
+ // set an inbound strategy that provides a known value
+ FixedInbound inbound = new FixedInbound().set((Object[])keys);
+ FixedKeySource keysource = new FixedKeySource().set(keys);
+
+ // set a keysource with 2 keys
+ container.setKeySource(keysource);
+
+ container.keyspaceResponsibilityChanged(inbound, false, true);
+
+ TestProcessor prototype = context.getBean(TestProcessor.class);
+
+ assertTrue(TestUtils.poll(baseTimeoutMillis, prototype.cloneCount, new TestUtils.Condition<AtomicInteger>()
+ { @Override public boolean conditionMet(AtomicInteger o) { return o.intValue() == 2; } }));
+
+ Thread.sleep(10);
+ assertEquals(2,prototype.cloneCount.intValue());
+
+ inbound.clear(); // force the Inbound to deny that any Mp should run here.
+ container.keyspaceResponsibilityChanged(inbound, true, false);
+
+ assertTrue(TestUtils.poll(baseTimeoutMillis, container, new TestUtils.Condition<MpContainer>()
+ { @Override public boolean conditionMet(MpContainer o) { return o.getInstances().size() == 0; } }));
+
+ Thread.sleep(10);
+ assertEquals(0,container.getInstances().size());
+ }
+
+ @Test
+ public void testKeyspaceExpandAndContraction() throws Throwable
+ {
+ final String[] keys = { "foo1", "foo2" };
+ final String[] keysShard2 = { "foo3", "foo4" };
+
+ // set an inbound strategy that provides a known value
+ FixedInbound inbound = new FixedInbound().set((Object[])keys);
+ FixedKeySource keysource = new FixedKeySource().set(keys, keysShard2);
+
+ // set a keysource with 2 keys
+ container.setKeySource(keysource);
+
+ container.keyspaceResponsibilityChanged(inbound, false, true);
+
+ TestProcessor prototype = context.getBean(TestProcessor.class);
+ assertTrue(TestUtils.poll(baseTimeoutMillis, prototype.cloneCount, new TestUtils.Condition<AtomicInteger>()
+ { @Override public boolean conditionMet(AtomicInteger o) { return o.intValue() == 2; } }));
+
+ Thread.sleep(10);
+ assertEquals(2,prototype.cloneCount.intValue());
+
+ inbound.set((Object[])keysShard2); // force the Inbound to deny that any Mp should run here.
+ container.keyspaceResponsibilityChanged(inbound, true, true);
+
+ // check to make sure the total clone count is now 4 since two new Mps should be there
+ assertTrue(TestUtils.poll(baseTimeoutMillis, prototype.cloneCount, new TestUtils.Condition<AtomicInteger>()
+ { @Override public boolean conditionMet(AtomicInteger o) { return o.intValue() == 4; } }));
+
+ assertTrue(TestUtils.poll(baseTimeoutMillis, container, new TestUtils.Condition<MpContainer>()
+ { @Override public boolean conditionMet(MpContainer o) { return o.getInstances().size() == 2; } }));
+
+ Thread.sleep(10);
+ assertEquals(2,container.getInstances().size());
+ }
+
+ @Test
+ public void testOverlappingKeyspaceContraction() throws Throwable
+ {
+ // need to get hold of the block passivate latch and set it to 1
+ TestProcessor prototype = context.getBean(TestProcessor.class);
+ CountDownLatch blockPassivate = new CountDownLatch(1);
+ prototype.blockPassivate = blockPassivate;
+
+ final String[] keys = { "foo1", "foo2" };
+
+ // set an inbound strategy that provides a known value
+ final FixedInbound inbound = new FixedInbound().set((Object[])keys);
+ FixedKeySource keysource = new FixedKeySource().set(keys);
+
+ // set a keysource with 2 keys
+ container.setKeySource(keysource);
+
+ container.keyspaceResponsibilityChanged(inbound, false, true);
+
+ assertTrue(TestUtils.poll(baseTimeoutMillis, prototype.cloneCount, new TestUtils.Condition<AtomicInteger>()
+ { @Override public boolean conditionMet(AtomicInteger o) { return o.intValue() == 2; } }));
+
+ Thread.sleep(10);
+ assertEquals(2,prototype.cloneCount.intValue());
+
+ // ok ... we have a container with 2 mps.
+ // now we're going to passivate but the passivate will block
+
+ inbound.clear(); // force the Inbound to deny that any Mp should run here.
+ container.keyspaceResponsibilityChanged(inbound, true, false);
+
+ // the Mp deletion should be hung in passivate on the frist one ... this test will break if eviction
+ // becomes concurrent since both Mps will be blocked and released at the same time. We want the
+ // second.
+
+ assertTrue(TestUtils.poll(baseTimeoutMillis, prototype.passivateCount, new TestUtils.Condition<AtomicLong>()
+ { @Override public boolean conditionMet(AtomicLong o) { return o.intValue() == 1; } }));
+ Thread.sleep(10);
+ assertEquals(1,prototype.passivateCount.get());
+
+ // kick force a redo of the keyspaceResponsibilityChange. This will hang without the latch so
+ // we kick it off in the background. Here we are going to reset the inbound so that it rescues the
+ // other Mp
+ inbound.set((Object[])keys);
+ final AtomicBoolean isRunningKSChange = new AtomicBoolean(false);
+ Thread t = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ isRunningKSChange.set(true);
+ container.keyspaceResponsibilityChanged(inbound, true, false);
+ }
+ finally
+ {
+ isRunningKSChange.set(false);
+ }
+ }
+ });
+ t.setDaemon(true);
+ t.start();
+
+ // let's make sure the keyspaceChanged call is hanging
+ assertTrue(TestUtils.poll(baseTimeoutMillis, isRunningKSChange, new TestUtils.Condition<AtomicBoolean>()
+ { @Override public boolean conditionMet(AtomicBoolean o) { return o.get(); } }));
+ Thread.sleep(10);
+ assertTrue(isRunningKSChange.get());
+
+ // the above will block unless the latch is released
+ blockPassivate.countDown(); // this lets them all go
+
+ // this should result in preservation of the other Mps since we preempted the keyspace deletion
+ assertTrue(TestUtils.poll(baseTimeoutMillis, container, new TestUtils.Condition<MpContainer>()
+ { @Override public boolean conditionMet(MpContainer o) { return o.getInstances().size() == 1; } }));
+ Thread.sleep(10);
+ assertEquals(1,container.getInstances().size());
+
+ assertTrue(TestUtils.poll(baseTimeoutMillis, isRunningKSChange, new TestUtils.Condition<AtomicBoolean>()
+ { @Override public boolean conditionMet(AtomicBoolean o) { return !o.get(); } }));
+ }
+
}
Please sign in to comment.
Something went wrong with that request. Please try again.