Skip to content

Commit

Permalink
Merge pull request #273 from billyy/concat
Browse files Browse the repository at this point in the history
Concat
  • Loading branch information
benjchristensen committed May 16, 2013
2 parents ce3ee1b + 0063b90 commit 58951bd
Showing 1 changed file with 154 additions and 80 deletions.
234 changes: 154 additions & 80 deletions rxjava-core/src/main/java/rx/operators/OperationConcat.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.junit.Ignore;
import org.junit.Test;

import org.mockito.InOrder;
Expand All @@ -40,23 +44,10 @@ public final class OperationConcat {

/**
* Combine the observable sequences from the list of Observables into one
* observable sequence without any transformation. If either the outer
* observable sequence without any transformation. If either the outer
* observable or an inner observable calls onError, we will call onError.
*
* <p/>
*
* The outer observable might run on a separate thread from (one of) the
* inner observables; in this case care must be taken to avoid a deadlock.
* The Concat operation may block the outer thread while servicing an inner
* thread in order to ensure a well-defined ordering of elements; therefore
* none of the inner threads must be implemented in a way that might wait on
* the outer thread.
*
* <p/>
*
* Beware that concat(o1,o2).subscribe() is a blocking call from
* which it is impossible to unsubscribe if observables are running on same thread.
*
* @param sequences An observable sequence of elements to project.
* @return An observable sequence whose elements are the result of combining the output from the list of Observables.
*/
Expand All @@ -69,73 +60,101 @@ public static <T> Func1<Observer<T>, Subscription> concat(final List<Observable<
}

public static <T> Func1<Observer<T>, Subscription> concat(final Observable<Observable<T>> sequences) {
return new Func1<Observer<T>, Subscription>() {

@Override
public Subscription call(Observer<T> observer) {
return new ConcatSubscription<T>(sequences, observer);
}
};
return new Concat<T>(sequences);
}

private static class ConcatSubscription<T> extends BooleanSubscription {
// Might be updated by an inner thread's onError during the outer
// thread's onNext, then read in the outer thread's onComplete.
final AtomicBoolean innerError = new AtomicBoolean(false);
private static class Concat<T> implements Func1<Observer<T>, Subscription> {
private Observable<Observable<T>> sequences;
private AtomicObservableSubscription innerSubscription = null;

public ConcatSubscription(Observable<Observable<T>> sequences, final Observer<T> observer) {
public Concat(Observable<Observable<T>> sequences) {
this.sequences = sequences;
}

public Subscription call(final Observer<T> observer) {
final AtomicBoolean completedOrErred = new AtomicBoolean(false);
final AtomicBoolean allSequencesReceived = new AtomicBoolean(false);
final Queue<Observable<T>> nextSequences = new ConcurrentLinkedQueue<Observable<T>>();
final AtomicObservableSubscription outerSubscription = new AtomicObservableSubscription();
outerSubscription.wrap(sequences.subscribe(new Observer<Observable<T>>() {

final Observer<T> reusableObserver = new Observer<T>() {
@Override
public void onNext(Observable<T> nextSequence) {
// We will not return from onNext until the inner observer completes.
// NB: while we are in onNext, the well-behaved outer observable will not call onError or onCompleted.
final CountDownLatch latch = new CountDownLatch(1);
final AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription();
innerSubscription.wrap(nextSequence.subscribe(new Observer<T>() {
@Override
public void onNext(T item) {
// Make our best-effort to release resources in the face of unsubscribe.
if (isUnsubscribed()) {
innerSubscription.unsubscribe();
outerSubscription.unsubscribe();
} else {
observer.onNext(item);
public void onNext(T item) {
observer.onNext(item);
}
@Override
public void onError(Exception e) {
if (completedOrErred.compareAndSet(false, true)) {
outerSubscription.unsubscribe();
observer.onError(e);
}
}
@Override
public void onCompleted() {
synchronized (nextSequences) {
if (nextSequences.isEmpty()) {
// No new sequences available at the moment
innerSubscription = null;
if (allSequencesReceived.get()) {
// No new sequences are coming, we are finished
if (completedOrErred.compareAndSet(false, true)) {
observer.onCompleted();
}
}
}
@Override
public void onError(Exception e) {
outerSubscription.unsubscribe();
innerError.set(true);
observer.onError(e);
latch.countDown();
}
@Override
public void onCompleted() {
} else {
// Continue on to the next sequence
latch.countDown();
innerSubscription = new AtomicObservableSubscription();
innerSubscription.wrap(nextSequences.poll().subscribe(this));
}
}
}
};

outerSubscription.wrap(sequences.subscribe(new Observer<Observable<T>>() {
@Override
public void onNext(Observable<T> nextSequence) {
synchronized (nextSequences) {
if (innerSubscription == null) {
// We are currently not subscribed to any sequence
innerSubscription = new AtomicObservableSubscription();
innerSubscription.wrap(nextSequence.subscribe(reusableObserver));
} else {
// Put this sequence at the end of the queue
nextSequences.add(nextSequence);
}
}));
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Exceptions.propagate(e);
}
}
@Override
public void onError(Exception e) {
// NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls.
observer.onError(e);
if (completedOrErred.compareAndSet(false, true)) {
if (innerSubscription != null) {
innerSubscription.unsubscribe();
}
observer.onError(e);
}
}
@Override
public void onCompleted() {
// NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls.
if (!innerError.get()) {
observer.onCompleted();
allSequencesReceived.set(true);
if (innerSubscription == null) {
// We are not subscribed to any sequence, and none are coming anymore
if (completedOrErred.compareAndSet(false, true)) {
observer.onCompleted();
}
}
}
}));

return new Subscription() {
@Override
public void unsubscribe() {
synchronized (nextSequences) {
if (innerSubscription != null)
innerSubscription.unsubscribe();
outerSubscription.unsubscribe();
}
}
};
}
}

Expand Down Expand Up @@ -442,9 +461,72 @@ public void testConcatConcurrentWithInfinity() {

}



@Test
public void testConcatUnSubscribeNotBlockingObservables() {

final CountDownLatch okToContinueW1 = new CountDownLatch(1);
final CountDownLatch okToContinueW2 = new CountDownLatch(1);

final TestObservable<String> w1 = new TestObservable<String>(null, okToContinueW1, "one", "two", "three");
final TestObservable<String> w2 = new TestObservable<String>(null, okToContinueW2, "four", "five", "six");

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Observable<Observable<String>> observableOfObservables = Observable.create(new Func1<Observer<Observable<String>>, Subscription>() {

@Override
public Subscription call(Observer<Observable<String>> observer) {
// simulate what would happen in an observable
observer.onNext(w1);
observer.onNext(w2);
observer.onCompleted();

return new Subscription() {

@Override
public void unsubscribe() {
}

};
}

});
Observable<String> concat = Observable.create(concat(observableOfObservables));

concat.subscribe(aObserver);

verify(aObserver, times(0)).onCompleted();


//Wait for the thread to start up.
try {
Thread.sleep(25);
w1.t.join();
w2.t.join();
okToContinueW1.countDown();
okToContinueW2.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

InOrder inOrder = inOrder(aObserver);
inOrder.verify(aObserver, times(1)).onNext("one");
inOrder.verify(aObserver, times(1)).onNext("two");
inOrder.verify(aObserver, times(1)).onNext("three");
inOrder.verify(aObserver, times(1)).onNext("four");
inOrder.verify(aObserver, times(1)).onNext("five");
inOrder.verify(aObserver, times(1)).onNext("six");
verify(aObserver, times(1)).onCompleted();


}


/**
* The outer observable is running on the same thread and subscribe() in this case is a blocking call. Calling unsubscribe() is no-op because the sequence is complete.
* Test unsubscribing the concatenated Observable in a single thread.
*/
@Test
public void testConcatUnsubscribe() {
Expand All @@ -458,20 +540,13 @@ public void testConcatUnsubscribe() {
@SuppressWarnings("unchecked")
final Observable<String> concat = Observable.create(concat(w1, w2));
final AtomicObservableSubscription s1 = new AtomicObservableSubscription();
Thread t = new Thread() {
@Override
public void run() {
// NB: this statement does not complete until after "six" has been delivered.
s1.wrap(concat.subscribe(aObserver));
}
};
t.start();

try {
// Subscribe
s1.wrap(concat.subscribe(aObserver));
//Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once.
callOnce.await();
// NB: This statement has no effect, since s1 cannot possibly
// wrap anything until "six" has been delivered, which cannot
// happen until we okToContinue.countDown()
// Unsubcribe
s1.unsubscribe();
//Unblock the observable to continue.
okToContinue.countDown();
Expand All @@ -487,10 +562,9 @@ public void run() {
inOrder.verify(aObserver, times(1)).onNext("two");
inOrder.verify(aObserver, times(1)).onNext("three");
inOrder.verify(aObserver, times(1)).onNext("four");
// NB: you might hope that five and six are not delivered, but see above.
inOrder.verify(aObserver, times(1)).onNext("five");
inOrder.verify(aObserver, times(1)).onNext("six");
inOrder.verify(aObserver, times(1)).onCompleted();
inOrder.verify(aObserver, never()).onNext("five");
inOrder.verify(aObserver, never()).onNext("six");
inOrder.verify(aObserver, never()).onCompleted();

}

Expand Down Expand Up @@ -598,7 +672,7 @@ public void run() {
once.countDown();
//Block until the main thread has called unsubscribe.
if (null != okToContinue)
okToContinue.await();
okToContinue.await(1, TimeUnit.SECONDS);
}
if (subscribed)
observer.onCompleted();
Expand Down

0 comments on commit 58951bd

Please sign in to comment.