/
RxUtilsTest.java
126 lines (110 loc) · 4.88 KB
/
RxUtilsTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package cgeo.geocaching.utils;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestCase;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
public class RxUtilsTest extends TestCase {
public static void testRememberLast() {
final PublishSubject<String> rawObservable = PublishSubject.create();
final Observable<String> observable = RxUtils.rememberLast(rawObservable, "initial");
// Check that the initial value is present, and is kept there
assertThat(observable.toBlocking().first()).isEqualTo("initial");
assertThat(observable.toBlocking().first()).isEqualTo("initial");
// Check that if the observable is not subscribed, changes are not propagated (similar to not keeping the
// inner subscription active).
rawObservable.onNext("without subscribers");
assertThat(observable.toBlocking().first()).isEqualTo("initial");
// Check that new values are propagated and cached
final Subscription subscription = observable.subscribe();
rawObservable.onNext("first");
assertThat(observable.toBlocking().first()).isEqualTo("first");
subscription.unsubscribe();
assertThat(observable.toBlocking().first()).isEqualTo("first");
}
public static void testFromNullable() {
final Observable<String> fromNull = RxUtils.fromNullable(null);
assertThat(fromNull.toBlocking().getIterator().hasNext()).isFalse();
final Observable<String> fromNonNull = RxUtils.fromNullable("foo");
assertThat(fromNonNull.toBlocking().single()).isEqualTo("foo");
}
public static void testDeferredNullable() {
final Observable<String> fromNull = RxUtils.deferredNullable(new Func0<String>() {
@Override
public String call() {
return null;
}
});
assertThat(fromNull.toBlocking().getIterator().hasNext()).isFalse();
final Observable<String> fromNonNull = RxUtils.deferredNullable(new Func0<String>() {
@Override
public String call() {
return "foo";
}
});
assertThat(fromNonNull.toBlocking().single()).isEqualTo("foo");
}
public static void testWaitForCompletion() {
final PublishSubject<String> observable = PublishSubject.create();
final AtomicBoolean terminated = new AtomicBoolean(false);
new Thread() {
@Override
public void run() {
RxUtils.waitForCompletion(observable.toBlocking());
terminated.set(true);
}
}.start();
observable.onNext("foo");
assertThat(terminated.get()).isFalse();
observable.onNext("bar");
assertThat(terminated.get()).isFalse();
observable.onCompleted();
try {
Thread.sleep(100);
} catch (final InterruptedException ignored) {
}
assertThat(terminated.get()).isTrue();
}
public static void testObservableCache() {
final AtomicInteger counter = new AtomicInteger(0);
final RxUtils.ObservableCache<String, Integer> cache = new RxUtils.ObservableCache<String, Integer>(new Func1<String, Observable<Integer>>() {
@Override
public Observable<Integer> call(final String s) {
counter.incrementAndGet();
return Observable.just(s.length());
}
});
assertThat(cache.get("a").toBlocking().single()).isEqualTo(1);
assertThat(counter.get()).isEqualTo(1);
assertThat(cache.get("a").toBlocking().single()).isEqualTo(1);
assertThat(counter.get()).isEqualTo(1);
assertThat(cache.get("bb").toBlocking().single()).isEqualTo(2);
assertThat(counter.get()).isEqualTo(2);
assertThat(cache.get("bb").toBlocking().single()).isEqualTo(2);
assertThat(counter.get()).isEqualTo(2);
assertThat(cache.get("a").toBlocking().single()).isEqualTo(1);
assertThat(counter.get()).isEqualTo(2);
}
public static void testDelayedUnsubscription() {
final AtomicBoolean unsubscribed = new AtomicBoolean(false);
Observable.never().doOnUnsubscribe(new Action0() {
@Override
public void call() {
unsubscribed.set(true);
}
}).lift(new RxUtils.DelayedUnsubscription<Object>(100, TimeUnit.MILLISECONDS)).subscribe().unsubscribe();
assertThat(unsubscribed.get()).isFalse();
try {
Thread.sleep(200);
} catch (final InterruptedException ignored) {
// ignore for tests
}
assertThat(unsubscribed.get()).isTrue();
}
}