From 30f855b021a8feb673a34cda2c5022c8f74e9541 Mon Sep 17 00:00:00 2001 From: dcapwell Date: Thu, 7 Feb 2013 22:32:10 -0800 Subject: [PATCH] Added first draft of forEach operator --- rxjava-core/src/main/java/rx/Observable.java | 62 ++++++ .../java/rx/operators/OperationForEach.java | 203 ++++++++++++++++++ 2 files changed, 265 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationForEach.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 83f49dc512..e049ec0ac8 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -34,6 +34,7 @@ import rx.operators.OperationConcat; import rx.operators.OperationFilter; +import rx.operators.OperationForEach; import rx.operators.OperationLast; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; @@ -1692,6 +1693,37 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) { }); } + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + */ + public static void forEach(final Observable sequence, final Action1 onNext) { + OperationForEach.forEach(sequence, onNext); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + */ + public static void forEach(final Observable sequence, final Action1 onNext, final Action0 onCompleted) { + OperationForEach.forEach(sequence, onNext, onCompleted); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + * @param onError + */ + public static void forEach(final Observable sequence, final Action1 onNext, final Action0 onCompleted, + final Action1 onError) { + OperationForEach.forEach(sequence, onNext, onCompleted, onError); + } + /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

@@ -2213,6 +2245,36 @@ public Observable take(final int num) { return take(this, num); } + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + */ + public void forEach(final Action1 onNext) { + forEach(this, onNext); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + */ + public void forEach(final Action1 onNext, final Action0 onCompleted) { + forEach(this, onNext, onCompleted); + } + + /** + * Invokes an action for each element in the sequence. + * + * @param onNext + * @param onCompleted + * @param onError + */ + public void forEach(final Action1 onNext, final Action0 onCompleted, final Action1 onError) { + forEach(this, onNext, onCompleted, onError); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationForEach.java b/rxjava-core/src/main/java/rx/operators/OperationForEach.java new file mode 100644 index 0000000000..2a3c63eb53 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationForEach.java @@ -0,0 +1,203 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public final class OperationForEach { + + /** + * Accepts a sequence and a action. Applies the action to each element in + * the sequence. + * + * @param sequence + * the input sequence. + * @param onNext + * a action to apply to each item in the sequence. + */ + public static void forEach(final Observable sequence, final Action1 onNext) { + forEach(sequence, onNext, null, null); + } + + /** + * Accepts a sequence and a action. Applies the action to each element in + * the sequence. + * + * @param sequence + * the input sequence. + * @param onNext + * a action to apply to each item in the sequence. + * @param onCompleted + * a action to run when sequence completes. + */ + public static void forEach(final Observable sequence, final Action1 onNext, final Action0 onCompleted) { + forEach(sequence, onNext, onCompleted, null); + } + + /** + * Accepts a sequence and a action. Applies the action to each element in + * the sequence. + * + * @param sequence + * the input sequence. + * @param onNext + * a action to apply to each item in the sequence. + * @param onCompleted + * a action to run when sequence completes. + * @param onError + * a action to run when an exception is thrown. + */ + public static void forEach(final Observable sequence, final Action1 onNext, final Action0 onCompleted, + final Action1 onError) { + ForEachObserver fe = new ForEachObserver(onNext, onCompleted, onError); + sequence.subscribe(fe); + } + + private static final class ForEachObserver implements Observer { + private final Action1 onNext; + private final Action0 onCompleted; + private final Action1 onError; + + private boolean running = true; + + private ForEachObserver(final Action1 onNext, final Action0 onCompleted, final Action1 onError) { + if (onNext == null) + throw new NullPointerException(); + this.onNext = onNext; + this.onCompleted = onCompleted; + this.onError = onError; + } + + @Override + public void onCompleted() { + running = false; + if (onCompleted != null) { + onCompleted.call(); + } + } + + @Override + public void onError(final Exception e) { + running = false; + if (onError != null) { + onError.call(e); + } + } + + @Override + public void onNext(final T args) { + if (running) { + try { + onNext.call(args); + } catch (Exception e) { + onError(e); + } + } + } + } + + public static class UnitTest { + + @Test + public void testForEach() { + Map m1 = getMap("One"); + Map m2 = getMap("Two"); + + Observable> observable = Observable.toObservable(m1, m2); + + final AtomicInteger counter = new AtomicInteger(); + forEach(observable, new Action1>() { + @Override + public void call(final Map stringStringMap) { + switch (counter.getAndIncrement()) { + case 0: + assertEquals("firstName doesn't match", "OneFirst", stringStringMap.get("firstName")); + assertEquals("lastName doesn't match", "OneLast", stringStringMap.get("lastName")); + break; + case 1: + assertEquals("firstName doesn't match", "TwoFirst", stringStringMap.get("firstName")); + assertEquals("lastName doesn't match", "TwoLast", stringStringMap.get("lastName")); + break; + default: + fail("Unknown increment"); + } + } + }); + assertEquals("Number of executions didn't match expected.", 2, counter.get()); + } + + @Test + public void testForEachEmptyObserver() { + Observable> observable = Observable.empty(); + + final AtomicInteger counter = new AtomicInteger(); + forEach(observable, new Action1>() { + @Override + public void call(final Map stringStringMap) { + counter.incrementAndGet(); + fail("Should not have called action"); + } + }); + assertEquals("Number of executions didn't match expected.", 0, counter.get()); + } + + @Test + public void testForEachWithException() { + Observable observable = Observable.toObservable(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + + final AtomicInteger counter = new AtomicInteger(); + final AtomicReference exception = new AtomicReference(); + forEach(observable, new Action1() { + @Override + public void call(final Integer integer) { + counter.incrementAndGet(); + if (integer.equals(5)) { + // fail half way through + throw new RuntimeException("testForEachWithException"); + } + } + }, null, new Action1() { + @Override + public void call(final Exception e) { + exception.set(e); + } + }); + assertEquals("Number of executions didn't match expected.", 5, counter.get()); + assertNotNull(exception.get()); + } + + private Map getMap(String prefix) { + Map m = new HashMap(); + m.put("firstName", prefix + "First"); + m.put("lastName", prefix + "Last"); + return m; + } + + } +}