From ba7f9103f2706c162f0a27d4ca4be35acf3e78fa Mon Sep 17 00:00:00 2001 From: Aaron Tull Date: Tue, 28 Jul 2015 17:55:34 -0700 Subject: [PATCH] Implementing the SyncOnSubscribe --- .../java/rx/observables/SyncOnSubscribe.java | 465 +++++++++ .../rx/jmh/InputWithIncrementingInteger.java | 22 +- .../observables/BlockingObservablePerf.java | 32 +- src/perf/java/rx/observables/MultiInput.java | 36 + src/perf/java/rx/observables/SingleInput.java | 36 + .../rx/observables/SyncOnSubscribePerf.java | 118 +++ .../rx/observables/SyncOnSubscribeTest.java | 982 ++++++++++++++++++ 7 files changed, 1649 insertions(+), 42 deletions(-) create mode 100644 src/main/java/rx/observables/SyncOnSubscribe.java create mode 100644 src/perf/java/rx/observables/MultiInput.java create mode 100644 src/perf/java/rx/observables/SingleInput.java create mode 100644 src/perf/java/rx/observables/SyncOnSubscribePerf.java create mode 100644 src/test/java/rx/observables/SyncOnSubscribeTest.java diff --git a/src/main/java/rx/observables/SyncOnSubscribe.java b/src/main/java/rx/observables/SyncOnSubscribe.java new file mode 100644 index 0000000000..47a6c34024 --- /dev/null +++ b/src/main/java/rx/observables/SyncOnSubscribe.java @@ -0,0 +1,465 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.observables; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; + +import rx.Observable.OnSubscribe; +import rx.Observer; +import rx.Producer; +import rx.Subscriber; +import rx.Subscription; +import rx.annotations.Experimental; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Action2; +import rx.functions.Func0; +import rx.functions.Func2; +import rx.internal.operators.BackpressureUtils; +import rx.plugins.RxJavaPlugins; + +/** + * A utility class to create {@code OnSubscribe} functions that respond correctly to back + * pressure requests from subscribers. This is an improvement over + * {@link rx.Observable#create(OnSubscribe) Observable.create(OnSubscribe)} which does not provide + * any means of managing back pressure requests out-of-the-box. + * + * @param + * the type of the user-define state used in {@link #generateState() generateState(S)} , + * {@link #next(Object, Subscriber) next(S, Subscriber)}, and + * {@link #onUnsubscribe(Object) onUnsubscribe(S)}. + * @param + * the type of {@code Subscribers} that will be compatible with {@code this}. + */ +@Experimental +public abstract class SyncOnSubscribe implements OnSubscribe { + + /* (non-Javadoc) + * @see rx.functions.Action1#call(java.lang.Object) + */ + @Override + public final void call(final Subscriber subscriber) { + S state = generateState(); + SubscriptionProducer p = new SubscriptionProducer(subscriber, this, state); + subscriber.add(p); + subscriber.setProducer(p); + } + + /** + * Executed once when subscribed to by a subscriber (via {@link OnSubscribe#call(Subscriber)}) + * to produce a state value. This value is passed into {@link #next(Object, Observer) next(S + * state, Observer observer)} on the first iteration. Subsequent iterations of {@code next} + * will receive the state returned by the previous invocation of {@code next}. + * + * @return the initial state value + */ + protected abstract S generateState(); + + /** + * Called to produce data to the downstream subscribers. To emit data to a downstream subscriber + * call {@code observer.onNext(t)}. To signal an error condition call + * {@code observer.onError(throwable)} or throw an Exception. To signal the end of a data stream + * call {@code + * observer.onCompleted()}. Implementations of this method must follow the following rules. + * + *
    + *
  • Must not call {@code observer.onNext(t)} more than 1 time per invocation.
  • + *
  • Must not call {@code observer.onNext(t)} concurrently.
  • + *
+ * + * The value returned from an invocation of this method will be passed in as the {@code state} + * argument of the next invocation of this method. + * + * @param state + * the state value (from {@link #generateState()} on the first invocation or the + * previous invocation of this method. + * @param observer + * the observer of data emitted by + * @return the next iteration's state value + */ + protected abstract S next(S state, Observer observer); + + /** + * Clean up behavior that is executed after the downstream subscriber's subscription is + * unsubscribed. This method will be invoked exactly once. + * + * @param state + * the last state value prior from {@link #generateState()} or + * {@link #next(Object, Observer) next(S, Observer<T>)} before unsubscribe. + */ + protected void onUnsubscribe(S state) { + + } + + /** + * Generates a synchronous {@link SyncOnSubscribe} that calls the provided {@code next} function + * to generate data to downstream subscribers. + * + * @param generator + * generates the initial state value (see {@link #generateState()}) + * @param next + * produces data to the downstream subscriber (see {@link #next(Object, Subscriber) + * next(S, Subscriber)}) + * @return an OnSubscribe that emits data in a protocol compatible with back-pressure. + */ + @Experimental + public static OnSubscribe createSingleState(Func0 generator, + final Action2> next) { + Func2, S> nextFunc = new Func2, S>() { + @Override + public S call(S state, Observer subscriber) { + next.call(state, subscriber); + return state; + } + }; + return new SyncOnSubscribeImpl(generator, nextFunc); + } + + /** + * Generates a synchronous {@link SyncOnSubscribe} that calls the provided {@code next} function + * to generate data to downstream subscribers. + * + * This overload creates a SyncOnSubscribe without an explicit clean up step. + * + * @param generator + * generates the initial state value (see {@link #generateState()}) + * @param next + * produces data to the downstream subscriber (see {@link #next(Object, Subscriber) + * next(S, Subscriber)}) + * @param onUnsubscribe + * clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)}) + * @return an OnSubscribe that emits data downstream in a protocol compatible with + * back-pressure. + */ + @Experimental + public static OnSubscribe createSingleState(Func0 generator, + final Action2> next, + final Action1 onUnsubscribe) { + Func2, S> nextFunc = new Func2, S>() { + @Override + public S call(S state, Observer subscriber) { + next.call(state, subscriber); + return state; + } + }; + return new SyncOnSubscribeImpl(generator, nextFunc, onUnsubscribe); + } + + /** + * Generates a synchronous {@link SyncOnSubscribe} that calls the provided {@code next} function + * to generate data to downstream subscribers. + * + * @param generator + * generates the initial state value (see {@link #generateState()}) + * @param next + * produces data to the downstream subscriber (see {@link #next(Object, Subscriber) + * next(S, Subscriber)}) + * @param onUnsubscribe + * clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)}) + * @return an OnSubscribe that emits data downstream in a protocol compatible with + * back-pressure. + */ + @Experimental + public static OnSubscribe createStateful(Func0 generator, + Func2, ? extends S> next, + Action1 onUnsubscribe) { + return new SyncOnSubscribeImpl(generator, next, onUnsubscribe); + } + + /** + * Generates a synchronous {@link SyncOnSubscribe} that calls the provided {@code next} function + * to generate data to downstream subscribers. + * + * @param generator + * generates the initial state value (see {@link #generateState()}) + * @param next + * produces data to the downstream subscriber (see {@link #next(Object, Subscriber) + * next(S, Subscriber)}) + * @return an OnSubscribe that emits data downstream in a protocol compatible with + * back-pressure. + */ + @Experimental + public static OnSubscribe createStateful(Func0 generator, + Func2, ? extends S> next) { + return new SyncOnSubscribeImpl(generator, next); + } + + /** + * Generates a synchronous {@link SyncOnSubscribe} that calls the provided {@code next} function + * to generate data to downstream subscribers. + * + * This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state + * value. This should be used when the {@code next} function closes over it's state. + * + * @param next + * produces data to the downstream subscriber (see {@link #next(Object, Subscriber) + * next(S, Subscriber)}) + * @return an OnSubscribe that emits data downstream in a protocol compatible with + * back-pressure. + */ + @Experimental + public static OnSubscribe createStateless(final Action1> next) { + Func2, Void> nextFunc = new Func2, Void>() { + @Override + public Void call(Void state, Observer subscriber) { + next.call(subscriber); + return state; + } + }; + return new SyncOnSubscribeImpl(nextFunc); + } + + /** + * Generates a synchronous {@link SyncOnSubscribe} that calls the provided {@code next} function + * to generate data to downstream subscribers. + * + * This overload creates a "state-less" SyncOnSubscribe which does not have an explicit state + * value. This should be used when the {@code next} function closes over it's state. + * + * @param next + * produces data to the downstream subscriber (see {@link #next(Object, Subscriber) + * next(S, Subscriber)}) + * @param onUnsubscribe + * clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)}) + * @return an OnSubscribe that emits data downstream in a protocol compatible with + * back-pressure. + */ + @Experimental + public static OnSubscribe createStateless(final Action1> next, + final Action0 onUnsubscribe) { + Func2, Void> nextFunc = new Func2, Void>() { + @Override + public Void call(Void state, Observer subscriber) { + next.call(subscriber); + return null; + } + }; + Action1 wrappedOnUnsubscribe = new Action1(){ + @Override + public void call(Void t) { + onUnsubscribe.call(); + }}; + return new SyncOnSubscribeImpl(nextFunc, wrappedOnUnsubscribe); + } + + /** + * An implementation of SyncOnSubscribe that delegates + * {@link SyncOnSubscribe#next(Object, Subscriber)}, {@link SyncOnSubscribe#generateState()}, + * and {@link SyncOnSubscribe#onUnsubscribe(Object)} to provided functions/closures. + * + * @param + * the type of the user-defined state + * @param + * the type of compatible Subscribers + */ + private static final class SyncOnSubscribeImpl extends SyncOnSubscribe { + private final Func0 generator; + private final Func2, ? extends S> next; + private final Action1 onUnsubscribe; + + private SyncOnSubscribeImpl(Func0 generator, Func2, ? extends S> next, Action1 onUnsubscribe) { + this.generator = generator; + this.next = next; + this.onUnsubscribe = onUnsubscribe; + } + + public SyncOnSubscribeImpl(Func0 generator, Func2, ? extends S> next) { + this(generator, next, null); + } + + public SyncOnSubscribeImpl(Func2, S> next, Action1 onUnsubscribe) { + this(null, next, onUnsubscribe); + } + + public SyncOnSubscribeImpl(Func2, S> nextFunc) { + this(null, nextFunc, null); + } + + @Override + protected S generateState() { + return generator == null ? null : generator.call(); + } + + @Override + protected S next(S state, Observer observer) { + return next.call(state, observer); + } + + @Override + protected void onUnsubscribe(S state) { + if (onUnsubscribe != null) + onUnsubscribe.call(state); + } + } + + /** + * Contains the producer loop that reacts to downstream requests of work. + * + * @param + * the type of compatible Subscribers + */ + private static class SubscriptionProducer + extends AtomicLong implements Producer, Subscription, Observer { + /** */ + private static final long serialVersionUID = -3736864024352728072L; + private final Subscriber actualSubscriber; + private final SyncOnSubscribe parent; + private boolean onNextCalled; + private boolean hasTerminated; + + private S state; + + volatile int isUnsubscribed; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater IS_UNSUBSCRIBED = + AtomicIntegerFieldUpdater.newUpdater(SubscriptionProducer.class, "isUnsubscribed"); + + private SubscriptionProducer(final Subscriber subscriber, SyncOnSubscribe parent, S state) { + this.actualSubscriber = subscriber; + this.parent = parent; + this.state = state; + } + + @Override + public boolean isUnsubscribed() { + return isUnsubscribed != 0; + } + + @Override + public void unsubscribe() { + IS_UNSUBSCRIBED.compareAndSet(this, 0, 1); + if (get() == 0L) + parent.onUnsubscribe(state); + } + + @Override + public void request(long n) { + if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) { + if (n == Long.MAX_VALUE) { + fastpath(); + } else { + slowPath(n); + } + } + } + + void fastpath() { + final SyncOnSubscribe p = parent; + Subscriber a = actualSubscriber; + + if (isUnsubscribed()) { + p.onUnsubscribe(state); + return; + } + + for (;;) { + try { + onNextCalled = false; + nextIteration(p); + } catch (Throwable ex) { + handleThrownError(p, a, state, ex); + return; + } + if (hasTerminated || isUnsubscribed()) { + p.onUnsubscribe(state); + return; + } + } + } + + private void handleThrownError(final SyncOnSubscribe p, Subscriber a, S st, Throwable ex) { + if (hasTerminated) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(ex); + } else { + hasTerminated = true; + a.onError(ex); + p.onUnsubscribe(st); + } + } + + void slowPath(long n) { + final SyncOnSubscribe p = parent; + Subscriber a = actualSubscriber; + long numRequested = n; + for (;;) { + if (isUnsubscribed()) { + p.onUnsubscribe(state); + return; + } + long numRemaining = numRequested; + do { + try { + onNextCalled = false; + nextIteration(p); + } catch (Throwable ex) { + handleThrownError(p, a, state, ex); + return; + } + if (hasTerminated || isUnsubscribed()) { + p.onUnsubscribe(state); + return; + } + if (onNextCalled) + numRemaining--; + } while (numRemaining != 0L); + + numRequested = addAndGet(-numRequested); + if (numRequested == 0L) { + break; + } + } + } + + private void nextIteration(final SyncOnSubscribe parent) { + state = parent.next(state, this); + } + + @Override + public void onCompleted() { + if (hasTerminated) { + throw new IllegalStateException("Terminal event already emitted."); + } + hasTerminated = true; + if (!actualSubscriber.isUnsubscribed()) { + actualSubscriber.onCompleted(); + } + } + + @Override + public void onError(Throwable e) { + if (hasTerminated) { + throw new IllegalStateException("Terminal event already emitted."); + } + hasTerminated = true; + if (!actualSubscriber.isUnsubscribed()) { + actualSubscriber.onError(e); + } + } + + @Override + public void onNext(T value) { + if (onNextCalled) { + throw new IllegalStateException("onNext called multiple times!"); + } + onNextCalled = true; + actualSubscriber.onNext(value); + } + } + + +} \ No newline at end of file diff --git a/src/perf/java/rx/jmh/InputWithIncrementingInteger.java b/src/perf/java/rx/jmh/InputWithIncrementingInteger.java index f86bc28117..6760202024 100644 --- a/src/perf/java/rx/jmh/InputWithIncrementingInteger.java +++ b/src/perf/java/rx/jmh/InputWithIncrementingInteger.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package rx.jmh; import java.util.Iterator; @@ -40,46 +41,43 @@ public abstract class InputWithIncrementingInteger { @Setup public void setup(final Blackhole bh) { this.bh = bh; - observable = Observable.range(0, getSize()); + final int size = getSize(); + observable = Observable.range(0, size); firehose = Observable.create(new OnSubscribe() { @Override public void call(Subscriber s) { - for (int i = 0; i < getSize(); i++) { + for (int i = 0; i < size; i++) { s.onNext(i); } s.onCompleted(); } }); - iterable = new Iterable() { - @Override public Iterator iterator() { return new Iterator() { - int i = 0; - + @Override public boolean hasNext() { - return i < getSize(); + return i < size; } - + @Override public Integer next() { + Blackhole.consumeCPU(10); return i++; } - + @Override public void remove() { - + } - }; } - }; observer = new Observer() { diff --git a/src/perf/java/rx/observables/BlockingObservablePerf.java b/src/perf/java/rx/observables/BlockingObservablePerf.java index 4cb18d31c0..7c6b00029e 100644 --- a/src/perf/java/rx/observables/BlockingObservablePerf.java +++ b/src/perf/java/rx/observables/BlockingObservablePerf.java @@ -15,48 +15,20 @@ */ package rx.observables; +import java.util.concurrent.TimeUnit; + import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.State; -import rx.jmh.InputWithIncrementingInteger; - -import java.util.concurrent.TimeUnit; @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) @State(Scope.Thread) public class BlockingObservablePerf { - @State(Scope.Thread) - public static class MultiInput extends InputWithIncrementingInteger { - - @Param({ "1", "1000", "1000000" }) - public int size; - - @Override - public int getSize() { - return size; - } - - } - - @State(Scope.Thread) - public static class SingleInput extends InputWithIncrementingInteger { - - @Param({ "1" }) - public int size; - - @Override - public int getSize() { - return size; - } - - } - @Benchmark public int benchSingle(final SingleInput input) { return input.observable.toBlocking().single(); diff --git a/src/perf/java/rx/observables/MultiInput.java b/src/perf/java/rx/observables/MultiInput.java new file mode 100644 index 0000000000..e607249d07 --- /dev/null +++ b/src/perf/java/rx/observables/MultiInput.java @@ -0,0 +1,36 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.observables; + +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; + +import rx.jmh.InputWithIncrementingInteger; + +@State(Scope.Thread) +public class MultiInput extends InputWithIncrementingInteger { + + @Param({ "1", "1000", "1000000" }) + public int size; + + @Override + public int getSize() { + return size; + } + +} diff --git a/src/perf/java/rx/observables/SingleInput.java b/src/perf/java/rx/observables/SingleInput.java new file mode 100644 index 0000000000..7949efcfa5 --- /dev/null +++ b/src/perf/java/rx/observables/SingleInput.java @@ -0,0 +1,36 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.observables; + +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; + +import rx.jmh.InputWithIncrementingInteger; + +@State(Scope.Thread) +public class SingleInput extends InputWithIncrementingInteger { + + @Param({ "1" }) + public int size; + + @Override + public int getSize() { + return size; + } + +} diff --git a/src/perf/java/rx/observables/SyncOnSubscribePerf.java b/src/perf/java/rx/observables/SyncOnSubscribePerf.java new file mode 100644 index 0000000000..8417bf3a8e --- /dev/null +++ b/src/perf/java/rx/observables/SyncOnSubscribePerf.java @@ -0,0 +1,118 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.observables; + +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import rx.Observable.OnSubscribe; +import rx.Observer; +import rx.internal.operators.OnSubscribeFromIterable; + +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@State(Scope.Thread) +public class SyncOnSubscribePerf { + + public static void main(String[] args) { + SingleInput singleInput = new SingleInput(); + singleInput.size = 1; + singleInput.setup(generated._jmh_tryInit_()); + SyncOnSubscribePerf perf = new SyncOnSubscribePerf(); + perf.benchSyncOnSubscribe(singleInput); + } + private static class generated { + private static Blackhole _jmh_tryInit_() { + return new Blackhole(); + } + } + + private static OnSubscribe createSyncOnSubscribe(final Iterator iterator) { + return new SyncOnSubscribe(){ + + @Override + protected Void generateState() { + return null; + } + + @Override + protected Void next(Void state, Observer observer) { + if (iterator.hasNext()) { + observer.onNext(iterator.next()); + } + else + observer.onCompleted(); + return null; + } + }; + } + +// @Benchmark +// @Group("single") + public void benchSyncOnSubscribe(final SingleInput input) { + createSyncOnSubscribe(input.iterable.iterator()).call(input.newSubscriber()); + } + +// @Benchmark +// @Group("single") + public void benchFromIterable(final SingleInput input) { + new OnSubscribeFromIterable(input.iterable).call(input.newSubscriber()); + } + +// @Benchmark +// @Group("single") + public void benchAbstractOnSubscribe(final SingleInput input) { + final Iterator iterator = input.iterable.iterator(); + createAbstractOnSubscribe(iterator).call(input.newSubscriber()); + } + + private AbstractOnSubscribe createAbstractOnSubscribe(final Iterator iterator) { + return new AbstractOnSubscribe() { + @Override + protected void next(rx.observables.AbstractOnSubscribe.SubscriptionState state) { + if (iterator.hasNext()) + state.onNext(iterator.next()); + else + state.onCompleted(); + }}; + } + + @Benchmark +// @Group("multi") + public void benchSyncOnSubscribe2(final MultiInput input) { + createSyncOnSubscribe(input.iterable.iterator()).call(input.newSubscriber()); + } + +// @Benchmark +// @Group("multi") + public void benchAbstractOnSubscribe2(final MultiInput input) { + createAbstractOnSubscribe(input.iterable.iterator()).call(input.newSubscriber()); + } + + @Benchmark +// @Group("multi") + public void benchFromIterable2(final MultiInput input) { + new OnSubscribeFromIterable(input.iterable).call(input.newSubscriber()); + } +} diff --git a/src/test/java/rx/observables/SyncOnSubscribeTest.java b/src/test/java/rx/observables/SyncOnSubscribeTest.java new file mode 100644 index 0000000000..91421d502a --- /dev/null +++ b/src/test/java/rx/observables/SyncOnSubscribeTest.java @@ -0,0 +1,982 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.observables; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Matchers; +import org.mockito.Mockito; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Observable.Operator; +import rx.Observer; +import rx.Producer; +import rx.Subscriber; +import rx.exceptions.TestException; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Action2; +import rx.functions.Func0; +import rx.functions.Func2; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; + +/** + * Test if SyncOnSubscribe adheres to the usual unsubscription and backpressure contracts. + */ +public class SyncOnSubscribeTest { + @Test + public void testObservableJustEquivalent() { + OnSubscribe os = SyncOnSubscribe.createStateless(new Action1>() { + @Override + public void call(Observer subscriber) { + subscriber.onNext(1); + subscriber.onCompleted(); + }}); + + TestSubscriber ts = new TestSubscriber(); + + Observable.create(os).subscribe(ts); + + ts.assertNoErrors(); + ts.assertTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList(1)); + } + + @Test + public void testStateAfterTerminal() { + final AtomicInteger finalStateValue = new AtomicInteger(-1); + OnSubscribe os = SyncOnSubscribe.createStateful(new Func0(){ + @Override + public Integer call() { + return 1; + }}, + new Func2, Integer>() { + @Override + public Integer call(Integer state, Observer subscriber) { + subscriber.onNext(state); + subscriber.onCompleted(); + return state + 1; + }}, new Action1() { + @Override + public void call(Integer t) { + finalStateValue.set(t); + }}); + + TestSubscriber ts = new TestSubscriber(); + + Observable.create(os).subscribe(ts); + + ts.assertNoErrors(); + ts.assertTerminalEvent(); + ts.assertValue(1); + assertEquals(2, finalStateValue.get()); + } + + @Test + public void testMultipleOnNextValuesCallsOnError() { + OnSubscribe os = SyncOnSubscribe.createStateless(new Action1>() { + @Override + public void call(Observer subscriber) { + subscriber.onNext(1); + subscriber.onNext(2); + subscriber.onCompleted(); + }}); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + Observable.create(os).subscribe(o); + + verify(o, times(1)).onNext(1); + verify(o, never()).onNext(2); + verify(o, never()).onCompleted(); + verify(o, times(1)).onError(any(IllegalStateException.class)); + } + + @Test + public void testMultipleOnCompleted() { + OnSubscribe os = SyncOnSubscribe.createStateless(new Action1>() { + @Override + public void call(Observer subscriber) { + subscriber.onNext(1); + subscriber.onCompleted(); + subscriber.onCompleted(); + }}); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + Observable.create(os).subscribe(o); + + verify(o, times(1)).onNext(1); + verify(o, times(1)).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testOnNextAfterOnComplete() { + OnSubscribe os = SyncOnSubscribe.createStateless(new Action1>() { + @Override + public void call(Observer subscriber) { + subscriber.onNext(1); + subscriber.onCompleted(); + subscriber.onNext(1); + }}); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + Observable.create(os).subscribe(o); + + verify(o, times(1)).onNext(1); + verify(o, times(1)).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + + @SuppressWarnings("serial") + private static class FooException extends RuntimeException { + public FooException(String string) { + super(string); + } + } + + @Test + public void testMultipleOnErrors() { + OnSubscribe os = SyncOnSubscribe.createStateless(new Action1>() { + @Override + public void call(Observer subscriber) { + subscriber.onNext(1); + subscriber.onError(new TestException("Forced failure 1")); + subscriber.onError(new FooException("Should not see this error.")); + }}); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + Observable.create(os).subscribe(o); + + verify(o, times(1)).onNext(1); + verify(o, never()).onCompleted(); + verify(o, times(1)).onError(isA(TestException.class)); + verify(o, never()).onError(isA(FooException.class)); + } + + @Test + public void testEmpty() { + OnSubscribe os = SyncOnSubscribe.createStateless(new Action1>() { + @Override + public void call(Observer subscriber) { + subscriber.onCompleted(); + }}); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + Observable.create(os).subscribe(o); + + verify(o, never()).onNext(any(Integer.class)); + verify(o, never()).onError(any(Throwable.class)); + verify(o).onCompleted(); + } + + @Test + public void testNever() { + OnSubscribe os = SyncOnSubscribe.createStateless(new Action1>() { + @Override + public void call(Observer subscriber) { + + }}); + + + Observable neverObservable = Observable.create(os).subscribeOn(Schedulers.newThread()); + Observable merged = Observable.amb(neverObservable, Observable.timer(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread())); + Iterator values = merged.toBlocking().toIterable().iterator(); + + assertTrue((values.hasNext())); + assertEquals(0l, values.next()); + } + + @Test + public void testThrows() { + OnSubscribe os = SyncOnSubscribe.createStateless(new Action1>() { + @Override + public void call(Observer subscriber) { + throw new TestException("Forced failure"); + }}); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + Observable.create(os).subscribe(o); + + verify(o, never()).onNext(any(Integer.class)); + verify(o, never()).onCompleted(); + verify(o).onError(any(TestException.class)); + } + + @Test + public void testThrowAfterCompleteFastPath() { + OnSubscribe os = SyncOnSubscribe.createStateless(new Action1>() { + @Override + public void call(Observer subscriber) { + subscriber.onCompleted(); + throw new TestException("Forced failure"); + }}); + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + Observable.create(os).subscribe(o); + + verify(o, never()).onNext(any(Integer.class)); + verify(o, times(1)).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testThrowsSlowPath() { + OnSubscribe os = SyncOnSubscribe.createStateless(new Action1>() { + @Override + public void call(Observer subscriber) { + throw new TestException("Forced failure"); + }}); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + TestSubscriber ts = new TestSubscriber(o) { + @Override + public void onStart() { + requestMore(0); // don't start right away + } + }; + + Observable.create(os).subscribe(ts); + + ts.requestMore(1); + + verify(o, never()).onNext(any(Integer.class)); + verify(o, never()).onCompleted(); + verify(o, times(1)).onError(any(TestException.class)); + } + + @Test + public void testError() { + OnSubscribe os = SyncOnSubscribe.createStateless(new Action1>() { + @Override + public void call(Observer subscriber) { + subscriber.onError(new TestException("Forced failure")); + }}); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + Observable.create(os).subscribe(o); + + verify(o, never()).onNext(any(Integer.class)); + verify(o).onError(any(TestException.class)); + verify(o, never()).onCompleted(); + } + + @Test + public void testRange() { + final int start = 1; + final int count = 4000; + OnSubscribe os = SyncOnSubscribe.createStateful(new Func0(){ + @Override + public Integer call() { + return start; + }}, + new Func2, Integer>() { + @Override + public Integer call(Integer state, Observer subscriber) { + subscriber.onNext(state); + if (state == count) { + subscriber.onCompleted(); + } + return state + 1; + } + }); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + Observable.create(os).subscribe(o); + + verify(o, never()).onError(any(TestException.class)); + inOrder.verify(o, times(count)).onNext(any(Integer.class)); + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testFromIterable() { + int n = 400; + final List source = new ArrayList(); + for (int i = 0; i < n; i++) { + source.add(i); + } + OnSubscribe os = SyncOnSubscribe.createStateful( + new Func0>() { + @Override + public Iterator call() { + return source.iterator(); + }}, + new Func2, Observer, Iterator>() { + @Override + public Iterator call(Iterator it, Observer observer) { + if (it.hasNext()) { + observer.onNext(it.next()); + } + if (!it.hasNext()) { + observer.onCompleted(); + } + return it; + }}); + + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + Observable.create(os).subscribe(o); + + verify(o, never()).onError(any(TestException.class)); + inOrder.verify(o, times(n)).onNext(any()); + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testInfiniteTake() { + final int start = 0; + final int finalCount = 4000; + OnSubscribe os = SyncOnSubscribe.createStateful( + new Func0() { + @Override + public Integer call() { + return start; + }}, + new Func2, Integer>() { + @Override + public Integer call(Integer state, Observer observer) { + observer.onNext(state); + return state + 1; + }}); + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + Observable.create(os).take(finalCount).subscribe(o); + + verify(o, never()).onError(any(Throwable.class)); + inOrder.verify(o, times(finalCount)).onNext(any()); + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testInfiniteRequestSome() { + final int finalCount = 4000; + final int start = 0; + + @SuppressWarnings("unchecked") + Action1 onUnSubscribe = mock(Action1.class); + + OnSubscribe os = SyncOnSubscribe.createStateful( + new Func0() { + @Override + public Integer call() { + return start; + }}, + new Func2, Integer>() { + @Override + public Integer call(Integer state, Observer observer) { + observer.onNext(state); + return state + 1; + }}, + onUnSubscribe); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + TestSubscriber ts = new TestSubscriber(o) { + @Override + public void onStart() { + requestMore(0); // don't start right away + } + }; + + Observable.create(os).subscribe(ts); + + ts.requestMore(finalCount); + + verify(o, never()).onError(any(Throwable.class)); + verify(o, never()).onCompleted(); + inOrder.verify(o, times(finalCount)).onNext(any()); + inOrder.verifyNoMoreInteractions(); + // unsubscribe does not take place because subscriber is still in process of requesting + verify(onUnSubscribe, never()).call(any(Integer.class)); + } + + @Test + public void testUnsubscribeDownstream() { + @SuppressWarnings("unchecked") + Action1 onUnSubscribe = mock(Action1.class); + + OnSubscribe os = SyncOnSubscribe.createStateful( + new Func0() { + @Override + public Integer call() { + return null; + }}, + new Func2, Integer>() { + @Override + public Integer call(Integer state, Observer observer) { + observer.onNext(state); + return state; + }}, + onUnSubscribe); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + TestSubscriber ts = new TestSubscriber(o); + + Observable.create(os).lift(new Operator(){ + @Override + public Subscriber call(final Subscriber subscriber) { + return new Subscriber(){ + @Override + public void setProducer(Producer p) { + p.request(Long.MAX_VALUE); + } + + @Override + public void onCompleted() { + subscriber.onCompleted(); + } + + @Override + public void onError(Throwable e) { + subscriber.onError(e); + } + + @Override + public void onNext(Object t) { + subscriber.onNext(t); + unsubscribe(); + }}; + }}).take(1).subscribe(ts); + + verify(o, never()).onError(any(Throwable.class)); + verify(onUnSubscribe, times(1)).call(any(Integer.class)); + } + + @Test + public void testConcurrentRequests() throws InterruptedException { + final int count1 = 1000; + final int count2 = 1000; + final int finalCount = count1 + count2; + final int start = 1; + final CountDownLatch l1 = new CountDownLatch(1); + final CountDownLatch l2 = new CountDownLatch(1); + + @SuppressWarnings("unchecked") + Action1 onUnSubscribe = mock(Action1.class); + + OnSubscribe os = SyncOnSubscribe.createStateful( + new Func0() { + @Override + public Integer call() { + return start; + }}, + new Func2, Integer>() { + @Override + public Integer call(Integer state, Observer observer) { + // countdown so the other thread is certain to make a concurrent request + l2.countDown(); + // wait until the 2nd request returns then proceed + try { + if (!l1.await(1, TimeUnit.SECONDS)) + throw new IllegalStateException(); + } catch (InterruptedException e) {} + observer.onNext(state); + if (state == finalCount) + observer.onCompleted(); + return state + 1; + }}, + onUnSubscribe); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + final TestSubscriber ts = new TestSubscriber(o); + Observable.create(os).subscribeOn(Schedulers.newThread()).subscribe(ts); + + // wait until the first request has started processing + try { + if (!l2.await(1, TimeUnit.SECONDS)) + throw new IllegalStateException(); + } catch (InterruptedException e) {} + // make a concurrent request, this should return + ts.requestMore(count2); + // unblock the 1st thread to proceed fulfilling requests + l1.countDown(); + + ts.awaitTerminalEvent(10, TimeUnit.SECONDS); + ts.assertNoErrors(); + + inOrder.verify(o, times(finalCount)).onNext(any()); + inOrder.verify(o, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(onUnSubscribe, times(1)).call(any(Integer.class)); + } + + @Test + public void testUnsubscribeOutsideOfLoop() { + final AtomicInteger calledUnsubscribe = new AtomicInteger(0); + final AtomicBoolean currentlyEvaluating = new AtomicBoolean(false); + + OnSubscribe os = SyncOnSubscribe.createStateful( + new Func0() { + @Override + public Void call() { + return null; + }}, + new Func2, Void>() { + @Override + public Void call(Void state, Observer observer) { + currentlyEvaluating.set(true); + observer.onNext(null); + currentlyEvaluating.set(false); + return null; + }}, + new Action1(){ + @Override + public void call(Void t) { + calledUnsubscribe.incrementAndGet(); + assertFalse(currentlyEvaluating.get()); + }}); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + final TestSubscriber ts = new TestSubscriber(o) { + @Override + public void onStart() { + requestMore(1); + } + }; + Observable.create(os).lift(new Operator(){ + @Override + public Subscriber call(final Subscriber subscriber) { + return new Subscriber(){ + @Override + public void setProducer(Producer p) { + p.request(1); + } + @Override + public void onCompleted() { + subscriber.onCompleted(); + } + + @Override + public void onError(Throwable e) { + subscriber.onError(e); + } + + @Override + public void onNext(final Void t) { + new Thread(new Runnable(){ + @Override + public void run() { + subscriber.onNext(t); + unsubscribe(); + subscriber.onCompleted(); + }}).start(); + }}; + }}).subscribe(ts); + ts.awaitTerminalEvent(1, TimeUnit.SECONDS); + ts.assertNoErrors(); + ts.assertUnsubscribed(); + assertEquals(1, calledUnsubscribe.get()); + } + + @Test + public void testIndependentStates() { + int count = 100; + final ConcurrentHashMap subscribers = new ConcurrentHashMap(); + + @SuppressWarnings("unchecked") + Action1> onUnSubscribe = mock(Action1.class); + + OnSubscribe os = SyncOnSubscribe.createStateful( + new Func0>() { + @Override + public Map call() { + return subscribers; + }}, + new Func2, Observer, Map>() { + @Override + public Map call(Map state, Observer observer) { + state.put(observer, observer); + observer.onCompleted(); + return state; + }}, + onUnSubscribe); + + Observable source = Observable.create(os); + for (int i = 0; i < count; i++) { + source.subscribe(); + } + + assertEquals(count, subscribers.size()); + verify(onUnSubscribe, times(count)).call(Matchers.>any()); + } + + @Test(timeout = 3000) + public void testSubscribeOn() { + final int start = 1; + final int count = 400; + final AtomicInteger countUnsubscribe = new AtomicInteger(0); + final int numSubscribers = 4; + + OnSubscribe os = SyncOnSubscribe.createStateful( + new Func0() { + @Override + public Integer call() { + return start; + }}, + new Func2, Integer>() { + @Override + public Integer call(Integer calls, Observer observer) { + if (calls > count) { + observer.onCompleted(); + } else { + observer.onNext(calls); + } + return calls + 1; + }}, + new Action1() { + @Override + public void call(Integer t) { + countUnsubscribe.incrementAndGet(); + }}); + + List> subs = new ArrayList>(numSubscribers); + List> mocks = new ArrayList>(numSubscribers); + for (int i = 0; i < numSubscribers; i++) { + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + TestSubscriber ts = new TestSubscriber(o); + subs.add(ts); + mocks.add(o); + } + + Observable o2 = Observable.create(os).subscribeOn(Schedulers.newThread()); + for (Subscriber ts : subs) { + o2.subscribe(ts); + } + + for (TestSubscriber ts : subs) { + ts.awaitTerminalEventAndUnsubscribeOnTimeout(1, TimeUnit.SECONDS); + } + + for (Observer o : mocks) { + verify(o, never()).onError(any(Throwable.class)); + verify(o, times(count)).onNext(any()); + verify(o, times(1)).onCompleted(); + } + assertEquals(numSubscribers, countUnsubscribe.get()); + } + + @Test(timeout = 10000) + public void testObserveOn() { + final int start = 1; + final int count = 4000; + + @SuppressWarnings("unchecked") + Action1 onUnSubscribe = mock(Action1.class); + @SuppressWarnings("unchecked") + Func0 generator = mock(Func0.class); + Mockito.when(generator.call()).thenReturn(start); + + OnSubscribe os = SyncOnSubscribe.createStateful(generator, + new Func2, Integer>() { + @Override + public Integer call(Integer calls, Observer observer) { + observer.onNext(calls); + if (calls == count) + observer.onCompleted(); + return calls + 1; + }}, + onUnSubscribe); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + TestSubscriber ts = new TestSubscriber(o); + + TestScheduler scheduler = new TestScheduler(); + Observable.create(os).observeOn(scheduler).subscribe(ts); + + scheduler.triggerActions(); + ts.awaitTerminalEvent(); + + verify(o, never()).onError(any(Throwable.class)); + verify(o, times(count)).onNext(any(Integer.class)); + verify(o).onCompleted(); + verify(generator, times(1)).call(); + + List events = ts.getOnNextEvents(); + for (int i = 0; i < events.size(); i++) { + assertEquals(i + 1, events.get(i)); + } + verify(onUnSubscribe, times(1)).call(any(Integer.class)); + } + + @Test + public void testCanRequestInOnNext() { + Action0 onUnSubscribe = mock(Action0.class); + + OnSubscribe os = SyncOnSubscribe.createStateless( + new Action1>() { + @Override + public void call(Observer observer) { + observer.onNext(1); + observer.onCompleted(); + }}, + onUnSubscribe); + final AtomicReference exception = new AtomicReference(); + Observable.create(os).subscribe(new Subscriber() { + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + exception.set(e); + } + + @Override + public void onNext(Integer t) { + request(1); + } + }); + if (exception.get() != null) { + exception.get().printStackTrace(); + } + assertNull(exception.get()); + verify(onUnSubscribe, times(1)).call(); + } + + @Test + public void testExtendingBase() { + final AtomicReference lastState = new AtomicReference(); + final AtomicInteger countUnsubs = new AtomicInteger(0); + SyncOnSubscribe sos = new SyncOnSubscribe() { + @Override + protected Object generateState() { + Object o = new Object(); + lastState.set(o); + return o; + } + + @Override + protected Object next(Object state, Observer observer) { + observer.onNext(lastState.get()); + assertEquals(lastState.get(), state); + Object o = new Object(); + lastState.set(o); + return o; + } + + @Override + protected void onUnsubscribe(Object state) { + countUnsubs.incrementAndGet(); + assertEquals(lastState.get(), state); + } + }; + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + TestSubscriber ts = new TestSubscriber(o); + + int count = 10; + Observable.create(sos).take(count).subscribe(ts); + + verify(o, never()).onError(any(Throwable.class)); + verify(o, times(count)).onNext(any(Object.class)); + verify(o).onCompleted(); + assertEquals(1, countUnsubs.get()); + } + + private interface FooQux {} + private static class Foo implements FooQux {} + private interface BarQux extends FooQux {} + private static class Bar extends Foo implements BarQux {} + + @Test + public void testGenericsCreateSingleState() { + Func0 generator = new Func0() { + @Override + public Bar call() { + return new Bar(); + }}; + Action2> next = new Action2>() { + @Override + public void call(BarQux state, Observer observer) { + observer.onNext(state); + observer.onCompleted(); + }}; + assertJustBehavior(SyncOnSubscribe.createSingleState(generator, next)); + } + + @Test + public void testGenericsCreateSingleStateWithUnsub() { + Func0 generator = new Func0() { + @Override + public Bar call() { + return new Bar(); + }}; + Action2> next = new Action2>() { + @Override + public void call(BarQux state, Observer observer) { + observer.onNext(state); + observer.onCompleted(); + }}; + Action1 unsub = new Action1() { + @Override + public void call(FooQux t) { + + }}; + assertJustBehavior(SyncOnSubscribe.createSingleState(generator, next, unsub)); + } + + @Test + public void testGenericsCreateStateful() { + Func0 generator = new Func0() { + @Override + public Bar call() { + return new Bar(); + }}; + Func2, ? extends BarQux> next = new Func2, BarQux>() { + @Override + public BarQux call(BarQux state, Observer observer) { + observer.onNext(state); + observer.onCompleted(); + return state; + }}; + assertJustBehavior(SyncOnSubscribe.createStateful(generator, next)); + } + + @Test + public void testGenericsCreateStatefulWithUnsub() { + Func0 generator = new Func0() { + @Override + public Bar call() { + return new Bar(); + }}; + Func2, ? extends BarQux> next = new Func2, BarQux>() { + @Override + public BarQux call(BarQux state, Observer observer) { + observer.onNext(state); + observer.onCompleted(); + return state; + }}; + Action1 unsub = new Action1() { + @Override + public void call(FooQux t) { + + }}; + OnSubscribe os = SyncOnSubscribe.createStateful(generator, next, unsub); + assertJustBehavior(os); + } + + @Test + public void testGenericsCreateStateless() { + Action1> next = new Action1>() { + @Override + public void call(Observer observer) { + observer.onNext(new Foo()); + observer.onCompleted(); + }}; + OnSubscribe os = SyncOnSubscribe.createStateless(next); + assertJustBehavior(os); + } + + @Test + public void testGenericsCreateStatelessWithUnsub() { + Action1> next = new Action1>() { + @Override + public void call(Observer observer) { + observer.onNext(new Foo()); + observer.onCompleted(); + }}; + Action0 unsub = new Action0() { + @Override + public void call() { + + }}; + OnSubscribe os = SyncOnSubscribe.createStateless(next, unsub); + assertJustBehavior(os); + } + + private void assertJustBehavior(OnSubscribe os) { + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + TestSubscriber ts = new TestSubscriber(o); + + os.call(ts); + verify(o, times(1)).onNext(any()); + verify(o, times(1)).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } +}