Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: [Java 8] Add ParallelFlowable operators + cleanup #6798

Merged
merged 1 commit into from
Dec 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18944,9 +18944,9 @@ public final TestSubscriber<T> test(long initialRequest, boolean cancel) { // No
* @param <T> the element type of the optional value
* @param optional the optional value to convert into a {@code Flowable}
* @return the new Flowable instance
* @since 3.0.0
* @see #just(Object)
* @see #empty()
* @since 3.0.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
Expand Down Expand Up @@ -19409,6 +19409,7 @@ public final Stream<T> blockingStream(int prefetch) {
* @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements
* will be emitted to the downstream
* @return the new Flowable instance
* @since 3.0.0
* @see #concatMap(Function)
* @see #concatMapIterable(Function)
* @see #concatMapStream(Function, int)
Expand Down Expand Up @@ -19461,6 +19462,7 @@ public final Stream<T> blockingStream(int prefetch) {
* will be emitted to the downstream
* @param prefetch the number of upstream items to request upfront, then 75% of this amount after each 75% upstream items received
* @return the new Flowable instance
* @since 3.0.0
* @see #concatMap(Function, int)
* @see #concatMapIterable(Function, int)
* @see #flatMapStream(Function, int)
Expand Down Expand Up @@ -19515,6 +19517,7 @@ public final Stream<T> blockingStream(int prefetch) {
* @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements
* will be emitted to the downstream
* @return the new Flowable instance
* @since 3.0.0
* @see #flatMap(Function)
* @see #flatMapIterable(Function)
* @see #flatMapStream(Function, int)
Expand Down Expand Up @@ -19567,6 +19570,7 @@ public final Stream<T> blockingStream(int prefetch) {
* will be emitted to the downstream
* @param prefetch the number of upstream items to request upfront, then 75% of this amount after each 75% upstream items received
* @return the new Flowable instance
* @since 3.0.0
* @see #flatMap(Function, int)
* @see #flatMapIterable(Function, int)
* @see #concatMapStream(Function, int)
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -4870,9 +4870,9 @@ public final TestObserver<T> test(boolean dispose) {
* @param <T> the element type of the optional value
* @param optional the optional value to convert into a {@code Maybe}
* @return the new Maybe instance
* @since 3.0.0
* @see #just(Object)
* @see #empty()
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15937,9 +15937,9 @@ public final TestObserver<T> test(boolean dispose) { // NoPMD
* @param <T> the element type of the optional value
* @param optional the optional value to convert into an {@code Observable}
* @return the new Observable instance
* @since 3.0.0
* @see #just(Object)
* @see #empty()
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down Expand Up @@ -16355,6 +16355,7 @@ public final Stream<T> blockingStream(int capacityHint) {
* @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements
* will be emitted to the downstream
* @return the new Observable instance
* @since 3.0.0
* @see #concatMap(Function)
* @see #concatMapIterable(Function)
* @see #flatMapStream(Function)
Expand Down Expand Up @@ -16401,6 +16402,7 @@ public final Stream<T> blockingStream(int capacityHint) {
* @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements
* will be emitted to the downstream
* @return the new Observable instance
* @since 3.0.0
* @see #flatMap(Function)
* @see #flatMapIterable(Function)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,23 @@ protected void subscribeActual(Subscriber<? super R> s) {
EmptySubscription.complete(s);
}
} else {
source.subscribe(new FlatMapStreamSubscriber<>(s, mapper, prefetch));
source.subscribe(subscribe(s, mapper, prefetch));
}
}

/**
* Create a {@link Subscriber} with the given parameters.
* @param <T> the upstream value type
* @param <R> the {@link Stream} and output value type
* @param downstream the downstream {@code Subscriber} to wrap
* @param mapper the mapper function
* @param prefetch the number of items to prefetch
* @return the new {@code Subscriber}
*/
public static <T, R> Subscriber<T> subscribe(Subscriber<? super R> downstream, Function<? super T, ? extends Stream<? extends R>> mapper, int prefetch) {
return new FlatMapStreamSubscriber<>(downstream, mapper, prefetch);
}

static final class FlatMapStreamSubscriber<T, R> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.jdk8;

import java.util.Objects;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.stream.Collector;

import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.internal.subscriptions.*;
import io.reactivex.rxjava3.internal.util.AtomicThrowable;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* Reduces all 'rails' into a single via a Java 8 {@link Collector} callback set.
*
* @param <T> the value type
* @param <A> the accumulator type
* @param <R> the result type
* @since 3.0.0
*/
public final class ParallelCollector<T, A, R> extends Flowable<R> {

final ParallelFlowable<? extends T> source;

final Collector<T, A, R> collector;

public ParallelCollector(ParallelFlowable<? extends T> source, Collector<T, A, R> collector) {
this.source = source;
this.collector = collector;
}

@Override
protected void subscribeActual(Subscriber<? super R> s) {
ParallelCollectorSubscriber<T, A, R> parent;
try {
parent = new ParallelCollectorSubscriber<>(s, source.parallelism(), collector);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, s);
return;
}
s.onSubscribe(parent);

source.subscribe(parent.subscribers);
}

static final class ParallelCollectorSubscriber<T, A, R> extends DeferredScalarSubscription<R> {

private static final long serialVersionUID = -5370107872170712765L;

final ParallelCollectorInnerSubscriber<T, A, R>[] subscribers;

final AtomicReference<SlotPair<A>> current = new AtomicReference<>();

final AtomicInteger remaining = new AtomicInteger();

final AtomicThrowable error = new AtomicThrowable();

final Function<A, R> finisher;

ParallelCollectorSubscriber(Subscriber<? super R> subscriber, int n, Collector<T, A, R> collector) {
super(subscriber);
this.finisher = collector.finisher();
@SuppressWarnings("unchecked")
ParallelCollectorInnerSubscriber<T, A, R>[] a = new ParallelCollectorInnerSubscriber[n];
for (int i = 0; i < n; i++) {
a[i] = new ParallelCollectorInnerSubscriber<>(this, collector.supplier().get(), collector.accumulator(), collector.combiner());
}
this.subscribers = a;
remaining.lazySet(n);
}

SlotPair<A> addValue(A value) {
for (;;) {
SlotPair<A> curr = current.get();

if (curr == null) {
curr = new SlotPair<>();
if (!current.compareAndSet(null, curr)) {
continue;
}
}

int c = curr.tryAcquireSlot();
if (c < 0) {
current.compareAndSet(curr, null);
continue;
}
if (c == 0) {
curr.first = value;
} else {
curr.second = value;
}

if (curr.releaseSlot()) {
current.compareAndSet(curr, null);
return curr;
}
return null;
}
}

@Override
public void cancel() {
for (ParallelCollectorInnerSubscriber<T, A, R> inner : subscribers) {
inner.cancel();
}
}

void innerError(Throwable ex) {
if (error.compareAndSet(null, ex)) {
cancel();
downstream.onError(ex);
} else {
if (ex != error.get()) {
RxJavaPlugins.onError(ex);
}
}
}

void innerComplete(A value, BinaryOperator<A> combiner) {
for (;;) {
SlotPair<A> sp = addValue(value);

if (sp != null) {

try {
value = combiner.apply(sp.first, sp.second);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
innerError(ex);
return;
}

} else {
break;
}
}

if (remaining.decrementAndGet() == 0) {
SlotPair<A> sp = current.get();
current.lazySet(null);

R result;
try {
result = Objects.requireNonNull(finisher.apply(sp.first), "The finisher returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
innerError(ex);
return;
}

complete(result);
}
}
}

static final class ParallelCollectorInnerSubscriber<T, A, R>
extends AtomicReference<Subscription>
implements FlowableSubscriber<T> {

private static final long serialVersionUID = -7954444275102466525L;

final ParallelCollectorSubscriber<T, A, R> parent;

final BiConsumer<A, T> accumulator;

final BinaryOperator<A> combiner;

A container;

boolean done;

ParallelCollectorInnerSubscriber(ParallelCollectorSubscriber<T, A, R> parent, A container, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner) {
this.parent = parent;
this.accumulator = accumulator;
this.combiner = combiner;
this.container = container;
}

@Override
public void onSubscribe(Subscription s) {
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
}

@Override
public void onNext(T t) {
if (!done) {
try {
accumulator.accept(container, t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
get().cancel();
onError(ex);
return;
}
}
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
container = null;
done = true;
parent.innerError(t);
}

@Override
public void onComplete() {
if (!done) {
A v = container;
container = null;
done = true;
parent.innerComplete(v, combiner);
}
}

void cancel() {
SubscriptionHelper.cancel(this);
}
}

static final class SlotPair<T> extends AtomicInteger {

private static final long serialVersionUID = 473971317683868662L;

T first;

T second;

final AtomicInteger releaseIndex = new AtomicInteger();

int tryAcquireSlot() {
for (;;) {
int acquired = get();
if (acquired >= 2) {
return -1;
}

if (compareAndSet(acquired, acquired + 1)) {
return acquired;
}
}
}

boolean releaseSlot() {
return releaseIndex.incrementAndGet() == 2;
}
}
}
Loading