Skip to content

Commit

Permalink
Blocking subscribe methods for convenience
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Aug 21, 2015
1 parent 189928c commit 2c915ad
Show file tree
Hide file tree
Showing 2 changed files with 376 additions and 33 deletions.
245 changes: 232 additions & 13 deletions src/main/java/rx/observables/BlockingObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,19 @@
*/
package rx.observables;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import rx.*;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.internal.operators.BlockingOperatorLatest;
import rx.internal.operators.BlockingOperatorMostRecent;
import rx.internal.operators.BlockingOperatorNext;
import rx.internal.operators.BlockingOperatorToFuture;
import rx.internal.operators.BlockingOperatorToIterator;
import rx.Observer;
import rx.annotations.Experimental;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.*;
import rx.internal.operators.*;
import rx.internal.util.UtilityFunctions;
import rx.subscriptions.Subscriptions;

/**
* {@code BlockingObservable} is a variety of {@link Observable} that provides blocking operators. It can be
Expand Down Expand Up @@ -477,4 +473,227 @@ private void awaitForComplete(CountDownLatch latch, Subscription subscription) {
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
}
}

/**
* Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
*/
@Experimental
public void run() {
final CountDownLatch cdl = new CountDownLatch(1);
final Throwable[] error = { null };
Subscription s = o.subscribe(new Subscriber<T>() {
@Override
public void onNext(T t) {

}
@Override
public void onError(Throwable e) {
error[0] = e;
cdl.countDown();
}

@Override
public void onCompleted() {
cdl.countDown();
}
});

awaitForComplete(cdl, s);
Throwable e = error[0];
if (e != null) {
if (e instanceof RuntimeException) {
throw (RuntimeException)e;
} else {
throw new RuntimeException(e);
}
}
}

/**
* Subscribes to the source and calls back the Observer methods on the current thread.
* @param observer the observer to call event methods on
*/
@Experimental
public void subscribe(Observer<? super T> observer) {
final NotificationLite<T> nl = NotificationLite.instance();
final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();

Subscription s = o.subscribe(new Subscriber<T>() {
@Override
public void onNext(T t) {
queue.offer(nl.next(t));
}
@Override
public void onError(Throwable e) {
queue.offer(nl.error(e));
}
@Override
public void onCompleted() {
queue.offer(nl.completed());
}
});

try {
for (;;) {
Object o = queue.poll();
if (o == null) {
o = queue.take();
}
if (nl.accept(observer, o)) {
return;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
observer.onError(e);
} finally {
s.unsubscribe();
}
}

/** Constant to indicate the onStart method should be called. */
private static final Object ON_START = new Object();

/** Constant indicating the setProducer method should be called. */
private static final Object SET_PRODUCER = new Object();

/** Indicates an unsubscripton happened */
private static final Object UNSUBSCRIBE = new Object();

/**
* Subscribes to the source and calls the Subscriber methods on the current thread.
* <p>
* The unsubscription and backpressure is composed through.
* @param subscriber the subscriber to forward events and calls to in the current thread
*/
@Experimental
public void subscribe(Subscriber<? super T> subscriber) {
final NotificationLite<T> nl = NotificationLite.instance();
final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
final Producer[] theProducer = { null };

Subscriber<T> s = new Subscriber<T>() {
@Override
public void onNext(T t) {
queue.offer(nl.next(t));
}
@Override
public void onError(Throwable e) {
queue.offer(nl.error(e));
}
@Override
public void onCompleted() {
queue.offer(nl.completed());
}

@Override
public void setProducer(Producer p) {
theProducer[0] = p;
queue.offer(SET_PRODUCER);
}

@Override
public void onStart() {
queue.offer(ON_START);
}
};

subscriber.add(s);
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
queue.offer(UNSUBSCRIBE);
}
}));

o.subscribe(s);

try {
for (;;) {
if (subscriber.isUnsubscribed()) {
break;
}
Object o = queue.poll();
if (o == null) {
o = queue.take();
}
if (subscriber.isUnsubscribed() || o == UNSUBSCRIBE) {
break;
}
if (o == ON_START) {
subscriber.onStart();
} else
if (o == SET_PRODUCER) {
subscriber.setProducer(theProducer[0]);
} else
if (nl.accept(subscriber, o)) {
return;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
subscriber.onError(e);
} finally {
s.unsubscribe();
}
}

/**
* Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
*/
@Experimental
public void subscribe() {
run();
}

/**
* Subscribes to the source and calls the given action on the current thread and rethrows any exception wrapped
* into OnErrorNotImplementedException.
* @param onNext the callback action for each source value
*/
@Experimental
public void subscribe(final Action1<? super T> onNext) {
subscribe(onNext, new Action1<Throwable>() {
@Override
public void call(Throwable t) {
throw new OnErrorNotImplementedException(t);
}
}, Actions.empty());
}

/**
* Subscribes to the source and calls the given actions on the current thread.
* @param onNext the callback action for each source value
* @param onError the callback action for an error event
*/
@Experimental
public void subscribe(final Action1<? super T> onNext, final Action1<? super Throwable> onError) {
subscribe(onNext, onError, Actions.empty());
}

/**
* Subscribes to the source and calls the given actions on the current thread.
* @param onNext the callback action for each source value
* @param onError the callback action for an error event
* @param onCompleted the callback action for the completion event.
*/
@Experimental
public void subscribe(final Action1<? super T> onNext, final Action1<? super Throwable> onError, final Action0 onCompleted) {
subscribe(new Observer<T>() {
@Override
public void onNext(T t) {
onNext.call(t);
}

@Override
public void onError(Throwable e) {
onError.call(e);
}

@Override
public void onCompleted() {
onCompleted.call();
}
});
}
}

0 comments on commit 2c915ad

Please sign in to comment.