Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notifications for the allocation averse. #970

Merged
merged 1 commit into from
Mar 20, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 9 additions & 35 deletions rxjava-core/src/main/java/rx/observers/SerializedObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

import java.util.ArrayList;

import javax.management.NotificationListener;

import rx.Notification;
import rx.Observer;
import rx.operators.NotificationLite;

/**
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
Expand All @@ -22,21 +26,7 @@ public class SerializedObserver<T> implements Observer<T> {
private boolean emitting = false;
private boolean terminated = false;
private ArrayList<Object> queue = new ArrayList<Object>();

private static Sentinel NULL_SENTINEL = new Sentinel();
private static Sentinel COMPLETE_SENTINEL = new Sentinel();

private static class Sentinel {

}

private static class ErrorSentinel extends Sentinel {
final Throwable e;

ErrorSentinel(Throwable e) {
this.e = e;
}
}
private NotificationLite<T> on = NotificationLite.instance();

public SerializedObserver(Observer<? super T> s) {
this.actual = s;
Expand All @@ -61,7 +51,7 @@ public void onCompleted() {
}
} else {
// someone else is already emitting so just queue it
queue.add(COMPLETE_SENTINEL);
queue.add(on.completed());
}
}
if (canEmit) {
Expand Down Expand Up @@ -97,7 +87,7 @@ public void onError(final Throwable e) {
} else {
// someone else is already emitting so just queue it ... after eliminating the queue to shortcut
queue.clear();
queue.add(new ErrorSentinel(e));
queue.add(on.error(e));
}
}
if (canEmit) {
Expand Down Expand Up @@ -131,11 +121,7 @@ public void onNext(T t) {
}
} else {
// someone else is already emitting so just queue it
if (t == null) {
queue.add(NULL_SENTINEL);
} else {
queue.add(t);
}
queue.add(on.next(t));
}
}
if (canEmit) {
Expand Down Expand Up @@ -168,19 +154,7 @@ public void drainQueue(ArrayList<Object> list) {
return;
}
for (Object v : list) {
if (v != null) {
if (v instanceof Sentinel) {
if (v == NULL_SENTINEL) {
actual.onNext(null);
} else if (v == COMPLETE_SENTINEL) {
actual.onCompleted();
} else if (v instanceof ErrorSentinel) {
actual.onError(((ErrorSentinel) v).e);
}
} else {
actual.onNext((T) v);
}
}
on.accept(actual, v);
}
}
}
39 changes: 6 additions & 33 deletions rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.LinkedList;
import java.util.Queue;

import rx.Subscriber;
import rx.subscriptions.CompositeSubscription;

Expand All @@ -36,21 +37,8 @@ public class BufferUntilSubscriber<T> extends Subscriber<T> {
private final Queue<Object> queue = new LinkedList<Object>();
/** The queue capacity. */
private final int capacity;
/** Null sentinel (in case queue type is changed). */
private static final Object NULL_SENTINEL = new Object();
/** Complete sentinel. */
private static final Object COMPLETE_SENTINEL = new Object();
/**
* Container for an onError event.
*/
private static final class ErrorSentinel {
final Throwable t;
private final NotificationLite<T> on = NotificationLite.instance();

public ErrorSentinel(Throwable t) {
this.t = t;
}

}
/**
* Constructor that wraps the actual subscriber and shares its subscription.
* @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
Expand Down Expand Up @@ -85,22 +73,7 @@ public void enterPassthroughMode() {
while (!queue.isEmpty()) {
Object o = queue.poll();
if (!actual.isUnsubscribed()) {
if (o == NULL_SENTINEL) {
actual.onNext(null);
} else
if (o == COMPLETE_SENTINEL) {
actual.onCompleted();
} else
if (o instanceof ErrorSentinel) {
actual.onError(((ErrorSentinel)o).t);
} else
if (o != null) {
@SuppressWarnings("unchecked")
T v = (T)o;
actual.onNext(v);
} else {
throw new NullPointerException();
}
on.accept(actual, o);
}
}
passthroughMode = true;
Expand All @@ -115,7 +88,7 @@ public void onNext(T t) {
synchronized (gate) {
if (!passthroughMode) {
if (capacity < 0 || queue.size() < capacity) {
queue.offer(t != null ? t : NULL_SENTINEL);
queue.offer(on.next(t));
return;
}
try {
Expand All @@ -142,7 +115,7 @@ public void onError(Throwable e) {
synchronized (gate) {
if (!passthroughMode) {
if (capacity < 0 || queue.size() < capacity) {
queue.offer(new ErrorSentinel(e));
queue.offer(on.error(e));
return;
}
try {
Expand All @@ -169,7 +142,7 @@ public void onCompleted() {
synchronized (gate) {
if (!passthroughMode) {
if (capacity < 0 || queue.size() < capacity) {
queue.offer(COMPLETE_SENTINEL);
queue.offer(on.completed());
return;
}
try {
Expand Down
167 changes: 167 additions & 0 deletions rxjava-core/src/main/java/rx/operators/NotificationLite.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package rx.operators;

import java.io.ObjectStreamException;
import java.io.Serializable;

import rx.Notification;
import rx.Notification.Kind;
import rx.Observer;

/**
* For use in internal operators that need something like materialize and dematerialize wholly
* within the implementation of the operator but don't want to incur the allocation cost of actually
* creating {@link rx.Notification} objects for every onNext and onComplete.
*
* An object is allocated inside {@link #error(Throwable)} to wrap the {@link Throwable} but this
* shouldn't effect performance because exceptions should be exceptionally rare.
*
* It's implemented as a singleton to maintain some semblance of type safety that is completely
* non-existent.
*
* @author gscampbell
*
* @param <T>
*/
public final class NotificationLite<T> {
private NotificationLite() {
}

private static final NotificationLite INSTANCE = new NotificationLite();

@SuppressWarnings("unchecked")
public static <T> NotificationLite<T> instance() {
return INSTANCE;
}

private static final Object ON_COMPLETED_SENTINEL = new Serializable() {
private static final long serialVersionUID = 1;
};

private static final Object ON_NEXT_NULL_SENTINEL = new Serializable() {
private static final long serialVersionUID = 2;
};

private static class OnErrorSentinel implements Serializable {
private static final long serialVersionUID = 3;
private final Throwable e;

public OnErrorSentinel(Throwable e) {
this.e = e;
}
}

/**
* Creates a lite onNext notification for the value passed in without doing any allocation. Can
* be unwrapped and sent with the {@link #accept} method.
*
* @param t
* @return
*/
public Object next(T t) {
if (t == null)
return ON_NEXT_NULL_SENTINEL;
else
return t;
}

/**
* Creates a lite onComplete notification without doing any allocation. Can be unwrapped and
* sent with the {@link #accept} method.
*
* @return
*/
public Object completed() {
return ON_COMPLETED_SENTINEL;
}

/**
* Create a lite onError notification. This call does new up an object to wrap the
* {@link Throwable} but since there should only be one of these the performance impact should
* be small. Can be unwrapped and sent with the {@link #accept} method.
*
* @param e
* @return
*/
public Object error(Throwable e) {
return new OnErrorSentinel(e);
}

/**
* Unwraps the lite notification and calls the appropriate method on the {@link Observer}.
*
* @param o
* the {@link Observer} to call onNext, onCompleted or onError.
* @param n
* @throws IllegalArgumentException
* if the notification is null.
* @throws NullPointerException
* if the {@link Observer} is null.
*/
@SuppressWarnings("unchecked")
public void accept(Observer<? super T> o, Object n) {
switch (kind(n)) {
case OnNext:
o.onNext(getValue(n));
break;
case OnCompleted:
o.onCompleted();
break;
case OnError:
o.onError(getError(n));
break;
}
}

public boolean isCompleted(Object n) {
return n == ON_COMPLETED_SENTINEL;
}

public boolean isError(Object n) {
return n instanceof OnErrorSentinel;
}

/**
* If there is custom logic that isn't as simple as call the right method on an {@link Observer}
* then this method can be used to get the {@link Notification.Kind}
*
* @param n
* @return
*/
public Kind kind(Object n) {
if (n == null)
throw new IllegalArgumentException("The lite notification can not be null");
else if (n == ON_COMPLETED_SENTINEL)
return Kind.OnCompleted;
else if (n instanceof OnErrorSentinel)
return Kind.OnError;
else
// value or ON_NEXT_NULL_SENTINEL but either way it's an OnNext
return Kind.OnNext;
}

/**
* returns value passed in {@link #next(Object)} method call. Bad things happen if you call this
* the onComplete or onError notification type. For performance you are expected to use this
* when it is appropriate.
*
* @param n
* @return
*/
@SuppressWarnings("unchecked")
public T getValue(Object n) {
return n == ON_NEXT_NULL_SENTINEL ? null : (T) n;
}

/**
* returns {@link Throwable} passed in {@link #error(Throwable)} method call. Bad things happen
* if you
* call this the onComplete or onNext notification type. For performance you are expected to use
* this when it is appropriate.
*
* @param n
* @return The {@link Throwable} wrapped inside n
*/
public Throwable getError(Object n) {
return ((OnErrorSentinel) n).e;
}
}
Loading