Skip to content

Commit

Permalink
Merge pull request #989 from mattrjacobs/collapser-concurrency-test
Browse files Browse the repository at this point in the history
Adding a test of concurrent HystrixObservableCollapsers running
  • Loading branch information
mattrjacobs committed Nov 20, 2015
2 parents 7c159e0 + c8bf170 commit 3a66c1f
Showing 1 changed file with 77 additions and 3 deletions.
Expand Up @@ -24,14 +24,20 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

import com.netflix.hystrix.collapser.CollapserTimer;
import com.netflix.hystrix.collapser.RealCollapserTimer;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable;
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesCollapserDefault;
import com.netflix.hystrix.util.HystrixRollingNumberEvent;
Expand Down Expand Up @@ -563,6 +569,74 @@ public void testTwoRequestsWithValuesForWrongArgs() {
testSubscriber2.assertValues("2:2", "2:4", "2:6");
}

@Test
public void testCollapserUnderConcurrency() throws InterruptedException {
final CollapserTimer timer = new RealCollapserTimer();
final int NUM_THREADS_SUBMITTING_WORK = 4;
final int NUM_REQUESTS_PER_THREAD = 2;

final CountDownLatch latch = new CountDownLatch(NUM_THREADS_SUBMITTING_WORK);

List<Runnable> runnables = new ArrayList<Runnable>();
final ConcurrentLinkedQueue<TestSubscriber<String>> subscribers = new ConcurrentLinkedQueue<TestSubscriber<String>>();

HystrixRequestContext context = HystrixRequestContext.initializeContext();

final AtomicInteger uniqueInt = new AtomicInteger(0);

for (int i = 0; i < NUM_THREADS_SUBMITTING_WORK; i++) {
runnables.add(new Runnable() {
@Override
public void run() {
//System.out.println("Runnable starting on thread : " + Thread.currentThread().getName());

for (int j = 0; j < NUM_REQUESTS_PER_THREAD; j++) {
HystrixObservableCollapser<String, String, String, String> collapser =
new TestCollapserWithMultipleResponses(timer, uniqueInt.getAndIncrement(), 3, false);
Observable<String> o = collapser.observe();
TestSubscriber<String> subscriber = new TestSubscriber<String>();
o.subscribe(subscriber);
subscribers.offer(subscriber);
}
//System.out.println("Runnable done on thread : " + Thread.currentThread().getName());
latch.countDown();
}
});
}

ExecutorService threadPool = Executors.newFixedThreadPool(NUM_THREADS_SUBMITTING_WORK);
for (Runnable r: runnables) {
threadPool.submit(new HystrixContextRunnable(r));
}

latch.await();

for (TestSubscriber<String> subscriber: subscribers) {
subscriber.awaitTerminalEvent();
if (subscriber.getOnErrorEvents().size() > 0) {
System.out.println("ERROR : " + subscriber.getOnErrorEvents());
for (Throwable ex: subscriber.getOnErrorEvents()) {
ex.printStackTrace();
}
}
subscriber.assertCompleted();
subscriber.assertNoErrors();
//System.out.println("Received : " + subscriber.getOnNextEvents());
subscriber.assertValueCount(3);
}

context.shutdown();
threadPool.shutdown();
}

@Test
public void testConcurrencyInTightLoop() throws InterruptedException {
for (int i = 0; i < 1000; i++) {
System.out.println("TRIAL : " + i);
testCollapserUnderConcurrency();
}
}

private static class TestRequestCollapser extends HystrixObservableCollapser<String, String, String, String> {

private final String value;
Expand Down Expand Up @@ -719,13 +793,13 @@ public void call(Subscriber<? super String> s) {
private static class TestCollapserWithMultipleResponses extends HystrixObservableCollapser<String, String, String, String> {

private final String arg;
private final static Map<String, Integer> emitsPerArg;
private final static ConcurrentMap<String, Integer> emitsPerArg;
private final boolean commandConstructionFails;
private final Func1<String, String> keyMapper;
private final Action1<CollapsedRequest<String, String>> onMissingResponseHandler;

static {
emitsPerArg = new HashMap<String, Integer>();
emitsPerArg = new ConcurrentHashMap<String, Integer>();
}

public TestCollapserWithMultipleResponses(CollapserTimer timer, int arg, int numEmits, boolean commandConstructionFails) {
Expand Down Expand Up @@ -835,7 +909,7 @@ protected Observable<String> construct() {
public void call(Subscriber<? super String> subscriber) {
try {
assertNotNull("Executing the Batch command should have a HystrixRequestContext", HystrixRequestContext.getContextForCurrentThread());
Thread.sleep(100);
Thread.sleep(30);
for (Integer arg: args) {
int numEmits = emitsPerArg.get(arg.toString());
for (int j = 1; j < numEmits + 1; j++) {
Expand Down

0 comments on commit 3a66c1f

Please sign in to comment.