Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Implemented: ForEach #131

Closed
wants to merge 5 commits into from

2 participants

@dcapwell

implement #45.

@benjchristensen

Hi @dcapwell, thanks for contributing!

The code looks great, only thing I see missing is support for dynamic languages which requires "Object" overloads of any method with Func* or Action* arguments so closures from Groovy/Clojure/Scala/JRuby/etc can be passed in.

For example:

-> typed: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/Observable.java#L2212
-> non-typed: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/Observable.java#L2269

You'll see how it takes the 'Object' and derives a FuncN from it and then invokes the appropriate typed method:

public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, final Object sortFunction) {
        @SuppressWarnings("rawtypes")
        final FuncN _f = Functions.from(sortFunction);
        return create(OperationToObservableSortedList.toSortedList(sequence, new Func2<T, T, Integer>() {

            @Override
            public Integer call(T t1, T t2) {
                return (Integer) _f.call(t1, t2);
            }

        }));
    }

If you can confirm that the methods work from at least one of the dynamic languages by adding a unit test for each of the overloads I would appreciate it.

Groovy is the primary one we use thus far (most similar to Java): https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy

Thank you again!

dcapwell added some commits
@dcapwell

Is there a better way to handle the errors than just skipping calling the user code?

@benjchristensen

Can you point me to a part of the code where that question refers to?

If an error occurs (either in the operator itself or the Func passed in) the appropriate thing (and only option really) is to call onError.

@dcapwell

dcapwell/RxJava@30f855b#L1R115

If onNext throws an exception, then onError is called and all calls to onNext, onError, or onComplete get ignored after.

@benjchristensen

Yes that's the correct behavior to call onError if an exception is thrown.

If the code is working as designed (I think it is!) then it will also automatically unsubscribe to the parent Observable.

You can see how here: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/util/AtomicObserver.java#L60

The Observer is wrapped when the Observable is subscribed to. You can see that here: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/Observable.java#L132

That ensures the onError/onCompleted only get called once and onNext will be ignored and takes care of automatically unsubscribing in the event it doesn't get done correctly elsewhere.

@dcapwell

Would it be fine to make forEach not support static operators? If so then i can have it unsubscribe. Basically just replicate this code:

AtomicObservableSubscription subscription = new AtomicObservableSubscription();
return subscription.wrap(onSubscribe.call(new AtomicObserver<T>(subscription, observer)));

and have onComplete and onError unsubscribe like AtomicObserver does.

@benjchristensen benjchristensen referenced this pull request
Merged

Operator: forEach #147

@benjchristensen

Since forEach is basically just a blocking version of subscribe, I'd like to propose a different approach that uses subscribe but blocks until onError or onCompleted are received.

There is very little new code for this (mostly just overloads) and this blocks if the Observable is asynchronous.

Pull Request: #147

The interesting part of the code is here: https://github.com/Netflix/RxJava/pull/147/files#L1R368

It uses a CountDownLatch to block until onError or onCompleted are received.

@benjchristensen

Re-reading the specs ... we should actually eliminate the onError/onCompleted overloads as they are not required.

It should instead just throw an exception if onError is called (since this is blocking) and onCompleted is not needed because it blocks.

http://msdn.microsoft.com/en-us/library/hh211815(v=vs.103).aspx

@benjchristensen

See pull request #147 for more on this.

@benjchristensen

Thank you @dcapwell for your work on this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 8, 2013
  1. Added first draft of forEach operator

    dcapwell authored
  2. added Object to each forEach method to validate that it works properl…

    dcapwell authored
    …y with groovy
  3. added a groovy test for each overloaded forEach method. Fixed a bug i…

    dcapwell authored
    …n forEach where onError and onCompleted could both be called.
Commits on Feb 13, 2013
  1. merging in master

    dcapwell authored
  2. readded groovy tests for foreach

    dcapwell authored
This page is out of date. Refresh to see the latest.
View
34 language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy
@@ -197,6 +197,40 @@ def class ObservableTests {
Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5));
}
+
+ @Test
+ public void testForEach() {
+ Observable.toObservable(1, 3, 2, 5, 4).forEach({ result -> a.received(result)});
+ verify(a, times(1)).received(1);
+ verify(a, times(1)).received(3);
+ verify(a, times(1)).received(2);
+ verify(a, times(1)).received(5);
+ verify(a, times(1)).received(4);
+ }
+
+ @Test
+ public void testForEachWithComplete() {
+ Observable.toObservable(1, 3, 2, 5, 4).forEach({ result -> a.received(result)}, {a.received('done')});
+ verify(a, times(1)).received(1);
+ verify(a, times(1)).received(3);
+ verify(a, times(1)).received(2);
+ verify(a, times(1)).received(5);
+ verify(a, times(1)).received(4);
+ verify(a, times(1)).received("done");
+ }
+
+ @Test
+ public void testForEachWithCompleteAndError() {
+ Observable.toObservable(1, 3, 2, 5, 4).forEach({ result -> throw new RuntimeException('err')}, {a.received('done')}, {err -> a.received(err.message)});
+ verify(a, times(0)).received(1);
+ verify(a, times(0)).received(3);
+ verify(a, times(0)).received(2);
+ verify(a, times(0)).received(5);
+ verify(a, times(0)).received(4);
+ verify(a, times(1)).received("err");
+ verify(a, times(0)).received("done");
+ }
+
def class TestFactory {
int counter = 1;
View
177 rxjava-core/src/main/java/rx/Observable.java
@@ -34,6 +34,7 @@
import rx.operators.OperationConcat;
import rx.operators.OperationFilter;
+import rx.operators.OperationForEach;
import rx.operators.OperationLast;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
@@ -1782,6 +1783,122 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) {
}
/**
+ * Invokes an action for each element in the sequence.
+ *
+ * @param onNext
+ */
+ public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext) {
+ OperationForEach.forEach(sequence, onNext);
+ }
+
+ /**
+ * Invokes an action for each element in the sequence.
+ *
+ * @param onNext
+ */
+ public static <T> void forEach(final Observable<T> sequence, final Object onNext) {
+ @SuppressWarnings("rawtypes")
+ final FuncN _f = Functions.from(onNext);
+ OperationForEach.forEach(sequence,
+ new Action1<T>() {
+
+ @Override
+ public void call(T t1) {
+ _f.call(t1);
+
+ }
+ });
+ }
+
+ /**
+ * Invokes an action for each element in the sequence.
+ *
+ * @param onNext
+ * @param onCompleted
+ */
+ public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext, final Action0 onCompleted) {
+ OperationForEach.forEach(sequence, onNext, onCompleted);
+ }
+
+ /**
+ * Invokes an action for each element in the sequence.
+ *
+ * @param onNext
+ * @param onCompleted
+ */
+ public static <T> void forEach(final Observable<T> sequence, final Object onNext, final Object onCompleted) {
+ @SuppressWarnings("rawtypes")
+ final FuncN _f = Functions.from(onNext);
+ @SuppressWarnings("rawtypes")
+ final FuncN _f2 = Functions.from(onCompleted);
+ OperationForEach.forEach(sequence,
+ new Action1<T>() {
+
+ @Override
+ public void call(T t1) {
+ _f.call(t1);
+
+ }
+ }, new Action0() {
+
+ @Override
+ public void call() {
+ _f2.call();
+ }
+ });
+ }
+
+ /**
+ * Invokes an action for each element in the sequence.
+ *
+ * @param onNext
+ * @param onCompleted
+ * @param onError
+ */
+ public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext, final Action0 onCompleted,
+ final Action1<Exception> onError) {
+ OperationForEach.forEach(sequence, onNext, onCompleted, onError);
+ }
+
+ /**
+ * Invokes an action for each element in the sequence.
+ *
+ * @param onNext
+ * @param onCompleted
+ * @param onError
+ */
+ public static <T> void forEach(final Observable<T> sequence, final Object onNext, final Object onCompleted,
+ final Object onError) {
+ @SuppressWarnings("rawtypes")
+ final FuncN _f = Functions.from(onNext);
+ @SuppressWarnings("rawtypes")
+ final FuncN _f2 = Functions.from(onCompleted);
+ @SuppressWarnings("rawtypes")
+ final FuncN _f3 = Functions.from(onError);
+ OperationForEach.forEach(sequence,
+ new Action1<T>() {
+
+ @Override
+ public void call(T t1) {
+ _f.call(t1);
+
+ }
+ }, new Action0() {
+
+ @Override
+ public void call() {
+ _f2.call();
+ }
+ }, new Action1<Exception>() {
+
+ @Override
+ public void call(Exception t1) {
+ _f3.call(t1);
+ }
+ });
+ }
+
+ /**
* Filters an Observable by discarding any of its emissions that do not meet some test.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/filter.png">
@@ -2303,6 +2420,66 @@ public T call(Exception e) {
}
/**
+ * Invokes an action for each element in the sequence.
+ *
+ * @param onNext
+ */
+ public void forEach(final Action1<T> onNext) {
+ forEach(this, onNext);
+ }
+
+ /**
+ * Invokes an action for each element in the sequence.
+ *
+ * @param onNext
+ */
+ public <T> void forEach(final Object onNext) {
+ forEach(this, onNext);
+ }
+
+ /**
+ * Invokes an action for each element in the sequence.
+ *
+ * @param onNext
+ * @param onCompleted
+ */
+ public void forEach(final Action1<T> onNext, final Action0 onCompleted) {
+ forEach(this, onNext, onCompleted);
+ }
+
+ /**
+ * Invokes an action for each element in the sequence.
+ *
+ * @param onNext
+ * @param onCompleted
+ */
+ public void forEach(final Object onNext, final Object onCompleted) {
+ forEach(this, onNext, onCompleted);
+ }
+
+ /**
+ * Invokes an action for each element in the sequence.
+ *
+ * @param onNext
+ * @param onCompleted
+ * @param onError
+ */
+ public void forEach(final Action1<T> onNext, final Action0 onCompleted, final Action1<Exception> onError) {
+ forEach(this, onNext, onCompleted, onError);
+ }
+
+ /**
+ * Invokes an action for each element in the sequence.
+ *
+ * @param onNext
+ * @param onCompleted
+ * @param onError
+ */
+ public void forEach(final Object onNext, final Object onCompleted, final Object onError) {
+ forEach(this, onNext, onCompleted, onError);
+ }
+
+ /*
* Returns an Observable that emits the last <code>count</code> items emitted by the source
* Observable.
*
View
207 rxjava-core/src/main/java/rx/operators/OperationForEach.java
@@ -0,0 +1,207 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.operators;
+
+import org.junit.Test;
+import rx.Observable;
+import rx.Observer;
+import rx.util.functions.Action0;
+import rx.util.functions.Action1;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public final class OperationForEach {
+
+ /**
+ * Accepts a sequence and a action. Applies the action to each element in
+ * the sequence.
+ *
+ * @param sequence
+ * the input sequence.
+ * @param onNext
+ * a action to apply to each item in the sequence.
+ */
+ public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext) {
+ forEach(sequence, onNext, null, null);
+ }
+
+ /**
+ * Accepts a sequence and a action. Applies the action to each element in
+ * the sequence.
+ *
+ * @param sequence
+ * the input sequence.
+ * @param onNext
+ * a action to apply to each item in the sequence.
+ * @param onCompleted
+ * a action to run when sequence completes.
+ */
+ public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext, final Action0 onCompleted) {
+ forEach(sequence, onNext, onCompleted, null);
+ }
+
+ /**
+ * Accepts a sequence and a action. Applies the action to each element in
+ * the sequence.
+ *
+ * @param sequence
+ * the input sequence.
+ * @param onNext
+ * a action to apply to each item in the sequence.
+ * @param onCompleted
+ * a action to run when sequence completes.
+ * @param onError
+ * a action to run when an exception is thrown.
+ */
+ public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext, final Action0 onCompleted,
+ final Action1<Exception> onError) {
+ ForEachObserver<T> fe = new ForEachObserver<T>(onNext, onCompleted, onError);
+ sequence.subscribe(fe);
+ }
+
+ private static final class ForEachObserver<T> implements Observer<T> {
+ private final Action1<T> onNext;
+ private final Action0 onCompleted;
+ private final Action1 onError;
+
+ private boolean running = true;
+
+ private ForEachObserver(final Action1<T> onNext, final Action0 onCompleted, final Action1<Exception> onError) {
+ if (onNext == null)
+ throw new NullPointerException();
+ this.onNext = onNext;
+ this.onCompleted = onCompleted;
+ this.onError = onError;
+ }
+
+ @Override
+ public void onCompleted() {
+ if(running) {
+ running = false;
+ if (onCompleted != null) {
+ onCompleted.call();
+ }
+ }
+ }
+
+ @Override
+ public void onError(final Exception e) {
+ if(running) {
+ running = false;
+ if (onError != null) {
+ onError.call(e);
+ }
+ }
+ }
+
+ @Override
+ public void onNext(final T args) {
+ if (running) {
+ try {
+ onNext.call(args);
+ } catch (Exception e) {
+ onError(e);
+ }
+ }
+ }
+ }
+
+ public static class UnitTest {
+
+ @Test
+ public void testForEach() {
+ Map<String, String> m1 = getMap("One");
+ Map<String, String> m2 = getMap("Two");
+
+ Observable<Map<String, String>> observable = Observable.toObservable(m1, m2);
+
+ final AtomicInteger counter = new AtomicInteger();
+ forEach(observable, new Action1<Map<String, String>>() {
+ @Override
+ public void call(final Map<String, String> stringStringMap) {
+ switch (counter.getAndIncrement()) {
+ case 0:
+ assertEquals("firstName doesn't match", "OneFirst", stringStringMap.get("firstName"));
+ assertEquals("lastName doesn't match", "OneLast", stringStringMap.get("lastName"));
+ break;
+ case 1:
+ assertEquals("firstName doesn't match", "TwoFirst", stringStringMap.get("firstName"));
+ assertEquals("lastName doesn't match", "TwoLast", stringStringMap.get("lastName"));
+ break;
+ default:
+ fail("Unknown increment");
+ }
+ }
+ });
+ assertEquals("Number of executions didn't match expected.", 2, counter.get());
+ }
+
+ @Test
+ public void testForEachEmptyObserver() {
+ Observable<Map<String, String>> observable = Observable.empty();
+
+ final AtomicInteger counter = new AtomicInteger();
+ forEach(observable, new Action1<Map<String, String>>() {
+ @Override
+ public void call(final Map<String, String> stringStringMap) {
+ counter.incrementAndGet();
+ fail("Should not have called action");
+ }
+ });
+ assertEquals("Number of executions didn't match expected.", 0, counter.get());
+ }
+
+ @Test
+ public void testForEachWithException() {
+ Observable<Integer> observable = Observable.toObservable(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+ final AtomicInteger counter = new AtomicInteger();
+ final AtomicReference<Exception> exception = new AtomicReference<Exception>();
+ forEach(observable, new Action1<Integer>() {
+ @Override
+ public void call(final Integer integer) {
+ counter.incrementAndGet();
+ if (integer.equals(5)) {
+ // fail half way through
+ throw new RuntimeException("testForEachWithException");
+ }
+ }
+ }, null, new Action1<Exception>() {
+ @Override
+ public void call(final Exception e) {
+ exception.set(e);
+ }
+ });
+ assertEquals("Number of executions didn't match expected.", 5, counter.get());
+ assertNotNull(exception.get());
+ }
+
+ private Map<String, String> getMap(String prefix) {
+ Map<String, String> m = new HashMap<String, String>();
+ m.put("firstName", prefix + "First");
+ m.put("lastName", prefix + "Last");
+ return m;
+ }
+
+ }
+}
Something went wrong with that request. Please try again.