Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OperatorToObservableFuture #1088

Merged
merged 1 commit into from
Apr 24, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import rx.operators.OperationTimer;
import rx.operators.OperationToMap;
import rx.operators.OperationToMultimap;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationUsing;
import rx.operators.OperationWindow;
import rx.operators.OperatorAll;
Expand Down Expand Up @@ -121,6 +120,7 @@
import rx.operators.OperatorTimeout;
import rx.operators.OperatorTimeoutWithSelector;
import rx.operators.OperatorTimestamp;
import rx.operators.OperatorToObservableFuture;
import rx.operators.OperatorToObservableList;
import rx.operators.OperatorToObservableSortedList;
import rx.operators.OperatorUnsubscribeOn;
Expand Down Expand Up @@ -1083,7 +1083,7 @@ public final static <T> Observable<T> error(Throwable exception, Scheduler sched
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-from">RxJava Wiki: from()</a>
*/
public final static <T> Observable<T> from(Future<? extends T> future) {
return create(OperationToObservableFuture.toObservableFuture(future));
return create(OperatorToObservableFuture.toObservableFuture(future));
}

/**
Expand All @@ -1109,7 +1109,7 @@ public final static <T> Observable<T> from(Future<? extends T> future) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-from">RxJava Wiki: from()</a>
*/
public final static <T> Observable<T> from(Future<? extends T> future, long timeout, TimeUnit unit) {
return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit));
return create(OperatorToObservableFuture.toObservableFuture(future, timeout, unit));
}

/**
Expand All @@ -1132,7 +1132,7 @@ public final static <T> Observable<T> from(Future<? extends T> future, long time
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-from">RxJava Wiki: from()</a>
*/
public final static <T> Observable<T> from(Future<? extends T> future, Scheduler scheduler) {
return create(OperationToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler);
return create(OperatorToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;

/**
Expand All @@ -34,15 +34,15 @@
* This is blocking so the Subscription returned when calling
* <code>Observable.unsafeSubscribe(Observer)</code> does nothing.
*/
public class OperationToObservableFuture {
/* package accessible for unit tests */static class ToObservableFuture<T> implements OnSubscribeFunc<T> {
public class OperatorToObservableFuture {
/* package accessible for unit tests */static class ToObservableFuture<T> implements OnSubscribe<T> {
private final Future<? extends T> that;
private final Long time;
private final long time;
private final TimeUnit unit;

public ToObservableFuture(Future<? extends T> that) {
this.that = that;
this.time = null;
this.time = 0;
this.unit = null;
}

Expand All @@ -53,29 +53,34 @@ public ToObservableFuture(Future<? extends T> that, long time, TimeUnit unit) {
}

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
try {
T value = (time == null) ? (T) that.get() : (T) that.get(time, unit);

if (!that.isCancelled()) {
observer.onNext(value);
public void call(Subscriber<? super T> subscriber) {
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
// If the Future is already completed, "cancel" does nothing.
that.cancel(true);
}
observer.onCompleted();
}));
try {
T value = (unit == null) ? (T) that.get() : (T) that.get(time, unit);
subscriber.onNext(value);
subscriber.onCompleted();
} catch (Throwable e) {
observer.onError(e);
// If this Observable is unsubscribed, we will receive an CancellationException.
// However, CancellationException will not be passed to the final Subscriber
// since it's already subscribed.
// If the Future is canceled in other place, CancellationException will be still
// passed to the final Subscriber.
subscriber.onError(e);
}

// the get() has already completed so there is no point in
// giving the user a way to cancel.
return Subscriptions.empty();
}
}

public static <T> OnSubscribeFunc<T> toObservableFuture(final Future<? extends T> that) {
public static <T> OnSubscribe<T> toObservableFuture(final Future<? extends T> that) {
return new ToObservableFuture<T>(that);
}

public static <T> OnSubscribeFunc<T> toObservableFuture(final Future<? extends T> that, long time, TimeUnit unit) {
public static <T> OnSubscribe<T> toObservableFuture(final Future<? extends T> that, long time, TimeUnit unit) {
return new ToObservableFuture<T>(that, time, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.operators;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -25,44 +26,42 @@

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observers.TestObserver;
import rx.operators.OperationToObservableFuture.ToObservableFuture;

public class OperationToObservableFutureTest {
public class OperatorToObservableFutureTest {

@Test
public void testSuccess() throws Exception {
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
ToObservableFuture<Object> ob = new ToObservableFuture<Object>(future);
Observer<Object> o = mock(Observer.class);

Subscription sub = ob.onSubscribe(new TestObserver<Object>(o));
Subscription sub = Observable.from(future).subscribe(new TestObserver<Object>(o));
sub.unsubscribe();

verify(o, times(1)).onNext(value);
verify(o, times(1)).onCompleted();
verify(o, never()).onError(null);
verify(future, never()).cancel(true);
verify(o, never()).onError(any(Throwable.class));
verify(future, times(1)).cancel(true);
}

@Test
public void testFailure() throws Exception {
Future<Object> future = mock(Future.class);
RuntimeException e = new RuntimeException();
when(future.get()).thenThrow(e);
ToObservableFuture<Object> ob = new ToObservableFuture<Object>(future);
Observer<Object> o = mock(Observer.class);

Subscription sub = ob.onSubscribe(new TestObserver<Object>(o));
Subscription sub = Observable.from(future).subscribe(new TestObserver<Object>(o));
sub.unsubscribe();

verify(o, never()).onNext(null);
verify(o, never()).onCompleted();
verify(o, times(1)).onError(e);
verify(future, never()).cancel(true);
verify(future, times(1)).cancel(true);
}
}