From 64d77b0910d2b620e3f22f93b3172904c3e345d5 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 20 Nov 2013 22:01:25 +0100 Subject: [PATCH] ToAsync: Issue #95 --- rxjava-core/src/main/java/rx/Observable.java | 646 ++++++++++++++++- .../main/java/rx/concurrency/Schedulers.java | 11 +- .../java/rx/operators/OperationToAsync.java | 404 +++++++++++ .../main/java/rx/util/functions/Action4.java | 24 + .../main/java/rx/util/functions/Action5.java | 24 + .../main/java/rx/util/functions/Action6.java | 24 + .../main/java/rx/util/functions/Action7.java | 23 + .../main/java/rx/util/functions/Action8.java | 23 + .../main/java/rx/util/functions/Action9.java | 23 + .../main/java/rx/util/functions/ActionN.java | 23 + .../main/java/rx/util/functions/Actions.java | 310 +++++++++ .../rx/operators/OperationToAsyncTest.java | 653 ++++++++++++++++++ 12 files changed, 2186 insertions(+), 2 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationToAsync.java create mode 100644 rxjava-core/src/main/java/rx/util/functions/Action4.java create mode 100644 rxjava-core/src/main/java/rx/util/functions/Action5.java create mode 100644 rxjava-core/src/main/java/rx/util/functions/Action6.java create mode 100644 rxjava-core/src/main/java/rx/util/functions/Action7.java create mode 100644 rxjava-core/src/main/java/rx/util/functions/Action8.java create mode 100644 rxjava-core/src/main/java/rx/util/functions/Action9.java create mode 100644 rxjava-core/src/main/java/rx/util/functions/ActionN.java create mode 100644 rxjava-core/src/main/java/rx/util/functions/Actions.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperationToAsyncTest.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index ce2b883764..18df894295 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -83,6 +83,7 @@ import rx.operators.OperationTimeInterval; import rx.operators.OperationTimeout; import rx.operators.OperationTimestamp; +import rx.operators.OperationToAsync; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; import rx.operators.OperationToObservableList; @@ -108,6 +109,15 @@ import rx.util.Timestamped; import rx.util.functions.Action0; import rx.util.functions.Action1; +import rx.util.functions.Action2; +import rx.util.functions.Action3; +import rx.util.functions.Action4; +import rx.util.functions.Action5; +import rx.util.functions.Action6; +import rx.util.functions.Action7; +import rx.util.functions.Action8; +import rx.util.functions.Action9; +import rx.util.functions.ActionN; import rx.util.functions.Func0; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -4606,7 +4616,641 @@ public Observable> toSortedList() { public Observable> toSortedList(Func2 sortFunction) { return create(OperationToObservableSortedList.toSortedList(this, sortFunction)); } - + + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func0> toAsync(Action0 action) { + return OperationToAsync.toAsync(action, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func0> toAsync(Func0 func) { + return OperationToAsync.toAsync(func, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func1> toAsync(Action1 action) { + return OperationToAsync.toAsync(action, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func1> toAsync(Func1 func) { + return OperationToAsync.toAsync(func, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func2> toAsync(Action2 action) { + return OperationToAsync.toAsync(action, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func2> toAsync(Func2 func) { + return OperationToAsync.toAsync(func, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func3> toAsync(Action3 action) { + return OperationToAsync.toAsync(action, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func3> toAsync(Func3 func) { + return OperationToAsync.toAsync(func, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func4> toAsync(Action4 action) { + return OperationToAsync.toAsync(action, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func4> toAsync(Func4 func) { + return OperationToAsync.toAsync(func, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func5> toAsync(Action5 action) { + return OperationToAsync.toAsync(action, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func5> toAsync(Func5 func) { + return OperationToAsync.toAsync(func, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func6> toAsync(Action6 action) { + return OperationToAsync.toAsync(action, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func6> toAsync(Func6 func) { + return OperationToAsync.toAsync(func, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func7> toAsync(Action7 action) { + return OperationToAsync.toAsync(action, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func7> toAsync(Func7 func) { + return OperationToAsync.toAsync(func, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func8> toAsync(Action8 action) { + return OperationToAsync.toAsync(action, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func8> toAsync(Func8 func) { + return OperationToAsync.toAsync(func, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func9> toAsync(Action9 action) { + return OperationToAsync.toAsync(action, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func9> toAsync(Func9 func) { + return OperationToAsync.toAsync(func, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + */ + public static FuncN> toAsync(ActionN action) { + return OperationToAsync.toAsync(action, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + */ + public static FuncN> toAsync(FuncN func) { + return OperationToAsync.toAsync(func, Schedulers.threadPoolForAsyncConversions()); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func0> toAsync(Action0 action, Scheduler scheduler) { + return OperationToAsync.toAsync(action, scheduler); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func0> toAsync(Func0 func, Scheduler scheduler) { + return OperationToAsync.toAsync(func, scheduler); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func1> toAsync(Action1 action, Scheduler scheduler) { + return OperationToAsync.toAsync(action, scheduler); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func1> toAsync(Func1 func, Scheduler scheduler) { + return OperationToAsync.toAsync(func, scheduler); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func2> toAsync(Action2 action, Scheduler scheduler) { + return OperationToAsync.toAsync(action, scheduler); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func2> toAsync(Func2 func, Scheduler scheduler) { + return OperationToAsync.toAsync(func, scheduler); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func3> toAsync(Action3 action, Scheduler scheduler) { + return OperationToAsync.toAsync(action, scheduler); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func3> toAsync(Func3 func, Scheduler scheduler) { + return OperationToAsync.toAsync(func, scheduler); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func4> toAsync(Action4 action, Scheduler scheduler) { + return OperationToAsync.toAsync(action, scheduler); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func4> toAsync(Func4 func, Scheduler scheduler) { + return OperationToAsync.toAsync(func, scheduler); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func5> toAsync(Action5 action, Scheduler scheduler) { + return OperationToAsync.toAsync(action, scheduler); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func5> toAsync(Func5 func, Scheduler scheduler) { + return OperationToAsync.toAsync(func, scheduler); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func6> toAsync(Action6 action, Scheduler scheduler) { + return OperationToAsync.toAsync(action, scheduler); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func6> toAsync(Func6 func, Scheduler scheduler) { + return OperationToAsync.toAsync(func, scheduler); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func7> toAsync(Action7 action, Scheduler scheduler) { + return OperationToAsync.toAsync(action, scheduler); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func7> toAsync(Func7 func, Scheduler scheduler) { + return OperationToAsync.toAsync(func, scheduler); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func8> toAsync(Action8 action, Scheduler scheduler) { + return OperationToAsync.toAsync(action, scheduler); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func8> toAsync(Func8 func, Scheduler scheduler) { + return OperationToAsync.toAsync(func, scheduler); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + * @see MSDN: Observable.ToAsync + */ + public static Func9> toAsync(Action9 action, Scheduler scheduler) { + return OperationToAsync.toAsync(action, scheduler); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + * @see MSDN: Observable.ToAsync + */ + public static Func9> toAsync(Func9 func, Scheduler scheduler) { + return OperationToAsync.toAsync(func, scheduler); + } + /** + * Convert a synchronous action call into an asynchronous function + * call through an Observable sequence. + * + * @param action the action to convert + * @param scheduler the scheduler used to execute the {@code action} + * + * @return a function which returns an observable sequence which + * executes the {@code action} and emits {@code null}. + * + */ + public static FuncN> toAsync(ActionN action, Scheduler scheduler) { + return OperationToAsync.toAsync(action, scheduler); + } + /** + * Convert a synchronous function call into an asynchronous function + * call through an Observable sequence. + * + * @param func the function to convert + * @param scheduler the scheduler used to call the {@code func} + * + * @return a function which returns an observable sequence which + * executes the {@code func} and emits its returned value. + * + */ + public static FuncN> toAsync(FuncN func, Scheduler scheduler) { + return OperationToAsync.toAsync(func, scheduler); + } /** * Emit a specified set of items before beginning to emit items from the * source Observable. diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index 1b27b9bf0a..a44f01406e 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -96,7 +96,16 @@ public static Scheduler executor(ScheduledExecutorService executor) { public static Scheduler threadPoolForComputation() { return executor(COMPUTATION_EXECUTOR); } - + /** + * {@link Scheduler} intended for asynchronous conversions. + *

+ * Defaults to {@link #threadPoolForComputation()}. + * + * @return {@link ExecutorScheduler} for asynchronous conversion work. + */ + public static Scheduler threadPoolForAsyncConversions() { + return threadPoolForComputation(); + } /** * {@link Scheduler} intended for IO-bound work. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperationToAsync.java b/rxjava-core/src/main/java/rx/operators/OperationToAsync.java new file mode 100644 index 0000000000..e0987e4ca1 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationToAsync.java @@ -0,0 +1,404 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.operators; + +import rx.Observable; +import rx.Scheduler; +import rx.subjects.AsyncSubject; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Action2; +import rx.util.functions.Action3; +import rx.util.functions.Action4; +import rx.util.functions.Action5; +import rx.util.functions.Action6; +import rx.util.functions.Action7; +import rx.util.functions.Action8; +import rx.util.functions.Action9; +import rx.util.functions.ActionN; +import rx.util.functions.Actions; +import rx.util.functions.Func0; +import rx.util.functions.Func1; +import rx.util.functions.Func2; +import rx.util.functions.Func3; +import rx.util.functions.Func4; +import rx.util.functions.Func5; +import rx.util.functions.Func6; +import rx.util.functions.Func7; +import rx.util.functions.Func8; +import rx.util.functions.Func9; +import rx.util.functions.FuncN; + +/** + * Convert an action or function call into an asynchronous operation + * through an Observable. + */ +public final class OperationToAsync { + private OperationToAsync() { throw new IllegalStateException("No instances!"); } + /** + * Action0 with Scheduler. + */ + public static Func0> toAsync(final Action0 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + /** + * Func0 with Scheduler. + */ + public static Func0> toAsync(final Func0 func, final Scheduler scheduler) { + return new Func0>() { + @Override + public Observable call() { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } + /** + * Action1 with Scheduler. + */ + public static Func1> toAsync(final Action1 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + /** + * Func1 with Scheduler. + */ + public static Func1> toAsync(final Func1 func, final Scheduler scheduler) { + return new Func1>() { + @Override + public Observable call(final T1 t1) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } + /** + * Action2 with Scheduler. + */ + public static Func2> toAsync(final Action2 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + /** + * Func2 with Scheduler. + */ + public static Func2> toAsync(final Func2 func, final Scheduler scheduler) { + return new Func2>() { + @Override + public Observable call(final T1 t1, final T2 t2) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } + /** + * Action3 with Scheduler. + */ + public static Func3> toAsync(final Action3 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + /** + * Func3 with Scheduler. + */ + public static Func3> toAsync(final Func3 func, final Scheduler scheduler) { + return new Func3>() { + @Override + public Observable call(final T1 t1, final T2 t2, final T3 t3) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2, t3); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); // Rx.NET: null value ? + subject.onCompleted(); + } + }); + return subject; + } + }; + } + /** + * Action4 with Scheduler. + */ + public static Func4> toAsync(final Action4 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + /** + * Func4 with Scheduler. + */ + public static Func4> toAsync(final Func4 func, final Scheduler scheduler) { + return new Func4>() { + @Override + public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2, t3, t4); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); // Rx.NET: null value ? + subject.onCompleted(); + } + }); + return subject; + } + }; + } + /** + * Action5 with Scheduler. + */ + public static Func5> toAsync(final Action5 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + /** + * Func5 with Scheduler. + */ + public static Func5> toAsync(final Func5 func, final Scheduler scheduler) { + return new Func5>() { + @Override + public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2, t3, t4, t5); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); // Rx.NET: null value ? + subject.onCompleted(); + } + }); + return subject; + } + }; + } + /** + * Action6 with Scheduler. + */ + public static Func6> toAsync(final Action6 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + /** + * Func6 with Scheduler. + */ + public static Func6> toAsync(final Func6 func, final Scheduler scheduler) { + return new Func6>() { + @Override + public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2, t3, t4, t5, t6); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); // Rx.NET: null value ? + subject.onCompleted(); + } + }); + return subject; + } + }; + } + /** + * Action7 with Scheduler. + */ + public static Func7> toAsync(final Action7 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + /** + * Func7 with Scheduler. + */ + public static Func7> toAsync(final Func7 func, final Scheduler scheduler) { + return new Func7>() { + @Override + public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2, t3, t4, t5, t6, t7); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); // Rx.NET: null value ? + subject.onCompleted(); + } + }); + return subject; + } + }; + } + /** + * Action8 with Scheduler. + */ + public static Func8> toAsync(final Action8 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + /** + * Func8 with Scheduler. + */ + public static Func8> toAsync(final Func8 func, final Scheduler scheduler) { + return new Func8>() { + @Override + public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2, t3, t4, t5, t6, t7, t8); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); // Rx.NET: null value ? + subject.onCompleted(); + } + }); + return subject; + } + }; + } + /** + * Action9 with Scheduler. + */ + public static Func9> toAsync(final Action9 action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + /** + * Func9 with Scheduler. + */ + public static Func9> toAsync(final Func9 func, final Scheduler scheduler) { + return new Func9>() { + @Override + public Observable call(final T1 t1, final T2 t2, final T3 t3, final T4 t4, final T5 t5, final T6 t6, final T7 t7, final T8 t8, final T9 t9) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(t1, t2, t3, t4, t5, t6, t7, t8, t9); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); // Rx.NET: null value ? + subject.onCompleted(); + } + }); + return subject; + } + }; + } + /** + * ActionN with Scheduler. + */ + public static FuncN> toAsync(final ActionN action, final Scheduler scheduler) { + return toAsync(Actions.toFunc(action), scheduler); + } + /** + * FuncN with Scheduler. + */ + public static FuncN> toAsync(final FuncN func, final Scheduler scheduler) { + return new FuncN>() { + @Override + public Observable call(final Object... args) { + final AsyncSubject subject = AsyncSubject.create(); + scheduler.schedule(new Action0() { + @Override + public void call() { + R result; + try { + result = func.call(args); + } catch (Throwable t) { + subject.onError(t); + return; + } + subject.onNext(result); + subject.onCompleted(); + } + }); + return subject; + } + }; + } +} diff --git a/rxjava-core/src/main/java/rx/util/functions/Action4.java b/rxjava-core/src/main/java/rx/util/functions/Action4.java new file mode 100644 index 0000000000..f4126fcb95 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/functions/Action4.java @@ -0,0 +1,24 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.util.functions; + +/** + * A four-argument action. + */ +public interface Action4 extends Action { + void call(T1 t1, T2 t2, T3 t3, T4 t4); +} diff --git a/rxjava-core/src/main/java/rx/util/functions/Action5.java b/rxjava-core/src/main/java/rx/util/functions/Action5.java new file mode 100644 index 0000000000..14d4bfa666 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/functions/Action5.java @@ -0,0 +1,24 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.util.functions; + +/** + * A five-argument action. + */ +public interface Action5 extends Action { + void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5); +} diff --git a/rxjava-core/src/main/java/rx/util/functions/Action6.java b/rxjava-core/src/main/java/rx/util/functions/Action6.java new file mode 100644 index 0000000000..69cda2fe69 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/functions/Action6.java @@ -0,0 +1,24 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.util.functions; + +/** + * A six-argument action. + */ +public interface Action6 extends Action { + void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6); +} diff --git a/rxjava-core/src/main/java/rx/util/functions/Action7.java b/rxjava-core/src/main/java/rx/util/functions/Action7.java new file mode 100644 index 0000000000..1b7a5ade64 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/functions/Action7.java @@ -0,0 +1,23 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.functions; + +/** + * A seven-argument action. + */ +public interface Action7 extends Action { + void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7); +} diff --git a/rxjava-core/src/main/java/rx/util/functions/Action8.java b/rxjava-core/src/main/java/rx/util/functions/Action8.java new file mode 100644 index 0000000000..4ad0c7915d --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/functions/Action8.java @@ -0,0 +1,23 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.functions; + +/** + * An eight-argument action. + */ +public interface Action8 extends Action { + void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8); +} diff --git a/rxjava-core/src/main/java/rx/util/functions/Action9.java b/rxjava-core/src/main/java/rx/util/functions/Action9.java new file mode 100644 index 0000000000..09a82f4949 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/functions/Action9.java @@ -0,0 +1,23 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.functions; + +/** + * A nine-argument action. + */ +public interface Action9 extends Action { + void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9); +} diff --git a/rxjava-core/src/main/java/rx/util/functions/ActionN.java b/rxjava-core/src/main/java/rx/util/functions/ActionN.java new file mode 100644 index 0000000000..b7ef56252f --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/functions/ActionN.java @@ -0,0 +1,23 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.functions; + +/** + * A vector-argument action. + */ +public interface ActionN extends Action { + void call(Object... args); +} diff --git a/rxjava-core/src/main/java/rx/util/functions/Actions.java b/rxjava-core/src/main/java/rx/util/functions/Actions.java new file mode 100644 index 0000000000..e4d916198e --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/functions/Actions.java @@ -0,0 +1,310 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.functions; + +/** + * Utility class to support the Action0 .. Action9, ActionN interfaces. + */ +public final class Actions { + private Actions() { throw new IllegalStateException("No instances!"); } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @return {@link Func0} + */ + public static Func0 toFunc(final Action0 action) { + return toFunc(action, (Void)null); + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @return {@link Func0} + */ + public static Func1 toFunc(final Action1 action) { + return toFunc(action, (Void)null); + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @return {@link Func0} + */ + public static Func2 toFunc(final Action2 action) { + return toFunc(action, (Void)null); + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @return {@link Func0} + */ + public static Func3 toFunc(final Action3 action) { + return toFunc(action, (Void)null); + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @return {@link Func0} + */ + public static Func4 toFunc(final Action4 action) { + return toFunc(action, (Void)null); + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @return {@link Func0} + */ + public static Func5 toFunc( + final Action5 action) { + return toFunc(action, (Void)null); + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @return {@link Func0} + */ + public static Func6 toFunc( + final Action6 action) { + return toFunc(action, (Void)null); + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @return {@link Func0} + */ + public static Func7 toFunc( + final Action7 action) { + return toFunc(action, (Void)null); + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @return {@link Func0} + */ + public static Func8 toFunc( + final Action8 action) { + return toFunc(action, (Void)null); + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @return {@link Func0} + */ + public static Func9 toFunc( + final Action9 action) { + return toFunc(action, (Void)null); + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @return {@link Func0} + */ + public static FuncN toFunc( + final ActionN action) { + return toFunc(action, (Void)null); + } + /** + * Convert an action to a function which calls + * the action returns the given result. + * @param action + * @param result + * @return {@link Func0} + */ + public static Func0 toFunc(final Action0 action, final R result) { + return new Func0() { + @Override + public R call() { + action.call(); + return result; + } + }; + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @param result + * @return {@link Func0} + */ + public static Func1 toFunc(final Action1 action, final R result) { + return new Func1() { + @Override + public R call(T1 t1) { + action.call(t1); + return result; + } + }; + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @param result + * @return {@link Func0} + */ + public static Func2 toFunc(final Action2 action, final R result) { + return new Func2() { + @Override + public R call(T1 t1, T2 t2) { + action.call(t1, t2); + return result; + } + }; + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @param result + * @return {@link Func0} + */ + public static Func3 toFunc(final Action3 action, final R result) { + return new Func3() { + @Override + public R call(T1 t1, T2 t2, T3 t3) { + action.call(t1, t2, t3); + return result; + } + }; + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @param result + * @return {@link Func0} + */ + public static Func4 toFunc(final Action4 action, final R result) { + return new Func4() { + @Override + public R call(T1 t1, T2 t2, T3 t3, T4 t4) { + action.call(t1, t2, t3, t4); + return result; + } + }; + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @param result + * @return {@link Func0} + */ + public static Func5 toFunc( + final Action5 action, final R result) { + return new Func5() { + @Override + public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5) { + action.call(t1, t2, t3, t4, t5); + return result; + } + }; + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @param result + * @return {@link Func0} + */ + public static Func6 toFunc( + final Action6 action, final R result) { + return new Func6() { + @Override + public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6) { + action.call(t1, t2, t3, t4, t5, t6); + return result; + } + }; + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @param result + * @return {@link Func0} + */ + public static Func7 toFunc( + final Action7 action, final R result) { + return new Func7() { + @Override + public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7) { + action.call(t1, t2, t3, t4, t5, t6, t7); + return result; + } + }; + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @param result + * @return {@link Func0} + */ + public static Func8 toFunc( + final Action8 action, final R result) { + return new Func8() { + @Override + public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8) { + action.call(t1, t2, t3, t4, t5, t6, t7, t8); + return result; + } + }; + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @param result + * @return {@link Func0} + */ + public static Func9 toFunc( + final Action9 action, final R result) { + return new Func9() { + @Override + public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9) { + action.call(t1, t2, t3, t4, t5, t6, t7, t8, t9); + return result; + } + }; + } + /** + * Convert an action to a function which calls + * the action returns Void (null). + * @param action + * @param result + * @return {@link Func0} + */ + public static FuncN toFunc( + final ActionN action, final R result) { + return new FuncN() { + @Override + public R call(Object... args) { + action.call(args); + return result; + } + }; + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationToAsyncTest.java b/rxjava-core/src/test/java/rx/operators/OperationToAsyncTest.java new file mode 100644 index 0000000000..434cac862e --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationToAsyncTest.java @@ -0,0 +1,653 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.operators; + +import java.util.concurrent.atomic.AtomicInteger; +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Matchers.any; +import org.mockito.Mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import org.mockito.MockitoAnnotations; +import rx.Observer; +import rx.concurrency.Schedulers; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Action2; +import rx.util.functions.Action3; +import rx.util.functions.Action4; +import rx.util.functions.Action5; +import rx.util.functions.Action6; +import rx.util.functions.Action7; +import rx.util.functions.Action8; +import rx.util.functions.Action9; +import rx.util.functions.ActionN; +import rx.util.functions.Func0; +import rx.util.functions.Func1; +import rx.util.functions.Func2; +import rx.util.functions.Func3; +import rx.util.functions.Func4; +import rx.util.functions.Func5; +import rx.util.functions.Func6; +import rx.util.functions.Func7; +import rx.util.functions.Func8; +import rx.util.functions.Func9; +import rx.util.functions.FuncN; + +public class OperationToAsyncTest { + @Mock + Observer observer; + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + @Test + public void testAction0() { + final AtomicInteger value = new AtomicInteger(); + Action0 action = new Action0() { + @Override + public void call() { + value.incrementAndGet(); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call() + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(null); + verify(observer, times(1)).onCompleted(); + + Assert.assertEquals(1, value.get()); + } + @Test + public void testAction0Error() { + Action0 action = new Action0() { + @Override + public void call() { + throw new RuntimeException("Forced failure"); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call() + .subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(null); + verify(observer, never()).onCompleted(); + } + @Test + public void testAction1() { + final AtomicInteger value = new AtomicInteger(); + Action1 action = new Action1() { + @Override + public void call(Integer t1) { + value.set(t1); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(null); + verify(observer, times(1)).onCompleted(); + + Assert.assertEquals(1, value.get()); + } + @Test + public void testAction1Error() { + Action1 action = new Action1() { + @Override + public void call(Integer t1) { + throw new RuntimeException("Forced failure"); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1) + .subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(null); + verify(observer, never()).onCompleted(); + } + @Test + public void testAction2() { + final AtomicInteger value = new AtomicInteger(); + Action2 action = new Action2() { + @Override + public void call(Integer t1, Integer t2) { + value.set(t1 | t2); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(null); + verify(observer, times(1)).onCompleted(); + + Assert.assertEquals(3, value.get()); + } + @Test + public void testAction2Error() { + Action2 action = new Action2() { + @Override + public void call(Integer t1, Integer t2) { + throw new RuntimeException("Forced failure"); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2) + .subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(null); + verify(observer, never()).onCompleted(); + } + @Test + public void testAction3() { + final AtomicInteger value = new AtomicInteger(); + Action3 action = new Action3() { + @Override + public void call(Integer t1, Integer t2, Integer t3) { + value.set(t1 | t2 | t3); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(null); + verify(observer, times(1)).onCompleted(); + + Assert.assertEquals(7, value.get()); + } + @Test + public void testAction3Error() { + Action3 action = new Action3() { + @Override + public void call(Integer t1, Integer t2, Integer t3) { + throw new RuntimeException("Forced failure"); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4) + .subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(null); + verify(observer, never()).onCompleted(); + } + @Test + public void testAction4() { + final AtomicInteger value = new AtomicInteger(); + Action4 action = new Action4() { + @Override + public void call(Integer t1, Integer t2, Integer t3, Integer t4) { + value.set(t1 | t2 | t3 | t4); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4, 8) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(null); + verify(observer, times(1)).onCompleted(); + + Assert.assertEquals(15, value.get()); + } + @Test + public void testAction4Error() { + Action4 action = new Action4() { + @Override + public void call(Integer t1, Integer t2, Integer t3, Integer t4) { + throw new RuntimeException("Forced failure"); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4, 8) + .subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(null); + verify(observer, never()).onCompleted(); + } + @Test + public void testAction5() { + final AtomicInteger value = new AtomicInteger(); + Action5 action = new Action5() { + @Override + public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5) { + value.set(t1 | t2 | t3 | t4 | t5); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4, 8, 16) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(null); + verify(observer, times(1)).onCompleted(); + + Assert.assertEquals(31, value.get()); + } + @Test + public void testAction5Error() { + Action5 action = new Action5() { + @Override + public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5) { + throw new RuntimeException("Forced failure"); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4, 8, 16) + .subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(null); + verify(observer, never()).onCompleted(); + } + @Test + public void testAction6() { + final AtomicInteger value = new AtomicInteger(); + Action6 action = new Action6() { + @Override + public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6) { + value.set(t1 | t2 | t3 | t4 | t5 | t6); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(null); + verify(observer, times(1)).onCompleted(); + + Assert.assertEquals(63, value.get()); + } + @Test + public void testAction6Error() { + Action6 action = new Action6() { + @Override + public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6) { + throw new RuntimeException("Forced failure"); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32) + .subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(null); + verify(observer, never()).onCompleted(); + } + @Test + public void testAction7() { + final AtomicInteger value = new AtomicInteger(); + Action7 action = new Action7() { + @Override + public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6, Integer t7) { + value.set(t1 | t2 | t3 | t4 | t5 | t6 | t7); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32, 64) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(null); + verify(observer, times(1)).onCompleted(); + + Assert.assertEquals(127, value.get()); + } + @Test + public void testAction7Error() { + Action7 action = new Action7() { + @Override + public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6, Integer t7) { + throw new RuntimeException("Forced failure"); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32, 64) + .subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(null); + verify(observer, never()).onCompleted(); + } + @Test + public void testAction8() { + final AtomicInteger value = new AtomicInteger(); + Action8 action = new Action8() { + @Override + public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6, Integer t7, Integer t8) { + value.set(t1 | t2 | t3 | t4 | t5 | t6 | t7 | t8); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32, 64, 128) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(null); + verify(observer, times(1)).onCompleted(); + + Assert.assertEquals(255, value.get()); + } + @Test + public void testAction8Error() { + Action8 action = new Action8() { + @Override + public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6, Integer t7, Integer t8) { + throw new RuntimeException("Forced failure"); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32, 64, 128) + .subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(null); + verify(observer, never()).onCompleted(); + } + @Test + public void testAction9() { + final AtomicInteger value = new AtomicInteger(); + Action9 action = new Action9() { + @Override + public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6, Integer t7, Integer t8, Integer t9) { + value.set(t1 | t2 | t3 | t4 | t5 | t6 | t7 | t8 | t9); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32, 64, 128, 256) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(null); + verify(observer, times(1)).onCompleted(); + + Assert.assertEquals(511, value.get()); + } + @Test + public void testAction9Error() { + Action9 action = new Action9() { + @Override + public void call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6, Integer t7, Integer t8, Integer t9) { + throw new RuntimeException("Forced failure"); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32, 64, 128, 256) + .subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(null); + verify(observer, never()).onCompleted(); + } + @Test + public void testActionN() { + final AtomicInteger value = new AtomicInteger(); + ActionN action = new ActionN() { + @Override + public void call(Object... args) { + int i = 0; + for (Object o : args) { + i = i | (Integer)o; + } + value.set(i); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32, 64, 128, 256, 512) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(null); + verify(observer, times(1)).onCompleted(); + + Assert.assertEquals(1023, value.get()); + } + @Test + public void testActionNError() { + ActionN action = new ActionN() { + @Override + public void call(Object... args) { + throw new RuntimeException("Forced failure"); + } + }; + + OperationToAsync.toAsync(action, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32, 64, 128, 256, 512) + .subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(null); + verify(observer, never()).onCompleted(); + } + @Test + public void testFunc0() { + Func0 func = new Func0() { + @Override + public Integer call() { + return 0; + } + }; + OperationToAsync.toAsync(func, Schedulers.immediate()) + .call() + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onCompleted(); + + } + @Test + public void testFunc1() { + Func1 func = new Func1() { + @Override + public Integer call(Integer t1) { + return t1; + } + }; + OperationToAsync.toAsync(func, Schedulers.immediate()) + .call(1) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(1); + verify(observer, times(1)).onCompleted(); + } + @Test + public void testFunc2() { + Func2 func = new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + return t1 | t2; + } + }; + OperationToAsync.toAsync(func, Schedulers.immediate()) + .call(1, 2) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onCompleted(); + } + @Test + public void testFunc3() { + Func3 func = new Func3() { + @Override + public Integer call(Integer t1, Integer t2, Integer t3) { + return t1 | t2 | t3; + } + }; + OperationToAsync.toAsync(func, Schedulers.immediate()) + .call(1, 2, 4) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(7); + verify(observer, times(1)).onCompleted(); + } + @Test + public void testFunc4() { + Func4 func = new Func4() { + @Override + public Integer call(Integer t1, Integer t2, Integer t3, Integer t4) { + return t1 | t2 | t3 | t4; + } + }; + OperationToAsync.toAsync(func, Schedulers.immediate()) + .call(1, 2, 4, 8) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(15); + verify(observer, times(1)).onCompleted(); + } + @Test + public void testFunc5() { + Func5 func = new Func5() { + @Override + public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5) { + return t1 | t2 | t3 | t4 | t5; + } + }; + OperationToAsync.toAsync(func, Schedulers.immediate()) + .call(1, 2, 4, 8, 16) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(31); + verify(observer, times(1)).onCompleted(); + } + @Test + public void testFunc6() { + Func6 func = new Func6() { + @Override + public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6) { + return t1 | t2 | t3 | t4 | t5 | t6; + } + }; + OperationToAsync.toAsync(func, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(63); + verify(observer, times(1)).onCompleted(); + } + @Test + public void testFunc7() { + Func7 func = new Func7() { + @Override + public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6, Integer t7) { + return t1 | t2 | t3 | t4 | t5 | t6 | t7; + } + }; + OperationToAsync.toAsync(func, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32, 64) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(127); + verify(observer, times(1)).onCompleted(); + } + @Test + public void testFunc8() { + Func8 func = new Func8() { + @Override + public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6, Integer t7, Integer t8) { + return t1 | t2 | t3 | t4 | t5 | t6 | t7 | t8; + } + }; + OperationToAsync.toAsync(func, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32, 64, 128) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(255); + verify(observer, times(1)).onCompleted(); + } + @Test + public void testFunc9() { + Func9 func = new Func9() { + @Override + public Integer call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6, Integer t7, Integer t8, Integer t9) { + return t1 | t2 | t3 | t4 | t5 | t6 | t7 | t8 | t9; + } + }; + OperationToAsync.toAsync(func, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32, 64, 128, 256) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(511); + verify(observer, times(1)).onCompleted(); + } + @Test + public void testFuncN() { + FuncN func = new FuncN() { + @Override + public Integer call(Object... args) { + int i = 0; + for (Object o : args) { + i = i | (Integer)o; + } + return i; + } + }; + OperationToAsync.toAsync(func, Schedulers.immediate()) + .call(1, 2, 4, 8, 16, 32, 64, 128, 256, 512) + .subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(1023); + verify(observer, times(1)).onCompleted(); + } +}