Skip to content

Commit

Permalink
Add switchMap operator
Browse files Browse the repository at this point in the history
  • Loading branch information
balamaci committed Nov 6, 2018
1 parent 65a24d7 commit 8265e90
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 21 deletions.
12 changes: 3 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1025,15 +1025,9 @@ so for example **red : red0, red1, red2**
```java
private Flowable<String> simulateRemoteOperation(String color) {
return Flowable.<String>create(subscriber -> {
for (int i = 0; i < color.length(); i++) {
subscriber.onNext(color + i);
Helpers.sleepMillis(200);
}
subscriber.onComplete();
}, BackpressureStrategy.MISSING);
}
return Flowable.intervalRange(1, color.length(), 0, 200, TimeUnit.MILLISECONDS)
.map(iteration -> color + iteration);
}
```
If we have a stream of color names:
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<description>RxJava Playground - Test scenarios describing RxJava functionality</description>

<properties>
<rxjava.version>2.1.16</rxjava.version>
<rxjava.version>2.2.3</rxjava.version>
<slf4j.version>1.7.25</slf4j.version>
</properties>

Expand Down
28 changes: 17 additions & 11 deletions src/test/java/com/balamaci/rx/Part07FlatMapOperator.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.balamaci.rx;

import com.balamaci.rx.util.Helpers;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.flowables.GroupedFlowable;
import io.reactivex.schedulers.Schedulers;
Expand Down Expand Up @@ -91,7 +89,9 @@ public void flatMapConcurrency() {
public void concatMap() {
Flowable<String> colors = Flowable.just("orange", "red", "green", "blue")
.subscribeOn(Schedulers.io())
.concatMap(val -> simulateRemoteOperation(val));
.concatMap(val -> simulateRemoteOperation(val)
.subscribeOn(Schedulers.io())
);

subscribeWithLogOutputWaitingForComplete(colors);
}
Expand Down Expand Up @@ -147,20 +147,26 @@ private List<String> generateColors() {
return Arrays.asList("red", "green", "blue");
}

@Test
public void switchMap() {
Flowable<String> colors = Flowable.interval(400, TimeUnit.MILLISECONDS)
.zipWith(Arrays.asList("red", "green", "blue"), (it, color) -> color)
.switchMap(color -> simulateRemoteOperation(color)
.doOnCancel(() -> log.info("Unsubscribed {}", color))
);

subscribeWithLogOutputWaitingForComplete(colors);
}


/**
* Simulated remote operation that emits as many events as the length of the color string
* @param color color
* @return stream of events
*/
private Flowable<String> simulateRemoteOperation(String color) {
return Flowable.<String>create(subscriber -> {
for (int i = 0; i < color.length(); i++) {
subscriber.onNext(color + i);
Helpers.sleepMillis(200);
}

subscriber.onComplete();
}, BackpressureStrategy.MISSING);
return Flowable.intervalRange(1, color.length(), 0, 200, TimeUnit.MILLISECONDS)
.map(iteration -> color + iteration);
}

}

0 comments on commit 8265e90

Please sign in to comment.