diff --git a/pmd.xml b/pmd.xml index e6fbccdd95..339af7d5ac 100644 --- a/pmd.xml +++ b/pmd.xml @@ -71,7 +71,6 @@ - @@ -134,9 +133,6 @@ - - - @@ -160,7 +156,6 @@ - @@ -183,8 +178,6 @@ - - diff --git a/src/main/java/rx/BackpressureOverflow.java b/src/main/java/rx/BackpressureOverflow.java index 761a2c4a80..ebd98e1b82 100644 --- a/src/main/java/rx/BackpressureOverflow.java +++ b/src/main/java/rx/BackpressureOverflow.java @@ -24,6 +24,30 @@ @Experimental public final class BackpressureOverflow { + /** + * Signal a MissingBackressureException due to lack of requests. + */ + public static final BackpressureOverflow.Strategy ON_OVERFLOW_ERROR = Error.INSTANCE; + + /** + * By default, signal a MissingBackressureException due to lack of requests. + */ + public static final BackpressureOverflow.Strategy ON_OVERFLOW_DEFAULT = ON_OVERFLOW_ERROR; + + /** + * Drop the oldest value in the buffer. + */ + public static final BackpressureOverflow.Strategy ON_OVERFLOW_DROP_OLDEST = DropOldest.INSTANCE; + + /** + * Drop the latest value. + */ + public static final BackpressureOverflow.Strategy ON_OVERFLOW_DROP_LATEST = DropLatest.INSTANCE; + + /** + * Represents a callback called when a value is about to be dropped + * due to lack of downstream requests. + */ public interface Strategy { /** @@ -36,14 +60,6 @@ public interface Strategy { boolean mayAttemptDrop() throws MissingBackpressureException; } - public static final BackpressureOverflow.Strategy ON_OVERFLOW_DEFAULT = Error.INSTANCE; - - public static final BackpressureOverflow.Strategy ON_OVERFLOW_ERROR = Error.INSTANCE; - - public static final BackpressureOverflow.Strategy ON_OVERFLOW_DROP_OLDEST = DropOldest.INSTANCE; - - public static final BackpressureOverflow.Strategy ON_OVERFLOW_DROP_LATEST = DropLatest.INSTANCE; - /** * Drop oldest items from the buffer making room for newer ones. */ diff --git a/src/main/java/rx/Completable.java b/src/main/java/rx/Completable.java index e5f7c4a068..c4560cbede 100644 --- a/src/main/java/rx/Completable.java +++ b/src/main/java/rx/Completable.java @@ -38,6 +38,8 @@ */ @Experimental public class Completable { + /** The actual subscription action. */ + private final CompletableOnSubscribe onSubscribe; /** * Callback used for building deferred computations that takes a CompletableSubscriber. */ @@ -192,6 +194,22 @@ public void call(final CompletableSubscriber s) { final CompositeSubscription set = new CompositeSubscription(); s.onSubscribe(set); + Iterator extends Completable> it; + + try { + it = sources.iterator(); + } catch (Throwable e) { + s.onError(e); + return; + } + + if (it == null) { + s.onError(new NullPointerException("The iterator returned is null")); + return; + } + + boolean empty = true; + final AtomicBoolean once = new AtomicBoolean(); CompletableSubscriber inner = new CompletableSubscriber() { @@ -220,22 +238,6 @@ public void onSubscribe(Subscription d) { }; - Iterator extends Completable> it; - - try { - it = sources.iterator(); - } catch (Throwable e) { - s.onError(e); - return; - } - - if (it == null) { - s.onError(new NullPointerException("The iterator returned is null")); - return; - } - - boolean empty = true; - for (;;) { if (once.get() || set.isUnsubscribed()) { return; @@ -377,7 +379,7 @@ public static Completable create(CompletableOnSubscribe onSubscribe) { requireNonNull(onSubscribe); try { return new Completable(onSubscribe); - } catch (NullPointerException ex) { + } catch (NullPointerException ex) { // NOPMD throw ex; } catch (Throwable ex) { RxJavaHooks.onError(ex); @@ -849,7 +851,7 @@ public static Completable using(final Func0 resourceFunc0, return create(new CompletableOnSubscribe() { @Override public void call(final CompletableSubscriber s) { - final R resource; + final R resource; // NOPMD try { resource = resourceFunc0.call(); @@ -965,9 +967,6 @@ public void call() { }); } - /** The actual subscription action. */ - private final CompletableOnSubscribe onSubscribe; - /** * Constructs a Completable instance with the given onSubscribe callback. * @param onSubscribe the callback that will receive CompletableSubscribers when they subscribe, @@ -1550,7 +1549,7 @@ public void call(CompletableSubscriber s) { CompletableSubscriber sw = onLiftDecorated.call(s); unsafeSubscribe(sw); - } catch (NullPointerException ex) { + } catch (NullPointerException ex) { // NOPMD throw ex; } catch (Throwable ex) { throw toNpe(ex); @@ -2008,7 +2007,7 @@ public final void unsafeSubscribe(CompletableSubscriber s) { CompletableOnSubscribe onSubscribeDecorated = RxJavaHooks.onCompletableStart(this, this.onSubscribe); onSubscribeDecorated.call(s); - } catch (NullPointerException ex) { + } catch (NullPointerException ex) { // NOPMD throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); @@ -2072,7 +2071,7 @@ public void onSubscribe(Subscription d) { } }); RxJavaHooks.onObservableReturn(s); - } catch (NullPointerException ex) { + } catch (NullPointerException ex) { // NOPMD throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); diff --git a/src/main/java/rx/Notification.java b/src/main/java/rx/Notification.java index 71fb95c745..f1cfdab4ef 100644 --- a/src/main/java/rx/Notification.java +++ b/src/main/java/rx/Notification.java @@ -182,22 +182,27 @@ public enum Kind { @Override public String toString() { - StringBuilder str = new StringBuilder("[").append(super.toString()).append(" ").append(getKind()); - if (hasValue()) - str.append(" ").append(getValue()); - if (hasThrowable()) - str.append(" ").append(getThrowable().getMessage()); - str.append("]"); + StringBuilder str = new StringBuilder(64).append('[').append(super.toString()) + .append(' ').append(getKind()); + if (hasValue()) { + str.append(' ').append(getValue()); + } + if (hasThrowable()) { + str.append(' ').append(getThrowable().getMessage()); + } + str.append(']'); return str.toString(); } @Override public int hashCode() { int hash = getKind().hashCode(); - if (hasValue()) + if (hasValue()) { hash = hash * 31 + getValue().hashCode(); - if (hasThrowable()) + } + if (hasThrowable()) { hash = hash * 31 + getThrowable().hashCode(); + } return hash; } @@ -224,10 +229,6 @@ public boolean equals(Object obj) { return false; } - if (!(throwable == notification.throwable || (throwable != null && throwable.equals(notification.throwable)))) { - return false; - } - - return true; + return (throwable == notification.throwable || (throwable != null && throwable.equals(notification.throwable))); } } diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 4507c4480f..4599edb5f4 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -9015,7 +9015,7 @@ public final Subscription unsafeSubscribe(Subscriber super T> subscriber) { // TODO could the hook be the cause of the error in the on error handling. RxJavaHooks.onObservableError(r); // TODO why aren't we throwing the hook's return value. - throw r; + throw r; // NOPMD } return Subscriptions.unsubscribed(); } @@ -9112,7 +9112,7 @@ static Subscription subscribe(Subscriber super T> subscriber, Observable takeFirst(Func1 super T, Boolean> predicate) { * @see ReactiveX operators documentation: TakeLast */ public final Observable takeLast(final int count) { - if (count == 0) + if (count == 0) { return ignoreElements(); - else if (count == 1 ) + } else if (count == 1) { return lift(OperatorTakeLastOne.instance()); - else + } else { return lift(new OperatorTakeLast(count)); + } } /** diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index 183f869fd0..c4c4e71881 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -619,7 +619,7 @@ public static Single fromCallable(final Callable extends T> func) { return create(new OnSubscribe() { @Override public void call(SingleSubscriber super T> singleSubscriber) { - final T value; + T value; try { value = func.call(); @@ -1719,7 +1719,7 @@ public final Subscription unsafeSubscribe(Subscriber super T> subscriber) { // TODO could the hook be the cause of the error in the on error handling. RxJavaHooks.onSingleError(r); // TODO why aren't we throwing the hook's return value. - throw r; + throw r; // NOPMD } return Subscriptions.unsubscribed(); } @@ -1829,7 +1829,7 @@ public final Subscription subscribe(Subscriber super T> subscriber) { // TODO could the hook be the cause of the error in the on error handling. RxJavaHooks.onSingleError(r); // TODO why aren't we throwing the hook's return value. - throw r; + throw r; // NOPMD } return Subscriptions.empty(); } @@ -1875,7 +1875,7 @@ public final Subscription subscribe(final SingleSubscriber super T> te) { @Override public void onCompleted() { - + // deliberately ignored } @Override @@ -2376,6 +2376,7 @@ public final Single doOnError(final Action1 onError) { Observer observer = new Observer() { @Override public void onCompleted() { + // deliberately ignored } @Override @@ -2385,6 +2386,7 @@ public void onError(Throwable e) { @Override public void onNext(T t) { + // deliberately ignored } }; @@ -2410,10 +2412,12 @@ public final Single doOnSuccess(final Action1 super T> onSuccess) { Observer observer = new Observer() { @Override public void onCompleted() { + // deliberately ignored } @Override public void onError(Throwable e) { + // deliberately ignored } @Override @@ -2589,7 +2593,7 @@ public final Single doAfterTerminate(Action0 action) { */ @SuppressWarnings("unchecked") static Single extends T>[] iterableToArray(final Iterable extends Single extends T>> singlesIterable) { - final Single extends T>[] singlesArray; + Single extends T>[] singlesArray; int count; if (singlesIterable instanceof Collection) { diff --git a/src/main/java/rx/exceptions/AssemblyStackTraceException.java b/src/main/java/rx/exceptions/AssemblyStackTraceException.java index ee6a8be6a9..752e8aad87 100644 --- a/src/main/java/rx/exceptions/AssemblyStackTraceException.java +++ b/src/main/java/rx/exceptions/AssemblyStackTraceException.java @@ -46,7 +46,7 @@ public AssemblyStackTraceException(String message) { } @Override - public synchronized Throwable fillInStackTrace() { + public synchronized Throwable fillInStackTrace() { // NOPMD return this; } } diff --git a/src/main/java/rx/exceptions/CompositeException.java b/src/main/java/rx/exceptions/CompositeException.java index 310cfab8ae..d4af4c5350 100644 --- a/src/main/java/rx/exceptions/CompositeException.java +++ b/src/main/java/rx/exceptions/CompositeException.java @@ -41,15 +41,17 @@ public final class CompositeException extends RuntimeException { private final List exceptions; private final String message; + private Throwable cause; + /** * Constructs a CompositeException with the given prefix and error collection. * @param messagePrefix the prefix to use (actually unused) * @param errors the collection of errors * @deprecated please use {@link #CompositeException(Collection)} */ @Deprecated - public CompositeException(String messagePrefix, Collection extends Throwable> errors) { + public CompositeException(String messagePrefix, Collection extends Throwable> errors) { // NOPMD Set deDupedExceptions = new LinkedHashSet(); - List _exceptions = new ArrayList(); + List localExceptions = new ArrayList(); if (errors != null) { for (Throwable ex : errors) { if (ex instanceof CompositeException) { @@ -65,8 +67,8 @@ public CompositeException(String messagePrefix, Collection extends Throwable> deDupedExceptions.add(new NullPointerException()); } - _exceptions.addAll(deDupedExceptions); - this.exceptions = Collections.unmodifiableList(_exceptions); + localExceptions.addAll(deDupedExceptions); + this.exceptions = Collections.unmodifiableList(localExceptions); this.message = exceptions.size() + " exceptions occurred. "; } @@ -81,7 +83,7 @@ public CompositeException(Collection extends Throwable> errors) { @Experimental public CompositeException(Throwable... errors) { Set deDupedExceptions = new LinkedHashSet(); - List _exceptions = new ArrayList(); + List localExceptions = new ArrayList(); if (errors != null) { for (Throwable ex : errors) { if (ex instanceof CompositeException) { @@ -97,8 +99,8 @@ public CompositeException(Throwable... errors) { deDupedExceptions.add(new NullPointerException()); } - _exceptions.addAll(deDupedExceptions); - this.exceptions = Collections.unmodifiableList(_exceptions); + localExceptions.addAll(deDupedExceptions); + this.exceptions = Collections.unmodifiableList(localExceptions); this.message = exceptions.size() + " exceptions occurred. "; } @@ -116,16 +118,14 @@ public String getMessage() { return message; } - private Throwable cause = null; - @Override - public synchronized Throwable getCause() { + public synchronized Throwable getCause() { // NOPMD if (cause == null) { // we lazily generate this causal chain if this is called - CompositeExceptionCausalChain _cause = new CompositeExceptionCausalChain(); + CompositeExceptionCausalChain localCause = new CompositeExceptionCausalChain(); Set seenCauses = new HashSet(); - Throwable chain = _cause; + Throwable chain = localCause; for (Throwable e : exceptions) { if (seenCauses.contains(e)) { // already seen this outer Throwable so skip @@ -147,14 +147,14 @@ public synchronized Throwable getCause() { // we now have 'e' as the last in the chain try { chain.initCause(e); - } catch (Throwable t) { + } catch (Throwable t) { // NOPMD // ignore // the javadocs say that some Throwables (depending on how they're made) will never // let me call initCause without blowing up even if it returns null } chain = getRootCause(chain); } - cause = _cause; + cause = localCause; } return cause; } @@ -192,14 +192,14 @@ public void printStackTrace(PrintWriter s) { * stream to print to */ private void printStackTrace(PrintStreamOrWriter s) { - StringBuilder bldr = new StringBuilder(); - bldr.append(this).append("\n"); + StringBuilder bldr = new StringBuilder(128); + bldr.append(this).append('\n'); for (StackTraceElement myStackElement : getStackTrace()) { - bldr.append("\tat ").append(myStackElement).append("\n"); + bldr.append("\tat ").append(myStackElement).append('\n'); } int i = 1; for (Throwable ex : exceptions) { - bldr.append(" ComposedException ").append(i).append(" :").append("\n"); + bldr.append(" ComposedException ").append(i).append(" :\n"); appendStackTrace(bldr, ex, "\t"); i++; } @@ -209,9 +209,9 @@ private void printStackTrace(PrintStreamOrWriter s) { } private void appendStackTrace(StringBuilder bldr, Throwable ex, String prefix) { - bldr.append(prefix).append(ex).append("\n"); + bldr.append(prefix).append(ex).append('\n'); for (StackTraceElement stackElement : ex.getStackTrace()) { - bldr.append("\t\tat ").append(stackElement).append("\n"); + bldr.append("\t\tat ").append(stackElement).append('\n'); } if (ex.getCause() != null) { bldr.append("\tCaused by: "); @@ -219,7 +219,7 @@ private void appendStackTrace(StringBuilder bldr, Throwable ex, String prefix) { } } - private abstract static class PrintStreamOrWriter { + abstract static class PrintStreamOrWriter { /** Returns the object to be locked when using this StreamOrWriter */ abstract Object lock(); @@ -230,7 +230,7 @@ private abstract static class PrintStreamOrWriter { /** * Same abstraction and implementation as in JDK to allow PrintStream and PrintWriter to share implementation */ - private static class WrappedPrintStream extends PrintStreamOrWriter { + static final class WrappedPrintStream extends PrintStreamOrWriter { private final PrintStream printStream; WrappedPrintStream(PrintStream printStream) { @@ -248,7 +248,7 @@ void println(Object o) { } } - private static class WrappedPrintWriter extends PrintStreamOrWriter { + static final class WrappedPrintWriter extends PrintStreamOrWriter { private final PrintWriter printWriter; WrappedPrintWriter(PrintWriter printWriter) { @@ -268,7 +268,7 @@ void println(Object o) { /* package-private */final static class CompositeExceptionCausalChain extends RuntimeException { private static final long serialVersionUID = 3875212506787802066L; - /* package-private */static String MESSAGE = "Chain of Causes for CompositeException In Order Received =>"; + /* package-private */static final String MESSAGE = "Chain of Causes for CompositeException In Order Received =>"; @Override public String getMessage() { diff --git a/src/main/java/rx/exceptions/Exceptions.java b/src/main/java/rx/exceptions/Exceptions.java index f427018f53..1486012d2d 100644 --- a/src/main/java/rx/exceptions/Exceptions.java +++ b/src/main/java/rx/exceptions/Exceptions.java @@ -26,6 +26,9 @@ * manage fatal and regular exception delivery. */ public final class Exceptions { + + private static final int MAX_DEPTH = 25; + /** Utility class, no instances. */ private Exceptions() { throw new IllegalStateException("No instances!"); @@ -52,7 +55,7 @@ public static RuntimeException propagate(Throwable t) { } else if (t instanceof Error) { throw (Error) t; } else { - throw new RuntimeException(t); + throw new RuntimeException(t); // NOPMD } } /** @@ -96,8 +99,6 @@ else if (t instanceof StackOverflowError) { } } - private static final int MAX_DEPTH = 25; - /** * Adds a {@code Throwable} to a causality-chain of Throwables, as an additional cause (if it does not * already appear in the chain among the causes). @@ -126,7 +127,7 @@ public static void addCause(Throwable e, Throwable cause) { // we now have 'e' as the last in the chain try { e.initCause(cause); - } catch (Throwable t) { + } catch (Throwable t) { // NOPMD // ignore // the javadocs say that some Throwables (depending on how they're made) will never // let me call initCause without blowing up even if it returns null @@ -171,7 +172,7 @@ public static void throwIfAny(List extends Throwable> exceptions) { } else if (t instanceof Error) { throw (Error) t; } else { - throw new RuntimeException(t); + throw new RuntimeException(t); // NOPMD } } throw new CompositeException(exceptions); diff --git a/src/main/java/rx/exceptions/MissingBackpressureException.java b/src/main/java/rx/exceptions/MissingBackpressureException.java index b113d6536c..3c11bc42d4 100644 --- a/src/main/java/rx/exceptions/MissingBackpressureException.java +++ b/src/main/java/rx/exceptions/MissingBackpressureException.java @@ -52,7 +52,7 @@ public class MissingBackpressureException extends Exception { * Constructs the exception without any custom message. */ public MissingBackpressureException() { - + super(); } /** diff --git a/src/main/java/rx/exceptions/OnErrorThrowable.java b/src/main/java/rx/exceptions/OnErrorThrowable.java index 2ef465141b..5f4ab73e8a 100644 --- a/src/main/java/rx/exceptions/OnErrorThrowable.java +++ b/src/main/java/rx/exceptions/OnErrorThrowable.java @@ -99,7 +99,7 @@ public static Throwable addValueAsLastCause(Throwable e, Object value) { e = new NullPointerException(); } Throwable lastCause = Exceptions.getFinalCause(e); - if (lastCause != null && lastCause instanceof OnNextValue) { + if (lastCause instanceof OnNextValue) { // purposefully using == for object reference check if (((OnNextValue) lastCause).getValue() == value) { // don't add another @@ -117,9 +117,11 @@ public static Throwable addValueAsLastCause(Throwable e, Object value) { public static class OnNextValue extends RuntimeException { private static final long serialVersionUID = -3454462756050397899L; + + private final Object value; // Lazy loaded singleton - private static final class Primitives { + static final class Primitives { static final Set> INSTANCE = create(); @@ -139,8 +141,6 @@ private static Set> create() { } } - private final Object value; - /** * Create an {@code OnNextValue} exception and include in its error message a string representation of * the item that was intended to be emitted at the time the exception was handled. diff --git a/src/main/java/rx/functions/Actions.java b/src/main/java/rx/functions/Actions.java index 877c1adbda..d204d16ebc 100644 --- a/src/main/java/rx/functions/Actions.java +++ b/src/main/java/rx/functions/Actions.java @@ -19,6 +19,9 @@ * Utility class for the Action interfaces. */ public final class Actions { + @SuppressWarnings("rawtypes") + private static final EmptyAction EMPTY_ACTION = new EmptyAction(); + private Actions() { throw new IllegalStateException("No instances!"); } @@ -28,10 +31,7 @@ public static EmptyAction implements + static final class EmptyAction implements Action0, Action1, Action2, @@ -43,51 +43,60 @@ private static final class EmptyAction imple Action8, Action9, ActionN { - EmptyAction() { - } @Override public void call() { + // deliberately no op } @Override public void call(T0 t1) { + // deliberately no op } @Override public void call(T0 t1, T1 t2) { + // deliberately no op } @Override public void call(T0 t1, T1 t2, T2 t3) { + // deliberately no op } @Override public void call(T0 t1, T1 t2, T2 t3, T3 t4) { + // deliberately no op } @Override public void call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5) { + // deliberately no op } @Override public void call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5, T5 t6) { + // deliberately no op } @Override public void call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5, T5 t6, T6 t7) { + // deliberately no op } @Override public void call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5, T5 t6, T6 t7, T7 t8) { + // deliberately no op } @Override public void call(T0 t1, T1 t2, T2 t3, T3 t4, T4 t5, T5 t6, T6 t7, T7 t8, T8 t9) { + // deliberately no op } @Override public void call(Object... args) { + // deliberately no op } } diff --git a/src/main/java/rx/functions/Functions.java b/src/main/java/rx/functions/Functions.java index b22df29204..24036ec498 100644 --- a/src/main/java/rx/functions/Functions.java +++ b/src/main/java/rx/functions/Functions.java @@ -34,7 +34,7 @@ public static FuncN fromFunc(final Func0 extends R> f) { @Override public R call(Object... args) { if (args.length != 0) { - throw new RuntimeException("Func0 expecting 0 arguments."); + throw new IllegalArgumentException("Func0 expecting 0 arguments."); } return f.call(); } @@ -58,7 +58,7 @@ public static FuncN fromFunc(final Func1 super T0, ? extends R> f) @Override public R call(Object... args) { if (args.length != 1) { - throw new RuntimeException("Func1 expecting 1 argument."); + throw new IllegalArgumentException("Func1 expecting 1 argument."); } return f.call((T0) args[0]); } @@ -83,7 +83,7 @@ public static FuncN fromFunc(final Func2 super T0, ? super T1, @Override public R call(Object... args) { if (args.length != 2) { - throw new RuntimeException("Func2 expecting 2 arguments."); + throw new IllegalArgumentException("Func2 expecting 2 arguments."); } return f.call((T0) args[0], (T1) args[1]); } @@ -109,7 +109,7 @@ public static FuncN fromFunc(final Func3 super T0, ? super @Override public R call(Object... args) { if (args.length != 3) { - throw new RuntimeException("Func3 expecting 3 arguments."); + throw new IllegalArgumentException("Func3 expecting 3 arguments."); } return f.call((T0) args[0], (T1) args[1], (T2) args[2]); } @@ -136,7 +136,7 @@ public static FuncN fromFunc(final Func4 super T0, ? su @Override public R call(Object... args) { if (args.length != 4) { - throw new RuntimeException("Func4 expecting 4 arguments."); + throw new IllegalArgumentException("Func4 expecting 4 arguments."); } return f.call((T0) args[0], (T1) args[1], (T2) args[2], (T3) args[3]); } @@ -164,7 +164,7 @@ public static FuncN fromFunc(final Func5 super T0, @Override public R call(Object... args) { if (args.length != 5) { - throw new RuntimeException("Func5 expecting 5 arguments."); + throw new IllegalArgumentException("Func5 expecting 5 arguments."); } return f.call((T0) args[0], (T1) args[1], (T2) args[2], (T3) args[3], (T4) args[4]); } @@ -193,7 +193,7 @@ public static FuncN fromFunc(final Func6 super @Override public R call(Object... args) { if (args.length != 6) { - throw new RuntimeException("Func6 expecting 6 arguments."); + throw new IllegalArgumentException("Func6 expecting 6 arguments."); } return f.call((T0) args[0], (T1) args[1], (T2) args[2], (T3) args[3], (T4) args[4], (T5) args[5]); } @@ -223,7 +223,7 @@ public static FuncN fromFunc(final Func7 su @Override public R call(Object... args) { if (args.length != 7) { - throw new RuntimeException("Func7 expecting 7 arguments."); + throw new IllegalArgumentException("Func7 expecting 7 arguments."); } return f.call((T0) args[0], (T1) args[1], (T2) args[2], (T3) args[3], (T4) args[4], (T5) args[5], (T6) args[6]); } @@ -254,7 +254,7 @@ public static FuncN fromFunc(final Func8< @Override public R call(Object... args) { if (args.length != 8) { - throw new RuntimeException("Func8 expecting 8 arguments."); + throw new IllegalArgumentException("Func8 expecting 8 arguments."); } return f.call((T0) args[0], (T1) args[1], (T2) args[2], (T3) args[3], (T4) args[4], (T5) args[5], (T6) args[6], (T7) args[7]); } @@ -286,7 +286,7 @@ public static FuncN fromFunc(final Fu @Override public R call(Object... args) { if (args.length != 9) { - throw new RuntimeException("Func9 expecting 9 arguments."); + throw new IllegalArgumentException("Func9 expecting 9 arguments."); } return f.call((T0) args[0], (T1) args[1], (T2) args[2], (T3) args[3], (T4) args[4], (T5) args[5], (T6) args[6], (T7) args[7], (T8) args[8]); } @@ -307,7 +307,7 @@ public static FuncN fromAction(final Action0 f) { @Override public Void call(Object... args) { if (args.length != 0) { - throw new RuntimeException("Action0 expecting 0 arguments."); + throw new IllegalArgumentException("Action0 expecting 0 arguments."); } f.call(); return null; @@ -331,7 +331,7 @@ public static FuncN fromAction(final Action1 super T0> f) { @Override public Void call(Object... args) { if (args.length != 1) { - throw new RuntimeException("Action1 expecting 1 argument."); + throw new IllegalArgumentException("Action1 expecting 1 argument."); } f.call((T0) args[0]); return null; @@ -356,7 +356,7 @@ public static FuncN fromAction(final Action2 super T0, ? super @Override public Void call(Object... args) { if (args.length != 2) { - throw new RuntimeException("Action3 expecting 2 arguments."); + throw new IllegalArgumentException("Action3 expecting 2 arguments."); } f.call((T0) args[0], (T1) args[1]); return null; @@ -382,7 +382,7 @@ public static FuncN fromAction(final Action3 super T0, ? su @Override public Void call(Object... args) { if (args.length != 3) { - throw new RuntimeException("Action3 expecting 3 arguments."); + throw new IllegalArgumentException("Action3 expecting 3 arguments."); } f.call((T0) args[0], (T1) args[1], (T2) args[2]); return null; diff --git a/src/main/java/rx/internal/operators/BackpressureUtils.java b/src/main/java/rx/internal/operators/BackpressureUtils.java index 6a13709649..5d63ab7c27 100644 --- a/src/main/java/rx/internal/operators/BackpressureUtils.java +++ b/src/main/java/rx/internal/operators/BackpressureUtils.java @@ -27,6 +27,15 @@ * */ public final class BackpressureUtils { + /** + * Masks the most significant bit, i.e., 0x8000_0000_0000_0000L. + */ + static final long COMPLETED_MASK = Long.MIN_VALUE; + /** + * Masks the request amount bits, i.e., 0x7FFF_FFFF_FFFF_FFFF. + */ + static final long REQUESTED_MASK = Long.MAX_VALUE; + /** Utility class, no instances. */ private BackpressureUtils() { throw new IllegalStateException("No instances!"); @@ -111,15 +120,6 @@ public static long addCap(long a, long b) { return u; } - /** - * Masks the most significant bit, i.e., 0x8000_0000_0000_0000L. - */ - static final long COMPLETED_MASK = Long.MIN_VALUE; - /** - * Masks the request amount bits, i.e., 0x7FFF_FFFF_FFFF_FFFF. - */ - static final long REQUESTED_MASK = Long.MAX_VALUE; - /** * Signals the completion of the main sequence and switches to post-completion replay mode. * diff --git a/src/main/java/rx/internal/operators/BlockingOperatorLatest.java b/src/main/java/rx/internal/operators/BlockingOperatorLatest.java index 2a75eceb43..a07588007b 100644 --- a/src/main/java/rx/internal/operators/BlockingOperatorLatest.java +++ b/src/main/java/rx/internal/operators/BlockingOperatorLatest.java @@ -62,6 +62,8 @@ static final class LatestObserverIterator extends Subscriber> value = new AtomicReference>(); + // iterator's notification + Notification extends T> iNotif; @Override public void onNext(Notification extends T> args) { @@ -81,9 +83,6 @@ public void onCompleted() { // not expected } - // iterator's notification - Notification extends T> iNotif; - @Override public boolean hasNext() { if (iNotif != null && iNotif.isOnError()) { diff --git a/src/main/java/rx/internal/operators/BlockingOperatorMostRecent.java b/src/main/java/rx/internal/operators/BlockingOperatorMostRecent.java index 3a55cf0cb7..507f1ee708 100644 --- a/src/main/java/rx/internal/operators/BlockingOperatorMostRecent.java +++ b/src/main/java/rx/internal/operators/BlockingOperatorMostRecent.java @@ -62,7 +62,7 @@ public Iterator iterator() { }; } - private static final class MostRecentObserver extends Subscriber { + static final class MostRecentObserver extends Subscriber { final NotificationLite nl = NotificationLite.instance(); volatile Object value; @@ -95,7 +95,7 @@ public Iterator getIterable() { /** * buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next(). */ - private Object buf = null; + private Object buf; @Override public boolean hasNext() { @@ -107,10 +107,12 @@ public boolean hasNext() { public T next() { try { // if hasNext wasn't called before calling next. - if (buf == null) + if (buf == null) { buf = value; - if (nl.isCompleted(buf)) + } + if (nl.isCompleted(buf)) { throw new NoSuchElementException(); + } if (nl.isError(buf)) { throw Exceptions.propagate(nl.getError(buf)); } diff --git a/src/main/java/rx/internal/operators/BlockingOperatorNext.java b/src/main/java/rx/internal/operators/BlockingOperatorNext.java index 94820b4acf..236744ac31 100644 --- a/src/main/java/rx/internal/operators/BlockingOperatorNext.java +++ b/src/main/java/rx/internal/operators/BlockingOperatorNext.java @@ -64,8 +64,8 @@ public Iterator iterator() { private T next; private boolean hasNext = true; private boolean isNextConsumed = true; - private Throwable error = null; - private boolean started = false; + private Throwable error; + private boolean started; NextIterator(Observable extends T> items, NextObserver observer) { this.items = items; @@ -122,7 +122,7 @@ private boolean moveToNext() { observer.unsubscribe(); Thread.currentThread().interrupt(); error = e; - throw Exceptions.propagate(error); + throw Exceptions.propagate(e); } } @@ -147,13 +147,10 @@ public void remove() { } } - private static class NextObserver extends Subscriber> { + static final class NextObserver extends Subscriber> { private final BlockingQueue> buf = new ArrayBlockingQueue>(1); final AtomicInteger waiting = new AtomicInteger(); - NextObserver() { - } - @Override public void onCompleted() { // ignore diff --git a/src/main/java/rx/internal/operators/BlockingOperatorToFuture.java b/src/main/java/rx/internal/operators/BlockingOperatorToFuture.java index 20a2377472..ee43643fce 100644 --- a/src/main/java/rx/internal/operators/BlockingOperatorToFuture.java +++ b/src/main/java/rx/internal/operators/BlockingOperatorToFuture.java @@ -77,7 +77,7 @@ public void onNext(T v) { return new Future() { - private volatile boolean cancelled = false; + private volatile boolean cancelled; @Override public boolean cancel(boolean mayInterruptIfRunning) { diff --git a/src/main/java/rx/internal/operators/BufferUntilSubscriber.java b/src/main/java/rx/internal/operators/BufferUntilSubscriber.java index 2fcf60cf45..f97198ee28 100644 --- a/src/main/java/rx/internal/operators/BufferUntilSubscriber.java +++ b/src/main/java/rx/internal/operators/BufferUntilSubscriber.java @@ -48,6 +48,9 @@ * the type of the items to be buffered */ public final class BufferUntilSubscriber extends Subject { + final State state; + + private boolean forward; /** * Creates a default, unbounded buffering Subject instance. @@ -63,16 +66,17 @@ public static BufferUntilSubscriber create() { static final class State extends AtomicReference> { /** */ private static final long serialVersionUID = 8026705089538090368L; - boolean casObserverRef(Observer super T> expected, Observer super T> next) { - return compareAndSet(expected, next); - } final Object guard = new Object(); /* protected by guard */ - boolean emitting = false; + boolean emitting; final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue(); final NotificationLite nl = NotificationLite.instance(); + + boolean casObserverRef(Observer super T> expected, Observer super T> next) { + return compareAndSet(expected, next); + } } static final class OnSubscribeAction implements OnSubscribe { @@ -123,9 +127,6 @@ public void call() { } } - final State state; - - private boolean forward = false; private BufferUntilSubscriber(State state) { super(new OnSubscribeAction(state)); @@ -194,17 +195,17 @@ public boolean hasObservers() { @Override public void onCompleted() { - + // deliberately no op } @Override public void onError(Throwable e) { - + // deliberately no op } @Override public void onNext(Object t) { - + // deliberately no op } }; diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorIterable.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorIterable.java index 9d2567135e..55583c85ad 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorIterable.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorIterable.java @@ -34,9 +34,6 @@ public CompletableOnSubscribeMergeDelayErrorIterable(Iterable extends Completa @Override public void call(final CompletableSubscriber s) { final CompositeSubscription set = new CompositeSubscription(); - final AtomicInteger wip = new AtomicInteger(1); - - final Queue queue = new MpscLinkedQueue(); s.onSubscribe(set); @@ -53,7 +50,11 @@ public void call(final CompletableSubscriber s) { s.onError(new NullPointerException("The source iterator returned is null")); return; } + + final AtomicInteger wip = new AtomicInteger(1); + final Queue queue = new MpscLinkedQueue(); + for (;;) { if (set.isUnsubscribed()) { return; diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java index fed111a6c7..9d09393bbe 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java @@ -34,9 +34,7 @@ public CompletableOnSubscribeMergeIterable(Iterable extends Completable> sourc @Override public void call(final CompletableSubscriber s) { final CompositeSubscription set = new CompositeSubscription(); - final AtomicInteger wip = new AtomicInteger(1); - final AtomicBoolean once = new AtomicBoolean(); - + s.onSubscribe(set); Iterator extends Completable> iterator; @@ -53,6 +51,9 @@ public void call(final CompletableSubscriber s) { return; } + final AtomicInteger wip = new AtomicInteger(1); + final AtomicBoolean once = new AtomicBoolean(); + for (;;) { if (set.isUnsubscribed()) { return; diff --git a/src/main/java/rx/internal/operators/EmptyObservableHolder.java b/src/main/java/rx/internal/operators/EmptyObservableHolder.java index 00f4e16f68..c9678db92d 100644 --- a/src/main/java/rx/internal/operators/EmptyObservableHolder.java +++ b/src/main/java/rx/internal/operators/EmptyObservableHolder.java @@ -26,7 +26,11 @@ public enum EmptyObservableHolder implements OnSubscribe { INSTANCE ; + + /** The singleton instance. */ + static final Observable EMPTY = Observable.create(INSTANCE); + /** * Returns a type-corrected singleton instance of the empty Observable. * @param the value type @@ -37,9 +41,6 @@ public static Observable instance() { return (Observable)EMPTY; } - /** The singleton instance. */ - static final Observable EMPTY = Observable.create(INSTANCE); - @Override public void call(Subscriber super Object> child) { child.onCompleted(); diff --git a/src/main/java/rx/internal/operators/NeverObservableHolder.java b/src/main/java/rx/internal/operators/NeverObservableHolder.java index 5114772872..4ffdc630d8 100644 --- a/src/main/java/rx/internal/operators/NeverObservableHolder.java +++ b/src/main/java/rx/internal/operators/NeverObservableHolder.java @@ -26,7 +26,10 @@ public enum NeverObservableHolder implements OnSubscribe { INSTANCE ; - + + /** The singleton instance. */ + static final Observable NEVER = Observable.create(INSTANCE); + /** * Returns a type-corrected singleton instance of the never Observable. * @param the value type @@ -37,10 +40,8 @@ public static Observable instance() { return (Observable)NEVER; } - /** The singleton instance. */ - static final Observable NEVER = Observable.create(INSTANCE); - @Override public void call(Subscriber super Object> child) { + // deliberately no op } } diff --git a/src/main/java/rx/internal/operators/NotificationLite.java b/src/main/java/rx/internal/operators/NotificationLite.java index 223d5299cd..7047553dc9 100644 --- a/src/main/java/rx/internal/operators/NotificationLite.java +++ b/src/main/java/rx/internal/operators/NotificationLite.java @@ -31,16 +31,16 @@ * * It's implemented as a singleton to maintain some semblance of type safety that is completely non-existent. * - * @param - * @warn type param undescribed + * @param the element type */ public final class NotificationLite { - private NotificationLite() { - } - @SuppressWarnings("rawtypes") private static final NotificationLite INSTANCE = new NotificationLite(); + private NotificationLite() { + // singleton + } + /** * Gets the {@code NotificationLite} singleton. * @@ -70,7 +70,7 @@ public String toString() { } }; - private static class OnErrorSentinel implements Serializable { + static final class OnErrorSentinel implements Serializable { private static final long serialVersionUID = 3; final Throwable e; @@ -93,10 +93,11 @@ public String toString() { * @return the item, or a null token representing the item if the item is {@code null} */ public Object next(T t) { - if (t == null) + if (t == null) { return ON_NEXT_NULL_SENTINEL; - else + } else { return t; + } } /** @@ -207,15 +208,16 @@ public boolean isNext(Object n) { * {@code Kind.OnError}, or {@code Kind.OnNext} */ public Kind kind(Object n) { - if (n == null) + if (n == null) { throw new IllegalArgumentException("The lite notification can not be null"); - else if (n == ON_COMPLETED_SENTINEL) + } else if (n == ON_COMPLETED_SENTINEL) { return Kind.OnCompleted; - else if (n instanceof OnErrorSentinel) + } else if (n instanceof OnErrorSentinel) { return Kind.OnError; - else + } else { // value or ON_NEXT_NULL_SENTINEL but either way it's an OnNext return Kind.OnNext; + } } /** diff --git a/src/main/java/rx/internal/operators/OnSubscribeAmb.java b/src/main/java/rx/internal/operators/OnSubscribeAmb.java index 3b39357b07..f10e73396c 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeAmb.java +++ b/src/main/java/rx/internal/operators/OnSubscribeAmb.java @@ -33,6 +33,9 @@ * @param the value type */ public final class OnSubscribeAmb implements OnSubscribe{ + //give default access instead of private as a micro-optimization + //for access from anonymous classes below + final Iterable extends Observable extends T>> sources; /** * Given two {@link Observable}s, propagates the one that first emits an item. @@ -275,7 +278,7 @@ public static OnSubscribe amb(final Iterable extends Observable exten return new OnSubscribeAmb(sources); } - private static final class AmbSubscriber extends Subscriber { + static final class AmbSubscriber extends Subscriber { private final Subscriber super T> subscriber; private final Selection selection; @@ -338,7 +341,7 @@ private boolean isSelected() { } } - private static class Selection { + static final class Selection { final AtomicReference> choice = new AtomicReference>(); final Collection> ambSubscribers = new ConcurrentLinkedQueue>(); @@ -360,10 +363,6 @@ public void unsubscribeOthers(AmbSubscriber notThis) { } - //give default access instead of private as a micro-optimization - //for access from anonymous classes below - final Iterable extends Observable extends T>> sources; - private OnSubscribeAmb(Iterable extends Observable extends T>> sources) { this.sources = sources; } @@ -419,7 +418,7 @@ public void call() { @Override public void request(long n) { - final AmbSubscriber c; + AmbSubscriber c; if ((c = choice.get()) != null) { // propagate the request to that single Subscriber that won c.requestMore(n); diff --git a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java index e9dac2d026..e7cb3a1251 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java +++ b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java @@ -245,10 +245,9 @@ void drain() { } long requestAmount = localRequested.get(); - boolean unbounded = requestAmount == Long.MAX_VALUE; long emitted = 0L; - while (requestAmount != 0L) { + while (emitted != requestAmount) { boolean d = done; @SuppressWarnings("unchecked") @@ -287,14 +286,11 @@ void drain() { cs.requestMore(1); - requestAmount--; - emitted--; + emitted++; } - if (emitted != 0L) { - if (!unbounded) { - localRequested.addAndGet(emitted); - } + if (emitted != 0L && requestAmount != Long.MAX_VALUE) { + BackpressureUtils.produced(localRequested, emitted); } missed = addAndGet(-missed); diff --git a/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java b/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java index 9ee3104f95..889cce2872 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java +++ b/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java @@ -44,7 +44,7 @@ public OnSubscribeFromIterable(Iterable extends T> iterable) { @Override public void call(final Subscriber super T> o) { - final Iterator extends T> it; + Iterator extends T> it; boolean b; try { diff --git a/src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java b/src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java index a8f13fb5e7..e677fb9bc3 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java +++ b/src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java @@ -38,11 +38,11 @@ * @param the result value type */ public final class OnSubscribeGroupJoin implements OnSubscribe { - protected final Observable left; - protected final Observable right; - protected final Func1 super T1, ? extends Observable> leftDuration; - protected final Func1 super T2, ? extends Observable> rightDuration; - protected final Func2 super T1, ? super Observable, ? extends R> resultSelector; + final Observable left; + final Observable right; + final Func1 super T1, ? extends Observable> leftDuration; + final Func1 super T2, ? extends Observable> rightDuration; + final Func2 super T1, ? super Observable, ? extends R> resultSelector; public OnSubscribeGroupJoin( Observable left, @@ -75,9 +75,9 @@ final class ResultManager implements Subscription { /** Guarded by guard. */ int rightIds; /** Guarded by guard. */ - final Map> leftMap = new HashMap>(); + final Map> leftMap = new HashMap>(); // NOPMD /** Guarded by guard. */ - final Map rightMap = new HashMap(); + final Map rightMap = new HashMap(); // NOPMD /** Guarded by guard. */ boolean leftDone; /** Guarded by guard. */ diff --git a/src/main/java/rx/internal/operators/OnSubscribeRange.java b/src/main/java/rx/internal/operators/OnSubscribeRange.java index 8f17303a2d..1a96c152c8 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRange.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRange.java @@ -38,7 +38,7 @@ public void call(final Subscriber super Integer> childSubscriber) { childSubscriber.setProducer(new RangeProducer(childSubscriber, startIndex, endIndex)); } - private static final class RangeProducer extends AtomicLong implements Producer { + static final class RangeProducer extends AtomicLong implements Producer { /** */ private static final long serialVersionUID = 4114392207069098388L; diff --git a/src/main/java/rx/internal/operators/OnSubscribeRedo.java b/src/main/java/rx/internal/operators/OnSubscribeRedo.java index d30ddc1343..c41b211558 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRedo.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRedo.java @@ -53,6 +53,11 @@ import rx.subscriptions.SerialSubscription; public final class OnSubscribeRedo implements OnSubscribe { + final Observable source; + private final Func1 super Observable extends Notification>>, ? extends Observable>> controlHandlerFunction; + final boolean stopOnComplete; + final boolean stopOnError; + private final Scheduler scheduler; static final Func1>, Observable>> REDO_INFINITE = new Func1>, Observable>>() { @Override @@ -77,7 +82,7 @@ public RedoFinite(long count) { public Observable> call(Observable extends Notification>> ts) { return ts.map(new Func1, Notification>>() { - int num=0; + int num; @Override public Notification> call(Notification> terminalNotification) { @@ -111,10 +116,11 @@ public Observable extends Notification>> call(Observable extends Notificat @Override public Notification call(Notification n, Notification> term) { final int value = n.getValue(); - if (predicate.call(value, term.getThrowable())) + if (predicate.call(value, term.getThrowable())) { return Notification.createOnNext(value + 1); - else + } else { return (Notification) term; + } } }); } @@ -125,10 +131,12 @@ public static Observable retry(Observable source) { } public static Observable retry(Observable source, final long count) { - if (count < 0) + if (count < 0) { throw new IllegalArgumentException("count >= 0 expected"); - if (count == 0) + } + if (count == 0) { return source; + } return retry(source, new RedoFinite(count)); } @@ -156,8 +164,9 @@ public static Observable repeat(Observable source, final long count, S if(count == 0) { return Observable.empty(); } - if (count < 0) + if (count < 0) { throw new IllegalArgumentException("count >= 0 expected"); + } return repeat(source, new RedoFinite(count - 1), scheduler); } @@ -173,12 +182,6 @@ public static Observable redo(Observable source, Func1 super Observa return create(new OnSubscribeRedo(source, notificationHandler, false, false, scheduler)); } - final Observable source; - private final Func1 super Observable extends Notification>>, ? extends Observable>> controlHandlerFunction; - final boolean stopOnComplete; - final boolean stopOnError; - private final Scheduler scheduler; - private OnSubscribeRedo(Observable source, Func1 super Observable extends Notification>>, ? extends Observable>> f, boolean stopOnComplete, boolean stopOnError, Scheduler scheduler) { this.source = source; @@ -362,8 +365,9 @@ public void request(final long n) { if (n > 0) { BackpressureUtils.getAndAddRequest(consumerCapacity, n); arbiter.request(n); - if (resumeBoundary.compareAndSet(true, false)) + if (resumeBoundary.compareAndSet(true, false)) { worker.schedule(subscribeToSource); + } } } }); diff --git a/src/main/java/rx/internal/operators/OnSubscribeSingle.java b/src/main/java/rx/internal/operators/OnSubscribeSingle.java index 7bfdcca511..90dc8d288e 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeSingle.java +++ b/src/main/java/rx/internal/operators/OnSubscribeSingle.java @@ -38,9 +38,9 @@ public OnSubscribeSingle(Observable observable) { @Override public void call(final SingleSubscriber super T> child) { Subscriber parent = new Subscriber() { - private boolean emittedTooMany = false; - private boolean itemEmitted = false; - private T emission = null; + private boolean emittedTooMany; + private boolean itemEmitted; + private T emission; @Override public void onStart() { diff --git a/src/main/java/rx/internal/operators/OnSubscribeUsing.java b/src/main/java/rx/internal/operators/OnSubscribeUsing.java index 352c699056..5b5a9259fb 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeUsing.java +++ b/src/main/java/rx/internal/operators/OnSubscribeUsing.java @@ -58,7 +58,7 @@ public void call(final Subscriber super T> subscriber) { // dispose on unsubscription subscriber.add(disposeOnceOnly); // create the observable - final Observable extends T> source; + Observable extends T> source; try { source = observableFactory @@ -77,7 +77,7 @@ public void call(final Subscriber super T> subscriber) { return; } - final Observable extends T> observable; + Observable extends T> observable; // supplement with on termination disposal if requested if (disposeEagerly) { observable = source @@ -96,11 +96,12 @@ public void call(final Subscriber super T> subscriber) { Throwable disposeError = dispose(disposeOnceOnly); Exceptions.throwIfFatal(e); Exceptions.throwIfFatal(disposeError); - if (disposeError != null) + if (disposeError != null) { subscriber.onError(new CompositeException(e, disposeError)); - else + } else { // propagate error subscriber.onError(e); + } } } catch (Throwable e) { // then propagate error @@ -117,7 +118,7 @@ private Throwable dispose(final Action0 disposeOnceOnly) { } } - private static final class DisposeAction extends AtomicBoolean implements Action0, + static final class DisposeAction extends AtomicBoolean implements Action0, Subscription { private static final long serialVersionUID = 4262875056400218316L; diff --git a/src/main/java/rx/internal/operators/OperatorAsObservable.java b/src/main/java/rx/internal/operators/OperatorAsObservable.java index 9cb9d8ac56..33671e8f2e 100644 --- a/src/main/java/rx/internal/operators/OperatorAsObservable.java +++ b/src/main/java/rx/internal/operators/OperatorAsObservable.java @@ -26,7 +26,7 @@ */ public final class OperatorAsObservable implements Operator { /** Lazy initialization via inner-class holder. */ - private static final class Holder { + static final class Holder { /** A singleton instance. */ static final OperatorAsObservable INSTANCE = new OperatorAsObservable(); } @@ -38,7 +38,10 @@ private static final class Holder { public static OperatorAsObservable instance() { return (OperatorAsObservable)Holder.INSTANCE; } - OperatorAsObservable() { } + OperatorAsObservable() { + // singleton + } + @Override public Subscriber super T> call(Subscriber super T> s) { return s; diff --git a/src/main/java/rx/internal/operators/OperatorDebounceWithTime.java b/src/main/java/rx/internal/operators/OperatorDebounceWithTime.java index f98639aff3..50c2418228 100644 --- a/src/main/java/rx/internal/operators/OperatorDebounceWithTime.java +++ b/src/main/java/rx/internal/operators/OperatorDebounceWithTime.java @@ -110,7 +110,7 @@ static final class DebounceState { /** Guarded by this. */ boolean emitting; - public synchronized int next(T value) { + public synchronized int next(T value) { // NOPMD this.value = value; this.hasValue = true; return ++index; @@ -173,7 +173,7 @@ public void emitAndComplete(Subscriber onNextAndComplete, Subscriber> onErr } onNextAndComplete.onCompleted(); } - public synchronized void clear() { + public synchronized void clear() { // NOPMD ++index; value = null; hasValue = false; diff --git a/src/main/java/rx/internal/operators/OperatorDematerialize.java b/src/main/java/rx/internal/operators/OperatorDematerialize.java index 5fd8d7fdfa..23eea9f1e8 100644 --- a/src/main/java/rx/internal/operators/OperatorDematerialize.java +++ b/src/main/java/rx/internal/operators/OperatorDematerialize.java @@ -31,7 +31,7 @@ */ public final class OperatorDematerialize implements Operator> { /** Lazy initialization via inner-class holder. */ - private static final class Holder { + static final class Holder { /** A singleton instance. */ static final OperatorDematerialize INSTANCE = new OperatorDematerialize(); } @@ -42,7 +42,10 @@ private static final class Holder { public static OperatorDematerialize instance() { return Holder.INSTANCE; // using raw types because the type inference is not good enough } - OperatorDematerialize() { } + OperatorDematerialize() { + // singleton + } + @Override public Subscriber super Notification> call(final Subscriber super T> child) { return new Subscriber>(child) { @@ -62,6 +65,9 @@ public void onNext(Notification t) { case OnCompleted: onCompleted(); break; + default: + onError(new IllegalArgumentException("Unsupported notification type: " + t)); + break; } } diff --git a/src/main/java/rx/internal/operators/OperatorDistinct.java b/src/main/java/rx/internal/operators/OperatorDistinct.java index a581b1b1e0..716353e272 100644 --- a/src/main/java/rx/internal/operators/OperatorDistinct.java +++ b/src/main/java/rx/internal/operators/OperatorDistinct.java @@ -32,7 +32,7 @@ public final class OperatorDistinct implements Operator { final Func1 super T, ? extends U> keySelector; - private static class Holder { + static final class Holder { static final OperatorDistinct,?> INSTANCE = new OperatorDistinct(UtilityFunctions.identity()); } diff --git a/src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java b/src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java index 43529df76f..9d75db915d 100644 --- a/src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java +++ b/src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java @@ -31,7 +31,7 @@ public final class OperatorDistinctUntilChanged implements Operator, final Func2 super U, ? super U, Boolean> comparator; - private static class Holder { + static final class Holder { static final OperatorDistinctUntilChanged,?> INSTANCE = new OperatorDistinctUntilChanged(UtilityFunctions.identity()); } @@ -72,14 +72,14 @@ public Subscriber super T> call(final Subscriber super T> child) { boolean hasPrevious; @Override public void onNext(T t) { - U currentKey = previousKey; - final U key; + U key; try { key = keySelector.call(t); } catch (Throwable e) { Exceptions.throwOrReport(e, child, t); return; } + U currentKey = previousKey; previousKey = key; if (hasPrevious) { diff --git a/src/main/java/rx/internal/operators/OperatorDoOnEach.java b/src/main/java/rx/internal/operators/OperatorDoOnEach.java index e0e71a2188..0e3034a425 100644 --- a/src/main/java/rx/internal/operators/OperatorDoOnEach.java +++ b/src/main/java/rx/internal/operators/OperatorDoOnEach.java @@ -36,7 +36,7 @@ public OperatorDoOnEach(Observer super T> doOnEachObserver) { public Subscriber super T> call(final Subscriber super T> observer) { return new Subscriber(observer) { - private boolean done = false; + private boolean done; @Override public void onCompleted() { diff --git a/src/main/java/rx/internal/operators/OperatorDoOnRequest.java b/src/main/java/rx/internal/operators/OperatorDoOnRequest.java index d45e895b40..e59d1f04d5 100644 --- a/src/main/java/rx/internal/operators/OperatorDoOnRequest.java +++ b/src/main/java/rx/internal/operators/OperatorDoOnRequest.java @@ -53,7 +53,7 @@ public void request(long n) { return parent; } - private static final class ParentSubscriber extends Subscriber { + static final class ParentSubscriber extends Subscriber { private final Subscriber super T> child; ParentSubscriber(Subscriber super T> child) { diff --git a/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java b/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java index cfa46837b5..a779720396 100644 --- a/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java +++ b/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java @@ -73,7 +73,7 @@ static final class EagerOuterSubscriber extends Subscriber { final int bufferSize; final Subscriber super R> actual; - final LinkedList> subscribers; + final Queue> subscribers; volatile boolean done; Throwable error; @@ -131,10 +131,12 @@ public void onNext(T t) { return; } - EagerInnerSubscriber inner = new EagerInnerSubscriber(this, bufferSize); if (cancelled) { return; } + + EagerInnerSubscriber inner = new EagerInnerSubscriber(this, bufferSize); + synchronized (subscribers) { if (cancelled) { return; @@ -202,7 +204,6 @@ void drain() { if (!empty) { long requestedAmount = requested.get(); long emittedAmount = 0L; - boolean unbounded = requestedAmount == Long.MAX_VALUE; Queue innerQueue = innerSubscriber.queue; boolean innerDone = false; @@ -235,7 +236,7 @@ void drain() { break; } - if (requestedAmount == 0L) { + if (requestedAmount == emittedAmount) { break; } @@ -248,16 +249,15 @@ void drain() { return; } - requestedAmount--; - emittedAmount--; + emittedAmount++; } if (emittedAmount != 0L) { - if (!unbounded) { - requested.addAndGet(emittedAmount); + if (requestedAmount != Long.MAX_VALUE) { + BackpressureUtils.produced(requested, emittedAmount); } if (!innerDone) { - innerSubscriber.requestMore(-emittedAmount); + innerSubscriber.requestMore(emittedAmount); } } diff --git a/src/main/java/rx/internal/operators/OperatorElementAt.java b/src/main/java/rx/internal/operators/OperatorElementAt.java index 6f79d6a285..3fa581b8d2 100644 --- a/src/main/java/rx/internal/operators/OperatorElementAt.java +++ b/src/main/java/rx/internal/operators/OperatorElementAt.java @@ -52,7 +52,7 @@ private OperatorElementAt(int index, T defaultValue, boolean hasDefault) { public Subscriber super T> call(final Subscriber super T> child) { Subscriber parent = new Subscriber() { - private int currentIndex = 0; + private int currentIndex; @Override public void onNext(T value) { diff --git a/src/main/java/rx/internal/operators/OperatorGroupBy.java b/src/main/java/rx/internal/operators/OperatorGroupBy.java index 4ed1a504f3..954b1ac6c5 100644 --- a/src/main/java/rx/internal/operators/OperatorGroupBy.java +++ b/src/main/java/rx/internal/operators/OperatorGroupBy.java @@ -266,10 +266,9 @@ void drain() { } long r = requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0L; - while (r != 0) { + while (e != r) { boolean d = done; GroupedObservable t = q.poll(); @@ -286,15 +285,14 @@ void drain() { a.onNext(t); - r--; - e--; + e++; } if (e != 0L) { - if (!unbounded) { - requested.addAndGet(e); + if (r != Long.MAX_VALUE) { + BackpressureUtils.produced(requested, e); } - s.request(-e); + s.request(e); } missed = wip.addAndGet(-missed); @@ -334,14 +332,14 @@ boolean checkTerminated(boolean d, boolean empty, } static final class GroupedUnicast extends GroupedObservable { + final State state; + public static GroupedUnicast createWith(K key, int bufferSize, GroupBySubscriber, K, T> parent, boolean delayError) { State state = new State(bufferSize, parent, key, delayError); return new GroupedUnicast(key, state); } - final State state; - protected GroupedUnicast(K key, State state) { super(key, state); this.state = state; @@ -381,7 +379,7 @@ static final class State extends AtomicInteger implements Producer, Subscr final AtomicBoolean once; - public State(int bufferSize, GroupBySubscriber, K, T> parent, K key, boolean delayError) { + public State(int bufferSize, GroupBySubscriber, K, T> parent, K key, boolean delayError) { // NOPMD this.queue = new ConcurrentLinkedQueue(); this.parent = parent; this.key = key; @@ -467,10 +465,9 @@ void drain() { } long r = requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0; - while (r != 0L) { + while (e != r) { boolean d = done; Object v = q.poll(); boolean empty = v == null; @@ -485,15 +482,14 @@ void drain() { a.onNext(nl.getValue(v)); - r--; - e--; + e++; } if (e != 0L) { - if (!unbounded) { - requested.addAndGet(e); + if (r != Long.MAX_VALUE) { + BackpressureUtils.produced(requested, e); } - parent.s.request(-e); + parent.s.request(e); } } diff --git a/src/main/java/rx/internal/operators/OperatorIgnoreElements.java b/src/main/java/rx/internal/operators/OperatorIgnoreElements.java index 00098f85a2..b85fb54359 100644 --- a/src/main/java/rx/internal/operators/OperatorIgnoreElements.java +++ b/src/main/java/rx/internal/operators/OperatorIgnoreElements.java @@ -20,7 +20,7 @@ public class OperatorIgnoreElements implements Operator { - private static class Holder { + static final class Holder { static final OperatorIgnoreElements> INSTANCE = new OperatorIgnoreElements(); } @@ -30,7 +30,7 @@ public static OperatorIgnoreElements
* It's implemented as a singleton to maintain some semblance of type safety that is completely non-existent. * - * @param - * @warn type param undescribed + * @param the element type */ public final class NotificationLite { - private NotificationLite() { - } - @SuppressWarnings("rawtypes") private static final NotificationLite INSTANCE = new NotificationLite(); + private NotificationLite() { + // singleton + } + /** * Gets the {@code NotificationLite} singleton. * @@ -70,7 +70,7 @@ public String toString() { } }; - private static class OnErrorSentinel implements Serializable { + static final class OnErrorSentinel implements Serializable { private static final long serialVersionUID = 3; final Throwable e; @@ -93,10 +93,11 @@ public String toString() { * @return the item, or a null token representing the item if the item is {@code null} */ public Object next(T t) { - if (t == null) + if (t == null) { return ON_NEXT_NULL_SENTINEL; - else + } else { return t; + } } /** @@ -207,15 +208,16 @@ public boolean isNext(Object n) { * {@code Kind.OnError}, or {@code Kind.OnNext} */ public Kind kind(Object n) { - if (n == null) + if (n == null) { throw new IllegalArgumentException("The lite notification can not be null"); - else if (n == ON_COMPLETED_SENTINEL) + } else if (n == ON_COMPLETED_SENTINEL) { return Kind.OnCompleted; - else if (n instanceof OnErrorSentinel) + } else if (n instanceof OnErrorSentinel) { return Kind.OnError; - else + } else { // value or ON_NEXT_NULL_SENTINEL but either way it's an OnNext return Kind.OnNext; + } } /** diff --git a/src/main/java/rx/internal/operators/OnSubscribeAmb.java b/src/main/java/rx/internal/operators/OnSubscribeAmb.java index 3b39357b07..f10e73396c 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeAmb.java +++ b/src/main/java/rx/internal/operators/OnSubscribeAmb.java @@ -33,6 +33,9 @@ * @param the value type */ public final class OnSubscribeAmb implements OnSubscribe{ + //give default access instead of private as a micro-optimization + //for access from anonymous classes below + final Iterable extends Observable extends T>> sources; /** * Given two {@link Observable}s, propagates the one that first emits an item. @@ -275,7 +278,7 @@ public static OnSubscribe amb(final Iterable extends Observable exten return new OnSubscribeAmb(sources); } - private static final class AmbSubscriber extends Subscriber { + static final class AmbSubscriber extends Subscriber { private final Subscriber super T> subscriber; private final Selection selection; @@ -338,7 +341,7 @@ private boolean isSelected() { } } - private static class Selection { + static final class Selection { final AtomicReference> choice = new AtomicReference>(); final Collection> ambSubscribers = new ConcurrentLinkedQueue>(); @@ -360,10 +363,6 @@ public void unsubscribeOthers(AmbSubscriber notThis) { } - //give default access instead of private as a micro-optimization - //for access from anonymous classes below - final Iterable extends Observable extends T>> sources; - private OnSubscribeAmb(Iterable extends Observable extends T>> sources) { this.sources = sources; } @@ -419,7 +418,7 @@ public void call() { @Override public void request(long n) { - final AmbSubscriber c; + AmbSubscriber c; if ((c = choice.get()) != null) { // propagate the request to that single Subscriber that won c.requestMore(n); diff --git a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java index e9dac2d026..e7cb3a1251 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java +++ b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java @@ -245,10 +245,9 @@ void drain() { } long requestAmount = localRequested.get(); - boolean unbounded = requestAmount == Long.MAX_VALUE; long emitted = 0L; - while (requestAmount != 0L) { + while (emitted != requestAmount) { boolean d = done; @SuppressWarnings("unchecked") @@ -287,14 +286,11 @@ void drain() { cs.requestMore(1); - requestAmount--; - emitted--; + emitted++; } - if (emitted != 0L) { - if (!unbounded) { - localRequested.addAndGet(emitted); - } + if (emitted != 0L && requestAmount != Long.MAX_VALUE) { + BackpressureUtils.produced(localRequested, emitted); } missed = addAndGet(-missed); diff --git a/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java b/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java index 9ee3104f95..889cce2872 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java +++ b/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java @@ -44,7 +44,7 @@ public OnSubscribeFromIterable(Iterable extends T> iterable) { @Override public void call(final Subscriber super T> o) { - final Iterator extends T> it; + Iterator extends T> it; boolean b; try { diff --git a/src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java b/src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java index a8f13fb5e7..e677fb9bc3 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java +++ b/src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java @@ -38,11 +38,11 @@ * @param the result value type */ public final class OnSubscribeGroupJoin implements OnSubscribe { - protected final Observable left; - protected final Observable right; - protected final Func1 super T1, ? extends Observable> leftDuration; - protected final Func1 super T2, ? extends Observable> rightDuration; - protected final Func2 super T1, ? super Observable, ? extends R> resultSelector; + final Observable left; + final Observable right; + final Func1 super T1, ? extends Observable> leftDuration; + final Func1 super T2, ? extends Observable> rightDuration; + final Func2 super T1, ? super Observable, ? extends R> resultSelector; public OnSubscribeGroupJoin( Observable left, @@ -75,9 +75,9 @@ final class ResultManager implements Subscription { /** Guarded by guard. */ int rightIds; /** Guarded by guard. */ - final Map> leftMap = new HashMap>(); + final Map> leftMap = new HashMap>(); // NOPMD /** Guarded by guard. */ - final Map rightMap = new HashMap(); + final Map rightMap = new HashMap(); // NOPMD /** Guarded by guard. */ boolean leftDone; /** Guarded by guard. */ diff --git a/src/main/java/rx/internal/operators/OnSubscribeRange.java b/src/main/java/rx/internal/operators/OnSubscribeRange.java index 8f17303a2d..1a96c152c8 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRange.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRange.java @@ -38,7 +38,7 @@ public void call(final Subscriber super Integer> childSubscriber) { childSubscriber.setProducer(new RangeProducer(childSubscriber, startIndex, endIndex)); } - private static final class RangeProducer extends AtomicLong implements Producer { + static final class RangeProducer extends AtomicLong implements Producer { /** */ private static final long serialVersionUID = 4114392207069098388L; diff --git a/src/main/java/rx/internal/operators/OnSubscribeRedo.java b/src/main/java/rx/internal/operators/OnSubscribeRedo.java index d30ddc1343..c41b211558 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRedo.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRedo.java @@ -53,6 +53,11 @@ import rx.subscriptions.SerialSubscription; public final class OnSubscribeRedo implements OnSubscribe { + final Observable source; + private final Func1 super Observable extends Notification>>, ? extends Observable>> controlHandlerFunction; + final boolean stopOnComplete; + final boolean stopOnError; + private final Scheduler scheduler; static final Func1>, Observable>> REDO_INFINITE = new Func1>, Observable>>() { @Override @@ -77,7 +82,7 @@ public RedoFinite(long count) { public Observable> call(Observable extends Notification>> ts) { return ts.map(new Func1, Notification>>() { - int num=0; + int num; @Override public Notification> call(Notification> terminalNotification) { @@ -111,10 +116,11 @@ public Observable extends Notification>> call(Observable extends Notificat @Override public Notification call(Notification n, Notification> term) { final int value = n.getValue(); - if (predicate.call(value, term.getThrowable())) + if (predicate.call(value, term.getThrowable())) { return Notification.createOnNext(value + 1); - else + } else { return (Notification) term; + } } }); } @@ -125,10 +131,12 @@ public static Observable retry(Observable source) { } public static Observable retry(Observable source, final long count) { - if (count < 0) + if (count < 0) { throw new IllegalArgumentException("count >= 0 expected"); - if (count == 0) + } + if (count == 0) { return source; + } return retry(source, new RedoFinite(count)); } @@ -156,8 +164,9 @@ public static Observable repeat(Observable source, final long count, S if(count == 0) { return Observable.empty(); } - if (count < 0) + if (count < 0) { throw new IllegalArgumentException("count >= 0 expected"); + } return repeat(source, new RedoFinite(count - 1), scheduler); } @@ -173,12 +182,6 @@ public static Observable redo(Observable source, Func1 super Observa return create(new OnSubscribeRedo(source, notificationHandler, false, false, scheduler)); } - final Observable source; - private final Func1 super Observable extends Notification>>, ? extends Observable>> controlHandlerFunction; - final boolean stopOnComplete; - final boolean stopOnError; - private final Scheduler scheduler; - private OnSubscribeRedo(Observable source, Func1 super Observable extends Notification>>, ? extends Observable>> f, boolean stopOnComplete, boolean stopOnError, Scheduler scheduler) { this.source = source; @@ -362,8 +365,9 @@ public void request(final long n) { if (n > 0) { BackpressureUtils.getAndAddRequest(consumerCapacity, n); arbiter.request(n); - if (resumeBoundary.compareAndSet(true, false)) + if (resumeBoundary.compareAndSet(true, false)) { worker.schedule(subscribeToSource); + } } } }); diff --git a/src/main/java/rx/internal/operators/OnSubscribeSingle.java b/src/main/java/rx/internal/operators/OnSubscribeSingle.java index 7bfdcca511..90dc8d288e 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeSingle.java +++ b/src/main/java/rx/internal/operators/OnSubscribeSingle.java @@ -38,9 +38,9 @@ public OnSubscribeSingle(Observable observable) { @Override public void call(final SingleSubscriber super T> child) { Subscriber parent = new Subscriber() { - private boolean emittedTooMany = false; - private boolean itemEmitted = false; - private T emission = null; + private boolean emittedTooMany; + private boolean itemEmitted; + private T emission; @Override public void onStart() { diff --git a/src/main/java/rx/internal/operators/OnSubscribeUsing.java b/src/main/java/rx/internal/operators/OnSubscribeUsing.java index 352c699056..5b5a9259fb 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeUsing.java +++ b/src/main/java/rx/internal/operators/OnSubscribeUsing.java @@ -58,7 +58,7 @@ public void call(final Subscriber super T> subscriber) { // dispose on unsubscription subscriber.add(disposeOnceOnly); // create the observable - final Observable extends T> source; + Observable extends T> source; try { source = observableFactory @@ -77,7 +77,7 @@ public void call(final Subscriber super T> subscriber) { return; } - final Observable extends T> observable; + Observable extends T> observable; // supplement with on termination disposal if requested if (disposeEagerly) { observable = source @@ -96,11 +96,12 @@ public void call(final Subscriber super T> subscriber) { Throwable disposeError = dispose(disposeOnceOnly); Exceptions.throwIfFatal(e); Exceptions.throwIfFatal(disposeError); - if (disposeError != null) + if (disposeError != null) { subscriber.onError(new CompositeException(e, disposeError)); - else + } else { // propagate error subscriber.onError(e); + } } } catch (Throwable e) { // then propagate error @@ -117,7 +118,7 @@ private Throwable dispose(final Action0 disposeOnceOnly) { } } - private static final class DisposeAction extends AtomicBoolean implements Action0, + static final class DisposeAction extends AtomicBoolean implements Action0, Subscription { private static final long serialVersionUID = 4262875056400218316L; diff --git a/src/main/java/rx/internal/operators/OperatorAsObservable.java b/src/main/java/rx/internal/operators/OperatorAsObservable.java index 9cb9d8ac56..33671e8f2e 100644 --- a/src/main/java/rx/internal/operators/OperatorAsObservable.java +++ b/src/main/java/rx/internal/operators/OperatorAsObservable.java @@ -26,7 +26,7 @@ */ public final class OperatorAsObservable implements Operator { /** Lazy initialization via inner-class holder. */ - private static final class Holder { + static final class Holder { /** A singleton instance. */ static final OperatorAsObservable INSTANCE = new OperatorAsObservable(); } @@ -38,7 +38,10 @@ private static final class Holder { public static OperatorAsObservable instance() { return (OperatorAsObservable)Holder.INSTANCE; } - OperatorAsObservable() { } + OperatorAsObservable() { + // singleton + } + @Override public Subscriber super T> call(Subscriber super T> s) { return s; diff --git a/src/main/java/rx/internal/operators/OperatorDebounceWithTime.java b/src/main/java/rx/internal/operators/OperatorDebounceWithTime.java index f98639aff3..50c2418228 100644 --- a/src/main/java/rx/internal/operators/OperatorDebounceWithTime.java +++ b/src/main/java/rx/internal/operators/OperatorDebounceWithTime.java @@ -110,7 +110,7 @@ static final class DebounceState { /** Guarded by this. */ boolean emitting; - public synchronized int next(T value) { + public synchronized int next(T value) { // NOPMD this.value = value; this.hasValue = true; return ++index; @@ -173,7 +173,7 @@ public void emitAndComplete(Subscriber onNextAndComplete, Subscriber> onErr } onNextAndComplete.onCompleted(); } - public synchronized void clear() { + public synchronized void clear() { // NOPMD ++index; value = null; hasValue = false; diff --git a/src/main/java/rx/internal/operators/OperatorDematerialize.java b/src/main/java/rx/internal/operators/OperatorDematerialize.java index 5fd8d7fdfa..23eea9f1e8 100644 --- a/src/main/java/rx/internal/operators/OperatorDematerialize.java +++ b/src/main/java/rx/internal/operators/OperatorDematerialize.java @@ -31,7 +31,7 @@ */ public final class OperatorDematerialize implements Operator> { /** Lazy initialization via inner-class holder. */ - private static final class Holder { + static final class Holder { /** A singleton instance. */ static final OperatorDematerialize INSTANCE = new OperatorDematerialize(); } @@ -42,7 +42,10 @@ private static final class Holder { public static OperatorDematerialize instance() { return Holder.INSTANCE; // using raw types because the type inference is not good enough } - OperatorDematerialize() { } + OperatorDematerialize() { + // singleton + } + @Override public Subscriber super Notification> call(final Subscriber super T> child) { return new Subscriber>(child) { @@ -62,6 +65,9 @@ public void onNext(Notification t) { case OnCompleted: onCompleted(); break; + default: + onError(new IllegalArgumentException("Unsupported notification type: " + t)); + break; } } diff --git a/src/main/java/rx/internal/operators/OperatorDistinct.java b/src/main/java/rx/internal/operators/OperatorDistinct.java index a581b1b1e0..716353e272 100644 --- a/src/main/java/rx/internal/operators/OperatorDistinct.java +++ b/src/main/java/rx/internal/operators/OperatorDistinct.java @@ -32,7 +32,7 @@ public final class OperatorDistinct implements Operator { final Func1 super T, ? extends U> keySelector; - private static class Holder { + static final class Holder { static final OperatorDistinct,?> INSTANCE = new OperatorDistinct(UtilityFunctions.identity()); } diff --git a/src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java b/src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java index 43529df76f..9d75db915d 100644 --- a/src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java +++ b/src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java @@ -31,7 +31,7 @@ public final class OperatorDistinctUntilChanged implements Operator, final Func2 super U, ? super U, Boolean> comparator; - private static class Holder { + static final class Holder { static final OperatorDistinctUntilChanged,?> INSTANCE = new OperatorDistinctUntilChanged(UtilityFunctions.identity()); } @@ -72,14 +72,14 @@ public Subscriber super T> call(final Subscriber super T> child) { boolean hasPrevious; @Override public void onNext(T t) { - U currentKey = previousKey; - final U key; + U key; try { key = keySelector.call(t); } catch (Throwable e) { Exceptions.throwOrReport(e, child, t); return; } + U currentKey = previousKey; previousKey = key; if (hasPrevious) { diff --git a/src/main/java/rx/internal/operators/OperatorDoOnEach.java b/src/main/java/rx/internal/operators/OperatorDoOnEach.java index e0e71a2188..0e3034a425 100644 --- a/src/main/java/rx/internal/operators/OperatorDoOnEach.java +++ b/src/main/java/rx/internal/operators/OperatorDoOnEach.java @@ -36,7 +36,7 @@ public OperatorDoOnEach(Observer super T> doOnEachObserver) { public Subscriber super T> call(final Subscriber super T> observer) { return new Subscriber(observer) { - private boolean done = false; + private boolean done; @Override public void onCompleted() { diff --git a/src/main/java/rx/internal/operators/OperatorDoOnRequest.java b/src/main/java/rx/internal/operators/OperatorDoOnRequest.java index d45e895b40..e59d1f04d5 100644 --- a/src/main/java/rx/internal/operators/OperatorDoOnRequest.java +++ b/src/main/java/rx/internal/operators/OperatorDoOnRequest.java @@ -53,7 +53,7 @@ public void request(long n) { return parent; } - private static final class ParentSubscriber extends Subscriber { + static final class ParentSubscriber extends Subscriber { private final Subscriber super T> child; ParentSubscriber(Subscriber super T> child) { diff --git a/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java b/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java index cfa46837b5..a779720396 100644 --- a/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java +++ b/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java @@ -73,7 +73,7 @@ static final class EagerOuterSubscriber extends Subscriber { final int bufferSize; final Subscriber super R> actual; - final LinkedList> subscribers; + final Queue> subscribers; volatile boolean done; Throwable error; @@ -131,10 +131,12 @@ public void onNext(T t) { return; } - EagerInnerSubscriber inner = new EagerInnerSubscriber(this, bufferSize); if (cancelled) { return; } + + EagerInnerSubscriber inner = new EagerInnerSubscriber(this, bufferSize); + synchronized (subscribers) { if (cancelled) { return; @@ -202,7 +204,6 @@ void drain() { if (!empty) { long requestedAmount = requested.get(); long emittedAmount = 0L; - boolean unbounded = requestedAmount == Long.MAX_VALUE; Queue innerQueue = innerSubscriber.queue; boolean innerDone = false; @@ -235,7 +236,7 @@ void drain() { break; } - if (requestedAmount == 0L) { + if (requestedAmount == emittedAmount) { break; } @@ -248,16 +249,15 @@ void drain() { return; } - requestedAmount--; - emittedAmount--; + emittedAmount++; } if (emittedAmount != 0L) { - if (!unbounded) { - requested.addAndGet(emittedAmount); + if (requestedAmount != Long.MAX_VALUE) { + BackpressureUtils.produced(requested, emittedAmount); } if (!innerDone) { - innerSubscriber.requestMore(-emittedAmount); + innerSubscriber.requestMore(emittedAmount); } } diff --git a/src/main/java/rx/internal/operators/OperatorElementAt.java b/src/main/java/rx/internal/operators/OperatorElementAt.java index 6f79d6a285..3fa581b8d2 100644 --- a/src/main/java/rx/internal/operators/OperatorElementAt.java +++ b/src/main/java/rx/internal/operators/OperatorElementAt.java @@ -52,7 +52,7 @@ private OperatorElementAt(int index, T defaultValue, boolean hasDefault) { public Subscriber super T> call(final Subscriber super T> child) { Subscriber parent = new Subscriber() { - private int currentIndex = 0; + private int currentIndex; @Override public void onNext(T value) { diff --git a/src/main/java/rx/internal/operators/OperatorGroupBy.java b/src/main/java/rx/internal/operators/OperatorGroupBy.java index 4ed1a504f3..954b1ac6c5 100644 --- a/src/main/java/rx/internal/operators/OperatorGroupBy.java +++ b/src/main/java/rx/internal/operators/OperatorGroupBy.java @@ -266,10 +266,9 @@ void drain() { } long r = requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0L; - while (r != 0) { + while (e != r) { boolean d = done; GroupedObservable t = q.poll(); @@ -286,15 +285,14 @@ void drain() { a.onNext(t); - r--; - e--; + e++; } if (e != 0L) { - if (!unbounded) { - requested.addAndGet(e); + if (r != Long.MAX_VALUE) { + BackpressureUtils.produced(requested, e); } - s.request(-e); + s.request(e); } missed = wip.addAndGet(-missed); @@ -334,14 +332,14 @@ boolean checkTerminated(boolean d, boolean empty, } static final class GroupedUnicast extends GroupedObservable { + final State state; + public static GroupedUnicast createWith(K key, int bufferSize, GroupBySubscriber, K, T> parent, boolean delayError) { State state = new State(bufferSize, parent, key, delayError); return new GroupedUnicast(key, state); } - final State state; - protected GroupedUnicast(K key, State state) { super(key, state); this.state = state; @@ -381,7 +379,7 @@ static final class State extends AtomicInteger implements Producer, Subscr final AtomicBoolean once; - public State(int bufferSize, GroupBySubscriber, K, T> parent, K key, boolean delayError) { + public State(int bufferSize, GroupBySubscriber, K, T> parent, K key, boolean delayError) { // NOPMD this.queue = new ConcurrentLinkedQueue(); this.parent = parent; this.key = key; @@ -467,10 +465,9 @@ void drain() { } long r = requested.get(); - boolean unbounded = r == Long.MAX_VALUE; long e = 0; - while (r != 0L) { + while (e != r) { boolean d = done; Object v = q.poll(); boolean empty = v == null; @@ -485,15 +482,14 @@ void drain() { a.onNext(nl.getValue(v)); - r--; - e--; + e++; } if (e != 0L) { - if (!unbounded) { - requested.addAndGet(e); + if (r != Long.MAX_VALUE) { + BackpressureUtils.produced(requested, e); } - parent.s.request(-e); + parent.s.request(e); } } diff --git a/src/main/java/rx/internal/operators/OperatorIgnoreElements.java b/src/main/java/rx/internal/operators/OperatorIgnoreElements.java index 00098f85a2..b85fb54359 100644 --- a/src/main/java/rx/internal/operators/OperatorIgnoreElements.java +++ b/src/main/java/rx/internal/operators/OperatorIgnoreElements.java @@ -20,7 +20,7 @@ public class OperatorIgnoreElements implements Operator { - private static class Holder { + static final class Holder { static final OperatorIgnoreElements> INSTANCE = new OperatorIgnoreElements(); } @@ -30,7 +30,7 @@ public static OperatorIgnoreElements