Skip to content

Commit

Permalink
Added first draft of forEach operator
Browse files Browse the repository at this point in the history
  • Loading branch information
dcapwell committed Feb 8, 2013
1 parent 1f48ae1 commit 30f855b
Show file tree
Hide file tree
Showing 2 changed files with 265 additions and 0 deletions.
62 changes: 62 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Expand Up @@ -34,6 +34,7 @@

import rx.operators.OperationConcat;
import rx.operators.OperationFilter;
import rx.operators.OperationForEach;
import rx.operators.OperationLast;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
Expand Down Expand Up @@ -1692,6 +1693,37 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) {
});
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
*/
public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext) {
OperationForEach.forEach(sequence, onNext);
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
* @param onCompleted
*/
public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext, final Action0 onCompleted) {
OperationForEach.forEach(sequence, onNext, onCompleted);
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
* @param onCompleted
* @param onError
*/
public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext, final Action0 onCompleted,
final Action1<Exception> onError) {
OperationForEach.forEach(sequence, onNext, onCompleted, onError);
}

/**
* Filters an Observable by discarding any of its emissions that do not meet some test.
* <p>
Expand Down Expand Up @@ -2213,6 +2245,36 @@ public Observable<T> take(final int num) {
return take(this, num);
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
*/
public void forEach(final Action1<T> onNext) {
forEach(this, onNext);
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
* @param onCompleted
*/
public void forEach(final Action1<T> onNext, final Action0 onCompleted) {
forEach(this, onNext, onCompleted);
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
* @param onCompleted
* @param onError
*/
public void forEach(final Action1<T> onNext, final Action0 onCompleted, final Action1<Exception> onError) {
forEach(this, onNext, onCompleted, onError);
}

/**
* Returns an Observable that emits a single item, a list composed of all the items emitted by
* the source Observable.
Expand Down
203 changes: 203 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationForEach.java
@@ -0,0 +1,203 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;

public final class OperationForEach {

/**
* Accepts a sequence and a action. Applies the action to each element in
* the sequence.
*
* @param sequence
* the input sequence.
* @param onNext
* a action to apply to each item in the sequence.
*/
public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext) {
forEach(sequence, onNext, null, null);
}

/**
* Accepts a sequence and a action. Applies the action to each element in
* the sequence.
*
* @param sequence
* the input sequence.
* @param onNext
* a action to apply to each item in the sequence.
* @param onCompleted
* a action to run when sequence completes.
*/
public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext, final Action0 onCompleted) {
forEach(sequence, onNext, onCompleted, null);
}

/**
* Accepts a sequence and a action. Applies the action to each element in
* the sequence.
*
* @param sequence
* the input sequence.
* @param onNext
* a action to apply to each item in the sequence.
* @param onCompleted
* a action to run when sequence completes.
* @param onError
* a action to run when an exception is thrown.
*/
public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext, final Action0 onCompleted,
final Action1<Exception> onError) {
ForEachObserver<T> fe = new ForEachObserver<T>(onNext, onCompleted, onError);
sequence.subscribe(fe);
}

private static final class ForEachObserver<T> implements Observer<T> {
private final Action1<T> onNext;
private final Action0 onCompleted;
private final Action1 onError;

private boolean running = true;

private ForEachObserver(final Action1<T> onNext, final Action0 onCompleted, final Action1<Exception> onError) {
if (onNext == null)
throw new NullPointerException();
this.onNext = onNext;
this.onCompleted = onCompleted;
this.onError = onError;
}

@Override
public void onCompleted() {
running = false;
if (onCompleted != null) {
onCompleted.call();
}
}

@Override
public void onError(final Exception e) {
running = false;
if (onError != null) {
onError.call(e);
}
}

@Override
public void onNext(final T args) {
if (running) {
try {
onNext.call(args);
} catch (Exception e) {
onError(e);
}
}
}
}

public static class UnitTest {

@Test
public void testForEach() {
Map<String, String> m1 = getMap("One");
Map<String, String> m2 = getMap("Two");

Observable<Map<String, String>> observable = Observable.toObservable(m1, m2);

final AtomicInteger counter = new AtomicInteger();
forEach(observable, new Action1<Map<String, String>>() {
@Override
public void call(final Map<String, String> stringStringMap) {
switch (counter.getAndIncrement()) {
case 0:
assertEquals("firstName doesn't match", "OneFirst", stringStringMap.get("firstName"));
assertEquals("lastName doesn't match", "OneLast", stringStringMap.get("lastName"));
break;
case 1:
assertEquals("firstName doesn't match", "TwoFirst", stringStringMap.get("firstName"));
assertEquals("lastName doesn't match", "TwoLast", stringStringMap.get("lastName"));
break;
default:
fail("Unknown increment");
}
}
});
assertEquals("Number of executions didn't match expected.", 2, counter.get());
}

@Test
public void testForEachEmptyObserver() {
Observable<Map<String, String>> observable = Observable.empty();

final AtomicInteger counter = new AtomicInteger();
forEach(observable, new Action1<Map<String, String>>() {
@Override
public void call(final Map<String, String> stringStringMap) {
counter.incrementAndGet();
fail("Should not have called action");
}
});
assertEquals("Number of executions didn't match expected.", 0, counter.get());
}

@Test
public void testForEachWithException() {
Observable<Integer> observable = Observable.toObservable(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

final AtomicInteger counter = new AtomicInteger();
final AtomicReference<Exception> exception = new AtomicReference<Exception>();
forEach(observable, new Action1<Integer>() {
@Override
public void call(final Integer integer) {
counter.incrementAndGet();
if (integer.equals(5)) {
// fail half way through
throw new RuntimeException("testForEachWithException");
}
}
}, null, new Action1<Exception>() {
@Override
public void call(final Exception e) {
exception.set(e);
}
});
assertEquals("Number of executions didn't match expected.", 5, counter.get());
assertNotNull(exception.get());
}

private Map<String, String> getMap(String prefix) {
Map<String, String> m = new HashMap<String, String>();
m.put("firstName", prefix + "First");
m.put("lastName", prefix + "Last");
return m;
}

}
}

0 comments on commit 30f855b

Please sign in to comment.