Skip to content

Commit

Permalink
3.x: [Java 8] Add Observable operators + cleanup (#6797)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Dec 26, 2019
1 parent 480889c commit af17c6e
Show file tree
Hide file tree
Showing 38 changed files with 4,854 additions and 218 deletions.
509 changes: 505 additions & 4 deletions src/main/java/io/reactivex/rxjava3/core/Observable.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static <T> void subscribeStream(Subscriber<? super T> s, Stream<T> stream
}

if (s instanceof ConditionalSubscriber) {
s.onSubscribe(new StreamConditionalSubscription<T>((ConditionalSubscriber<? super T>)s, iterator, stream));
s.onSubscribe(new StreamConditionalSubscription<>((ConditionalSubscriber<? super T>)s, iterator, stream));
} else {
s.onSubscribe(new StreamSubscription<>(s, iterator, stream));
}
Expand Down Expand Up @@ -147,15 +147,23 @@ public T poll() {
once = true;
} else {
if (!iterator.hasNext()) {
clear();
return null;
}
}
return Objects.requireNonNull(iterator.next(), "Iterator.next() returned a null value");
return Objects.requireNonNull(iterator.next(), "The Stream's Iterator.next() returned a null value");
}

@Override
public boolean isEmpty() {
return iterator == null || !iterator.hasNext();
Iterator<T> it = iterator;
if (it != null) {
if (!once || it.hasNext()) {
return false;
}
clear();
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.jdk8;

import java.util.Objects;
import java.util.function.*;
import java.util.stream.Collector;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.internal.disposables.*;
import io.reactivex.rxjava3.internal.observers.DeferredScalarDisposable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* Collect items into a container defined by a Stream {@link Collector} callback set.
*
* @param <T> the upstream value type
* @param <A> the intermediate accumulator type
* @param <R> the result type
* @since 3.0.0
*/
public final class ObservableCollectWithCollector<T, A, R> extends Observable<R> {

final Observable<T> source;

final Collector<T, A, R> collector;

public ObservableCollectWithCollector(Observable<T> source, Collector<T, A, R> collector) {
this.source = source;
this.collector = collector;
}

@Override
protected void subscribeActual(@NonNull Observer<? super R> observer) {
A container;
BiConsumer<A, T> accumulator;
Function<A, R> finisher;

try {
container = collector.supplier().get();
accumulator = collector.accumulator();
finisher = collector.finisher();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}

source.subscribe(new CollectorObserver<>(observer, container, accumulator, finisher));
}

static final class CollectorObserver<T, A, R>
extends DeferredScalarDisposable<R>
implements Observer<T> {

private static final long serialVersionUID = -229544830565448758L;

final BiConsumer<A, T> accumulator;

final Function<A, R> finisher;

Disposable upstream;

boolean done;

A container;

CollectorObserver(Observer<? super R> downstream, A container, BiConsumer<A, T> accumulator, Function<A, R> finisher) {
super(downstream);
this.container = container;
this.accumulator = accumulator;
this.finisher = finisher;
}

@Override
public void onSubscribe(@NonNull Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;

downstream.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
if (done) {
return;
}
try {
accumulator.accept(container, t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.dispose();
onError(ex);
}
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
} else {
done = true;
upstream = DisposableHelper.DISPOSED;
this.container = null;
downstream.onError(t);
}
}

@Override
public void onComplete() {
if (done) {
return;
}

done = true;
upstream = DisposableHelper.DISPOSED;
A container = this.container;
this.container = null;
R result;
try {
result = Objects.requireNonNull(finisher.apply(container), "The finisher returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}

complete(result);
}

@Override
public void dispose() {
super.dispose();
upstream.dispose();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.jdk8;

import java.util.Objects;
import java.util.function.*;
import java.util.stream.Collector;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.internal.disposables.*;
import io.reactivex.rxjava3.internal.fuseable.FuseToObservable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* Collect items into a container defined by a Stream {@link Collector} callback set.
*
* @param <T> the upstream value type
* @param <A> the intermediate accumulator type
* @param <R> the result type
* @since 3.0.0
*/
public final class ObservableCollectWithCollectorSingle<T, A, R> extends Single<R> implements FuseToObservable<R> {

final Observable<T> source;

final Collector<T, A, R> collector;

public ObservableCollectWithCollectorSingle(Observable<T> source, Collector<T, A, R> collector) {
this.source = source;
this.collector = collector;
}

@Override
public Observable<R> fuseToObservable() {
return new ObservableCollectWithCollector<>(source, collector);
}

@Override
protected void subscribeActual(@NonNull SingleObserver<? super R> observer) {
A container;
BiConsumer<A, T> accumulator;
Function<A, R> finisher;

try {
container = collector.supplier().get();
accumulator = collector.accumulator();
finisher = collector.finisher();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}

source.subscribe(new CollectorSingleObserver<>(observer, container, accumulator, finisher));
}

static final class CollectorSingleObserver<T, A, R> implements Observer<T>, Disposable {

final SingleObserver<? super R> downstream;

final BiConsumer<A, T> accumulator;

final Function<A, R> finisher;

Disposable upstream;

boolean done;

A container;

CollectorSingleObserver(SingleObserver<? super R> downstream, A container, BiConsumer<A, T> accumulator, Function<A, R> finisher) {
this.downstream = downstream;
this.container = container;
this.accumulator = accumulator;
this.finisher = finisher;
}

@Override
public void onSubscribe(@NonNull Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;

downstream.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
if (done) {
return;
}
try {
accumulator.accept(container, t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.dispose();
onError(ex);
}
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
} else {
done = true;
upstream = DisposableHelper.DISPOSED;
this.container = null;
downstream.onError(t);
}
}

@Override
public void onComplete() {
if (done) {
return;
}

done = true;
upstream = DisposableHelper.DISPOSED;
A container = this.container;
this.container = null;
R result;
try {
result = Objects.requireNonNull(finisher.apply(container), "The finisher returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}

downstream.onSuccess(result);
}

@Override
public void dispose() {
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
}

@Override
public boolean isDisposed() {
return upstream == DisposableHelper.DISPOSED;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.jdk8;

import java.util.NoSuchElementException;

/**
* Signals the first element of the source via the underlying CompletableFuture,
* signals the a default item if the upstream is empty or signals {@link NoSuchElementException}.
*
* @param <T> the element type
* @since 3.0.0
*/
public final class ObservableFirstStageObserver<T> extends ObservableStageObserver<T> {

final boolean hasDefault;

final T defaultItem;

public ObservableFirstStageObserver(boolean hasDefault, T defaultItem) {
this.hasDefault = hasDefault;
this.defaultItem = defaultItem;
}

@Override
public void onNext(T t) {
complete(t);
}

@Override
public void onComplete() {
if (!isDone()) {
clear();
if (hasDefault) {
complete(defaultItem);
} else {
completeExceptionally(new NoSuchElementException());
}
}
}
}

0 comments on commit af17c6e

Please sign in to comment.