Skip to content

Commit

Permalink
Add unit test for sub/unsub/sub bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
Treora committed Jun 5, 2013
1 parent 2120103 commit 4f68c5a
Showing 1 changed file with 74 additions and 0 deletions.
74 changes: 74 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.operators;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.Observer;
Expand Down Expand Up @@ -556,6 +558,78 @@ public String toString() {
}
}

/*
* Test subscribing to a group, unsubscribing from it again, and subscribing to a next group
*/
@Test
public void testSubscribeAndImmediatelyUnsubscribeFirstGroup() {
CounterSource source = new CounterSource();
@SuppressWarnings("unchecked")
final Observer<Integer> observer = mock(Observer.class);

Func1<Integer, Integer> modulo2 = new Func1<Integer, Integer>() {
@Override
public Integer call(Integer x) {
return x%2;
}
};

Subscription outerSubscription = source.groupBy(modulo2).subscribe(new Action1<GroupedObservable<Integer, Integer>>() {
@Override
public void call(GroupedObservable<Integer, Integer> group) {
Subscription innerSubscription = group.subscribe(observer);
if (group.getKey() == 0) {
// We immediately unsubscribe again from the even numbers
innerSubscription.unsubscribe();
// We should still get the group of odd numbers
}
}
});
try {
source.thread.join();
} catch (InterruptedException ex) {
}

InOrder o = inOrder(observer);
// With a different implementation that subscribes to the group concurrently, we might actually receive 0.
o.verify(observer, never()).onNext(0);
o.verify(observer).onNext(1);
o.verify(observer, never()).onNext(2);
o.verify(observer).onNext(3);
o.verify(observer, never()).onNext(4);
o.verify(observer).onNext(5);
o.verify(observer, never()).onNext(6);
o.verify(observer).onNext(7);
o.verify(observer, never()).onNext(8);
o.verify(observer).onNext(9);
}

private class CounterSource extends Observable<Integer> {
public Thread thread = null;
@Override
public Subscription subscribe(final Observer<Integer> observer) {
thread = new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (i < 10) {
observer.onNext(i++);
if (Thread.interrupted()) {
return;
}
}
}
});
thread.start();
return new Subscription() {
@Override
public void unsubscribe() {
thread.interrupt();
}
};
}
}

}

}

0 comments on commit 4f68c5a

Please sign in to comment.