-
-
Notifications
You must be signed in to change notification settings - Fork 10
/
GroupsAndDelays.java
69 lines (63 loc) · 2.37 KB
/
GroupsAndDelays.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package org.abhijitsarkar.reactor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.List;
import java.util.function.Function;
/**
* @author Abhijit Sarkar
*/
@Slf4j
public class GroupsAndDelays {
int groupMemberDelaySeconds;
int maxRetries;
int retryDelaySeconds;
// https://github.com/reactor/reactor-core/issues/421
public Flux<Integer> doStuff(Flux<Integer> src,
Function<Integer, Integer> groupByFn,
Function<Integer, Integer> responseMapper) {
return src
.groupBy(groupByFn)
.flatMap(g -> g
.distinct()
.collectList()
.publishOn(Schedulers.newParallel("par-grp"))
.flatMap(this::call))
.map(responseMapper)
.doOnNext(i -> {
log.info("Received: {}.", i);
});
}
private Flux<Integer> call(List<Integer> values) {
return Flux.fromIterable(values)
.zipWith(Flux.interval(Duration.ofSeconds(groupMemberDelaySeconds)),
(x, delay) -> x)
.flatMap(this::even);
}
private Mono<Integer> even(int i) {
return Mono.<Integer>create(emitter -> {
if (i > 30) {
log.error("Too big: {}.", i);
emitter.error(new IllegalArgumentException("Too big."));
} else if (i % 2 == 0) {
log.info("Success: {}.", i);
emitter.success(i);
} else {
emitter.success();
}
})
.map(x -> x * 2)
.retryWhen(errors -> errors.zipWith(Flux.range(1, maxRetries + 1), (ex, x) -> x)
.flatMap(retryCount -> {
if (retryCount <= maxRetries) {
long delay = (long) Math.pow(retryDelaySeconds, retryCount);
return Mono.delay(Duration.ofSeconds(delay));
}
log.error("Done retrying: {}.", i);
return Mono.<Long>empty();
})
);
}
}