Permalink
Browse files

Back CompositeObservable with a Subject

  • Loading branch information...
1 parent 00e80dd commit 558da229a894868edd236c5bfeadfc5dfe387150 @thomasnield thomasnield committed Nov 3, 2016
View
@@ -18,7 +18,7 @@ sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
dependencies {
- compile 'io.reactivex:rxjava:1.1.+'
+ compile 'io.reactivex:rxjava:1.2.+'
testCompile 'junit:junit-dep:4.+'
testCompile 'org.mockito:mockito-core:1.8.5'
}
View
@@ -1 +1 @@
-version=0.2.0
+version=0.3.0
@@ -15,18 +15,17 @@
*/
package rx.javafx.sources;
-import javafx.collections.FXCollections;
-import javafx.collections.ObservableSet;
import rx.Observable;
+import rx.Subscription;
import rx.annotations.Beta;
-import rx.observables.JavaFxObservable;
+import rx.subjects.PublishSubject;
+import rx.subjects.SerializedSubject;
import java.util.Arrays;
-import java.util.HashSet;
/**
- * A CompositeObservable can merge multiple Observables that can be added/removed at any time,
+ * A CompositeObservable can merge multiple event source Observables that can be added/removed at any time,
* affecting all Subscribers regardless of when they subscribed. This is especially helpful for merging
* multiple UI event sources. You can also pass a Transformer to perform
* further operations on the combined Observable that is returned
@@ -36,7 +35,7 @@
@Beta
public final class CompositeObservable<T> {
- private final ObservableSet<Observable<T>> sources;
+ private final SerializedSubject<T,T> subject;
private final Observable<T> output;
/**
@@ -49,16 +48,12 @@ public CompositeObservable() {
/**
* Creates a new CompositeObservable with the provided transformations applied to the returned Observable
* yield from `toObservable()`. For instance, you can pass `obs -> obs.replay(1).refCount()` to make this CompositeObservable
- * replay one emission or `obs -> obs.share()` to multicast it.
* @param transformer
*/
public CompositeObservable(Observable.Transformer<T,T> transformer) {
- sources = FXCollections.synchronizedObservableSet(FXCollections.observableSet(new HashSet<>()));
+ subject = PublishSubject.<T>create().toSerialized();
- Observable<T> updatingSource = Observable.merge(
- Observable.from(sources).flatMap(obs -> obs.takeWhile(v -> sources.contains(obs))),
- JavaFxObservable.fromObservableSetAdds(sources).flatMap(obs -> obs.takeWhile(v -> sources.contains(obs)))
- );
+ Observable<T> updatingSource = subject.asObservable();
if (transformer == null) {
output = updatingSource;
@@ -75,22 +70,10 @@ public CompositeObservable(Observable.Transformer<T,T> transformer) {
public Observable<T> toObservable() {
return output;
}
- public void add(Observable<T> observable) {
- sources.add(observable);
+ public Subscription add(Observable<T> observable) {
+ return observable.subscribe(subject);
}
public void addAll(Observable<T>... observables) {
Arrays.stream(observables).forEach(this::add);
}
- public void remove(Observable<T> observable) {
- sources.remove(observable);
- }
- public void removeAll(Observable<T>... observables) {
- Arrays.stream(observables).forEach(this::remove);
- }
- public void clear() {
- sources.clear();
- }
- public ObservableSet<Observable<T>> getSources() {
- return FXCollections.unmodifiableObservableSet(sources);
- }
}
@@ -25,6 +25,7 @@
import javafx.util.Duration;
import org.junit.Test;
import rx.Observable;
+import rx.Subscription;
import rx.observables.JavaFxObservable;
import rx.schedulers.JavaFxScheduler;
import rx.schedulers.Schedulers;
@@ -312,9 +313,9 @@ public void testcompositeObservableInfinite() {
PublishSubject<String> source2 = PublishSubject.create();
PublishSubject<String> source3 = PublishSubject.create();
- compositeObservable.add(source1);
- compositeObservable.add(source2);
- compositeObservable.add(source3);
+ Subscription sub1 = compositeObservable.add(source1);
+ Subscription sub2 = compositeObservable.add(source2);
+ Subscription sub3 = compositeObservable.add(source3);
compositeObservable.toObservable().subscribe(emissions::add);
@@ -330,7 +331,7 @@ public void testcompositeObservableInfinite() {
source1.onNext("Delta");
assertTrue(emissions.get(3).equals("Delta"));
- compositeObservable.remove(source2);
+ sub2.unsubscribe();
source2.onNext("Epsilon");
assertTrue(emissions.size() == 4);

0 comments on commit 558da22

Please sign in to comment.