Skip to content

Commit

Permalink
3.x: Add onDropped callback to throttleLatest operator (#7457)
Browse files Browse the repository at this point in the history
* 3.x: Add onDropped callback to throttleLatest operator

* Reorganize imports
  • Loading branch information
akarnokd committed Aug 9, 2022
1 parent 2edba23 commit 9d4631b
Show file tree
Hide file tree
Showing 8 changed files with 1,193 additions and 53 deletions.
56 changes: 55 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Flowable.java
Expand Up @@ -17327,7 +17327,61 @@ public final Flowable<T> throttleLatest(long timeout, @NonNull TimeUnit unit, @N
public final Flowable<T> throttleLatest(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<>(this, timeout, unit, scheduler, emitLast));
return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<>(this, timeout, unit, scheduler, emitLast, null));
}

/**
* Throttles items from the upstream {@code Flowable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
* the specified timeout elapses between them, invoking the consumer for any dropped item.
* <p>
* <img width="640" height="326" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.se.png" alt="">
* <p>
* If no items were emitted from the upstream during this timeout phase, the next
* upstream item is emitted immediately and the timeout window starts from then.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.
* If the downstream is not ready to receive items, a
* {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException MissingBackpressureException}
* will be signaled.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>
* If the upstream signals an {@code onError} or {@code onDropped} callback crashes,
* the error is delivered immediately to the downstream. If both happen, a {@link CompositeException}
* is created, containing both the upstream and the callback error.
* If the {@code onDropped} callback crashes during cancellation, the exception is forwarded
* to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
* </dd>
* </dl>
* @param timeout the time to wait after an item emission towards the downstream
* before trying to emit the latest item from upstream again
* @param unit the time unit
* @param scheduler the {@code Scheduler} where the timed wait and latest item
* emission will be performed
* @param emitLast If {@code true}, the very last item from the upstream will be emitted
* immediately when the upstream completes, regardless if there is
* a timeout window active or not. If {@code false}, the very last
* upstream item is ignored and the flow terminates.
* @param onDropped called when an item is replaced by a newer item that doesn't get delivered
* to the downstream, including the very last item if {@code emitLast} is {@code false}
* and the current undelivered item when the sequence gets canceled.
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code unit}, {@code scheduler} or {@code onDropped} is {@code null}
* @since 3.1.6 - Experimental
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@Experimental
public final Flowable<T> throttleLatest(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<>(this, timeout, unit, scheduler, emitLast, onDropped));
}

/**
Expand Down
50 changes: 49 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Observable.java
Expand Up @@ -14362,7 +14362,55 @@ public final Observable<T> throttleLatest(long timeout, @NonNull TimeUnit unit,
public final Observable<T> throttleLatest(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableThrottleLatest<>(this, timeout, unit, scheduler, emitLast));
return RxJavaPlugins.onAssembly(new ObservableThrottleLatest<>(this, timeout, unit, scheduler, emitLast, null));
}

/**
* Throttles items from the current {@code Observable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
* the specified timeout elapses between them, invoking the consumer for any dropped item.
* <p>
* <img width="640" height="326" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.se.png" alt="">
* <p>
* If no items were emitted from the upstream during this timeout phase, the next
* upstream item is emitted immediately and the timeout window starts from then.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>
* If the upstream signals an {@code onError} or {@code onDropped} callback crashes,
* the error is delivered immediately to the downstream. If both happen, a {@link CompositeException}
* is created, containing both the upstream and the callback error.
* If the {@code onDropped} callback crashes when the sequence gets disposed, the exception is forwarded
* to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
* </dd>
* </dl>
* @param timeout the time to wait after an item emission towards the downstream
* before trying to emit the latest item from upstream again
* @param unit the time unit
* @param scheduler the {@code Scheduler} where the timed wait and latest item
* emission will be performed
* @param emitLast If {@code true}, the very last item from the upstream will be emitted
* immediately when the upstream completes, regardless if there is
* a timeout window active or not. If {@code false}, the very last
* upstream item is ignored and the flow terminates.
* @param onDropped called when an item is replaced by a newer item that doesn't get delivered
* to the downstream, including the very last item if {@code emitLast} is {@code false}
* and the current undelivered item when the sequence gets disposed.
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit}, {@code scheduler} or {@code onDropped} is {@code null}
* @since 3.1.6 - Experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
@Experimental
public final Observable<T> throttleLatest(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new ObservableThrottleLatest<>(this, timeout, unit, scheduler, emitLast, onDropped));
}

/**
Expand Down
Expand Up @@ -19,9 +19,11 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* Emits the next or latest item when the given time elapses.
Expand All @@ -44,19 +46,24 @@ public final class FlowableThrottleLatest<T> extends AbstractFlowableWithUpstrea

final boolean emitLast;

final Consumer<? super T> onDropped;

public FlowableThrottleLatest(Flowable<T> source,
long timeout, TimeUnit unit, Scheduler scheduler,
boolean emitLast) {
long timeout, TimeUnit unit,
Scheduler scheduler,
boolean emitLast,
Consumer<? super T> onDropped) {
super(source);
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
this.emitLast = emitLast;
this.onDropped = onDropped;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new ThrottleLatestSubscriber<>(s, timeout, unit, scheduler.createWorker(), emitLast));
source.subscribe(new ThrottleLatestSubscriber<>(s, timeout, unit, scheduler.createWorker(), emitLast, onDropped));
}

static final class ThrottleLatestSubscriber<T>
Expand All @@ -79,6 +86,8 @@ static final class ThrottleLatestSubscriber<T>

final AtomicLong requested;

final Consumer<? super T> onDropped;

Subscription upstream;

volatile boolean done;
Expand All @@ -93,15 +102,18 @@ static final class ThrottleLatestSubscriber<T>
boolean timerRunning;

ThrottleLatestSubscriber(Subscriber<? super T> downstream,
long timeout, TimeUnit unit, Scheduler.Worker worker,
boolean emitLast) {
long timeout, TimeUnit unit,
Scheduler.Worker worker,
boolean emitLast,
Consumer<? super T> onDropped) {
this.downstream = downstream;
this.timeout = timeout;
this.unit = unit;
this.worker = worker;
this.emitLast = emitLast;
this.latest = new AtomicReference<>();
this.requested = new AtomicLong();
this.onDropped = onDropped;
}

@Override
Expand All @@ -115,7 +127,17 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
latest.set(t);
T old = latest.getAndSet(t);
if (onDropped != null && old != null) {
try {
onDropped.accept(old);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
error = ex;
done = true;
}
}
drain();
}

Expand Down Expand Up @@ -145,6 +167,22 @@ public void cancel() {
upstream.cancel();
worker.dispose();
if (getAndIncrement() == 0) {
clear();
}
}

void clear() {
if (onDropped != null) {
T v = latest.getAndSet(null);
if (v != null) {
try {
onDropped.accept(v);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
} else {
latest.lazySet(null);
}
}
Expand All @@ -170,14 +208,27 @@ void drain() {

for (;;) {
if (cancelled) {
latest.lazySet(null);
clear();
return;
}

boolean d = done;
Throwable error = this.error;

if (d && error != null) {
latest.lazySet(null);
if (onDropped != null) {
T v = latest.getAndSet(null);
if (v != null) {
try {
onDropped.accept(v);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
error = new CompositeException(error, ex);
}
}
} else {
latest.lazySet(null);
}
downstream.onError(error);
worker.dispose();
return;
Expand All @@ -187,19 +238,31 @@ void drain() {
boolean empty = v == null;

if (d) {
if (!empty && emitLast) {
if (!empty) {
v = latest.getAndSet(null);
long e = emitted;
if (e != requested.get()) {
emitted = e + 1;
downstream.onNext(v);
downstream.onComplete();
if (emitLast) {
long e = emitted;
if (e != requested.get()) {
emitted = e + 1;
downstream.onNext(v);
downstream.onComplete();
} else {
tryDropAndSignalMBE(v);
}
} else {
downstream.onError(new MissingBackpressureException(
"Could not emit final value due to lack of requests"));
if (onDropped != null) {
try {
onDropped.accept(v);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
worker.dispose();
return;
}
}
downstream.onComplete();
}
} else {
latest.lazySet(null);
downstream.onComplete();
}
worker.dispose();
Expand All @@ -222,8 +285,7 @@ void drain() {
emitted = e + 1;
} else {
upstream.cancel();
downstream.onError(new MissingBackpressureException(
"Could not emit value due to lack of requests"));
tryDropAndSignalMBE(v);
worker.dispose();
return;
}
Expand All @@ -242,5 +304,19 @@ void drain() {
}
}
}

void tryDropAndSignalMBE(T valueToDrop) {
Throwable errorToSignal = new MissingBackpressureException(
"Could not emit value due to lack of requests");
if (onDropped != null) {
try {
onDropped.accept(valueToDrop);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
errorToSignal = new CompositeException(errorToSignal, ex);
}
}
downstream.onError(errorToSignal);
}
}
}

0 comments on commit 9d4631b

Please sign in to comment.