Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperatorZip.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,14 @@ public OperatorZip(Func9 f) {
public Subscriber<? super Observable[]> call(final Subscriber<? super R> observer) {
return new Subscriber<Observable[]>(observer) {

boolean started = false;

@Override
public void onCompleted() {
// we only complete once a child Observable completes or errors
if (!started) {
// this means we have not received a valid onNext before termination so we emit the onCompleted
observer.onCompleted();
}
}

@Override
Expand All @@ -118,7 +123,12 @@ public void onError(Throwable e) {

@Override
public void onNext(Observable[] observables) {
new Zip<R>(observables, observer, zipFunction).zip();
if (observables == null || observables.length == 0) {
observer.onCompleted();
} else {
started = true;
new Zip<R>(observables, observer, zipFunction).zip();
}
}

};
Expand Down
30 changes: 30 additions & 0 deletions rxjava-core/src/test/java/rx/ZipTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
package rx;

import static org.junit.Assert.*;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -31,6 +35,7 @@
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.FuncN;
import rx.observables.GroupedObservable;

public class ZipTests {
Expand Down Expand Up @@ -91,6 +96,31 @@ public void testCovarianceOfZip() {
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine);
}

/**
* Occasionally zip may be invoked with 0 observables. Test that we don't block indefinitely instead
* of immediately invoking zip with 0 argument.
*
* We now expect an IllegalArgumentException since last() requires at least one value and nothing will be emitted.
*/
@Test(expected = IllegalArgumentException.class)
public void nonBlockingObservable() {

final Object invoked = new Object();

Collection<Observable<Object>> observables = Collections.emptyList();

Observable<Object> result = Observable.zip(observables, new FuncN<Object>() {
@Override
public Object call(final Object... args) {
System.out.println("received: " + args);
assertEquals("No argument should have been passed", 0, args.length);
return invoked;
}
});

assertSame(invoked, result.toBlockingObservable().last());
}

Func2<Media, Rating, ExtendedResult> combine = new Func2<Media, Rating, ExtendedResult>() {
@Override
public ExtendedResult call(Media m, Rating r) {
Expand Down
68 changes: 68 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OperatorZipTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,6 +43,7 @@
import rx.functions.Func3;
import rx.functions.FuncN;
import rx.functions.Functions;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

Expand Down Expand Up @@ -986,6 +988,72 @@ public void call(String s) {
assertEquals("OnCompleted_null-OnCompleted_null", list.get(3));
}

@Test
public void testZipEmptyObservables() {

Observable<String> o = Observable.zip(Observable.<Integer> empty(), Observable.<String> empty(), new Func2<Integer, String, String>() {

@Override
public String call(Integer t1, String t2) {
return t1 + "-" + t2;
}

});

final ArrayList<String> list = new ArrayList<String>();
o.subscribe(new Action1<String>() {

@Override
public void call(String s) {
System.out.println(s);
list.add(s);
}
});

assertEquals(0, list.size());
}

@Test
public void testZipEmptyList() {

final Object invoked = new Object();
Collection<Observable<Object>> observables = Collections.emptyList();

Observable<Object> o = Observable.zip(observables, new FuncN<Object>() {
@Override
public Object call(final Object... args) {
assertEquals("No argument should have been passed", 0, args.length);
return invoked;
}
});

TestSubscriber<Object> ts = new TestSubscriber<Object>();
o.subscribe(ts);
ts.awaitTerminalEvent(200, TimeUnit.MILLISECONDS);
ts.assertReceivedOnNext(Collections.emptyList());
}

/**
* Expect IllegalArgumentException instead of blocking forever as zip should emit onCompleted and no onNext
* and last() expects at least a single response.
*/
@Test(expected = IllegalArgumentException.class)
public void testZipEmptyListBlocking() {

final Object invoked = new Object();
Collection<Observable<Object>> observables = Collections.emptyList();

Observable<Object> o = Observable.zip(observables, new FuncN<Object>() {
@Override
public Object call(final Object... args) {
assertEquals("No argument should have been passed", 0, args.length);
return invoked;
}
});

o.toBlockingObservable().last();
}

Observable<Integer> OBSERVABLE_OF_5_INTEGERS = OBSERVABLE_OF_5_INTEGERS(new AtomicInteger());

Observable<Integer> OBSERVABLE_OF_5_INTEGERS(final AtomicInteger numEmitted) {
Expand Down