From d85303842df9baca07dbaa28e6fed623a0189996 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 29 Apr 2014 15:22:07 +0200 Subject: [PATCH] Operator Switch --- rxjava-core/src/main/java/rx/Observable.java | 6 +- .../java/rx/operators/OperationSwitch.java | 175 ------------- .../java/rx/operators/OperatorSwitch.java | 234 ++++++++++++++++++ ...witchTest.java => OperatorSwitchTest.java} | 30 ++- 4 files changed, 252 insertions(+), 193 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationSwitch.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorSwitch.java rename rxjava-core/src/test/java/rx/operators/{OperationSwitchTest.java => OperatorSwitchTest.java} (94%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 746362ca78..58db739250 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -66,7 +66,6 @@ import rx.operators.OperationSequenceEqual; import rx.operators.OperationSkip; import rx.operators.OperationSkipUntil; -import rx.operators.OperationSwitch; import rx.operators.OperationTakeLast; import rx.operators.OperationTakeTimed; import rx.operators.OperationTakeUntil; @@ -120,6 +119,7 @@ import rx.operators.OperatorSkipLastTimed; import rx.operators.OperatorSkipWhile; import rx.operators.OperatorSubscribeOn; +import rx.operators.OperatorSwitch; import rx.operators.OperatorTake; import rx.operators.OperatorTimeout; import rx.operators.OperatorTimeoutWithSelector; @@ -2494,9 +2494,11 @@ public final static Observable sequenceEqual(ObservableRxJava Wiki: switchOnNext() + * + * @param the element type */ public final static Observable switchOnNext(Observable> sequenceOfSequences) { - return create(OperationSwitch.switchDo(sequenceOfSequences)); + return sequenceOfSequences.lift(new OperatorSwitch()); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java deleted file mode 100644 index 4af058f283..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Func1; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.SerialSubscription; - -/** - * Transforms an Observable that emits Observables into a single Observable that - * emits the items emitted by the most recently published of those Observables. - *

- * - */ -public final class OperationSwitch { - - /** - * This function transforms an {@link Observable} sequence of {@link Observable} sequences into a single {@link Observable} sequence - * which produces values from the most recently published {@link Observable} . - * - * @param sequences - * The {@link Observable} sequence consisting of {@link Observable} sequences. - * @return A {@link Func1} which does this transformation. - */ - public static OnSubscribeFunc switchDo(final Observable> sequences) { - return new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(Observer observer) { - return new Switch(sequences).onSubscribe(observer); - } - }; - } - - private static class Switch implements OnSubscribeFunc { - - private final Observable> sequences; - - public Switch(Observable> sequences) { - this.sequences = sequences; - } - - @Override - public Subscription onSubscribe(Observer observer) { - SafeObservableSubscription parent; - parent = new SafeObservableSubscription(); - - SerialSubscription child = new SerialSubscription(); - - parent.wrap(sequences.unsafeSubscribe(new SwitchObserver(observer, parent, child))); - - return new CompositeSubscription(parent, child); - } - } - - private static class SwitchObserver extends Subscriber> { - - private final Object gate; - private final Observer observer; - private final SafeObservableSubscription parent; - private final SerialSubscription child; - private long latest; - private boolean stopped; - private boolean hasLatest; - - public SwitchObserver(Observer observer, SafeObservableSubscription parent, - SerialSubscription child) { - this.observer = observer; - this.parent = parent; - this.child = child; - this.gate = new Object(); - } - - @Override - public void onNext(Observable args) { - final long id; - synchronized (gate) { - id = ++latest; - this.hasLatest = true; - } - - final SafeObservableSubscription sub = new SafeObservableSubscription(); - sub.wrap(args.unsafeSubscribe(new Subscriber() { - @Override - public void onNext(T args) { - synchronized (gate) { - if (latest == id) { - SwitchObserver.this.observer.onNext(args); - } - } - } - - @Override - public void onError(Throwable e) { - sub.unsubscribe(); - SafeObservableSubscription s = null; - synchronized (gate) { - if (latest == id) { - SwitchObserver.this.observer.onError(e); - s = SwitchObserver.this.parent; - } - } - if (s != null) { - s.unsubscribe(); - } - } - - @Override - public void onCompleted() { - sub.unsubscribe(); - SafeObservableSubscription s = null; - synchronized (gate) { - if (latest == id) { - SwitchObserver.this.hasLatest = false; - - if (stopped) { - SwitchObserver.this.observer.onCompleted(); - s = SwitchObserver.this.parent; - } - } - } - if (s != null) { - s.unsubscribe(); - } - } - - })); - - this.child.set(sub); - } - - @Override - public void onError(Throwable e) { - synchronized (gate) { - this.observer.onError(e); - } - - this.parent.unsubscribe(); - } - - @Override - public void onCompleted() { - SafeObservableSubscription s = null; - synchronized (gate) { - this.stopped = true; - if (!this.hasLatest) { - this.observer.onCompleted(); - s = this.parent; - } - } - if (s != null) { - s.unsubscribe(); - } - } - - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSwitch.java b/rxjava-core/src/main/java/rx/operators/OperatorSwitch.java new file mode 100644 index 0000000000..00d206da35 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorSwitch.java @@ -0,0 +1,234 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.List; +import rx.Observable; +import rx.Observable.Operator; +import rx.Subscriber; +import rx.observers.SerializedSubscriber; +import rx.subscriptions.SerialSubscription; + +/** + * Transforms an Observable that emits Observables into a single Observable that + * emits the items emitted by the most recently published of those Observables. + *

+ * + * + * @param the value type + */ +public final class OperatorSwitch implements Operator> { + + @Override + public Subscriber> call(final Subscriber child) { + final SerializedSubscriber s = new SerializedSubscriber(child); + final SerialSubscription ssub = new SerialSubscription(); + child.add(ssub); + + return new Subscriber>(child) { + final Object guard = new Object(); + final Subscriber self = this; + final NotificationLite nl = NotificationLite.instance(); + /** Guarded by guard. */ + int index; + /** Guarded by guard. */ + boolean active; + /** Guarded by guard. */ + boolean mainDone; + /** Guarded by guard. */ + List queue; + /** Guarded by guard. */ + boolean emitting; + @Override + public void onNext(Observable t) { + final int id; + synchronized (guard) { + id = ++index; + active = true; + } + + Subscriber sub = new Subscriber() { + + @Override + public void onNext(T t) { + emit(t, id); + } + + @Override + public void onError(Throwable e) { + error(e, id); + } + + @Override + public void onCompleted() { + complete(id); + } + + }; + ssub.set(sub); + + t.unsafeSubscribe(sub); + } + + @Override + public void onError(Throwable e) { + s.onError(e); + unsubscribe(); + } + + @Override + public void onCompleted() { + List localQueue; + synchronized (guard) { + mainDone = true; + if (active) { + return; + } + if (emitting) { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(nl.completed()); + return; + } + localQueue = queue; + queue = null; + emitting = true; + } + drain(localQueue); + s.onCompleted(); + unsubscribe(); + } + void emit(T value, int id) { + List localQueue; + synchronized (guard) { + if (id != index) { + return; + } + if (emitting) { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(value); + return; + } + localQueue = queue; + queue = null; + emitting = true; + } + boolean once = true; + boolean skipFinal = false; + try { + do { + drain(localQueue); + if (once) { + once = false; + s.onNext(value); + } + synchronized (guard) { + localQueue = queue; + queue = null; + if (localQueue == null) { + emitting = false; + skipFinal = true; + break; + } + } + } while (!s.isUnsubscribed()); + } finally { + if (!skipFinal) { + synchronized (guard) { + emitting = false; + } + } + } + } + void drain(List localQueue) { + if (localQueue == null) { + return; + } + for (Object o : localQueue) { + if (nl.isCompleted(o)) { + s.onCompleted(); + break; + } else + if (nl.isError(o)) { + s.onError(nl.getError(o)); + break; + } else { + @SuppressWarnings("unchecked") + T t = (T)o; + s.onNext(t); + } + } + } + + void error(Throwable e, int id) { + List localQueue; + synchronized (guard) { + if (id != index) { + return; + } + if (emitting) { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(nl.error(e)); + return; + } + + localQueue = queue; + queue = null; + emitting = true; + } + + drain(localQueue); + s.onError(e); + unsubscribe(); + } + void complete(int id) { + List localQueue; + synchronized (guard) { + if (id != index) { + return; + } + active = false; + if (!mainDone) { + return; + } + if (emitting) { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(nl.completed()); + return; + } + + localQueue = queue; + queue = null; + emitting = true; + } + + drain(localQueue); + s.onCompleted(); + unsubscribe(); + } + }; + } + +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSwitchTest.java similarity index 94% rename from rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorSwitchTest.java index a0d6fede6b..b90086e821 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorSwitchTest.java @@ -28,16 +28,17 @@ import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; - import rx.Observable; import rx.Observer; + import rx.Scheduler; +import rx.Subscriber; import rx.Subscription; import rx.functions.Action0; import rx.schedulers.TestScheduler; import rx.subscriptions.Subscriptions; -public class OperationSwitchTest { +public class OperatorSwitchTest { private TestScheduler scheduler; private Scheduler.Worker innerScheduler; @@ -53,25 +54,22 @@ public void before() { @Test public void testSwitchWhenOuterCompleteBeforeInner() { - Observable> source = Observable.create(new Observable.OnSubscribeFunc>() { + Observable> source = Observable.create(new Observable.OnSubscribe>() { @Override - public Subscription onSubscribe(Observer> observer) { - publishNext(observer, 50, Observable.create(new Observable.OnSubscribeFunc() { + public void call(Subscriber> observer) { + publishNext(observer, 50, Observable.create(new Observable.OnSubscribe() { @Override - public Subscription onSubscribe(Observer observer) { + public void call(Subscriber observer) { publishNext(observer, 70, "one"); publishNext(observer, 100, "two"); publishCompleted(observer, 200); - return Subscriptions.empty(); } })); publishCompleted(observer, 60); - - return Subscriptions.empty(); } }); - Observable sampled = Observable.create(OperationSwitch.switchDo(source)); + Observable sampled = Observable.switchOnNext(source); sampled.subscribe(observer); InOrder inOrder = inOrder(observer); @@ -111,7 +109,7 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable sampled = Observable.create(OperationSwitch.switchDo(source)); + Observable sampled = Observable.switchOnNext(source); sampled.subscribe(observer); InOrder inOrder = inOrder(observer); @@ -157,7 +155,7 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable sampled = Observable.create(OperationSwitch.switchDo(source)); + Observable sampled = Observable.switchOnNext(source); sampled.subscribe(observer); InOrder inOrder = inOrder(observer); @@ -217,7 +215,7 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable sampled = Observable.create(OperationSwitch.switchDo(source)); + Observable sampled = Observable.switchOnNext(source); sampled.subscribe(observer); InOrder inOrder = inOrder(observer); @@ -282,7 +280,7 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable sampled = Observable.create(OperationSwitch.switchDo(source)); + Observable sampled = Observable.switchOnNext(source); sampled.subscribe(observer); InOrder inOrder = inOrder(observer); @@ -337,7 +335,7 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable sampled = Observable.create(OperationSwitch.switchDo(source)); + Observable sampled = Observable.switchOnNext(source); sampled.subscribe(observer); InOrder inOrder = inOrder(observer); @@ -421,7 +419,7 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable sampled = Observable.create(OperationSwitch.switchDo(source)); + Observable sampled = Observable.switchOnNext(source); sampled.subscribe(observer); scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);