From 9d4631b93c425c077ab93e2b55662a5398d3efac Mon Sep 17 00:00:00 2001 From: David Karnok Date: Tue, 9 Aug 2022 08:31:36 +0200 Subject: [PATCH] 3.x: Add onDropped callback to throttleLatest operator (#7457) * 3.x: Add onDropped callback to throttleLatest operator * Reorganize imports --- .../io/reactivex/rxjava3/core/Flowable.java | 56 +- .../io/reactivex/rxjava3/core/Observable.java | 50 +- .../flowable/FlowableThrottleLatest.java | 116 ++++- .../observable/ObservableThrottleLatest.java | 87 +++- .../flowable/FlowableThrottleLatestTest.java | 479 +++++++++++++++++- .../ObservableThrottleLatestTest.java | 425 +++++++++++++++- .../rxjava3/testsupport/TestHelper.java | 31 +- .../ParamValidationCheckerTest.java | 2 + 8 files changed, 1193 insertions(+), 53 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index b06164d1ae..ddcb8126d9 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -17327,7 +17327,61 @@ public final Flowable throttleLatest(long timeout, @NonNull TimeUnit unit, @N public final Flowable throttleLatest(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<>(this, timeout, unit, scheduler, emitLast)); + return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<>(this, timeout, unit, scheduler, emitLast, null)); + } + + /** + * Throttles items from the upstream {@code Flowable} by first emitting the next + * item from upstream, then periodically emitting the latest item (if any) when + * the specified timeout elapses between them, invoking the consumer for any dropped item. + *

+ * + *

+ * If no items were emitted from the upstream during this timeout phase, the next + * upstream item is emitted immediately and the timeout window starts from then. + *

+ *
Backpressure:
+ *
This operator does not support backpressure as it uses time to control data flow. + * If the downstream is not ready to receive items, a + * {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException MissingBackpressureException} + * will be signaled.
+ *
Scheduler:
+ *
You specify which {@link Scheduler} this operator will use.
+ *
Error handling:
+ *
+ * If the upstream signals an {@code onError} or {@code onDropped} callback crashes, + * the error is delivered immediately to the downstream. If both happen, a {@link CompositeException} + * is created, containing both the upstream and the callback error. + * If the {@code onDropped} callback crashes during cancellation, the exception is forwarded + * to the global error handler via {@link RxJavaPlugins#onError(Throwable)}. + *
+ *
+ * @param timeout the time to wait after an item emission towards the downstream + * before trying to emit the latest item from upstream again + * @param unit the time unit + * @param scheduler the {@code Scheduler} where the timed wait and latest item + * emission will be performed + * @param emitLast If {@code true}, the very last item from the upstream will be emitted + * immediately when the upstream completes, regardless if there is + * a timeout window active or not. If {@code false}, the very last + * upstream item is ignored and the flow terminates. + * @param onDropped called when an item is replaced by a newer item that doesn't get delivered + * to the downstream, including the very last item if {@code emitLast} is {@code false} + * and the current undelivered item when the sequence gets canceled. + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code unit}, {@code scheduler} or {@code onDropped} is {@code null} + * @since 3.1.6 - Experimental + */ + @CheckReturnValue + @NonNull + @BackpressureSupport(BackpressureKind.ERROR) + @SchedulerSupport(SchedulerSupport.CUSTOM) + @Experimental + public final Flowable throttleLatest(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer onDropped) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(scheduler, "scheduler is null"); + Objects.requireNonNull(onDropped, "onDropped is null"); + return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<>(this, timeout, unit, scheduler, emitLast, onDropped)); } /** diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index f355bedf13..32cedef3b8 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -14362,7 +14362,55 @@ public final Observable throttleLatest(long timeout, @NonNull TimeUnit unit, public final Observable throttleLatest(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new ObservableThrottleLatest<>(this, timeout, unit, scheduler, emitLast)); + return RxJavaPlugins.onAssembly(new ObservableThrottleLatest<>(this, timeout, unit, scheduler, emitLast, null)); + } + + /** + * Throttles items from the current {@code Observable} by first emitting the next + * item from upstream, then periodically emitting the latest item (if any) when + * the specified timeout elapses between them, invoking the consumer for any dropped item. + *

+ * + *

+ * If no items were emitted from the upstream during this timeout phase, the next + * upstream item is emitted immediately and the timeout window starts from then. + *

+ *
Scheduler:
+ *
You specify which {@link Scheduler} this operator will use.
+ *
Error handling:
+ *
+ * If the upstream signals an {@code onError} or {@code onDropped} callback crashes, + * the error is delivered immediately to the downstream. If both happen, a {@link CompositeException} + * is created, containing both the upstream and the callback error. + * If the {@code onDropped} callback crashes when the sequence gets disposed, the exception is forwarded + * to the global error handler via {@link RxJavaPlugins#onError(Throwable)}. + *
+ *
+ * @param timeout the time to wait after an item emission towards the downstream + * before trying to emit the latest item from upstream again + * @param unit the time unit + * @param scheduler the {@code Scheduler} where the timed wait and latest item + * emission will be performed + * @param emitLast If {@code true}, the very last item from the upstream will be emitted + * immediately when the upstream completes, regardless if there is + * a timeout window active or not. If {@code false}, the very last + * upstream item is ignored and the flow terminates. + * @param onDropped called when an item is replaced by a newer item that doesn't get delivered + * to the downstream, including the very last item if {@code emitLast} is {@code false} + * and the current undelivered item when the sequence gets disposed. + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code unit}, {@code scheduler} or {@code onDropped} is {@code null} + * @since 3.1.6 - Experimental + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.CUSTOM) + @NonNull + @Experimental + public final Observable throttleLatest(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer onDropped) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(scheduler, "scheduler is null"); + Objects.requireNonNull(onDropped, "onDropped is null"); + return RxJavaPlugins.onAssembly(new ObservableThrottleLatest<>(this, timeout, unit, scheduler, emitLast, onDropped)); } /** diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleLatest.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleLatest.java index e6cb5ffdcc..b81fb3df03 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleLatest.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleLatest.java @@ -19,9 +19,11 @@ import org.reactivestreams.*; import io.reactivex.rxjava3.core.*; -import io.reactivex.rxjava3.exceptions.MissingBackpressureException; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; import io.reactivex.rxjava3.internal.util.BackpressureHelper; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; /** * Emits the next or latest item when the given time elapses. @@ -44,19 +46,24 @@ public final class FlowableThrottleLatest extends AbstractFlowableWithUpstrea final boolean emitLast; + final Consumer onDropped; + public FlowableThrottleLatest(Flowable source, - long timeout, TimeUnit unit, Scheduler scheduler, - boolean emitLast) { + long timeout, TimeUnit unit, + Scheduler scheduler, + boolean emitLast, + Consumer onDropped) { super(source); this.timeout = timeout; this.unit = unit; this.scheduler = scheduler; this.emitLast = emitLast; + this.onDropped = onDropped; } @Override protected void subscribeActual(Subscriber s) { - source.subscribe(new ThrottleLatestSubscriber<>(s, timeout, unit, scheduler.createWorker(), emitLast)); + source.subscribe(new ThrottleLatestSubscriber<>(s, timeout, unit, scheduler.createWorker(), emitLast, onDropped)); } static final class ThrottleLatestSubscriber @@ -79,6 +86,8 @@ static final class ThrottleLatestSubscriber final AtomicLong requested; + final Consumer onDropped; + Subscription upstream; volatile boolean done; @@ -93,8 +102,10 @@ static final class ThrottleLatestSubscriber boolean timerRunning; ThrottleLatestSubscriber(Subscriber downstream, - long timeout, TimeUnit unit, Scheduler.Worker worker, - boolean emitLast) { + long timeout, TimeUnit unit, + Scheduler.Worker worker, + boolean emitLast, + Consumer onDropped) { this.downstream = downstream; this.timeout = timeout; this.unit = unit; @@ -102,6 +113,7 @@ static final class ThrottleLatestSubscriber this.emitLast = emitLast; this.latest = new AtomicReference<>(); this.requested = new AtomicLong(); + this.onDropped = onDropped; } @Override @@ -115,7 +127,17 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - latest.set(t); + T old = latest.getAndSet(t); + if (onDropped != null && old != null) { + try { + onDropped.accept(old); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.cancel(); + error = ex; + done = true; + } + } drain(); } @@ -145,6 +167,22 @@ public void cancel() { upstream.cancel(); worker.dispose(); if (getAndIncrement() == 0) { + clear(); + } + } + + void clear() { + if (onDropped != null) { + T v = latest.getAndSet(null); + if (v != null) { + try { + onDropped.accept(v); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + } else { latest.lazySet(null); } } @@ -170,14 +208,27 @@ void drain() { for (;;) { if (cancelled) { - latest.lazySet(null); + clear(); return; } boolean d = done; + Throwable error = this.error; if (d && error != null) { - latest.lazySet(null); + if (onDropped != null) { + T v = latest.getAndSet(null); + if (v != null) { + try { + onDropped.accept(v); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + error = new CompositeException(error, ex); + } + } + } else { + latest.lazySet(null); + } downstream.onError(error); worker.dispose(); return; @@ -187,19 +238,31 @@ void drain() { boolean empty = v == null; if (d) { - if (!empty && emitLast) { + if (!empty) { v = latest.getAndSet(null); - long e = emitted; - if (e != requested.get()) { - emitted = e + 1; - downstream.onNext(v); - downstream.onComplete(); + if (emitLast) { + long e = emitted; + if (e != requested.get()) { + emitted = e + 1; + downstream.onNext(v); + downstream.onComplete(); + } else { + tryDropAndSignalMBE(v); + } } else { - downstream.onError(new MissingBackpressureException( - "Could not emit final value due to lack of requests")); + if (onDropped != null) { + try { + onDropped.accept(v); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + worker.dispose(); + return; + } + } + downstream.onComplete(); } } else { - latest.lazySet(null); downstream.onComplete(); } worker.dispose(); @@ -222,8 +285,7 @@ void drain() { emitted = e + 1; } else { upstream.cancel(); - downstream.onError(new MissingBackpressureException( - "Could not emit value due to lack of requests")); + tryDropAndSignalMBE(v); worker.dispose(); return; } @@ -242,5 +304,19 @@ void drain() { } } } + + void tryDropAndSignalMBE(T valueToDrop) { + Throwable errorToSignal = new MissingBackpressureException( + "Could not emit value due to lack of requests"); + if (onDropped != null) { + try { + onDropped.accept(valueToDrop); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + errorToSignal = new CompositeException(errorToSignal, ex); + } + } + downstream.onError(errorToSignal); + } } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleLatest.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleLatest.java index 8f0b2cc250..caf14d3a5e 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleLatest.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleLatest.java @@ -18,7 +18,10 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; /** * Emits the next or latest item when the given time elapses. @@ -41,19 +44,24 @@ public final class ObservableThrottleLatest extends AbstractObservableWithUps final boolean emitLast; + final Consumer onDropped; + public ObservableThrottleLatest(Observable source, - long timeout, TimeUnit unit, Scheduler scheduler, - boolean emitLast) { + long timeout, TimeUnit unit, + Scheduler scheduler, + boolean emitLast, + Consumer onDropped) { super(source); this.timeout = timeout; this.unit = unit; this.scheduler = scheduler; this.emitLast = emitLast; + this.onDropped = onDropped; } @Override protected void subscribeActual(Observer observer) { - source.subscribe(new ThrottleLatestObserver<>(observer, timeout, unit, scheduler.createWorker(), emitLast)); + source.subscribe(new ThrottleLatestObserver<>(observer, timeout, unit, scheduler.createWorker(), emitLast, onDropped)); } static final class ThrottleLatestObserver @@ -74,6 +82,8 @@ static final class ThrottleLatestObserver final AtomicReference latest; + final Consumer onDropped; + Disposable upstream; volatile boolean done; @@ -86,14 +96,17 @@ static final class ThrottleLatestObserver boolean timerRunning; ThrottleLatestObserver(Observer downstream, - long timeout, TimeUnit unit, Scheduler.Worker worker, - boolean emitLast) { + long timeout, TimeUnit unit, + Scheduler.Worker worker, + boolean emitLast, + Consumer onDropped) { this.downstream = downstream; this.timeout = timeout; this.unit = unit; this.worker = worker; this.emitLast = emitLast; this.latest = new AtomicReference<>(); + this.onDropped = onDropped; } @Override @@ -106,7 +119,17 @@ public void onSubscribe(Disposable d) { @Override public void onNext(T t) { - latest.set(t); + T old = latest.getAndSet(t); + if (onDropped != null && old != null) { + try { + onDropped.accept(old); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.dispose(); + error = ex; + done = true; + } + } drain(); } @@ -129,6 +152,22 @@ public void dispose() { upstream.dispose(); worker.dispose(); if (getAndIncrement() == 0) { + clear(); + } + } + + void clear() { + if (onDropped != null) { + T v = latest.getAndSet(null); + if (v != null) { + try { + onDropped.accept(v); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + } else { latest.lazySet(null); } } @@ -158,14 +197,27 @@ void drain() { for (;;) { if (cancelled) { - latest.lazySet(null); + clear(); return; } boolean d = done; + Throwable error = this.error; if (d && error != null) { - latest.lazySet(null); + if (onDropped != null) { + T v = latest.getAndSet(null); + if (v != null) { + try { + onDropped.accept(v); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + error = new CompositeException(error, ex); + } + } + } else { + latest.lazySet(null); + } downstream.onError(error); worker.dispose(); return; @@ -175,9 +227,22 @@ void drain() { boolean empty = v == null; if (d) { - v = latest.getAndSet(null); - if (!empty && emitLast) { - downstream.onNext(v); + if (!empty) { + v = latest.getAndSet(null); + if (emitLast) { + downstream.onNext(v); + } else { + if (onDropped != null) { + try { + onDropped.accept(v); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + worker.dispose(); + return; + } + } + } } downstream.onComplete(); worker.dispose(); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleLatestTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleLatestTest.java index fe19765b20..2b26ec98d4 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleLatestTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleLatestTest.java @@ -23,10 +23,11 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.subscriptions.EmptySubscription; import io.reactivex.rxjava3.processors.PublishProcessor; import io.reactivex.rxjava3.schedulers.TestScheduler; import io.reactivex.rxjava3.subscribers.TestSubscriber; -import io.reactivex.rxjava3.testsupport.TestHelper; +import io.reactivex.rxjava3.testsupport.*; public class FlowableThrottleLatestTest extends RxJavaTest { @@ -278,4 +279,480 @@ public void onNext(Integer t) { ts.assertResult(1, 2); } + + /** Emit 1, 2, 3, then advance time by a second; 1 and 3 should end up in downstream, 2 should be dropped. */ + @Test + public void onDroppedBasicNoEmitLast() { + PublishProcessor pp =PublishProcessor.create(); + + TestScheduler sch = new TestScheduler(); + + TestSubscriber drops = new TestSubscriber<>(); + drops.onSubscribe(EmptySubscription.INSTANCE); + + TestSubscriber ts = pp.throttleLatest(1, TimeUnit.SECONDS, sch, false, drops::onNext) + .test(); + + ts.assertEmpty(); + drops.assertEmpty(); + + pp.onNext(1); + + ts.assertValuesOnly(1); + drops.assertEmpty(); + + pp.onNext(2); + + ts.assertValuesOnly(1); + drops.assertEmpty(); + + pp.onNext(3); + + ts.assertValuesOnly(1); + drops.assertValuesOnly(2); + + sch.advanceTimeBy(1, TimeUnit.SECONDS); + + ts.assertValuesOnly(1, 3); + drops.assertValuesOnly(2); + + pp.onComplete(); + + ts.assertResult(1, 3); + + drops.assertValuesOnly(2); + } + + /** Emit 1, 2, 3; 1 should end up in downstream, 2, 3 should be dropped. */ + @Test + public void onDroppedBasicNoEmitLastDropLast() { + PublishProcessor pp =PublishProcessor.create(); + + TestScheduler sch = new TestScheduler(); + + TestSubscriber drops = new TestSubscriber<>(); + drops.onSubscribe(EmptySubscription.INSTANCE); + + TestSubscriber ts = pp.throttleLatest(1, TimeUnit.SECONDS, sch, false, drops::onNext) + .test(); + + ts.assertEmpty(); + drops.assertEmpty(); + + pp.onNext(1); + + ts.assertValuesOnly(1); + drops.assertEmpty(); + + pp.onNext(2); + + ts.assertValuesOnly(1); + drops.assertEmpty(); + + pp.onNext(3); + + ts.assertValuesOnly(1); + drops.assertValuesOnly(2); + + pp.onComplete(); + + ts.assertResult(1); + + drops.assertValuesOnly(2, 3); + } + + /** Emit 1, 2, 3; 1 and 3 should end up in downstream, 2 should be dropped. */ + @Test + public void onDroppedBasicEmitLast() { + PublishProcessor pp =PublishProcessor.create(); + + TestScheduler sch = new TestScheduler(); + + TestSubscriber drops = new TestSubscriber<>(); + drops.onSubscribe(EmptySubscription.INSTANCE); + + TestSubscriber ts = pp.throttleLatest(1, TimeUnit.SECONDS, sch, true, drops::onNext) + .test(); + + ts.assertEmpty(); + drops.assertEmpty(); + + pp.onNext(1); + + ts.assertValuesOnly(1); + drops.assertEmpty(); + + pp.onNext(2); + + ts.assertValuesOnly(1); + drops.assertEmpty(); + + pp.onNext(3); + + ts.assertValuesOnly(1); + drops.assertValuesOnly(2); + + pp.onComplete(); + + ts.assertResult(1, 3); + + drops.assertValuesOnly(2); + } + + /** Emit 1, 2, 3; 3 should trigger an error to the downstream because 2 is dropped and the callback crashes. */ + @Test + public void onDroppedBasicNoEmitLastFirstDropCrash() throws Throwable { + PublishProcessor pp =PublishProcessor.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestSubscriber ts = pp + .doOnCancel(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, false, d -> { + if (d == 2) { + throw new TestException("forced"); + } + }) + .test(); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertValuesOnly(1); + + pp.onNext(2); + + ts.assertValuesOnly(1); + + pp.onNext(3); + + ts.assertFailure(TestException.class, 1); + + verify(whenDisposed).run(); + } + + /** + * Emit 1, 2, Error; the error should trigger the drop callback and crash it too, + * downstream gets 1, composite(source, drop-crash). + */ + @Test + public void onDroppedBasicNoEmitLastOnErrorDropCrash() throws Throwable { + PublishProcessor pp =PublishProcessor.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestSubscriberEx ts = pp + .doOnCancel(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, false, d -> { throw new TestException("forced " + d); }) + .subscribeWith(new TestSubscriberEx<>()); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertValuesOnly(1); + + pp.onNext(2); + + ts.assertValuesOnly(1); + + pp.onError(new TestException("source")); + + ts.assertFailure(CompositeException.class, 1); + + TestHelper.assertCompositeExceptions(ts, TestException.class, "source", TestException.class, "forced 2"); + + verify(whenDisposed, never()).run(); + } + + /** + * Emit 1, 2, 3; 3 should trigger a drop-crash for 2, which then would trigger the error path and drop-crash for 3, + * the last item not delivered, downstream gets 1, composite(drop-crash 2, drop-crash 3). + */ + @Test + public void onDroppedBasicEmitLastOnErrorDropCrash() throws Throwable { + PublishProcessor pp =PublishProcessor.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestSubscriberEx ts = pp + .doOnCancel(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, true, d -> { throw new TestException("forced " + d); }) + .subscribeWith(new TestSubscriberEx<>()); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertValuesOnly(1); + + pp.onNext(2); + + ts.assertValuesOnly(1); + + pp.onNext(3); + + ts.assertFailure(CompositeException.class, 1); + + TestHelper.assertCompositeExceptions(ts, TestException.class, "forced 2", TestException.class, "forced 3"); + + verify(whenDisposed).run(); + } + + /** Emit 1, complete; Downstream gets 1, complete, no drops. */ + @Test + public void onDroppedBasicNoEmitLastNoLastToDrop() { + PublishProcessor pp =PublishProcessor.create(); + + TestScheduler sch = new TestScheduler(); + + TestSubscriber drops = new TestSubscriber<>(); + drops.onSubscribe(EmptySubscription.INSTANCE); + + TestSubscriber ts = pp.throttleLatest(1, TimeUnit.SECONDS, sch, false, drops::onNext) + .test(); + + ts.assertEmpty(); + drops.assertEmpty(); + + pp.onNext(1); + + ts.assertValuesOnly(1); + drops.assertEmpty(); + + pp.onComplete(); + + ts.assertResult(1); + drops.assertEmpty(); + } + + /** Emit 1, error; Downstream gets 1, error, no drops. */ + @Test + public void onDroppedErrorNoEmitLastNoLastToDrop() { + PublishProcessor pp =PublishProcessor.create(); + + TestScheduler sch = new TestScheduler(); + + TestSubscriber drops = new TestSubscriber<>(); + drops.onSubscribe(EmptySubscription.INSTANCE); + + TestSubscriber ts = pp.throttleLatest(1, TimeUnit.SECONDS, sch, false, drops::onNext) + .test(); + + ts.assertEmpty(); + drops.assertEmpty(); + + pp.onNext(1); + + ts.assertValuesOnly(1); + drops.assertEmpty(); + + pp.onError(new TestException()); + + ts.assertFailure(TestException.class, 1); + drops.assertEmpty(); + } + + /** + * Emit 1, 2, complete; complete should crash drop, downstream gets 1, drop-crash 2. + */ + @Test + public void onDroppedHasLastNoEmitLastDropCrash() throws Throwable { + PublishProcessor pp =PublishProcessor.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestSubscriberEx ts = pp + .doOnCancel(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, false, d -> { throw new TestException("forced " + d); }) + .subscribeWith(new TestSubscriberEx<>()); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertValuesOnly(1); + + pp.onNext(2); + + ts.assertValuesOnly(1); + + pp.onComplete(); + + ts.assertFailureAndMessage(TestException.class, "forced 2", 1); + + verify(whenDisposed, never()).run(); + } + + /** + * Emit 1, 2 then dispose the sequence; downstream gets 1, drop should get for 2. + */ + @Test + public void onDroppedDisposeDrops() throws Throwable { + PublishProcessor pp =PublishProcessor.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestSubscriber drops = new TestSubscriber<>(); + drops.onSubscribe(EmptySubscription.INSTANCE); + + TestSubscriberEx ts = pp + .doOnCancel(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, false, drops::onNext) + .subscribeWith(new TestSubscriberEx<>()); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertValuesOnly(1); + + pp.onNext(2); + + ts.assertValuesOnly(1); + + ts.cancel(); + + ts.assertValuesOnly(1); + drops.assertValuesOnly(2); + + verify(whenDisposed).run(); + } + + /** + * Emit 1 then dispose the sequence; downstream gets 1, drop should not get called. + */ + @Test + public void onDroppedDisposeNoDrops() throws Throwable { + PublishProcessor pp =PublishProcessor.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestSubscriber drops = new TestSubscriber<>(); + drops.onSubscribe(EmptySubscription.INSTANCE); + + TestSubscriberEx ts = pp + .doOnCancel(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, false, drops::onNext) + .subscribeWith(new TestSubscriberEx<>()); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertValuesOnly(1); + + ts.cancel(); + + ts.assertValuesOnly(1); + drops.assertEmpty(); + + verify(whenDisposed).run(); + } + + /** + * Emit 1, 2 then dispose the sequence; downstream gets 1, global error handler should get drop-crash 2. + */ + @Test + public void onDroppedDisposeCrashesDrop() throws Throwable { + TestHelper.withErrorTracking(errors -> { + PublishProcessor pp =PublishProcessor.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestSubscriberEx ts = pp + .doOnCancel(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, false, d -> { throw new TestException("forced " + d); }) + .subscribeWith(new TestSubscriberEx<>()); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertValuesOnly(1); + + pp.onNext(2); + + ts.assertValuesOnly(1); + + ts.cancel(); + + ts.assertValuesOnly(1); + + verify(whenDisposed).run(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "forced 2"); + }); + } + + /** Emit 1 but downstream is backpressured; downstream gets MBE, drops gets 1. */ + @Test + public void onDroppedBackpressured() throws Throwable { + PublishProcessor pp =PublishProcessor.create(); + + TestScheduler sch = new TestScheduler(); + + TestSubscriber drops = new TestSubscriber<>(); + drops.onSubscribe(EmptySubscription.INSTANCE); + + Action whenDisposed = mock(Action.class); + + TestSubscriber ts = pp + .doOnCancel(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, false, drops::onNext) + .test(0L); + + ts.assertEmpty(); + drops.assertEmpty(); + + pp.onNext(1); + + ts.assertFailure(MissingBackpressureException.class); + + drops.assertValuesOnly(1); + + verify(whenDisposed).run(); + } + + /** Emit 1 but downstream is backpressured; drop crashes, downstream gets composite(MBE, drop-crash 1). */ + @Test + public void onDroppedBackpressuredDropCrash() throws Throwable { + PublishProcessor pp =PublishProcessor.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestSubscriberEx ts = pp + .doOnCancel(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, false, d -> { throw new TestException("forced " + d); }) + .subscribeWith(new TestSubscriberEx<>(0L)); + + ts.assertEmpty(); + + pp.onNext(1); + + ts.assertFailure(CompositeException.class); + + TestHelper.assertCompositeExceptions(ts, + MissingBackpressureException.class, "Could not emit value due to lack of requests", + TestException.class, "forced 1"); + + verify(whenDisposed).run(); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleLatestTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleLatestTest.java index c36d04babe..e1c15f3b1a 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleLatestTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleLatestTest.java @@ -20,12 +20,13 @@ import org.junit.Test; import io.reactivex.rxjava3.core.*; -import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.observers.TestObserver; import io.reactivex.rxjava3.schedulers.TestScheduler; import io.reactivex.rxjava3.subjects.PublishSubject; -import io.reactivex.rxjava3.testsupport.TestHelper; +import io.reactivex.rxjava3.testsupport.*; public class ObservableThrottleLatestTest extends RxJavaTest { @@ -221,4 +222,424 @@ public void onNext(Integer t) { to.assertResult(1, 2); } + + /** Emit 1, 2, 3, then advance time by a second; 1 and 3 should end up in downstream, 2 should be dropped. */ + @Test + public void onDroppedBasicNoEmitLast() { + PublishSubject ps = PublishSubject.create(); + + TestScheduler sch = new TestScheduler(); + + TestObserver drops = new TestObserver<>(); + drops.onSubscribe(Disposable.empty()); + + TestObserver to = ps.throttleLatest(1, TimeUnit.SECONDS, sch, false, drops::onNext) + .test(); + + to.assertEmpty(); + drops.assertEmpty(); + + ps.onNext(1); + + to.assertValuesOnly(1); + drops.assertEmpty(); + + ps.onNext(2); + + to.assertValuesOnly(1); + drops.assertEmpty(); + + ps.onNext(3); + + to.assertValuesOnly(1); + drops.assertValuesOnly(2); + + sch.advanceTimeBy(1, TimeUnit.SECONDS); + + to.assertValuesOnly(1, 3); + drops.assertValuesOnly(2); + + ps.onComplete(); + + to.assertResult(1, 3); + + drops.assertValuesOnly(2); + } + + /** Emit 1, 2, 3; 1 should end up in downstream, 2, 3 should be dropped. */ + @Test + public void onDroppedBasicNoEmitLastDropLast() { + PublishSubject ps = PublishSubject.create(); + + TestScheduler sch = new TestScheduler(); + + TestObserver drops = new TestObserver<>(); + drops.onSubscribe(Disposable.empty()); + + TestObserver to = ps.throttleLatest(1, TimeUnit.SECONDS, sch, false, drops::onNext) + .test(); + + to.assertEmpty(); + drops.assertEmpty(); + + ps.onNext(1); + + to.assertValuesOnly(1); + drops.assertEmpty(); + + ps.onNext(2); + + to.assertValuesOnly(1); + drops.assertEmpty(); + + ps.onNext(3); + + to.assertValuesOnly(1); + drops.assertValuesOnly(2); + + ps.onComplete(); + + to.assertResult(1); + + drops.assertValuesOnly(2, 3); + } + + /** Emit 1, 2, 3; 1 and 3 should end up in downstream, 2 should be dropped. */ + @Test + public void onDroppedBasicEmitLast() { + PublishSubject ps = PublishSubject.create(); + + TestScheduler sch = new TestScheduler(); + + TestObserver drops = new TestObserver<>(); + drops.onSubscribe(Disposable.empty()); + + TestObserver to = ps.throttleLatest(1, TimeUnit.SECONDS, sch, true, drops::onNext) + .test(); + + to.assertEmpty(); + drops.assertEmpty(); + + ps.onNext(1); + + to.assertValuesOnly(1); + drops.assertEmpty(); + + ps.onNext(2); + + to.assertValuesOnly(1); + drops.assertEmpty(); + + ps.onNext(3); + + to.assertValuesOnly(1); + drops.assertValuesOnly(2); + + ps.onComplete(); + + to.assertResult(1, 3); + + drops.assertValuesOnly(2); + } + + /** Emit 1, 2, 3; 3 should trigger an error to the downstream because 2 is dropped and the callback crashes. */ + @Test + public void onDroppedBasicNoEmitLastFirstDropCrash() throws Throwable { + PublishSubject ps = PublishSubject.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestObserver to = ps + .doOnDispose(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, false, d -> { + if (d == 2) { + throw new TestException("forced"); + } + }) + .test(); + + to.assertEmpty(); + + ps.onNext(1); + + to.assertValuesOnly(1); + + ps.onNext(2); + + to.assertValuesOnly(1); + + ps.onNext(3); + + to.assertFailure(TestException.class, 1); + + verify(whenDisposed).run(); + } + + /** + * Emit 1, 2, Error; the error should trigger the drop callback and crash it too, + * downstream gets 1, composite(source, drop-crash). + */ + @Test + public void onDroppedBasicNoEmitLastOnErrorDropCrash() throws Throwable { + PublishSubject ps = PublishSubject.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestObserverEx to = ps + .doOnDispose(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, false, d -> { throw new TestException("forced " + d); }) + .subscribeWith(new TestObserverEx<>()); + + to.assertEmpty(); + + ps.onNext(1); + + to.assertValuesOnly(1); + + ps.onNext(2); + + to.assertValuesOnly(1); + + ps.onError(new TestException("source")); + + to.assertFailure(CompositeException.class, 1); + + TestHelper.assertCompositeExceptions(to, TestException.class, "source", TestException.class, "forced 2"); + + verify(whenDisposed, never()).run(); + } + + /** + * Emit 1, 2, 3; 3 should trigger a drop-crash for 2, which then would trigger the error path and drop-crash for 3, + * the last item not delivered, downstream gets 1, composite(drop-crash 2, drop-crash 3). + */ + @Test + public void onDroppedBasicEmitLastOnErrorDropCrash() throws Throwable { + PublishSubject ps = PublishSubject.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestObserverEx to = ps + .doOnDispose(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, true, d -> { throw new TestException("forced " + d); }) + .subscribeWith(new TestObserverEx<>()); + + to.assertEmpty(); + + ps.onNext(1); + + to.assertValuesOnly(1); + + ps.onNext(2); + + to.assertValuesOnly(1); + + ps.onNext(3); + + to.assertFailure(CompositeException.class, 1); + + TestHelper.assertCompositeExceptions(to, TestException.class, "forced 2", TestException.class, "forced 3"); + + verify(whenDisposed).run(); + } + + /** Emit 1, complete; Downstream gets 1, complete, no drops. */ + @Test + public void onDroppedBasicNoEmitLastNoLastToDrop() { + PublishSubject ps = PublishSubject.create(); + + TestScheduler sch = new TestScheduler(); + + TestObserver drops = new TestObserver<>(); + drops.onSubscribe(Disposable.empty()); + + TestObserver to = ps.throttleLatest(1, TimeUnit.SECONDS, sch, false, drops::onNext) + .test(); + + to.assertEmpty(); + drops.assertEmpty(); + + ps.onNext(1); + + to.assertValuesOnly(1); + drops.assertEmpty(); + + ps.onComplete(); + + to.assertResult(1); + drops.assertEmpty(); + } + + /** Emit 1, error; Downstream gets 1, error, no drops. */ + @Test + public void onDroppedErrorNoEmitLastNoLastToDrop() { + PublishSubject ps = PublishSubject.create(); + + TestScheduler sch = new TestScheduler(); + + TestObserver drops = new TestObserver<>(); + drops.onSubscribe(Disposable.empty()); + + TestObserver to = ps.throttleLatest(1, TimeUnit.SECONDS, sch, false, drops::onNext) + .test(); + + to.assertEmpty(); + drops.assertEmpty(); + + ps.onNext(1); + + to.assertValuesOnly(1); + drops.assertEmpty(); + + ps.onError(new TestException()); + + to.assertFailure(TestException.class, 1); + drops.assertEmpty(); + } + + /** + * Emit 1, 2, complete; complete should crash drop, downstream gets 1, drop-crash 2. + */ + @Test + public void onDroppedHasLastNoEmitLastDropCrash() throws Throwable { + PublishSubject ps = PublishSubject.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestObserverEx to = ps + .doOnDispose(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, false, d -> { throw new TestException("forced " + d); }) + .subscribeWith(new TestObserverEx<>()); + + to.assertEmpty(); + + ps.onNext(1); + + to.assertValuesOnly(1); + + ps.onNext(2); + + to.assertValuesOnly(1); + + ps.onComplete(); + + to.assertFailureAndMessage(TestException.class, "forced 2", 1); + + verify(whenDisposed, never()).run(); + } + + /** + * Emit 1, 2 then dispose the sequence; downstream gets 1, drop should get for 2. + */ + @Test + public void onDroppedDisposeDrops() throws Throwable { + PublishSubject ps = PublishSubject.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestObserver drops = new TestObserver<>(); + drops.onSubscribe(Disposable.empty()); + + TestObserverEx to = ps + .doOnDispose(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, false, drops::onNext) + .subscribeWith(new TestObserverEx<>()); + + to.assertEmpty(); + + ps.onNext(1); + + to.assertValuesOnly(1); + + ps.onNext(2); + + to.assertValuesOnly(1); + + to.dispose(); + + to.assertValuesOnly(1); + drops.assertValuesOnly(2); + + verify(whenDisposed).run(); + } + + /** + * Emit 1 then dispose the sequence; downstream gets 1, drop should not get called. + */ + @Test + public void onDroppedDisposeNoDrops() throws Throwable { + PublishSubject ps = PublishSubject.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestObserver drops = new TestObserver<>(); + drops.onSubscribe(Disposable.empty()); + + TestObserverEx to = ps + .doOnDispose(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, false, drops::onNext) + .subscribeWith(new TestObserverEx<>()); + + to.assertEmpty(); + + ps.onNext(1); + + to.assertValuesOnly(1); + + to.dispose(); + + to.assertValuesOnly(1); + drops.assertEmpty(); + + verify(whenDisposed).run(); + } + + /** + * Emit 1, 2 then dispose the sequence; downstream gets 1, global error handler should get drop-crash 2. + */ + @Test + public void onDroppedDisposeCrashesDrop() throws Throwable { + TestHelper.withErrorTracking(errors -> { + PublishSubject ps = PublishSubject.create(); + + TestScheduler sch = new TestScheduler(); + + Action whenDisposed = mock(Action.class); + + TestObserverEx to = ps + .doOnDispose(whenDisposed) + .throttleLatest(1, TimeUnit.SECONDS, sch, false, d -> { throw new TestException("forced " + d); }) + .subscribeWith(new TestObserverEx<>()); + + to.assertEmpty(); + + ps.onNext(1); + + to.assertValuesOnly(1); + + ps.onNext(2); + + to.assertValuesOnly(1); + + to.dispose(); + + to.assertValuesOnly(1); + + verify(whenDisposed).run(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "forced 2"); + }); + } } diff --git a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java index 362b1d165a..d38ad8f998 100644 --- a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java +++ b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java @@ -2440,23 +2440,16 @@ public static void assertCompositeExceptions(TestSubscriberEx ts, Class ts, Object... classes) { + public static void assertCompositeExceptions(TestSubscriberEx ts, Object... classesAndMessages) { ts .assertSubscribed() .assertError(CompositeException.class) .assertNotComplete(); - List list = compositeList(ts.errors().get(0)); - - assertEquals(classes.length, list.size()); - - for (int i = 0; i < classes.length; i += 2) { - assertError(list, i, (Class)classes[i], (String)classes[i + 1]); - } + assertCompositeExceptionListOf(ts.errors().get(0), classesAndMessages); } /** @@ -2485,22 +2478,26 @@ public static void assertCompositeExceptions(TestObserverEx to, Class to, Object... classes) { + public static void assertCompositeExceptions(TestObserverEx to, Object... classesAndMessages) { to .assertSubscribed() .assertError(CompositeException.class) .assertNotComplete(); - List list = compositeList(to.errors().get(0)); + assertCompositeExceptionListOf(to.errors().get(0), classesAndMessages); + } - assertEquals(classes.length, list.size()); + @SuppressWarnings("unchecked") + static void assertCompositeExceptionListOf(Throwable ex, Object... classesAndMessages) { + List list = compositeList(ex); + + assertEquals(classesAndMessages.length, 2 * list.size()); - for (int i = 0; i < classes.length; i += 2) { - assertError(list, i, (Class)classes[i], (String)classes[i + 1]); + for (int i = 0; i < list.size(); i++) { + assertError(list, i, (Class)classesAndMessages[2 * i], (String)classesAndMessages[2 * i + 1]); } } diff --git a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java index 8b5fb4380d..a0e75f34ff 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java @@ -230,6 +230,7 @@ public void checkParallelFlowable() { addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleLatest", Long.TYPE, TimeUnit.class, Scheduler.class)); addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleLatest", Long.TYPE, TimeUnit.class, Boolean.TYPE)); addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleLatest", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleLatest", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE, Consumer.class)); // negative buffer time is considered as zero buffer time addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class)); @@ -474,6 +475,7 @@ public void checkParallelFlowable() { addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleLatest", Long.TYPE, TimeUnit.class, Scheduler.class)); addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleLatest", Long.TYPE, TimeUnit.class, Boolean.TYPE)); addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleLatest", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleLatest", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE, Consumer.class)); // negative buffer time is considered as zero buffer time addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class));