Skip to content

Commit

Permalink
Add onDropped callback for throttleWithTimeout - #7458 (#7510)
Browse files Browse the repository at this point in the history
  • Loading branch information
Desislav-Petrov authored Jan 9, 2023
1 parent 65d0739 commit bf8da15
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 38 deletions.
93 changes: 92 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8930,7 +8930,57 @@ public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit) {
public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler));
return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler, null));
}

/**
* Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the
* current {@code Flowable} that are followed by newer items before a timeout value expires on a specified
* {@link Scheduler}. The timer resets on each emission.
* <p>
* <em>Note:</em> If items keep being emitted by the current {@code Flowable} faster than the timeout then no items
* will be emitted by the resulting {@code Flowable}.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/debounce.s.v3.png" alt="">
* <p>
* Delivery of the item after the grace period happens on the given {@code Scheduler}'s
* {@code Worker} which if takes too long, a newer item may arrive from the upstream, causing the
* {@code Worker}'s task to get disposed, which may also interrupt any downstream blocking operation
* (yielding an {@code InterruptedException}). It is recommended processing items
* that may take long time to be moved to another thread via {@link #observeOn} applied after
* {@code debounce} itself.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param timeout
* the time each item has to be "the most recent" of those emitted by the current {@code Flowable} to
* ensure that it's not dropped
* @param unit
* the unit of time for the specified {@code timeout}
* @param scheduler
* the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
* item
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @see #throttleWithTimeout(long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler, onDropped));
}

/**
Expand Down Expand Up @@ -17587,6 +17637,47 @@ public final Flowable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit uni
return debounce(timeout, unit, scheduler);
}

/**
* Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the
* current {@code Flowable} that are followed by newer items before a timeout value expires on a specified
* {@link Scheduler}. The timer resets on each emission (alias to {@link #debounce(long, TimeUnit, Scheduler)}).
* <p>
* <em>Note:</em> If items keep being emitted by the current {@code Flowable} faster than the timeout then no items
* will be emitted by the resulting {@code Flowable}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleWithTimeout.s.v3.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param timeout
* the length of the window of time that must pass after the emission of an item from the current
* {@code Flowable} in which it emits no items in order for the item to be emitted by the
* resulting {@code Flowable}
* @param unit
* the unit of time for the specified {@code timeout}
* @param scheduler
* the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
* item
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @see #debounce(long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Flowable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
return debounce(timeout, unit, scheduler, onDropped);
}

/**
* Returns a {@code Flowable} that emits records of the time interval between consecutive items emitted by the
* current {@code Flowable}.
Expand Down
85 changes: 84 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7896,7 +7896,53 @@ public final Observable<T> debounce(long timeout, @NonNull TimeUnit unit) {
public final Observable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableDebounceTimed<>(this, timeout, unit, scheduler));
return RxJavaPlugins.onAssembly(new ObservableDebounceTimed<>(this, timeout, unit, scheduler, null));
}

/**
* Returns an {@code Observable} that mirrors the current {@code Observable}, except that it drops items emitted by the
* current {@code Observable} that are followed by newer items before a timeout value expires on a specified
* {@link Scheduler}. The timer resets on each emission.
* <p>
* <em>Note:</em> If items keep being emitted by the current {@code Observable} faster than the timeout then no items
* will be emitted by the resulting {@code Observable}.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/debounce.s.v3.png" alt="">
* <p>
* Delivery of the item after the grace period happens on the given {@code Scheduler}'s
* {@code Worker} which if takes too long, a newer item may arrive from the upstream, causing the
* {@code Worker}'s task to get disposed, which may also interrupt any downstream blocking operation
* (yielding an {@code InterruptedException}). It is recommended processing items
* that may take long time to be moved to another thread via {@link #observeOn} applied after
* {@code debounce} itself.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param timeout
* the time each item has to be "the most recent" of those emitted by the current {@code Observable} to
* ensure that it's not dropped
* @param unit
* the unit of time for the specified {@code timeout}
* @param scheduler
* the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
* item
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} } or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
* @see #throttleWithTimeout(long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new ObservableDebounceTimed<>(this, timeout, unit, scheduler, onDropped));
}

/**
Expand Down Expand Up @@ -14597,6 +14643,43 @@ public final Observable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit u
return debounce(timeout, unit, scheduler);
}

/**
* Returns an {@code Observable} that mirrors the current {@code Observable}, except that it drops items emitted by the
* current {@code Observable} that are followed by newer items before a timeout value expires on a specified
* {@link Scheduler}. The timer resets on each emission (Alias to {@link #debounce(long, TimeUnit, Scheduler)}).
* <p>
* <em>Note:</em> If items keep being emitted by the current {@code Observable} faster than the timeout then no items
* will be emitted by the resulting {@code Observable}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleWithTimeout.s.v3.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param timeout
* the length of the window of time that must pass after the emission of an item from the current
* {@code Observable}, in which the current {@code Observable} emits no items, in order for the item to be emitted by the
* resulting {@code Observable}
* @param unit
* the unit of time for the specified {@code timeout}
* @param scheduler
* the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
* item
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
* @see #debounce(long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final Observable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
return debounce(timeout, unit, scheduler, onDropped);
}

/**
* Returns an {@code Observable} that emits records of the time interval between consecutive items emitted by the
* current {@code Observable}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.*;

import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Consumer;
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
Expand All @@ -32,19 +34,20 @@ public final class FlowableDebounceTimed<T> extends AbstractFlowableWithUpstream
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
final Consumer<T> onDropped;

public FlowableDebounceTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler) {
public FlowableDebounceTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
super(source);
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
this.onDropped = onDropped;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new DebounceTimedSubscriber<>(
new SerializedSubscriber<>(s),
timeout, unit, scheduler.createWorker()));
new SerializedSubscriber<>(s), timeout, unit, scheduler.createWorker(), onDropped));
}

static final class DebounceTimedSubscriber<T> extends AtomicLong
Expand All @@ -55,20 +58,22 @@ static final class DebounceTimedSubscriber<T> extends AtomicLong
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
final Consumer<T> onDropped;

Subscription upstream;

Disposable timer;
DebounceEmitter<T> timer;

volatile long index;

boolean done;

DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer<T> onDropped) {
this.downstream = actual;
this.timeout = timeout;
this.unit = unit;
this.worker = worker;
this.onDropped = onDropped;
}

@Override
Expand All @@ -93,6 +98,18 @@ public void onNext(T t) {
d.dispose();
}

if (onDropped != null && timer != null) {
try {
onDropped.accept(timer.value);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
done = true;
downstream.onError(ex);
worker.dispose();
}
}

DebounceEmitter<T> de = new DebounceEmitter<>(t, idx, this);
timer = de;
d = worker.schedule(de, timeout, unit);
Expand Down Expand Up @@ -121,15 +138,13 @@ public void onComplete() {
}
done = true;

Disposable d = timer;
DebounceEmitter<T> d = timer;
if (d != null) {
d.dispose();
}

@SuppressWarnings("unchecked")
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
if (de != null) {
de.emit();
if (d != null) {
d.emit();
}

downstream.onComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.observers.SerializedObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Expand All @@ -27,19 +29,20 @@ public final class ObservableDebounceTimed<T> extends AbstractObservableWithUpst
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
final Consumer<T> onDropped;

public ObservableDebounceTimed(ObservableSource<T> source, long timeout, TimeUnit unit, Scheduler scheduler) {
public ObservableDebounceTimed(ObservableSource<T> source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
super(source);
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
this.onDropped = onDropped;
}

@Override
public void subscribeActual(Observer<? super T> t) {
source.subscribe(new DebounceTimedObserver<>(
new SerializedObserver<>(t),
timeout, unit, scheduler.createWorker()));
new SerializedObserver<>(t), timeout, unit, scheduler.createWorker(), onDropped));
}

static final class DebounceTimedObserver<T>
Expand All @@ -48,20 +51,22 @@ static final class DebounceTimedObserver<T>
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
final Consumer<T> onDropped;

Disposable upstream;

Disposable timer;
DebounceEmitter<T> timer;

volatile long index;

boolean done;

DebounceTimedObserver(Observer<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
DebounceTimedObserver(Observer<? super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer<T> onDropped) {
this.downstream = actual;
this.timeout = timeout;
this.unit = unit;
this.worker = worker;
this.onDropped = onDropped;
}

@Override
Expand All @@ -85,6 +90,17 @@ public void onNext(T t) {
d.dispose();
}

if (onDropped != null && timer != null) {
try {
onDropped.accept(timer.value);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.dispose();
downstream.onError(ex);
done = true;
}
}

DebounceEmitter<T> de = new DebounceEmitter<>(t, idx, this);
timer = de;
d = worker.schedule(de, timeout, unit);
Expand Down Expand Up @@ -113,15 +129,13 @@ public void onComplete() {
}
done = true;

Disposable d = timer;
DebounceEmitter<T> d = timer;
if (d != null) {
d.dispose();
}

@SuppressWarnings("unchecked")
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
if (de != null) {
de.run();
if (d != null) {
d.run();
}
downstream.onComplete();
worker.dispose();
Expand Down
Loading

0 comments on commit bf8da15

Please sign in to comment.