From a8c1e3c8ee14b612bfc086ee9cad2c78151837af Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 23 Apr 2014 15:06:47 +0200 Subject: [PATCH 1/2] OperatorConcat --- rxjava-core/src/main/java/rx/Observable.java | 20 +- .../java/rx/operators/OperationConcat.java | 175 ------------------ .../java/rx/operators/OperatorConcat.java | 119 ++++++++++++ .../rx/operators/OperationConcatTest.java | 95 ++++++++-- 4 files changed, 206 insertions(+), 203 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationConcat.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorConcat.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 7fea0a1fb1..36617e6abe 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -50,7 +50,6 @@ import rx.operators.OnSubscribeRange; import rx.operators.OperationBuffer; import rx.operators.OperationCombineLatest; -import rx.operators.OperationConcat; import rx.operators.OperationDebounce; import rx.operators.OperationDefaultIfEmpty; import rx.operators.OperationDefer; @@ -95,6 +94,7 @@ import rx.operators.OperatorAsObservable; import rx.operators.OperatorCache; import rx.operators.OperatorCast; +import rx.operators.OperatorConcat; import rx.operators.OperatorDoOnEach; import rx.operators.OperatorElementAt; import rx.operators.OperatorFilter; @@ -740,7 +740,7 @@ public final static Observable combin * @see MSDN: Observable.Concat */ public final static Observable concat(Observable> observables) { - return create(OperationConcat.concat(observables)); + return observables.lift(new OperatorConcat()); } /** @@ -761,7 +761,7 @@ public final static Observable concat(Observable Observable concat(Observable t1, Observable t2) { - return create(OperationConcat.concat(t1, t2)); + return concat(from(t1, t2)); } /** @@ -784,7 +784,7 @@ public final static Observable concat(Observable t1, Observa @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2, Observable t3) { - return create(OperationConcat.concat(t1, t2, t3)); + return concat(from(t1, t2, t3)); } /** @@ -809,7 +809,7 @@ public final static Observable concat(Observable t1, Observa @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4) { - return create(OperationConcat.concat(t1, t2, t3, t4)); + return concat(from(t1, t2, t3, t4)); } /** @@ -836,7 +836,7 @@ public final static Observable concat(Observable t1, Observa @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5) { - return create(OperationConcat.concat(t1, t2, t3, t4, t5)); + return concat(from(t1, t2, t3, t4, t5)); } /** @@ -865,7 +865,7 @@ public final static Observable concat(Observable t1, Observa @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6) { - return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6)); + return concat(from(t1, t2, t3, t4, t5, t6)); } /** @@ -896,7 +896,7 @@ public final static Observable concat(Observable t1, Observa @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7) { - return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6, t7)); + return concat(from(t1, t2, t3, t4, t5, t6, t7)); } /** @@ -929,7 +929,7 @@ public final static Observable concat(Observable t1, Observa @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8) { - return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6, t7, t8)); + return concat(from(t1, t2, t3, t4, t5, t6, t7, t8)); } /** @@ -964,7 +964,7 @@ public final static Observable concat(Observable t1, Observa @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8, Observable t9) { - return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6, t7, t8, t9)); + return concat(from(t1, t2, t3, t4, t5, t6, t7, t8, t9)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java deleted file mode 100644 index 8c39379a3e..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action0; -import rx.observers.Subscribers; -import rx.subscriptions.Subscriptions; - -/** - * Returns an Observable that emits the items emitted by two or more Observables, one after the - * other. - *

- * - */ -public final class OperationConcat { - - /** - * Combine the observable sequences from the list of Observables into one - * observable sequence without any transformation. If either the outer - * observable or an inner observable calls onError, we will call onError. - *

- * - * @param sequences - * An observable sequence of elements to project. - * @return An observable sequence whose elements are the result of combining the output from the list of Observables. - */ - public static OnSubscribeFunc concat(final Observable... sequences) { - return concat(Observable.from(sequences)); - } - - public static OnSubscribeFunc concat(final Iterable> sequences) { - return concat(Observable.from(sequences)); - } - - public static OnSubscribeFunc concat(final Observable> sequences) { - return new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(Observer t1) { - return new Concat(sequences).onSubscribe(t1); - } - }; - } - - private static class Concat implements OnSubscribeFunc { - private Observable> sequences; - private SafeObservableSubscription innerSubscription = null; - - public Concat(Observable> sequences) { - this.sequences = sequences; - } - - public Subscription onSubscribe(final Observer observer) { - final AtomicBoolean completedOrErred = new AtomicBoolean(false); - final AtomicBoolean allSequencesReceived = new AtomicBoolean(false); - final Queue> nextSequences = new ConcurrentLinkedQueue>(); - final SafeObservableSubscription outerSubscription = new SafeObservableSubscription(); - - final Observer reusableObserver = new Observer() { - @Override - public void onNext(T item) { - observer.onNext(item); - } - - @Override - public void onError(Throwable e) { - if (completedOrErred.compareAndSet(false, true)) { - outerSubscription.unsubscribe(); - observer.onError(e); - } - } - - @Override - public void onCompleted() { - synchronized (nextSequences) { - if (nextSequences.isEmpty()) { - // No new sequences available at the moment - innerSubscription = null; - if (allSequencesReceived.get()) { - // No new sequences are coming, we are finished - if (completedOrErred.compareAndSet(false, true)) { - observer.onCompleted(); - } - } - } else { - // Continue on to the next sequence - innerSubscription = new SafeObservableSubscription(); - innerSubscription.wrap(nextSequences.poll().unsafeSubscribe(Subscribers.from(this))); - } - } - } - }; - - outerSubscription.wrap(sequences.unsafeSubscribe(new Subscriber>() { - @Override - public void onNext(Observable nextSequence) { - synchronized (nextSequences) { - if (innerSubscription == null) { - // We are currently not subscribed to any sequence - innerSubscription = new SafeObservableSubscription(); - innerSubscription.wrap(nextSequence.unsafeSubscribe(Subscribers.from(reusableObserver))); - } else { - // Put this sequence at the end of the queue - nextSequences.add(nextSequence); - } - } - } - - @Override - public void onError(Throwable e) { - if (completedOrErred.compareAndSet(false, true)) { - Subscription q; - synchronized (nextSequences) { - q = innerSubscription; - } - if (q != null) { - q.unsubscribe(); - } - observer.onError(e); - } - } - - @Override - public void onCompleted() { - allSequencesReceived.set(true); - Subscription q; - synchronized (nextSequences) { - q = innerSubscription; - } - if (q == null) { - // We are not subscribed to any sequence, and none are coming anymore - if (completedOrErred.compareAndSet(false, true)) { - observer.onCompleted(); - } - } - } - })); - - return Subscriptions.create(new Action0() { - @Override - public void call() { - Subscription q; - synchronized (nextSequences) { - q = innerSubscription; - } - if (q != null) { - q.unsubscribe(); - } - outerSubscription.unsubscribe(); - } - }); - } - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorConcat.java b/rxjava-core/src/main/java/rx/operators/OperatorConcat.java new file mode 100644 index 0000000000..7b6a21eafe --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorConcat.java @@ -0,0 +1,119 @@ + /** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import rx.Observable; +import rx.Observable.Operator; +import rx.Subscriber; +import rx.functions.Action0; +import rx.observers.SerializedSubscriber; +import rx.subscriptions.SerialSubscription; +import rx.subscriptions.Subscriptions; + +/** + * Returns an Observable that emits the items emitted by two or more Observables, one after the + * other. + *

+ * + */ +public final class OperatorConcat implements Operator> { + final NotificationLite> nl = NotificationLite.instance(); + @Override + public Subscriber> call(final Subscriber child) { + final SerializedSubscriber s = new SerializedSubscriber(child); + final SerialSubscription current = new SerialSubscription(); + child.add(current); + return new ConcatSubscriber(s, current); + } + + final class ConcatSubscriber extends Subscriber> { + + private final Subscriber s; + private final SerialSubscription current; + final ConcurrentLinkedQueue queue; + final AtomicInteger wip; + + public ConcatSubscriber(Subscriber s, SerialSubscription current) { + super(s); + this.s = s; + this.current = current; + this.queue = new ConcurrentLinkedQueue(); + this.wip = new AtomicInteger(); + add(Subscriptions.create(new Action0() { + @Override + public void call() { + queue.clear(); + } + })); + } + + @Override + public void onNext(Observable t) { + queue.add(nl.next(t)); + if (wip.getAndIncrement() == 0) { + subscribeNext(); + } + } + + @Override + public void onError(Throwable e) { + s.onError(e); + unsubscribe(); + } + + @Override + public void onCompleted() { + queue.add(nl.completed()); + if (wip.getAndIncrement() == 0) { + subscribeNext(); + } + } + + void subscribeNext() { + Object o = queue.poll(); + if (nl.isCompleted(o)) { + s.onCompleted(); + } else + if (o != null) { + Observable obs = nl.getValue(o); + Subscriber sourceSub = new Subscriber() { + + @Override + public void onNext(T t) { + s.onNext(t); + } + + @Override + public void onError(Throwable e) { + ConcatSubscriber.this.onError(e); + } + + @Override + public void onCompleted() { + if (wip.decrementAndGet() > 0) { + subscribeNext(); + } + } + + }; + current.set(sourceSub); + obs.unsafeSubscribe(sourceSub); + } + } + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java b/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java index 557b0b5aff..2a56cea355 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static rx.operators.OperationConcat.concat; import java.util.ArrayList; import java.util.Arrays; @@ -36,7 +35,9 @@ import org.mockito.InOrder; import rx.Observable; +import rx.Observable.OnSubscribe; import rx.Observer; +import rx.Subscriber; import rx.Subscription; import rx.schedulers.TestScheduler; import rx.subscriptions.BooleanSubscription; @@ -56,7 +57,7 @@ public void testConcat() { final Observable even = Observable.from(e); @SuppressWarnings("unchecked") - Observable concat = Observable.create(concat(odds, even)); + Observable concat = Observable.concat(odds, even); concat.subscribe(observer); verify(observer, times(7)).onNext(anyString()); @@ -75,7 +76,7 @@ public void testConcatWithList() { final List> list = new ArrayList>(); list.add(odds); list.add(even); - Observable concat = Observable.create(concat(list)); + Observable concat = Observable.concat(Observable.from(list)); concat.subscribe(observer); verify(observer, times(7)).onNext(anyString()); @@ -105,7 +106,7 @@ public Subscription onSubscribe(Observer> observer) { } }); - Observable concat = Observable.create(concat(observableOfObservables)); + Observable concat = Observable.concat(observableOfObservables); concat.subscribe(observer); @@ -203,7 +204,7 @@ public void run() { } }); - Observable.create(concat(observableOfObservables)).subscribe(observer); + Observable.concat(observableOfObservables).subscribe(observer); // wait for parent to start parentHasStarted.await(); @@ -263,9 +264,8 @@ public void testBlockedObservableOfObservables() { final CountDownLatch callOnce = new CountDownLatch(1); final CountDownLatch okToContinue = new CountDownLatch(1); TestObservable> observableOfObservables = new TestObservable>(callOnce, okToContinue, odds, even); - Observable.OnSubscribeFunc concatF = concat(Observable.create(observableOfObservables)); - Observable concat = Observable.create(concatF); - concat.subscribe(observer); + Observable concatF = Observable.concat(Observable.create(observableOfObservables)); + concatF.subscribe(observer); try { //Block main thread to allow observables to serve up o1. callOnce.await(); @@ -303,11 +303,9 @@ public void testConcatConcurrentWithInfinity() { Observer observer = mock(Observer.class); @SuppressWarnings("unchecked") TestObservable> observableOfObservables = new TestObservable>(Observable.create(w1), Observable.create(w2)); - Observable.OnSubscribeFunc concatF = concat(Observable.create(observableOfObservables)); - - Observable concat = Observable.create(concatF); + Observable concatF = Observable.concat(Observable.create(observableOfObservables)); - concat.take(50).subscribe(observer); + concatF.take(50).subscribe(observer); //Wait for the thread to start up. try { @@ -351,7 +349,7 @@ public Subscription onSubscribe(Observer> observer) { } }); - Observable concat = Observable.create(concat(observableOfObservables)); + Observable concat = Observable.concat(observableOfObservables); concat.subscribe(observer); verify(observer, times(0)).onCompleted(); @@ -391,7 +389,7 @@ public void testConcatUnsubscribe() { @SuppressWarnings("unchecked") final Observer observer = mock(Observer.class); @SuppressWarnings("unchecked") - final Observable concat = Observable.create(concat(Observable.create(w1), Observable.create(w2))); + final Observable concat = Observable.concat(Observable.create(w1), Observable.create(w2)); try { // Subscribe @@ -434,11 +432,9 @@ public void testConcatUnsubscribeConcurrent() { Observer observer = mock(Observer.class); @SuppressWarnings("unchecked") TestObservable> observableOfObservables = new TestObservable>(Observable.create(w1), Observable.create(w2)); - Observable.OnSubscribeFunc concatF = concat(Observable.create(observableOfObservables)); - - Observable concat = Observable.create(concatF); + Observable concatF = Observable.concat(Observable.create(observableOfObservables)); - Subscription s1 = concat.subscribe(observer); + Subscription s1 = concatF.subscribe(observer); try { //Block main thread to allow observable "w1" to complete and observable "w2" to call onNext exactly once. @@ -593,4 +589,67 @@ public void testMultipleObservers() { verify(o1, never()).onError(any(Throwable.class)); verify(o2, never()).onError(any(Throwable.class)); } + + @Test + public void concatVeryLongObservableOfObservables() { + final int n = 10000; + Observable> source = Observable.create(new OnSubscribe>() { + @Override + public void call(Subscriber> s) { + for (int i = 0; i < n; i++) { + if (s.isUnsubscribed()) { + return; + } + s.onNext(Observable.from(i)); + } + s.onCompleted(); + } + }); + + Observable> result = Observable.concat(source).toList(); + + Observer> o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + List list = new ArrayList(n); + for (int i = 0; i < n; i++) { + list.add(i); + } + inOrder.verify(o).onNext(list); + inOrder.verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void concatVeryLongObservableOfObservablesTakeHalf() { + final int n = 10000; + Observable> source = Observable.create(new OnSubscribe>() { + @Override + public void call(Subscriber> s) { + for (int i = 0; i < n; i++) { + if (s.isUnsubscribed()) { + return; + } + s.onNext(Observable.from(i)); + } + s.onCompleted(); + } + }); + + Observable> result = Observable.concat(source).take(n / 2).toList(); + + Observer> o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + result.subscribe(o); + + List list = new ArrayList(n); + for (int i = 0; i < n / 2; i++) { + list.add(i); + } + inOrder.verify(o).onNext(list); + inOrder.verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } } From b50a6647f923f9826dcabe2d3243098cc7207f34 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 23 Apr 2014 17:00:27 +0200 Subject: [PATCH 2/2] Rename concat test --- .../{OperationConcatTest.java => OperatorConcatTest.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename rxjava-core/src/test/java/rx/operators/{OperationConcatTest.java => OperatorConcatTest.java} (99%) diff --git a/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java b/rxjava-core/src/test/java/rx/operators/OperatorConcatTest.java similarity index 99% rename from rxjava-core/src/test/java/rx/operators/OperationConcatTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorConcatTest.java index 2a56cea355..122610bbac 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorConcatTest.java @@ -43,7 +43,7 @@ import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; -public class OperationConcatTest { +public class OperatorConcatTest { @Test public void testConcat() {