Skip to content

Commit

Permalink
Merge pull request #970 from abersnaze/notification-lite
Browse files Browse the repository at this point in the history
Notifications for the allocation adverse.
  • Loading branch information
benjchristensen committed Mar 20, 2014
2 parents d3ead3a + 00703cf commit 5ca609b
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 163 deletions.
44 changes: 9 additions & 35 deletions rxjava-core/src/main/java/rx/observers/SerializedObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

import java.util.ArrayList;

import javax.management.NotificationListener;

import rx.Notification;
import rx.Observer;
import rx.operators.NotificationLite;

/**
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
Expand All @@ -22,21 +26,7 @@ public class SerializedObserver<T> implements Observer<T> {
private boolean emitting = false;
private boolean terminated = false;
private ArrayList<Object> queue = new ArrayList<Object>();

private static Sentinel NULL_SENTINEL = new Sentinel();
private static Sentinel COMPLETE_SENTINEL = new Sentinel();

private static class Sentinel {

}

private static class ErrorSentinel extends Sentinel {
final Throwable e;

ErrorSentinel(Throwable e) {
this.e = e;
}
}
private NotificationLite<T> on = NotificationLite.instance();

public SerializedObserver(Observer<? super T> s) {
this.actual = s;
Expand All @@ -61,7 +51,7 @@ public void onCompleted() {
}
} else {
// someone else is already emitting so just queue it
queue.add(COMPLETE_SENTINEL);
queue.add(on.completed());
}
}
if (canEmit) {
Expand Down Expand Up @@ -97,7 +87,7 @@ public void onError(final Throwable e) {
} else {
// someone else is already emitting so just queue it ... after eliminating the queue to shortcut
queue.clear();
queue.add(new ErrorSentinel(e));
queue.add(on.error(e));
}
}
if (canEmit) {
Expand Down Expand Up @@ -131,11 +121,7 @@ public void onNext(T t) {
}
} else {
// someone else is already emitting so just queue it
if (t == null) {
queue.add(NULL_SENTINEL);
} else {
queue.add(t);
}
queue.add(on.next(t));
}
}
if (canEmit) {
Expand Down Expand Up @@ -168,19 +154,7 @@ public void drainQueue(ArrayList<Object> list) {
return;
}
for (Object v : list) {
if (v != null) {
if (v instanceof Sentinel) {
if (v == NULL_SENTINEL) {
actual.onNext(null);
} else if (v == COMPLETE_SENTINEL) {
actual.onCompleted();
} else if (v instanceof ErrorSentinel) {
actual.onError(((ErrorSentinel) v).e);
}
} else {
actual.onNext((T) v);
}
}
on.accept(actual, v);
}
}
}
39 changes: 6 additions & 33 deletions rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.LinkedList;
import java.util.Queue;

import rx.Subscriber;
import rx.subscriptions.CompositeSubscription;

Expand All @@ -36,21 +37,8 @@ public class BufferUntilSubscriber<T> extends Subscriber<T> {
private final Queue<Object> queue = new LinkedList<Object>();
/** The queue capacity. */
private final int capacity;
/** Null sentinel (in case queue type is changed). */
private static final Object NULL_SENTINEL = new Object();
/** Complete sentinel. */
private static final Object COMPLETE_SENTINEL = new Object();
/**
* Container for an onError event.
*/
private static final class ErrorSentinel {
final Throwable t;
private final NotificationLite<T> on = NotificationLite.instance();

public ErrorSentinel(Throwable t) {
this.t = t;
}

}
/**
* Constructor that wraps the actual subscriber and shares its subscription.
* @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
Expand Down Expand Up @@ -85,22 +73,7 @@ public void enterPassthroughMode() {
while (!queue.isEmpty()) {
Object o = queue.poll();
if (!actual.isUnsubscribed()) {
if (o == NULL_SENTINEL) {
actual.onNext(null);
} else
if (o == COMPLETE_SENTINEL) {
actual.onCompleted();
} else
if (o instanceof ErrorSentinel) {
actual.onError(((ErrorSentinel)o).t);
} else
if (o != null) {
@SuppressWarnings("unchecked")
T v = (T)o;
actual.onNext(v);
} else {
throw new NullPointerException();
}
on.accept(actual, o);
}
}
passthroughMode = true;
Expand All @@ -115,7 +88,7 @@ public void onNext(T t) {
synchronized (gate) {
if (!passthroughMode) {
if (capacity < 0 || queue.size() < capacity) {
queue.offer(t != null ? t : NULL_SENTINEL);
queue.offer(on.next(t));
return;
}
try {
Expand All @@ -142,7 +115,7 @@ public void onError(Throwable e) {
synchronized (gate) {
if (!passthroughMode) {
if (capacity < 0 || queue.size() < capacity) {
queue.offer(new ErrorSentinel(e));
queue.offer(on.error(e));
return;
}
try {
Expand All @@ -169,7 +142,7 @@ public void onCompleted() {
synchronized (gate) {
if (!passthroughMode) {
if (capacity < 0 || queue.size() < capacity) {
queue.offer(COMPLETE_SENTINEL);
queue.offer(on.completed());
return;
}
try {
Expand Down
167 changes: 167 additions & 0 deletions rxjava-core/src/main/java/rx/operators/NotificationLite.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package rx.operators;

import java.io.ObjectStreamException;
import java.io.Serializable;

import rx.Notification;
import rx.Notification.Kind;
import rx.Observer;

/**
* For use in internal operators that need something like materialize and dematerialize wholly
* within the implementation of the operator but don't want to incur the allocation cost of actually
* creating {@link rx.Notification} objects for every onNext and onComplete.
*
* An object is allocated inside {@link #error(Throwable)} to wrap the {@link Throwable} but this
* shouldn't effect performance because exceptions should be exceptionally rare.
*
* It's implemented as a singleton to maintain some semblance of type safety that is completely
* non-existent.
*
* @author gscampbell
*
* @param <T>
*/
public final class NotificationLite<T> {
private NotificationLite() {
}

private static final NotificationLite INSTANCE = new NotificationLite();

@SuppressWarnings("unchecked")
public static <T> NotificationLite<T> instance() {
return INSTANCE;
}

private static final Object ON_COMPLETED_SENTINEL = new Serializable() {
private static final long serialVersionUID = 1;
};

private static final Object ON_NEXT_NULL_SENTINEL = new Serializable() {
private static final long serialVersionUID = 2;
};

private static class OnErrorSentinel implements Serializable {
private static final long serialVersionUID = 3;
private final Throwable e;

public OnErrorSentinel(Throwable e) {
this.e = e;
}
}

/**
* Creates a lite onNext notification for the value passed in without doing any allocation. Can
* be unwrapped and sent with the {@link #accept} method.
*
* @param t
* @return
*/
public Object next(T t) {
if (t == null)
return ON_NEXT_NULL_SENTINEL;
else
return t;
}

/**
* Creates a lite onComplete notification without doing any allocation. Can be unwrapped and
* sent with the {@link #accept} method.
*
* @return
*/
public Object completed() {
return ON_COMPLETED_SENTINEL;
}

/**
* Create a lite onError notification. This call does new up an object to wrap the
* {@link Throwable} but since there should only be one of these the performance impact should
* be small. Can be unwrapped and sent with the {@link #accept} method.
*
* @param e
* @return
*/
public Object error(Throwable e) {
return new OnErrorSentinel(e);
}

/**
* Unwraps the lite notification and calls the appropriate method on the {@link Observer}.
*
* @param o
* the {@link Observer} to call onNext, onCompleted or onError.
* @param n
* @throws IllegalArgumentException
* if the notification is null.
* @throws NullPointerException
* if the {@link Observer} is null.
*/
@SuppressWarnings("unchecked")
public void accept(Observer<? super T> o, Object n) {
switch (kind(n)) {
case OnNext:
o.onNext(getValue(n));
break;
case OnCompleted:
o.onCompleted();
break;
case OnError:
o.onError(getError(n));
break;
}
}

public boolean isCompleted(Object n) {
return n == ON_COMPLETED_SENTINEL;
}

public boolean isError(Object n) {
return n instanceof OnErrorSentinel;
}

/**
* If there is custom logic that isn't as simple as call the right method on an {@link Observer}
* then this method can be used to get the {@link Notification.Kind}
*
* @param n
* @return
*/
public Kind kind(Object n) {
if (n == null)
throw new IllegalArgumentException("The lite notification can not be null");
else if (n == ON_COMPLETED_SENTINEL)
return Kind.OnCompleted;
else if (n instanceof OnErrorSentinel)
return Kind.OnError;
else
// value or ON_NEXT_NULL_SENTINEL but either way it's an OnNext
return Kind.OnNext;
}

/**
* returns value passed in {@link #next(Object)} method call. Bad things happen if you call this
* the onComplete or onError notification type. For performance you are expected to use this
* when it is appropriate.
*
* @param n
* @return
*/
@SuppressWarnings("unchecked")
public T getValue(Object n) {
return n == ON_NEXT_NULL_SENTINEL ? null : (T) n;
}

/**
* returns {@link Throwable} passed in {@link #error(Throwable)} method call. Bad things happen
* if you
* call this the onComplete or onNext notification type. For performance you are expected to use
* this when it is appropriate.
*
* @param n
* @return The {@link Throwable} wrapped inside n
*/
public Throwable getError(Object n) {
return ((OnErrorSentinel) n).e;
}
}
Loading

0 comments on commit 5ca609b

Please sign in to comment.