From c70aa21a101411b9a89f253f11cda0f10615b112 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 24 Aug 2015 19:17:24 +0200 Subject: [PATCH] MapNotification producer NPE fix --- .../operators/OperatorMapNotification.java | 96 +++++++++++-------- .../OperatorMapNotificationTest.java | 55 +++++++++++ 2 files changed, 109 insertions(+), 42 deletions(-) create mode 100644 src/test/java/rx/internal/operators/OperatorMapNotificationTest.java diff --git a/src/main/java/rx/internal/operators/OperatorMapNotification.java b/src/main/java/rx/internal/operators/OperatorMapNotification.java index be363663fb..bb92f2c077 100644 --- a/src/main/java/rx/internal/operators/OperatorMapNotification.java +++ b/src/main/java/rx/internal/operators/OperatorMapNotification.java @@ -19,16 +19,12 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; +import rx.*; import rx.Observable.Operator; -import rx.Producer; -import rx.Subscriber; -import rx.Subscription; -import rx.exceptions.MissingBackpressureException; -import rx.exceptions.OnErrorThrowable; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.internal.util.unsafe.SpscArrayQueue; -import rx.internal.util.unsafe.UnsafeAccess; +import rx.exceptions.*; +import rx.functions.*; +import rx.internal.producers.ProducerArbiter; +import rx.internal.util.unsafe.*; /** * Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of @@ -50,44 +46,60 @@ public OperatorMapNotification(Func1 onNext, Func1 call(final Subscriber o) { - Subscriber subscriber = new Subscriber() { - SingleEmitter emitter; - @Override - public void setProducer(Producer producer) { - emitter = new SingleEmitter(o, producer, this); - o.setProducer(emitter); - } - - @Override - public void onCompleted() { - try { - emitter.offerAndComplete(onCompleted.call()); - } catch (Throwable e) { - o.onError(e); - } - } + final ProducerArbiter pa = new ProducerArbiter(); + + MapNotificationSubscriber subscriber = new MapNotificationSubscriber(pa, o); + o.add(subscriber); + subscriber.init(); + return subscriber; + } + + final class MapNotificationSubscriber extends Subscriber { + private final Subscriber o; + private final ProducerArbiter pa; + final SingleEmitter emitter; + + private MapNotificationSubscriber(ProducerArbiter pa, Subscriber o) { + this.pa = pa; + this.o = o; + this.emitter = new SingleEmitter(o, pa, this); + } + + void init() { + o.setProducer(emitter); + } - @Override - public void onError(Throwable e) { - try { - emitter.offerAndComplete(onError.call(e)); - } catch (Throwable e2) { - o.onError(e); - } + @Override + public void setProducer(Producer producer) { + pa.setProducer(producer); + } + + @Override + public void onCompleted() { + try { + emitter.offerAndComplete(onCompleted.call()); + } catch (Throwable e) { + o.onError(e); } + } - @Override - public void onNext(T t) { - try { - emitter.offer(onNext.call(t)); - } catch (Throwable e) { - o.onError(OnErrorThrowable.addValueAsLastCause(e, t)); - } + @Override + public void onError(Throwable e) { + try { + emitter.offerAndComplete(onError.call(e)); + } catch (Throwable e2) { + o.onError(e); } + } - }; - o.add(subscriber); - return subscriber; + @Override + public void onNext(T t) { + try { + emitter.offer(onNext.call(t)); + } catch (Throwable e) { + o.onError(OnErrorThrowable.addValueAsLastCause(e, t)); + } + } } static final class SingleEmitter extends AtomicLong implements Producer, Subscription { /** */ diff --git a/src/test/java/rx/internal/operators/OperatorMapNotificationTest.java b/src/test/java/rx/internal/operators/OperatorMapNotificationTest.java new file mode 100644 index 0000000000..2f1e603337 --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorMapNotificationTest.java @@ -0,0 +1,55 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import org.junit.Test; + +import rx.Observable; +import rx.functions.*; +import rx.observers.TestSubscriber; + +public class OperatorMapNotificationTest { + @Test + public void testJust() { + TestSubscriber ts = TestSubscriber.create(); + Observable.just(1) + .flatMap( + new Func1>() { + @Override + public Observable call(Integer item) { + return Observable.just((Object)(item + 1)); + } + }, + new Func1>() { + @Override + public Observable call(Throwable e) { + return Observable.error(e); + } + }, + new Func0>() { + @Override + public Observable call() { + return Observable.never(); + } + } + ).subscribe(ts); + + ts.assertNoErrors(); + ts.assertNotCompleted(); + ts.assertValue(2); + } +}