-
Notifications
You must be signed in to change notification settings - Fork 10
/
Part01CreateFluxAndMono.java
205 lines (164 loc) · 7 KB
/
Part01CreateFluxAndMono.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package com.balamaci.reactor;
import com.balamaci.reactor.util.Helpers;
import org.junit.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
public class Part01CreateFluxAndMono implements BaseTestFlux {
@Test
public void just() {
Flux<Integer> flux = Flux.just(1, 5, 10);
flux.subscribe(val -> log.info("Subscriber received: {}", val));
}
@Test
public void range() {
Flux<Integer> flux = Flux.range(1, 10);
flux.subscribe(val -> log.info("Subscriber received: {}", val));
}
@Test
public void fromArray() {
Flux<String> flux = Flux.fromArray(new String[]{"red", "green", "blue", "black"});
flux.subscribe(val -> log.info("Subscriber received: {}", val));
}
@Test
public void fromIterable() {
Flux<String> flux = Flux.fromIterable(Arrays.asList("red", "green", "blue"));
flux.subscribe(val -> log.info("Subscriber received: {}", val));
}
@Test
public void fromJavaStream() {
Stream<String> stream = Stream.of("red", "green");
Flux<String> flux = Flux.fromStream(stream);
flux.subscribe(val -> log.info("Subscriber received: {}", val));
}
/**
* We can also create a stream from Future, making easier to switch from legacy code to reactive
* Since CompletableFuture can only return a single entity, Mono is the returned type when converting
* from a Future
*/
@Test
public void fromFuture() throws InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.
supplyAsync(() -> { //starts a background thread the ForkJoin common pool
log.info("About to sleep and return a value");
Helpers.sleepMillis(500);
return "red";
});
Mono<String> mono = Mono.fromFuture(completableFuture);
mono
.log()
.subscribe(val -> log.info("Subscriber received: {}", val));
mono.block();
}
/**
* Using Flux.create to handle the actual emissions of events with the events like onNext, onComplete, onError
* <p>
* When subscribing to the Flux with flux.subscribe() the lambda code inside create() gets executed.
* Flux.subscribe can take 3 handlers for each type of event - onNext, onError and onComplete
* <p>
* When using Flux.create you need to be aware of <b>Backpressure</b> {@see Part07BackpressureHandling}.
*/
@Test
public void createSimpleFlux() {
Flux<Integer> flux = Flux.create(subscriber -> {
log.info("Started emitting");
log.info("Emitting 1st");
subscriber.next(1);
log.info("Emitting 2nd");
subscriber.next(2);
subscriber.complete();
});
Disposable cancelable =
flux
.log()
.subscribe(
val -> log.info("Subscriber received: {}", val), //--> onNext
err -> log.error("Subscriber received error", err),//--> onError
() -> log.info("Subscriber got Completed event") //--> onComplete
);
}
/**
* Flux emits an Error event which is a terminal operation and the subscriber is no longer executing
* it's onNext callback.
*/
@Test
public void createSimpleFluxThatEmitsError() {
Flux<Integer> flux = Flux.create(subscriber -> {
log.info("Started emitting");
log.info("Emitting 1st");
subscriber.next(1);
subscriber.error(new RuntimeException("Test exception"));
log.info("Emitting 2nd");
subscriber.next(2);
});
Disposable disposable = flux.subscribe(
val -> log.info("Subscriber received: {}", val),
err -> log.error("Subscriber received error", err),
() -> log.info("Subscriber got Completed event"));
}
/**
* Flux and Mono are lazy, meaning that the code inside create() doesn't get executed without subscribing to the Flux
* So even if we sleep for a long time inside create() method(to simulate a costly operation),
* without subscribing to this Observable the code is not executed and the method returns immediately.
*/
@Test
public void fluxIsLazy() {
Flux<Integer> flux = Flux.create(subscriber -> {
log.info("Started emitting but sleeping for 5 secs"); //this is not executed
Helpers.sleepMillis(5000);
subscriber.next(1);
});
log.info("Finished");
}
/**
* When subscribing to an Flux, the create() method gets executed for each subscription
* this means that the events inside create are re-emitted to each subscriber. So every subscriber will get the
* same events and will not lose any events.
*/
@Test
public void multipleSubscriptionsToSameFlux() {
Flux<Integer> flux = Flux.create(subscriber -> {
log.info("Started emitting");
log.info("Emitting 1st event");
subscriber.next(1);
log.info("Emitting 2nd event");
subscriber.next(2);
subscriber.complete();
});
log.info("Subscribing 1st subscriber");
flux.subscribe(val -> log.info("First Subscriber received: {}", val));
log.info("=======================");
log.info("Subscribing 2nd subscriber");
flux.subscribe(val -> log.info("Second Subscriber received: {}", val));
}
/**
* Inside the create() method, we can check is there are still active subscribers to our Observable.
* It's a way to prevent to do extra work(like for ex. querying a datasource for entries) if no one is listening
* In the following example we'd expect to have an infinite stream, but because we stop if there are no active
* subscribers we stop producing events.
* The **take()** operator unsubscribes from the Observable after it's received the specified amount of events
*/
@Test
public void canceledFlux() {
Flux<Integer> observable = Flux.create(subscriber -> {
int i = 1;
while (true) {
if (subscriber.isCancelled()) {
break;
}
subscriber.next(i++);
}
//subscriber.onCompleted(); too late to emit Complete event since the subscription already canceled
});
observable
.take(5)
.subscribe(
val -> log.info("Subscriber received: {}", val),
err -> log.error("Subscriber received error", err),
() -> log.info("Subscriber got Completed event") //The Complete event is triggered by 'take()' operator
);
}
}