diff --git a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java index 93f1632973..8c87a7b855 100644 --- a/rxjava-core/src/main/java/rx/observers/SerializedObserver.java +++ b/rxjava-core/src/main/java/rx/observers/SerializedObserver.java @@ -2,7 +2,11 @@ import java.util.ArrayList; +import javax.management.NotificationListener; + +import rx.Notification; import rx.Observer; +import rx.operators.NotificationLite; /** * Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError. @@ -22,21 +26,7 @@ public class SerializedObserver implements Observer { private boolean emitting = false; private boolean terminated = false; private ArrayList queue = new ArrayList(); - - private static Sentinel NULL_SENTINEL = new Sentinel(); - private static Sentinel COMPLETE_SENTINEL = new Sentinel(); - - private static class Sentinel { - - } - - private static class ErrorSentinel extends Sentinel { - final Throwable e; - - ErrorSentinel(Throwable e) { - this.e = e; - } - } + private NotificationLite on = NotificationLite.instance(); public SerializedObserver(Observer s) { this.actual = s; @@ -61,7 +51,7 @@ public void onCompleted() { } } else { // someone else is already emitting so just queue it - queue.add(COMPLETE_SENTINEL); + queue.add(on.completed()); } } if (canEmit) { @@ -97,7 +87,7 @@ public void onError(final Throwable e) { } else { // someone else is already emitting so just queue it ... after eliminating the queue to shortcut queue.clear(); - queue.add(new ErrorSentinel(e)); + queue.add(on.error(e)); } } if (canEmit) { @@ -131,11 +121,7 @@ public void onNext(T t) { } } else { // someone else is already emitting so just queue it - if (t == null) { - queue.add(NULL_SENTINEL); - } else { - queue.add(t); - } + queue.add(on.next(t)); } } if (canEmit) { @@ -168,19 +154,7 @@ public void drainQueue(ArrayList list) { return; } for (Object v : list) { - if (v != null) { - if (v instanceof Sentinel) { - if (v == NULL_SENTINEL) { - actual.onNext(null); - } else if (v == COMPLETE_SENTINEL) { - actual.onCompleted(); - } else if (v instanceof ErrorSentinel) { - actual.onError(((ErrorSentinel) v).e); - } - } else { - actual.onNext((T) v); - } - } + on.accept(actual, v); } } } diff --git a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java index 7526674636..c849421ee5 100644 --- a/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java +++ b/rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java @@ -17,6 +17,7 @@ import java.util.LinkedList; import java.util.Queue; + import rx.Subscriber; import rx.subscriptions.CompositeSubscription; @@ -36,21 +37,8 @@ public class BufferUntilSubscriber extends Subscriber { private final Queue queue = new LinkedList(); /** The queue capacity. */ private final int capacity; - /** Null sentinel (in case queue type is changed). */ - private static final Object NULL_SENTINEL = new Object(); - /** Complete sentinel. */ - private static final Object COMPLETE_SENTINEL = new Object(); - /** - * Container for an onError event. - */ - private static final class ErrorSentinel { - final Throwable t; + private final NotificationLite on = NotificationLite.instance(); - public ErrorSentinel(Throwable t) { - this.t = t; - } - - } /** * Constructor that wraps the actual subscriber and shares its subscription. * @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue @@ -85,22 +73,7 @@ public void enterPassthroughMode() { while (!queue.isEmpty()) { Object o = queue.poll(); if (!actual.isUnsubscribed()) { - if (o == NULL_SENTINEL) { - actual.onNext(null); - } else - if (o == COMPLETE_SENTINEL) { - actual.onCompleted(); - } else - if (o instanceof ErrorSentinel) { - actual.onError(((ErrorSentinel)o).t); - } else - if (o != null) { - @SuppressWarnings("unchecked") - T v = (T)o; - actual.onNext(v); - } else { - throw new NullPointerException(); - } + on.accept(actual, o); } } passthroughMode = true; @@ -115,7 +88,7 @@ public void onNext(T t) { synchronized (gate) { if (!passthroughMode) { if (capacity < 0 || queue.size() < capacity) { - queue.offer(t != null ? t : NULL_SENTINEL); + queue.offer(on.next(t)); return; } try { @@ -142,7 +115,7 @@ public void onError(Throwable e) { synchronized (gate) { if (!passthroughMode) { if (capacity < 0 || queue.size() < capacity) { - queue.offer(new ErrorSentinel(e)); + queue.offer(on.error(e)); return; } try { @@ -169,7 +142,7 @@ public void onCompleted() { synchronized (gate) { if (!passthroughMode) { if (capacity < 0 || queue.size() < capacity) { - queue.offer(COMPLETE_SENTINEL); + queue.offer(on.completed()); return; } try { diff --git a/rxjava-core/src/main/java/rx/operators/NotificationLite.java b/rxjava-core/src/main/java/rx/operators/NotificationLite.java new file mode 100644 index 0000000000..54e807855e --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/NotificationLite.java @@ -0,0 +1,167 @@ +package rx.operators; + +import java.io.ObjectStreamException; +import java.io.Serializable; + +import rx.Notification; +import rx.Notification.Kind; +import rx.Observer; + +/** + * For use in internal operators that need something like materialize and dematerialize wholly + * within the implementation of the operator but don't want to incur the allocation cost of actually + * creating {@link rx.Notification} objects for every onNext and onComplete. + * + * An object is allocated inside {@link #error(Throwable)} to wrap the {@link Throwable} but this + * shouldn't effect performance because exceptions should be exceptionally rare. + * + * It's implemented as a singleton to maintain some semblance of type safety that is completely + * non-existent. + * + * @author gscampbell + * + * @param + */ +public final class NotificationLite { + private NotificationLite() { + } + + private static final NotificationLite INSTANCE = new NotificationLite(); + + @SuppressWarnings("unchecked") + public static NotificationLite instance() { + return INSTANCE; + } + + private static final Object ON_COMPLETED_SENTINEL = new Serializable() { + private static final long serialVersionUID = 1; + }; + + private static final Object ON_NEXT_NULL_SENTINEL = new Serializable() { + private static final long serialVersionUID = 2; + }; + + private static class OnErrorSentinel implements Serializable { + private static final long serialVersionUID = 3; + private final Throwable e; + + public OnErrorSentinel(Throwable e) { + this.e = e; + } + } + + /** + * Creates a lite onNext notification for the value passed in without doing any allocation. Can + * be unwrapped and sent with the {@link #accept} method. + * + * @param t + * @return + */ + public Object next(T t) { + if (t == null) + return ON_NEXT_NULL_SENTINEL; + else + return t; + } + + /** + * Creates a lite onComplete notification without doing any allocation. Can be unwrapped and + * sent with the {@link #accept} method. + * + * @return + */ + public Object completed() { + return ON_COMPLETED_SENTINEL; + } + + /** + * Create a lite onError notification. This call does new up an object to wrap the + * {@link Throwable} but since there should only be one of these the performance impact should + * be small. Can be unwrapped and sent with the {@link #accept} method. + * + * @param e + * @return + */ + public Object error(Throwable e) { + return new OnErrorSentinel(e); + } + + /** + * Unwraps the lite notification and calls the appropriate method on the {@link Observer}. + * + * @param o + * the {@link Observer} to call onNext, onCompleted or onError. + * @param n + * @throws IllegalArgumentException + * if the notification is null. + * @throws NullPointerException + * if the {@link Observer} is null. + */ + @SuppressWarnings("unchecked") + public void accept(Observer o, Object n) { + switch (kind(n)) { + case OnNext: + o.onNext(getValue(n)); + break; + case OnCompleted: + o.onCompleted(); + break; + case OnError: + o.onError(getError(n)); + break; + } + } + + public boolean isCompleted(Object n) { + return n == ON_COMPLETED_SENTINEL; + } + + public boolean isError(Object n) { + return n instanceof OnErrorSentinel; + } + + /** + * If there is custom logic that isn't as simple as call the right method on an {@link Observer} + * then this method can be used to get the {@link Notification.Kind} + * + * @param n + * @return + */ + public Kind kind(Object n) { + if (n == null) + throw new IllegalArgumentException("The lite notification can not be null"); + else if (n == ON_COMPLETED_SENTINEL) + return Kind.OnCompleted; + else if (n instanceof OnErrorSentinel) + return Kind.OnError; + else + // value or ON_NEXT_NULL_SENTINEL but either way it's an OnNext + return Kind.OnNext; + } + + /** + * returns value passed in {@link #next(Object)} method call. Bad things happen if you call this + * the onComplete or onError notification type. For performance you are expected to use this + * when it is appropriate. + * + * @param n + * @return + */ + @SuppressWarnings("unchecked") + public T getValue(Object n) { + return n == ON_NEXT_NULL_SENTINEL ? null : (T) n; + } + + /** + * returns {@link Throwable} passed in {@link #error(Throwable)} method call. Bad things happen + * if you + * call this the onComplete or onNext notification type. For performance you are expected to use + * this when it is appropriate. + * + * @param n + * @return The {@link Throwable} wrapped inside n + */ + public Throwable getError(Object n) { + return ((OnErrorSentinel) n).e; + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java index c2457dd8d2..9a3b6937f5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java @@ -55,20 +55,7 @@ public Subscriber call(Subscriber child) { } } - private static class Sentinel { - - } - - private static Sentinel NULL_SENTINEL = new Sentinel(); - private static Sentinel COMPLETE_SENTINEL = new Sentinel(); - - private static class ErrorSentinel extends Sentinel { - final Throwable e; - - ErrorSentinel(Throwable e) { - this.e = e; - } - } + private final NotificationLite on = NotificationLite.instance(); /** Observe through individual queue per observer. */ private class ObserveOnSubscriber extends Subscriber { @@ -85,23 +72,19 @@ public ObserveOnSubscriber(Subscriber observer) { @Override public void onNext(final T t) { - if (t == null) { - queue.offer(NULL_SENTINEL); - } else { - queue.offer(t); - } + queue.offer(on.next(t)); schedule(); } @Override public void onCompleted() { - queue.offer(COMPLETE_SENTINEL); + queue.offer(on.completed()); schedule(); } @Override public void onError(final Throwable e) { - queue.offer(new ErrorSentinel(e)); + queue.offer(on.error(e)); schedule(); } @@ -134,19 +117,7 @@ public void call(Inner inner) { private void pollQueue() { do { Object v = queue.poll(); - if (v != null) { - if (v instanceof Sentinel) { - if (v == NULL_SENTINEL) { - observer.onNext(null); - } else if (v == COMPLETE_SENTINEL) { - observer.onCompleted(); - } else if (v instanceof ErrorSentinel) { - observer.onError(((ErrorSentinel) v).e); - } - } else { - observer.onNext((T) v); - } - } + on.accept(observer, v); } while (counter.decrementAndGet() > 0); } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOnBounded.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOnBounded.java index 97c53ae431..7901a86f4f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorObserveOnBounded.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOnBounded.java @@ -32,17 +32,18 @@ /** * Delivers events on the specified Scheduler. *

- * This provides backpressure by blocking the incoming onNext when there is already one in the queue. + * This provides backpressure by blocking the incoming onNext when there is already one in the + * queue. *

- * This means that at any given time the max number of "onNext" in flight is 3: - * -> 1 being delivered on the Scheduler - * -> 1 in the queue waiting for the Scheduler - * -> 1 blocking on the queue waiting to deliver it + * This means that at any given time the max number of "onNext" in flight is 3: -> 1 being delivered + * on the Scheduler -> 1 in the queue waiting for the Scheduler -> 1 blocking on the queue waiting + * to deliver it * - * I have chosen to allow 1 in the queue rather than using an Exchanger style process so that the Scheduler - * can loop and have something to do each time around to optimize for avoiding rescheduling when it - * can instead just loop. I'm avoiding having the Scheduler thread ever block as it could be an event-loop - * thus if the queue is empty it exits and next time something is added it will reschedule. + * I have chosen to allow 1 in the queue rather than using an Exchanger style process so that the + * Scheduler can loop and have something to do each time around to optimize for avoiding + * rescheduling when it can instead just loop. I'm avoiding having the Scheduler thread ever block + * as it could be an event-loop thus if the queue is empty it exits and next time something is added + * it will reschedule. * * */ @@ -97,16 +98,7 @@ public Subscriber call(Subscriber child) { } } - private static Object NULL_SENTINEL = new Object(); - private static Object COMPLETE_SENTINEL = new Object(); - - private static class ErrorSentinel { - final Throwable e; - - ErrorSentinel(Throwable e) { - this.e = e; - } - } + private final NotificationLite on = NotificationLite.instance(); /** Observe through individual queue per observer. */ private class ObserveOnSubscriber extends Subscriber { @@ -126,11 +118,7 @@ public void onNext(final T t) { try { // we want to block for natural back-pressure // so that the producer waits for each value to be consumed - if (t == null) { - queue.addBlocking(NULL_SENTINEL); - } else { - queue.addBlocking(t); - } + queue.addBlocking(on.next(t)); schedule(); } catch (InterruptedException e) { if (!isUnsubscribed()) { @@ -144,7 +132,7 @@ public void onCompleted() { try { // we want to block for natural back-pressure // so that the producer waits for each value to be consumed - queue.addBlocking(COMPLETE_SENTINEL); + queue.addBlocking(on.completed()); schedule(); } catch (InterruptedException e) { onError(e); @@ -156,7 +144,7 @@ public void onError(final Throwable e) { try { // we want to block for natural back-pressure // so that the producer waits for each value to be consumed - queue.addBlocking(new ErrorSentinel(e)); + queue.addBlocking(on.error(e)); schedule(); } catch (InterruptedException e2) { // call directly if we can't schedule @@ -205,26 +193,18 @@ public void call(Inner inner) { private void pollQueue() { do { Object v = queue.poll(); - if (v != null) { - if (v == NULL_SENTINEL) { - observer.onNext(null); - } else if (v == COMPLETE_SENTINEL) { - observer.onCompleted(); - } else if (v instanceof ErrorSentinel) { - observer.onError(((ErrorSentinel) v).e); - } else { - observer.onNext((T) v); - } - } + on.accept(observer, v); } while (counter.decrementAndGet() > 0); } } /** - * Single-producer-single-consumer queue (only thread-safe for 1 producer thread with 1 consumer thread). + * Single-producer-single-consumer queue (only thread-safe for 1 producer thread with 1 consumer + * thread). * - * This supports an interrupt() being called externally rather than needing to interrupt the thread. This allows + * This supports an interrupt() being called externally rather than needing to interrupt the + * thread. This allows * unsubscribe behavior when this queue is being used. * * @param diff --git a/rxjava-core/src/main/java/rx/operators/OperatorZip.java b/rxjava-core/src/main/java/rx/operators/OperatorZip.java index 21da75d417..9ecc99b376 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorZip.java @@ -134,6 +134,7 @@ public void onNext(Observable[] observables) { }; } + static final NotificationLite on = NotificationLite.instance(); private static class Zip { @SuppressWarnings("rawtypes") final Observable[] os; @@ -141,9 +142,7 @@ private static class Zip { final Observer observer; final FuncN zipFunction; final CompositeSubscription childSubscription = new CompositeSubscription(); - - static Object NULL_SENTINEL = new Object(); - static Object COMPLETE_SENTINEL = new Object(); + @SuppressWarnings("rawtypes") public Zip(Observable[] os, final Subscriber observer, FuncN zipFunction) { @@ -180,23 +179,26 @@ public void zip() { void tick() { if (counter.getAndIncrement() == 0) { do { - Object[] vs = new Object[observers.length]; + final Object[] vs = new Object[observers.length]; boolean allHaveValues = true; for (int i = 0; i < observers.length; i++) { - vs[i] = ((InnerObserver) observers[i]).items.peek(); - if (vs[i] == NULL_SENTINEL) { - // special handling for null - vs[i] = null; - } else if (vs[i] == COMPLETE_SENTINEL) { - // special handling for onComplete + Object n = ((InnerObserver) observers[i]).items.peek(); + + if (n == null) { + allHaveValues = false; + continue; + } + + switch (on.kind(n)) { + case OnNext: + vs[i] = on.getValue(n); + break; + case OnCompleted: observer.onCompleted(); - // we need to unsubscribe from all children since children are independently subscribed + // we need to unsubscribe from all children since children are + // independently subscribed childSubscription.unsubscribe(); return; - } else if (vs[i] == null) { - allHaveValues = false; - // we continue as there may be an onCompleted on one of the others - continue; } } if (allHaveValues) { @@ -211,7 +213,7 @@ void tick() { for (int i = 0; i < observers.length; i++) { ((InnerObserver) observers[i]).items.poll(); // eagerly check if the next item on this queue is an onComplete - if (((InnerObserver) observers[i]).items.peek() == COMPLETE_SENTINEL) { + if (on.isCompleted(((InnerObserver) observers[i]).items.peek())) { // it is an onComplete so shut down observer.onCompleted(); // we need to unsubscribe from all children since children are independently subscribed @@ -235,7 +237,7 @@ final class InnerObserver extends Subscriber { @SuppressWarnings("unchecked") @Override public void onCompleted() { - items.add(COMPLETE_SENTINEL); + items.add(on.completed()); tick(); } @@ -248,11 +250,7 @@ public void onError(Throwable e) { @SuppressWarnings("unchecked") @Override public void onNext(Object t) { - if (t == null) { - items.add(NULL_SENTINEL); - } else { - items.add(t); - } + items.add(on.next(t)); tick(); } };