From 5ef9075a491800fc7d9fa35b1f3f76d69d1b1169 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 9 Jan 2014 17:53:30 +0800 Subject: [PATCH] Replaced 'Thread.sleep' with 'CountDownLatch' to fix the flaky test failures --- .../rx/operators/OperationConcatTest.java | 37 +++++++++---------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java b/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java index 24146ec131..05117231dc 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java @@ -156,6 +156,7 @@ public void testNestedAsyncConcat() throws Throwable { final CountDownLatch allowThird = new CountDownLatch(1); final AtomicReference parent = new AtomicReference(); + final CountDownLatch parentHasStarted = new CountDownLatch(1); Observable> observableOfObservables = Observable.create(new Observable.OnSubscribeFunc>() { @Override @@ -197,6 +198,7 @@ public void run() { } })); parent.get().start(); + parentHasStarted.countDown(); return s; } }); @@ -204,22 +206,14 @@ public void run() { Observable.create(concat(observableOfObservables)).subscribe(observer); // wait for parent to start - while (parent.get() == null) { - Thread.sleep(1); - } + parentHasStarted.await(); try { // wait for first 2 async observables to complete - while (o1.t == null) { - Thread.sleep(1); - } - System.out.println("Thread1 started ... waiting for it to complete ..."); - o1.t.join(); - while (o2.t == null) { - Thread.sleep(1); - } - System.out.println("Thread2 started ... waiting for it to complete ..."); - o2.t.join(); + System.out.println("Thread1 is starting ... waiting for it to complete ..."); + o1.waitForThreadDone(); + System.out.println("Thread2 is starting ... waiting for it to complete ..."); + o2.waitForThreadDone(); } catch (Throwable e) { throw new RuntimeException("failed waiting on threads", e); } @@ -243,11 +237,8 @@ public void run() { allowThird.countDown(); try { - while (o3.t == null) { - Thread.sleep(1); - } // wait for 3rd to complete - o3.t.join(); + o3.waitForThreadDone(); } catch (Throwable e) { throw new RuntimeException("failed waiting on threads", e); } @@ -320,9 +311,8 @@ public void testConcatConcurrentWithInfinity() { //Wait for the thread to start up. try { - Thread.sleep(25); - w1.t.join(); - w2.t.join(); + w1.waitForThreadDone(); + w2.waitForThreadDone(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); @@ -500,6 +490,7 @@ public void unsubscribe() { private boolean subscribed = true; private final CountDownLatch once; private final CountDownLatch okToContinue; + private final CountDownLatch threadHasStarted = new CountDownLatch(1); private final T seed; private final int size; @@ -553,8 +544,14 @@ public void run() { }); t.start(); + threadHasStarted.countDown(); return s; } + + void waitForThreadDone() throws InterruptedException { + threadHasStarted.await(); + t.join(); + } } @Test public void testMultipleObservers() {