diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index e4460639bb..c5d14e81b2 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -5736,7 +5736,7 @@ public final Observable concatMapEagerDelayError(Function Observable concatMapIterable(final Function> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); - return concatMap(ObservableInternalHelper.flatMapIntoIterable(mapper)); + return RxJavaPlugins.onAssembly(new ObservableFlattenIterable(this, mapper)); } /** @@ -7188,7 +7188,7 @@ public final Observable flatMap(Function Observable flatMapIterable(final Function> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); - return flatMap(ObservableInternalHelper.flatMapIntoIterable(mapper)); + return RxJavaPlugins.onAssembly(new ObservableFlattenIterable(this, mapper)); } /** diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlattenIterable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlattenIterable.java new file mode 100644 index 0000000000..3411bc8f22 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlattenIterable.java @@ -0,0 +1,146 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import java.util.Iterator; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps a sequence into an Iterable and emits its values. + * + * @param the input value type to map to Iterable + * @param the element type of the Iterable and the output + */ +public final class ObservableFlattenIterable extends AbstractObservableWithUpstream { + + final Function> mapper; + + public ObservableFlattenIterable(ObservableSource source, + Function> mapper) { + super(source); + this.mapper = mapper; + } + + @Override + protected void subscribeActual(Observer observer) { + source.subscribe(new FlattenIterableObserver(observer, mapper)); + } + + static final class FlattenIterableObserver implements Observer, Disposable { + final Observer actual; + + final Function> mapper; + + Disposable d; + + FlattenIterableObserver(Observer actual, Function> mapper) { + this.actual = actual; + this.mapper = mapper; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T value) { + if (d == DisposableHelper.DISPOSED) { + return; + } + + Iterator it; + + try { + it = mapper.apply(value).iterator(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + d.dispose(); + onError(ex); + return; + } + + Observer a = actual; + + for (;;) { + boolean b; + + try { + b = it.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + d.dispose(); + onError(ex); + return; + } + + if (b) { + R v; + + try { + v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + d.dispose(); + onError(ex); + return; + } + + a.onNext(v); + } else { + break; + } + } + } + + @Override + public void onError(Throwable e) { + if (d == DisposableHelper.DISPOSED) { + RxJavaPlugins.onError(e); + return; + } + actual.onError(e); + } + + @Override + public void onComplete() { + if (d == DisposableHelper.DISPOSED) { + return; + } + actual.onComplete(); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + + @Override + public void dispose() { + d.dispose(); + d = DisposableHelper.DISPOSED; + } + } +} diff --git a/src/perf/java/io/reactivex/FlatMapJustPerf.java b/src/perf/java/io/reactivex/FlatMapJustPerf.java new file mode 100644 index 0000000000..9a67723212 --- /dev/null +++ b/src/perf/java/io/reactivex/FlatMapJustPerf.java @@ -0,0 +1,66 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Publisher; + +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlatMapJustPerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int times; + + Flowable flowable; + + Observable observable; + + @Setup + public void setup() { + Integer[] array = new Integer[times]; + + flowable = Flowable.fromArray(array).flatMap(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + return Flowable.just(v); + } + }); + + observable = Observable.fromArray(array).flatMap(new Function>() { + @Override + public Observable apply(Integer v) throws Exception { + return Observable.just(v); + } + }); + } + + @Benchmark + public void flowable(Blackhole bh) { + flowable.subscribe(new PerfConsumer(bh)); + } + + @Benchmark + public void observable(Blackhole bh) { + observable.subscribe(new PerfConsumer(bh)); + } +} \ No newline at end of file diff --git a/src/perf/java/io/reactivex/FlattenCrossMapPerf.java b/src/perf/java/io/reactivex/FlattenCrossMapPerf.java new file mode 100644 index 0000000000..78ab0b6449 --- /dev/null +++ b/src/perf/java/io/reactivex/FlattenCrossMapPerf.java @@ -0,0 +1,72 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlattenCrossMapPerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int times; + + Flowable flowable; + + Observable observable; + + @Setup + public void setup() { + Integer[] array = new Integer[times]; + Arrays.fill(array, 777); + + Integer[] arrayInner = new Integer[1000000 / times]; + Arrays.fill(arrayInner, 888); + + final Iterable list = Arrays.asList(arrayInner); + + flowable = Flowable.fromArray(array).flatMapIterable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return list; + } + }); + + observable = Observable.fromArray(array).flatMapIterable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return list; + } + }); + } + + @Benchmark + public void flowable(Blackhole bh) { + flowable.subscribe(new PerfConsumer(bh)); + } + + @Benchmark + public void observable(Blackhole bh) { + observable.subscribe(new PerfConsumer(bh)); + } +} \ No newline at end of file diff --git a/src/perf/java/io/reactivex/FlattenJustPerf.java b/src/perf/java/io/reactivex/FlattenJustPerf.java new file mode 100644 index 0000000000..0246c76cd7 --- /dev/null +++ b/src/perf/java/io/reactivex/FlattenJustPerf.java @@ -0,0 +1,69 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlattenJustPerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int times; + + Flowable flowable; + + Observable observable; + + @Setup + public void setup() { + Integer[] array = new Integer[times]; + Arrays.fill(array, 777); + + final Iterable singletonList = Collections.singletonList(1); + + flowable = Flowable.fromArray(array).flatMapIterable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return singletonList; + } + }); + + observable = Observable.fromArray(array).flatMapIterable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return singletonList; + } + }); + } + + @Benchmark + public void flowable(Blackhole bh) { + flowable.subscribe(new PerfConsumer(bh)); + } + + @Benchmark + public void observable(Blackhole bh) { + observable.subscribe(new PerfConsumer(bh)); + } +} \ No newline at end of file diff --git a/src/perf/java/io/reactivex/PerfConsumer.java b/src/perf/java/io/reactivex/PerfConsumer.java new file mode 100644 index 0000000000..9feabdfcaf --- /dev/null +++ b/src/perf/java/io/reactivex/PerfConsumer.java @@ -0,0 +1,61 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.*; + +import io.reactivex.disposables.Disposable; + +/** + * A multi-type synchronous consumer. + */ +public final class PerfConsumer implements Subscriber, Observer, +SingleObserver, CompletableObserver, MaybeObserver { + + final Blackhole bh; + + public PerfConsumer(Blackhole bh) { + this.bh = bh; + } + + @Override + public void onSuccess(Object value) { + bh.consume(value); + } + + @Override + public void onSubscribe(Disposable d) { + } + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Object t) { + bh.consume(t); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + @Override + public void onComplete() { + bh.consume(true); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlattenIterable.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlattenIterableTest.java similarity index 96% rename from src/test/java/io/reactivex/internal/operators/observable/ObservableFlattenIterable.java rename to src/test/java/io/reactivex/internal/operators/observable/ObservableFlattenIterableTest.java index 3c28d6eef5..f0fcdb2933 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlattenIterable.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlattenIterableTest.java @@ -20,7 +20,7 @@ import io.reactivex.Observable; import io.reactivex.functions.Function; -public class ObservableFlattenIterable { +public class ObservableFlattenIterableTest { @Test public void flatMapIterablePrefetch() { diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeTest.java index 8746222951..2a3d016778 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeTest.java @@ -262,7 +262,7 @@ public void onNext(String v) { // to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following // onNext is invoked. - int timeout = 10; + int timeout = 20; while (timeout-- > 0 && concurrentCounter.get() != 1) { Thread.sleep(100);