diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java index f75c6b5662..f8ef22e9a5 100644 --- a/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java @@ -3,72 +3,63 @@ import rx.Observable.Operator; import rx.Observer; import rx.Subscriber; -import rx.functions.Action1; -import rx.functions.Action2; -import rx.functions.Func1; import rx.plugins.DebugNotification; +import rx.plugins.DebugNotificationListener; public final class DebugSubscriber extends Subscriber { - private final Func1 onNextHook; - private final Func1 start; - private final Action1 complete; - private final Action2 error; + private DebugNotificationListener listener; private final Observer o; private Operator from = null; private Operator to = null; public DebugSubscriber( - Func1 onNextHook, - Func1 start, - Action1 complete, - Action2 error, + DebugNotificationListener listener, Subscriber _o, Operator _out, Operator _in) { super(_o); - this.start = start; - this.complete = complete; - this.error = error; + this.listener = listener; this.o = _o; - this.onNextHook = onNextHook; this.from = _out; this.to = _in; - this.add(new DebugSubscription(this, start, complete, error)); + this.add(new DebugSubscription(this, listener)); } @Override public void onCompleted() { - final DebugNotification n = DebugNotification.createOnCompleted(o, from, to); - C context = start.call(n); + final DebugNotification n = DebugNotification.createOnCompleted(o, from, to); + C context = listener.start(n); try { o.onCompleted(); - complete.call(context); + listener.complete(context); } catch (Throwable e) { - error.call(context, e); + listener.error(context, e); } } @Override public void onError(Throwable e) { - final DebugNotification n = DebugNotification.createOnError(o, from, e, to); - C context = start.call(n); + final DebugNotification n = DebugNotification.createOnError(o, from, e, to); + C context = listener.start(n); try { o.onError(e); - complete.call(context); + listener.complete(context); } catch (Throwable e2) { - error.call(context, e2); + listener.error(context, e2); } } @Override public void onNext(T t) { - final DebugNotification n = DebugNotification.createOnNext(o, from, t, to); - C context = start.call(n); + final DebugNotification n = DebugNotification.createOnNext(o, from, t, to); + t = (T) listener.onNext(n); + + C context = listener.start(n); try { - o.onNext(onNextHook.call(t)); - complete.call(context); + o.onNext(t); + listener.complete(context); } catch (Throwable e) { - error.call(context, e); + listener.error(context, e); } } diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscription.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscription.java index 256fd8798a..0a3ce9d5ef 100644 --- a/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscription.java +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscription.java @@ -1,33 +1,27 @@ package rx.operators; import rx.Subscription; -import rx.functions.Action1; -import rx.functions.Action2; -import rx.functions.Func1; import rx.plugins.DebugNotification; +import rx.plugins.DebugNotificationListener; final class DebugSubscription implements Subscription { private final DebugSubscriber debugObserver; - private final Func1 start; - private final Action1 complete; - private final Action2 error; + private DebugNotificationListener listener; - DebugSubscription(DebugSubscriber debugObserver, Func1 start, Action1 complete, Action2 error) { + DebugSubscription(DebugSubscriber debugObserver, DebugNotificationListener listener) { this.debugObserver = debugObserver; - this.start = start; - this.complete = complete; - this.error = error; + this.listener = listener; } @Override public void unsubscribe() { - final DebugNotification n = DebugNotification. createUnsubscribe(debugObserver.getActual(), debugObserver.getFrom(), debugObserver.getTo()); - C context = start.call(n); + final DebugNotification n = DebugNotification. createUnsubscribe(debugObserver.getActual(), debugObserver.getFrom(), debugObserver.getTo()); + C context = listener.start(n); try { debugObserver.unsubscribe(); - complete.call(context); + listener.complete(context); } catch (Throwable e) { - error.call(context, e); + listener.error(context, e); } } diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java index 3e5d5eba9a..9e48267333 100644 --- a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java @@ -19,10 +19,7 @@ * @author gscampbell */ public class DebugHook extends RxJavaObservableExecutionHook { - private final Func1 onNextHook; - private final Func1 start; - private final Action1 complete; - private final Action2 error; + private DebugNotificationListener listener; /** * Creates a new instance of the DebugHook RxJava plug-in that can be passed into @@ -34,11 +31,10 @@ public class DebugHook extends RxJavaObservableExecutionHook { * @param events * This action is invoked as each notification is generated */ - public DebugHook(Func1 onNextDataHook, Func1 start, Action1 complete, Action2 error) { - this.complete = complete; - this.error = error; - this.onNextHook = onNextDataHook == null ? Functions.identity() : onNextDataHook; - this.start = (Func1) (start == null ? Actions.empty() : start); + public DebugHook(DebugNotificationListener listener) { + if (listener == null) + throw new IllegalArgumentException("The debug listener must not be null"); + this.listener = listener; } @Override @@ -46,27 +42,48 @@ public OnSubscribe onSubscribeStart(final Observable observa return new OnSubscribe() { @Override public void call(Subscriber o) { - C context = start.call(DebugNotification.createSubscribe(o, observableInstance, f)); + final DebugNotification n = DebugNotification.createSubscribe(o, observableInstance, f); + o = wrapOutbound(null, o); + + C context = listener.start(n); try { - f.call(wrapOutbound(null, o)); - complete.call(context); + f.call(o); + listener.complete(context); } catch(Throwable e) { - error.call(context, e); + listener.error(context, e); } } }; } @Override - public Subscription onSubscribeReturn(Observable observableInstance, Subscription subscription) { + public Subscription onSubscribeReturn(Subscription subscription) { return subscription; } @Override public OnSubscribe onCreate(final OnSubscribe f) { - return new OnCreateWrapper(f); + return new DebugOnSubscribe(f); } + + public final class DebugOnSubscribe implements OnSubscribe { + private final OnSubscribe f; + + private DebugOnSubscribe(OnSubscribe f) { + this.f = f; + } + + @Override + public void call(Subscriber o) { + f.call(wrapInbound(null, o)); + } + + public OnSubscribe getActual() { + return f; + } + } + @Override public Operator onLift(final Operator bind) { @@ -78,11 +95,6 @@ public Subscriber call(final Subscriber o) { }; } - @Override - public Subscription onAdd(Subscriber subscriber, Subscription s) { - return s; - } - @SuppressWarnings("unchecked") private Subscriber wrapOutbound(Operator bind, Subscriber o) { if (o instanceof DebugSubscriber) { @@ -90,7 +102,7 @@ private Subscriber wrapOutbound(Operator bind, Su ((DebugSubscriber) o).setFrom(bind); return o; } - return new DebugSubscriber(onNextHook, start, complete, error, o, bind, null); + return new DebugSubscriber(listener, o, bind, null); } @SuppressWarnings("unchecked") @@ -100,23 +112,6 @@ private Subscriber wrapInbound(Operator bind, Subsc ((DebugSubscriber) o).setTo(bind); return o; } - return new DebugSubscriber(onNextHook, start, complete, error, o, null, bind); - } - - public final class OnCreateWrapper implements OnSubscribe { - private final OnSubscribe f; - - private OnCreateWrapper(OnSubscribe f) { - this.f = f; - } - - @Override - public void call(Subscriber o) { - f.call(wrapInbound(null, o)); - } - - public OnSubscribe getActual() { - return f; - } + return new DebugSubscriber(listener, o, null, bind); } } diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java index f22d4e31ef..1cc8eab752 100644 --- a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java @@ -7,7 +7,7 @@ import rx.observers.SafeSubscriber; import rx.operators.DebugSubscriber; -public class DebugNotification { +public class DebugNotification { public static enum Kind { OnNext, OnError, OnCompleted, Subscribe, Unsubscribe } @@ -21,37 +21,39 @@ public static enum Kind { private final T value; private final Observer observer; - public static DebugNotification createSubscribe(Observer o, Observable source, OnSubscribe sourceFunc) { + @SuppressWarnings("unchecked") + public static DebugNotification createSubscribe(Observer o, Observable source, OnSubscribe sourceFunc) { Operator to = null; Operator from = null; if (o instanceof SafeSubscriber) { - o = ((SafeSubscriber) o).getActual(); + o = ((SafeSubscriber) o).getActual(); } if (o instanceof DebugSubscriber) { - to = ((DebugSubscriber) o).getTo(); - from = ((DebugSubscriber) o).getFrom(); - o = ((DebugSubscriber) o).getActual(); + final DebugSubscriber ds = (DebugSubscriber) o; + to = ds.getTo(); + from = ds.getFrom(); + o = ds.getActual(); } - if (sourceFunc instanceof DebugHook.OnCreateWrapper) { - sourceFunc = ((DebugHook.OnCreateWrapper) sourceFunc).getActual(); + if (sourceFunc instanceof DebugHook.DebugOnSubscribe) { + sourceFunc = ((DebugHook.DebugOnSubscribe) sourceFunc).getActual(); } - return new DebugNotification(o, from, Kind.Subscribe, null, null, to, source, sourceFunc); + return new DebugNotification(o, from, Kind.Subscribe, null, null, to, source, sourceFunc); } - public static DebugNotification createOnNext(Observer o, Operator from, T t, Operator to) { - return new DebugNotification(o, from, Kind.OnNext, t, null, to, null, null); + public static DebugNotification createOnNext(Observer o, Operator from, T t, Operator to) { + return new DebugNotification(o, from, Kind.OnNext, t, null, to, null, null); } - public static DebugNotification createOnError(Observer o, Operator from, Throwable e, Operator to) { - return new DebugNotification(o, from, Kind.OnError, null, e, to, null, null); + public static DebugNotification createOnError(Observer o, Operator from, Throwable e, Operator to) { + return new DebugNotification(o, from, Kind.OnError, null, e, to, null, null); } - public static DebugNotification createOnCompleted(Observer o, Operator from, Operator to) { - return new DebugNotification(o, from, Kind.OnCompleted, null, null, to, null, null); + public static DebugNotification createOnCompleted(Observer o, Operator from, Operator to) { + return new DebugNotification(o, from, Kind.OnCompleted, null, null, to, null, null); } - public static DebugNotification createUnsubscribe(Observer o, Operator from, Operator to) { - return new DebugNotification(o, from, Kind.Unsubscribe, null, null, to, null, null); + public static DebugNotification createUnsubscribe(Observer o, Operator from, Operator to) { + return new DebugNotification(o, from, Kind.Unsubscribe, null, null, to, null, null); } private DebugNotification(Observer o, Operator from, Kind kind, T value, Throwable throwable, Operator to, Observable source, OnSubscribe sourceFunc) { @@ -88,11 +90,11 @@ public Throwable getThrowable() { public Kind getKind() { return kind; } - + public Observable getSource() { return source; } - + public OnSubscribe getSourceFunc() { return sourceFunc; } @@ -103,11 +105,14 @@ public OnSubscribe getSourceFunc() { */ public String toString() { final StringBuilder s = new StringBuilder("{"); - s.append("\"observer\": \"").append(observer.getClass().getName()).append("@").append(Integer.toHexString(observer.hashCode())).append("\""); + s.append("\"observer\": "); + if (observer != null) + s.append("\"").append(observer.getClass().getName()).append("@").append(Integer.toHexString(observer.hashCode())).append("\""); + else + s.append("null"); s.append(", \"type\": \"").append(kind).append("\""); if (kind == Kind.OnNext) - // not json safe - s.append(", \"value\": \"").append(value).append("\""); + s.append(", \"value\": ").append(quote(value)).append(""); if (kind == Kind.OnError) s.append(", \"exception\": \"").append(throwable.getMessage().replace("\\", "\\\\").replace("\"", "\\\"")).append("\""); if (source != null) @@ -121,4 +126,66 @@ public String toString() { s.append("}"); return s.toString(); } + + public static String quote(Object obj) { + if (obj == null) { + return "null"; + } + + String string; + try { + string = obj.toString(); + } catch (Throwable e) { + return "\"\""; + } + if (string == null || string.length() == 0) { + return "\"\""; + } + + char c = 0; + int i; + int len = string.length(); + StringBuilder sb = new StringBuilder(len + 4); + String t; + + sb.append('"'); + for (i = 0; i < len; i += 1) { + c = string.charAt(i); + switch (c) { + case '\\': + case '"': + sb.append('\\'); + sb.append(c); + break; + case '/': + sb.append('\\'); + sb.append(c); + break; + case '\b': + sb.append("\\b"); + break; + case '\t': + sb.append("\\t"); + break; + case '\n': + sb.append("\\n"); + break; + case '\f': + sb.append("\\f"); + break; + case '\r': + sb.append("\\r"); + break; + default: + if (c < ' ') { + t = "000" + Integer.toHexString(c); + sb.append("\\u" + t.substring(t.length() - 4)); + } else { + sb.append(c); + } + } + } + sb.append('"'); + return sb.toString(); + } } diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotificationListener.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotificationListener.java new file mode 100644 index 0000000000..f34e25159e --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotificationListener.java @@ -0,0 +1,70 @@ +package rx.plugins; + +import rx.Observable; +import rx.Observable.Operator; +import rx.Observer; + +/** + * Subclasses of this are passed into the constructor of {@link DebugHook} to receive notification + * about all activity in Rx. + * + * @author gscampbell + * + * @param + * Context type that is returned from start and passed to either complete or error. + * @see DebugHook + */ +@SuppressWarnings("unused") +public abstract class DebugNotificationListener { + /** + * Override this to change the default behavior of returning the encapsulated value. This will + * only be invoked when the {@link DebugNotification#getKind()} is + * {@link DebugNotification.Kind#OnNext} and the value (null or not) is just about to be sent to + * next {@link Observer}. This can end up being called multiple times for + * the same value as it passes from {@link Operator} to {@link Operator} in the + * {@link Observable} chain. + *

+ * This can be used to decorate or replace the values passed into any onNext function or just + * perform extra logging, metrics and other such things and pass-thru the function. + * + * @param n + * {@link DebugNotification} containing the data and context about what is happening. + * @return + */ + public T onNext(DebugNotification n) { + return n.getValue(); + } + + /** + * For each {@link DebugNotification.Kind} start is invoked before the actual method is invoked. + *

+ * This can be used to perform extra logging, metrics and other such things. + * + * @param n + * {@link DebugNotification} containing the data and context about what is happening. + * @return + * A contextual object that the listener can use in the {@link #complete(C)} or + * {@link #error(C, Throwable)} after the actual operation has ended. + */ + public C start(DebugNotification n) { + return null; + } + + /** + * After the actual operations has completed from {@link #start(DebugNotification)} this is + * invoked + * + * @param context + */ + public void complete(C context) { + } + + /** + * After the actual operations has thrown an exception from {@link #start(DebugNotification)} + * this is invoked + * + * @param context + */ + public void error(C context, Throwable e) { + } +} diff --git a/rxjava-contrib/rxjava-debug/src/test/java/rx/debug/DebugHookTest.java b/rxjava-contrib/rxjava-debug/src/test/java/rx/debug/DebugHookTest.java index ea69258259..c3f4f2d426 100644 --- a/rxjava-contrib/rxjava-debug/src/test/java/rx/debug/DebugHookTest.java +++ b/rxjava-contrib/rxjava-debug/src/test/java/rx/debug/DebugHookTest.java @@ -1,27 +1,28 @@ package rx.debug; +import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.util.Arrays; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; +import org.mockito.InOrder; import rx.Observable; -import rx.Subscriber; -import rx.functions.Action1; -import rx.functions.Action2; import rx.functions.Func1; +import rx.observers.Subscribers; import rx.plugins.DebugHook; import rx.plugins.DebugNotification; import rx.plugins.DebugNotification.Kind; +import rx.plugins.DebugNotificationListener; import rx.plugins.PlugReset; import rx.plugins.RxJavaPlugins; @@ -32,112 +33,212 @@ public void reset() { PlugReset.reset(); } + private static class TestDebugNotificationListener extends DebugNotificationListener { + ConcurrentHashMap allThreadDepths = new ConcurrentHashMap(1); + ThreadLocal currentThreadDepth = new ThreadLocal() { + protected AtomicInteger initialValue() { + AtomicInteger depth = new AtomicInteger(); + allThreadDepths.put(Thread.currentThread(), depth); + return depth; + }; + }; + + @Override + public T onNext(DebugNotification n) { + if (n == null) + return null; // because we are verifying on a spied object. + System.err.println("next: " + n.getValue()); + return super.onNext(n); + } + + @Override + public Object start(DebugNotification n) { + if (n == null) + return null; // because we are verifying on a spied object. + currentThreadDepth.get().incrementAndGet(); + Object context = new Object(); + System.err.println("start: " + Integer.toHexString(context.hashCode()) + " " + n); + return context; + } + + @Override + public void complete(Object context) { + if (context == null) + return; // because we are verifying on a spied object. + currentThreadDepth.get().decrementAndGet(); + System.err.println("complete: " + Integer.toHexString(context.hashCode())); + } + + @Override + public void error(Object context, Throwable e) { + if (context == null) + return; // because we are verifying on a spied object. + currentThreadDepth.get().decrementAndGet(); + System.err.println("error: " + Integer.toHexString(context.hashCode())); + } + + public void assertValidState() { + for (Entry threadDepth : allThreadDepths.entrySet()) { + assertEquals(0, threadDepth.getValue().get()); + } + } + } + + @SuppressWarnings("unchecked") @Test - @Ignore public void testSimple() { - Func1 start = mock(Func1.class); - Action1 complete = mock(Action1.class); - Action2 error = mock(Action2.class); - final DebugHook hook = new DebugHook(null, start, complete, error); + TestDebugNotificationListener listener = new TestDebugNotificationListener(); + listener = spy(listener); + final DebugHook hook = new DebugHook(listener); RxJavaPlugins.getInstance().registerObservableExecutionHook(hook); - Observable.empty().subscribe(); - verify(start, times(1)).call(subscribe()); - verify(start, times(1)).call(onCompleted()); - verify(complete, times(2)).call(any()); + Observable.from(1).subscribe(Subscribers.empty()); + + final InOrder inOrder = inOrder(listener); + inOrder.verify(listener).start(subscribe()); + inOrder.verify(listener).onNext(onNext(1)); + inOrder.verify(listener).start(onNext(1)); + inOrder.verify(listener).complete(any()); + inOrder.verify(listener).start(onCompleted()); + inOrder.verify(listener, times(2)).complete(any()); + inOrder.verifyNoMoreInteractions(); - verify(error, never()).call(any(), any()); + listener.assertValidState(); } + @SuppressWarnings("unchecked") @Test public void testOneOp() { - Func1, Object> start = mock(Func1.class); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) throws Throwable { - Object context = new Object(); - System.out.println("start: " + context.hashCode() + " " + invocation.getArguments()[0]); - return context; - } - }).when(start).call(any(DebugNotification.class)); - Action1 complete = mock(Action1.class); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) throws Throwable { - System.out.println("complete: " + invocation.getArguments()[0].hashCode()); - return null; - } - }).when(complete).call(any()); - Action2 error = mock(Action2.class); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) throws Throwable { - System.out.println("error: " + invocation.getArguments()[1].hashCode()); - return null; - } - }).when(error).call(any(), any(Throwable.class)); - final DebugHook hook = new DebugHook(null, start, complete, error); + TestDebugNotificationListener listener = new TestDebugNotificationListener(); + listener = spy(listener); + + // create and register the hooks. + final DebugHook hook = new DebugHook(listener); RxJavaPlugins.getInstance().registerObservableExecutionHook(hook); - Observable.from(Arrays.asList(1, 3)).flatMap(new Func1>() { + + // do the operation + Observable + .from(Arrays.asList(1, 3)) + .flatMap(new Func1>() { + @Override + public Observable call(Integer it) { + return Observable.from(Arrays.asList(it * 10, (it + 1) * 10)); + } + }) + .take(3) + .subscribe(Subscribers. empty()); + + InOrder calls = inOrder(listener); + + calls.verify(listener).start(subscribe()); + calls.verify(listener).start(onNext(1)); // from to map + calls.verify(listener).start(onNext(Observable.class)); // map to merge + calls.verify(listener).start(subscribe()); // merge inner + calls.verify(listener).start(onNext(10)); // from to merge inner + calls.verify(listener).start(onNext(10)); // merge inner to take + calls.verify(listener).start(onNext(10)); // take to empty subscriber + calls.verify(listener, times(3)).complete(any()); + calls.verify(listener).start(onNext(20)); // next from to merge inner + calls.verify(listener).start(onNext(20)); // merge inner to take + calls.verify(listener).start(onNext(20)); // take to output + calls.verify(listener, times(3)).complete(any()); + calls.verify(listener).start(onCompleted()); // sub from completes + // calls.verify(listener).start(unsubscribe()); // merge's composite subscription + // unnecessarily calls unsubscribe during the removing the subscription from the array. + // + // i didn't include it because it could cause a test failure if the internals change. + calls.verify(listener, times(5)).complete(any()); // pop the call stack up to onNext(1) + calls.verify(listener).start(onNext(3)); // from to map + calls.verify(listener).start(onNext(Observable.class)); // map to merge + calls.verify(listener).start(subscribe()); + calls.verify(listener).start(onNext(30)); // next from to merge inner + calls.verify(listener).start(onNext(30)); // merge inner to take + calls.verify(listener).start(onNext(30)); // take to output + calls.verify(listener).complete(any()); + calls.verify(listener).start(onCompleted()); // take to output + calls.verify(listener).start(unsubscribe()); // take unsubscribes + calls.verify(listener).complete(any()); + calls.verify(listener).start(unsubscribe()); // merge inner unsubscribes + calls.verify(listener).complete(any()); + calls.verify(listener).start(unsubscribe()); // merge outer unsubscribes + calls.verify(listener).complete(any()); + calls.verify(listener).start(unsubscribe()); // map unsubscribe + calls.verify(listener, times(7)).complete(any()); + calls.verifyNoMoreInteractions(); + + listener.assertValidState(); + } + + private static DebugNotification onNext(final T value) { + return argThat(new BaseMatcher>() { @Override - public Observable call(Integer it) { - return Observable.from(Arrays.asList(it, it + 1)); + public boolean matches(Object item) { + if (item instanceof DebugNotification) { + @SuppressWarnings("unchecked") + DebugNotification dn = (DebugNotification) item; + return dn.getKind() == Kind.OnNext && dn.getValue().equals(value); + } + return false; } - }).take(3).subscribe(new Subscriber() { + @Override - public void onCompleted() { + public void describeTo(Description description) { + description.appendText("OnNext " + value); } + }); + } + private static DebugNotification onNext(final Class type) { + return argThat(new BaseMatcher>() { @Override - public void onError(Throwable e) { + public boolean matches(Object item) { + if (item instanceof DebugNotification) { + @SuppressWarnings("unchecked") + DebugNotification dn = (DebugNotification) item; + return dn.getKind() == Kind.OnNext && type.isAssignableFrom(dn.getValue().getClass()); + } + return false; } @Override - public void onNext(Integer t) { + public void describeTo(Description description) { + description.appendText("OnNext " + type); } }); - verify(start, atLeast(3)).call(subscribe()); - verify(start, times(4)).call(onNext(1)); - // one less because it originates from the inner observable sent to merge - verify(start, times(3)).call(onNext(2)); - verify(start, times(4)).call(onNext(3)); - // because the take unsubscribes - verify(start, never()).call(onNext(4)); - - verify(complete, atLeast(14)).call(any()); - - verify(error, never()).call(any(), any(Throwable.class)); } - private static DebugNotification onNext(final T value) { - return argThat(new BaseMatcher>() { + private static DebugNotification subscribe() { + return argThat(new BaseMatcher>() { @Override public boolean matches(Object item) { if (item instanceof DebugNotification) { - DebugNotification dn = (DebugNotification) item; - return dn.getKind() == Kind.OnNext && dn.getValue().equals(value); + DebugNotification dn = (DebugNotification) item; + return dn.getKind() == DebugNotification.Kind.Subscribe; } return false; } @Override public void describeTo(Description description) { - description.appendText("OnNext " + value); + description.appendText("Subscribe"); } }); } - private static DebugNotification subscribe() { - return argThat(new BaseMatcher() { + private static DebugNotification unsubscribe() { + return argThat(new BaseMatcher>() { @Override public boolean matches(Object item) { if (item instanceof DebugNotification) { DebugNotification dn = (DebugNotification) item; - return dn.getKind() == DebugNotification.Kind.Subscribe; + return dn.getKind() == DebugNotification.Kind.Unsubscribe; } return false; } @Override public void describeTo(Description description) { - description.appendText("Subscribe"); + description.appendText("Unsubscribe"); } }); } diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 0db0910764..659d30d44f 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -25,8 +25,8 @@ import java.util.concurrent.TimeUnit; import rx.exceptions.Exceptions; -import rx.exceptions.OnErrorThrowable; import rx.exceptions.OnErrorNotImplementedException; +import rx.exceptions.OnErrorThrowable; import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Action2; @@ -49,7 +49,88 @@ import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; import rx.observers.SafeSubscriber; -import rx.operators.*; +import rx.operators.OnSubscribeFromIterable; +import rx.operators.OnSubscribeRange; +import rx.operators.OperationAll; +import rx.operators.OperationAmb; +import rx.operators.OperationAny; +import rx.operators.OperationAsObservable; +import rx.operators.OperationAverage; +import rx.operators.OperationBuffer; +import rx.operators.OperationCache; +import rx.operators.OperationCombineLatest; +import rx.operators.OperationConcat; +import rx.operators.OperationDebounce; +import rx.operators.OperationDefaultIfEmpty; +import rx.operators.OperationDefer; +import rx.operators.OperationDelay; +import rx.operators.OperationDematerialize; +import rx.operators.OperationDistinct; +import rx.operators.OperationDistinctUntilChanged; +import rx.operators.OperationElementAt; +import rx.operators.OperationFinally; +import rx.operators.OperationFlatMap; +import rx.operators.OperationGroupByUntil; +import rx.operators.OperationGroupJoin; +import rx.operators.OperationInterval; +import rx.operators.OperationJoin; +import rx.operators.OperationJoinPatterns; +import rx.operators.OperationMaterialize; +import rx.operators.OperationMergeDelayError; +import rx.operators.OperationMergeMaxConcurrent; +import rx.operators.OperationMinMax; +import rx.operators.OperationMulticast; +import rx.operators.OperationOnErrorResumeNextViaObservable; +import rx.operators.OperationOnErrorReturn; +import rx.operators.OperationOnExceptionResumeNextViaObservable; +import rx.operators.OperationParallelMerge; +import rx.operators.OperationReplay; +import rx.operators.OperationSample; +import rx.operators.OperationSequenceEqual; +import rx.operators.OperationSingle; +import rx.operators.OperationSkip; +import rx.operators.OperationSkipLast; +import rx.operators.OperationSkipUntil; +import rx.operators.OperationSkipWhile; +import rx.operators.OperationSum; +import rx.operators.OperationSwitch; +import rx.operators.OperationSynchronize; +import rx.operators.OperationTakeLast; +import rx.operators.OperationTakeTimed; +import rx.operators.OperationTakeUntil; +import rx.operators.OperationTakeWhile; +import rx.operators.OperationThrottleFirst; +import rx.operators.OperationTimeInterval; +import rx.operators.OperationTimer; +import rx.operators.OperationToMap; +import rx.operators.OperationToMultimap; +import rx.operators.OperationToObservableFuture; +import rx.operators.OperationUsing; +import rx.operators.OperationWindow; +import rx.operators.OperatorCast; +import rx.operators.OperatorDoOnEach; +import rx.operators.OperatorFilter; +import rx.operators.OperatorGroupBy; +import rx.operators.OperatorMap; +import rx.operators.OperatorMerge; +import rx.operators.OperatorObserveOn; +import rx.operators.OperatorOnErrorFlatMap; +import rx.operators.OperatorOnErrorResumeNextViaFunction; +import rx.operators.OperatorParallel; +import rx.operators.OperatorRepeat; +import rx.operators.OperatorRetry; +import rx.operators.OperatorScan; +import rx.operators.OperatorSkip; +import rx.operators.OperatorSubscribeOn; +import rx.operators.OperatorTake; +import rx.operators.OperatorTimeout; +import rx.operators.OperatorTimeoutWithSelector; +import rx.operators.OperatorTimestamp; +import rx.operators.OperatorToObservableList; +import rx.operators.OperatorToObservableSortedList; +import rx.operators.OperatorUnsubscribeOn; +import rx.operators.OperatorZip; +import rx.operators.OperatorZipIterable; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.schedulers.Schedulers; @@ -79,7 +160,7 @@ */ public class Observable { - final OnSubscribe f; + final OnSubscribe onSubscribe; /** * Observable with Function to execute when subscribed to. @@ -91,10 +172,10 @@ public class Observable { * {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called */ protected Observable(OnSubscribe f) { - this.f = hook.onCreate(f); + this.onSubscribe = f; } - private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); + private final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); /** * Returns an Observable that will execute the specified function when a {@link Subscriber} subscribes to @@ -187,7 +268,7 @@ public Observable lift(final Operator lift) { @Override public void call(Subscriber o) { try { - f.call(hook.onLift(lift).call(o)); + onSubscribe.call(hook.onLift(lift).call(o)); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then @@ -6953,7 +7034,7 @@ public void onNext(T t) { */ public final Subscription subscribe(Subscriber observer) { // allow the hook to intercept and/or decorate - OnSubscribe onSubscribeFunction = hook.onSubscribeStart(this, f); + OnSubscribe onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe); // validate and proceed if (observer == null) { throw new IllegalArgumentException("observer can not be null"); @@ -6977,7 +7058,7 @@ public final Subscription subscribe(Subscriber observer) { observer = new SafeSubscriber(observer); onSubscribeFunction.call(observer); } - final Subscription returnSubscription = hook.onSubscribeReturn(this, observer); + final Subscription returnSubscription = hook.onSubscribeReturn(observer); // we return it inside a Subscription so it can't be cast back to Subscriber return Subscriptions.create(new Action0() { @@ -6992,7 +7073,7 @@ public void call() { Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { - observer.onError(hook.onSubscribeError(this, e)); + observer.onError(hook.onSubscribeError(e)); } catch (OnErrorNotImplementedException e2) { // special handling when onError is not implemented ... we just rethrow throw e2; @@ -7000,7 +7081,9 @@ public void call() { // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); - hook.onSubscribeError(this, r); + // TODO could the hook be the cause of the error in the on error handling. + hook.onSubscribeError(r); + // TODO why aren't we throwing the hook's return value. throw r; } return Subscriptions.empty(); diff --git a/rxjava-core/src/main/java/rx/functions/Actions.java b/rxjava-core/src/main/java/rx/functions/Actions.java index 9954ce5119..0a9f79726e 100644 --- a/rxjava-core/src/main/java/rx/functions/Actions.java +++ b/rxjava-core/src/main/java/rx/functions/Actions.java @@ -25,51 +25,63 @@ private Actions() { throw new IllegalStateException("No instances!"); } - public static final EmptyAction empty() { - return EMPTY_ACTION; + @SuppressWarnings("unchecked") + public static final EmptyAction empty() { + return (EmptyAction) EMPTY_ACTION; } private static final EmptyAction EMPTY_ACTION = new EmptyAction(); - private static final class EmptyAction implements Action0, Action1, Action2, Action3, Action4, Action5, Action6, Action7, Action8, Action9, ActionN { + private static final class EmptyAction implements + Action0, + Action1, + Action2, + Action3, + Action4, + Action5, + Action6, + Action7, + Action8, + Action9, + ActionN { @Override public void call() { } @Override - public void call(Object t1) { + public void call(T0 t1) { } @Override - public void call(Object t1, Object t2) { + public void call(T0 t1, T1 t2) { } @Override - public void call(Object t1, Object t2, Object t3) { + public void call(T0 t1, T1 t2, T2 t3) { } @Override - public void call(Object t1, Object t2, Object t3, Object t4) { + public void call(T0 t1, T1 t2, T2 t3, T3 t4) { } @Override - public void call(Object t1, Object t2, Object t3, Object t4, Object t5) { + public void call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5) { } @Override - public void call(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6) { + public void call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5, T5 t6) { } @Override - public void call(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6, Object t7) { + public void call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5, T5 t6, T6 t7) { } @Override - public void call(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6, Object t7, Object t8) { + public void call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5, T5 t6, T6 t7, T7 t8) { } @Override - public void call(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6, Object t7, Object t8, Object t9) { + public void call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5, T5 t6, T6 t7, T7 t8, T8 t9) { } @Override diff --git a/rxjava-core/src/main/java/rx/functions/Functions.java b/rxjava-core/src/main/java/rx/functions/Functions.java index b30ad4e4d2..becfe31c43 100644 --- a/rxjava-core/src/main/java/rx/functions/Functions.java +++ b/rxjava-core/src/main/java/rx/functions/Functions.java @@ -358,4 +358,79 @@ public Boolean call(Object o) { return false; } } + + @SuppressWarnings("unchecked") + public static NullFunction returnNull() { + return (NullFunction) NULL_FUNCTION; + } + + private static final NullFunction NULL_FUNCTION = new NullFunction(); + + private static final class NullFunction implements + Func0, + Func1, + Func2, + Func3, + Func4, + Func5, + Func6, + Func7, + Func8, + Func9, + FuncN { + @Override + public R call() { + return null; + } + + @Override + public R call(T0 t1) { + return null; + } + + @Override + public R call(T0 t1, T1 t2) { + return null; + } + + @Override + public R call(T0 t1, T1 t2, T2 t3) { + return null; + } + + @Override + public R call(T0 t1, T1 t2, T2 t3, T3 t4) { + return null; + } + + @Override + public R call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5) { + return null; + } + + @Override + public R call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5, T5 t6) { + return null; + } + + @Override + public R call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5, T5 t6, T6 t7) { + return null; + } + + @Override + public R call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5, T5 t6, T6 t7, T7 t8) { + return null; + } + + @Override + public R call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5, T5 t6, T6 t7, T7 t8, T8 t9) { + return null; + } + + @Override + public R call(Object... args) { + return null; + } + } } diff --git a/rxjava-core/src/main/java/rx/operators/OnSubscribeFromIterable.java b/rxjava-core/src/main/java/rx/operators/OnSubscribeFromIterable.java index 45dbde9004..38766395f4 100644 --- a/rxjava-core/src/main/java/rx/operators/OnSubscribeFromIterable.java +++ b/rxjava-core/src/main/java/rx/operators/OnSubscribeFromIterable.java @@ -42,6 +42,9 @@ public void call(Subscriber o) { } o.onNext(i); } + if (o.isUnsubscribed()) { + return; + } o.onCompleted(); } diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java b/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java index 01e6c97507..64402fe0de 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java @@ -17,85 +17,108 @@ import rx.Observable; import rx.Observable.OnSubscribe; -import rx.Observable.OnSubscribeFunc; import rx.Observable.Operator; import rx.Subscriber; import rx.Subscription; import rx.functions.Func1; /** - * Abstract ExecutionHook with invocations at different lifecycle points of {@link Observable} execution with a default no-op implementation. + * Abstract ExecutionHook with invocations at different lifecycle points of {@link Observable} + * execution with a default no-op implementation. *

* See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: https://github.com/Netflix/RxJava/wiki/Plugins. + * href ="https://github.com/Netflix/RxJava/wiki/Plugins">https://github.com/Netflix/RxJava/wiki/ + * Plugins . *

* Note on thread-safety and performance *

- * A single implementation of this class will be used globally so methods on this class will be invoked concurrently from multiple threads so all functionality must be thread-safe. + * A single implementation of this class will be used globally so methods on this class will be + * invoked concurrently from multiple threads so all functionality must be thread-safe. *

- * Methods are also invoked synchronously and will add to execution time of the observable so all behavior should be fast. If anything time-consuming is to be done it should be spawned asynchronously - * onto separate worker threads. + * Methods are also invoked synchronously and will add to execution time of the observable so all + * behavior should be fast. If anything time-consuming is to be done it should be spawned + * asynchronously onto separate worker threads. * - * */ + */ public abstract class RxJavaObservableExecutionHook { + /** + * Invoked during the construction by {@link Observable#create(OnSubscribe)} + *

+ * This can be used to decorate or replace the onSubscribe function or just perform + * extra logging, metrics and other such things and pass-thru the function. + * + * @param onSubscribe + * original {@link OnSubscribe}<{@code T}> to be executed + * @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or + * just returned as a pass-thru. + */ + public OnSubscribe onCreate(OnSubscribe f) { + return f; + } + /** * Invoked before {@link Observable#subscribe(rx.Subscriber)} is about to be executed. *

- * This can be used to decorate or replace the onSubscribe function or just perform extra logging, metrics and other such things and pass-thru the function. + * This can be used to decorate or replace the onSubscribe function or just perform + * extra logging, metrics and other such things and pass-thru the function. * - * @param observableInstance - * The executing {@link Observable} instance. * @param onSubscribe - * original {@link Func1}<{@link Subscriber}{@code }, {@link Subscription}> to be executed - * @return {@link Func1}<{@link Subscriber}{@code }, {@link Subscription}> function that can be modified, decorated, replaced or just returned as a pass-thru. + * original {@link OnSubscribe}<{@code T}> to be executed + * @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or + * just returned as a pass-thru. */ - public OnSubscribe onSubscribeStart(Observable observableInstance, final OnSubscribe onSubscribe) { + public OnSubscribe onSubscribeStart(Observable observableInsance, final OnSubscribe onSubscribe) { // pass-thru by default return onSubscribe; } /** - * Invoked after successful execution of {@link Observable#subscribe(rx.Subscriber)} with returned {@link Subscription}. + * Invoked after successful execution of {@link Observable#subscribe(rx.Subscriber)} with + * returned {@link Subscription}. *

- * This can be used to decorate or replace the {@link Subscription} instance or just perform extra logging, metrics and other such things and pass-thru the subscription. + * This can be used to decorate or replace the {@link Subscription} instance or just perform + * extra logging, metrics and other such things and pass-thru the subscription. * - * @param observableInstance - * The executing {@link Observable} instance. * @param subscription * original {@link Subscription} - * @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a pass-thru. + * @return {@link Subscription} subscription that can be modified, decorated, replaced or just + * returned as a pass-thru. */ - public Subscription onSubscribeReturn(Observable observableInstance, Subscription subscription) { + public Subscription onSubscribeReturn(Subscription subscription) { // pass-thru by default return subscription; } /** - * Invoked after failed execution of {@link Observable#subscribe(Subscriber)} with thrown Throwable. + * Invoked after failed execution of {@link Observable#subscribe(Subscriber)} with thrown + * Throwable. *

- * This is NOT errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown when attempting - * to subscribe to a {@link Func1}<{@link Subscriber}{@code }, {@link Subscription}>. + * This is NOT errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown + * when attempting to subscribe to a {@link Func1}<{@link Subscriber}{@code }, + * {@link Subscription}>. * - * @param observableInstance - * The executing {@link Observable} instance. * @param e * Throwable thrown by {@link Observable#subscribe(Subscriber)} * @return Throwable that can be decorated, replaced or just returned as a pass-thru. */ - public Throwable onSubscribeError(Observable observableInstance, Throwable e) { + public Throwable onSubscribeError(Throwable e) { // pass-thru by default return e; } - public OnSubscribe onCreate(OnSubscribe f) { - return f; - } - - public Operator onLift(final Operator bind) { - return bind; - } - - public Subscription onAdd(Subscriber subscriber, Subscription s) { - return s; + /** + * Invoked just as the operator functions is called to bind two operations together into a new + * {@link Observable} and the return value is used as the lifted function + *

+ * This can be used to decorate or replace the {@link Operator} instance or just perform extra + * logging, metrics and other such things and pass-thru the onSubscribe. + * + * @param lift + * original {@link Operator}{@code} ' * @return {@link Operator}{@code } + * function that can be modified, decorated, replaced or + * just returned as a pass-thru. + */ + public Operator onLift(final Operator lift) { + return lift; } }