Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/main/java/io/reactivex/rxjava4/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.reactivex.rxjava4.internal.operators.maybe.*;
import io.reactivex.rxjava4.internal.operators.mixed.*;
import io.reactivex.rxjava4.internal.operators.single.SingleDelayWithCompletable;
import io.reactivex.rxjava4.internal.operators.streamable.StreamableFromCompletable;
import io.reactivex.rxjava4.observers.TestObserver;
import io.reactivex.rxjava4.plugins.RxJavaPlugins;
import io.reactivex.rxjava4.schedulers.Schedulers;
Expand Down Expand Up @@ -3221,6 +3222,26 @@ public final Future<Void> toFuture() {
return RxJavaPlugins.onAssembly(new CompletableToSingle<>(this, null, completionValue));
}

/**
* Returns an {@link Streamable} which when subscribed to subscribes to this {@code Completable} and
* relays the terminal events to the downstream {@link Streamer}.
* <p>
* <img width="640" height="293" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.toStreamable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toStreamable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @return the new {@code Streamable} instance
* @since 4.0.0
*/
@SchedulerSupport(SchedulerSupport.NONE)
@CheckReturnValue
@NonNull
public final <@NonNull T> Streamable<T> toStreamable() {
return RxJavaPlugins.onAssembly(new StreamableFromCompletable<>(this));
}

/**
* Returns a {@code Completable} which makes sure when an observer disposes the subscription, the
* {@code dispose()} method is called on the specified {@link Scheduler}.
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/io/reactivex/rxjava4/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.reactivex.rxjava4.internal.operators.maybe.*;
import io.reactivex.rxjava4.internal.operators.mixed.*;
import io.reactivex.rxjava4.internal.operators.observable.ObservableElementAtMaybe;
import io.reactivex.rxjava4.internal.operators.streamable.StreamableFromMaybe;
import io.reactivex.rxjava4.observers.TestObserver;
import io.reactivex.rxjava4.plugins.RxJavaPlugins;
import io.reactivex.rxjava4.schedulers.*;
Expand Down Expand Up @@ -3889,6 +3890,25 @@ public final Single<T> toSingle() {
return RxJavaPlugins.onAssembly(new MaybeToSingle<>(this, null));
}

/**
* Returns an {@link Streamable} which when subscribed to subscribes to this {@code Maybe} and
* relays the terminal events to the downstream {@link Streamer}.
* <p>
* <img width="640" height="346" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.toStreamable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toStreamable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new {@code Streamable} instance
* @since 4.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Streamable<T> toStreamable() {
return RxJavaPlugins.onAssembly(new StreamableFromMaybe<>(this));
}

/**
* Returns a {@code Maybe} instance that if this {@code Maybe} emits an error, it will emit an {@code onComplete}
* and swallow the throwable.
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/io/reactivex/rxjava4/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.reactivex.rxjava4.internal.operators.mixed.*;
import io.reactivex.rxjava4.internal.operators.observable.ObservableSingleSingle;
import io.reactivex.rxjava4.internal.operators.single.*;
import io.reactivex.rxjava4.internal.operators.streamable.StreamableFromSingle;
import io.reactivex.rxjava4.observers.TestObserver;
import io.reactivex.rxjava4.plugins.RxJavaPlugins;
import io.reactivex.rxjava4.schedulers.*;
Expand Down Expand Up @@ -4887,6 +4888,25 @@ public final Observable<T> toObservable() {
return RxJavaPlugins.onAssembly(new SingleToObservable<>(this));
}

/**
* Returns an {@link Streamable} which when subscribed to subscribes to this {@code Single} and
* relays the terminal events to the downstream {@link Streamer}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.toStreamable.v3.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toStreamable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new {@code Streamable} instance
* @since 4.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Streamable<T> toStreamable() {
return RxJavaPlugins.onAssembly(new StreamableFromSingle<>(this));
}

/**
* Returns a {@code Single} which makes sure when a {@link SingleObserver} disposes the {@link Disposable},
* that call is propagated up on the specified {@link Scheduler}.
Expand Down
Loading
Loading