Skip to content

Commit

Permalink
Merge pull request #731 from zsxwing/flaky-unit-tests
Browse files Browse the repository at this point in the history
Replaced 'Thread.sleep' with 'CountDownLatch' to fix the flaky test failures
  • Loading branch information
benjchristensen committed Jan 9, 2014
2 parents ab689bf + 5ef9075 commit fe438ca
Showing 1 changed file with 17 additions and 20 deletions.
37 changes: 17 additions & 20 deletions rxjava-core/src/test/java/rx/operators/OperationConcatTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public void testNestedAsyncConcat() throws Throwable {
final CountDownLatch allowThird = new CountDownLatch(1);

final AtomicReference<Thread> parent = new AtomicReference<Thread>();
final CountDownLatch parentHasStarted = new CountDownLatch(1);
Observable<Observable<String>> observableOfObservables = Observable.create(new Observable.OnSubscribeFunc<Observable<String>>() {

@Override
Expand Down Expand Up @@ -197,29 +198,22 @@ public void run() {
}
}));
parent.get().start();
parentHasStarted.countDown();
return s;
}
});

Observable.create(concat(observableOfObservables)).subscribe(observer);

// wait for parent to start
while (parent.get() == null) {
Thread.sleep(1);
}
parentHasStarted.await();

try {
// wait for first 2 async observables to complete
while (o1.t == null) {
Thread.sleep(1);
}
System.out.println("Thread1 started ... waiting for it to complete ...");
o1.t.join();
while (o2.t == null) {
Thread.sleep(1);
}
System.out.println("Thread2 started ... waiting for it to complete ...");
o2.t.join();
System.out.println("Thread1 is starting ... waiting for it to complete ...");
o1.waitForThreadDone();
System.out.println("Thread2 is starting ... waiting for it to complete ...");
o2.waitForThreadDone();
} catch (Throwable e) {
throw new RuntimeException("failed waiting on threads", e);
}
Expand All @@ -243,11 +237,8 @@ public void run() {
allowThird.countDown();

try {
while (o3.t == null) {
Thread.sleep(1);
}
// wait for 3rd to complete
o3.t.join();
o3.waitForThreadDone();
} catch (Throwable e) {
throw new RuntimeException("failed waiting on threads", e);
}
Expand Down Expand Up @@ -320,9 +311,8 @@ public void testConcatConcurrentWithInfinity() {

//Wait for the thread to start up.
try {
Thread.sleep(25);
w1.t.join();
w2.t.join();
w1.waitForThreadDone();
w2.waitForThreadDone();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
Expand Down Expand Up @@ -500,6 +490,7 @@ public void unsubscribe() {
private boolean subscribed = true;
private final CountDownLatch once;
private final CountDownLatch okToContinue;
private final CountDownLatch threadHasStarted = new CountDownLatch(1);
private final T seed;
private final int size;

Expand Down Expand Up @@ -553,8 +544,14 @@ public void run() {

});
t.start();
threadHasStarted.countDown();
return s;
}

void waitForThreadDone() throws InterruptedException {
threadHasStarted.await();
t.join();
}
}
@Test
public void testMultipleObservers() {
Expand Down

0 comments on commit fe438ca

Please sign in to comment.