-
Notifications
You must be signed in to change notification settings - Fork 0
/
RecordTest.java
65 lines (52 loc) · 1.85 KB
/
RecordTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.example;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.ReplayProcessor;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
class RecordTest {
private final static int CAPACITY = 100;
@Test
void testConsumeLessThanAvailable() {
test(CAPACITY - 1);
}
@Test
void testConsumeAllAvailable() {
Assertions.assertTimeoutPreemptively(Duration.ofSeconds(2), () -> {
test(CAPACITY);
});
}
@Test
void testConsumeAllAvailableWithoutExpect() {
// If you knock out the expect bit then everything finishes.
testWithoutExpect(CAPACITY);
}
private void test(int end) {
List<Integer> expected = IntStream.range(0, end).boxed().collect(Collectors.toList());
ReplayProcessor<Integer> flux = ReplayProcessor.create(CAPACITY);
for (int i = 0; i < CAPACITY; i++) flux.onNext(i);
StepVerifier.create(flux)
.recordWith(ArrayList::new)
.thenConsumeWhile(i -> i < (end - 1))
.expectRecordedMatches(actual -> {
boolean result = expected.equals(actual);
System.err.println("result = " + result);
return result;
})
.thenCancel()
.verify();
}
private void testWithoutExpect(int end) {
ReplayProcessor<Integer> flux = ReplayProcessor.create(CAPACITY);
for (int i = 0; i < CAPACITY; i++) flux.onNext(i);
StepVerifier.create(flux)
.recordWith(ArrayList::new)
.thenConsumeWhile(i -> i < (end - 1))
.thenCancel()
.verify();
}
}