forked from Netflix/hollow-reference-implementation
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ProducerTest.java
126 lines (111 loc) · 5.25 KB
/
ProducerTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package how.hollow.producer;
import com.netflix.hollow.api.consumer.HollowConsumer;
import com.netflix.hollow.api.consumer.fs.HollowFilesystemAnnouncementWatcher;
import com.netflix.hollow.api.consumer.fs.HollowFilesystemBlobRetriever;
import com.netflix.hollow.api.producer.HollowProducer;
import com.netflix.hollow.api.producer.fs.HollowFilesystemAnnouncer;
import com.netflix.hollow.api.producer.fs.HollowFilesystemPublisher;
import how.hollow.producer.datamodel.Actor;
import how.hollow.producer.datamodel.Movie;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.Spy;
import java.io.File;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
public class ProducerTest {
static File publishDir = new File("/home/qbug/tmp", "publish-dir");
@Spy
HollowProducer.Publisher publisher;
HollowProducer.Announcer announcer;
HollowConsumer.BlobRetriever blobRetriever;
private static final int total = 1_000_0;
private static final int round = 10;
private static final int numsInRound = total / round;
@BeforeEach
public void setup() {
publishDir.mkdir();
publisher = new HollowFilesystemPublisher(publishDir.toPath());
announcer = new HollowFilesystemAnnouncer(publishDir.toPath());
blobRetriever = new HollowFilesystemBlobRetriever(publishDir.toPath());
}
@Test
public void should_produce_in_10_batches_when_every_round_changed() {
HollowProducer.Incremental incremental = getIncremental(false);
for (int i = 0; i < round; i++) {
final int idx = i;
long version = incremental.runIncrementalCycle(state -> {
generateMovies(idx * numsInRound, numsInRound, "movie_").forEach(state::addOrModify);
});
System.out.println(version);
}
// verify(publisher, times(2)).publish(any(HollowProducer.PublishArtifact.class));
}
@Test
public void should_not_produce_at_last_in_10_batches_when_one_round_not_changed() {
// Publish dataset for testing case
final HollowProducer.Incremental incremental = getIncremental(true);
// Please run previous test case to get the latest version and set here
final long restoreVersion = 20240403122440010L;
long version = 0;
for (int i = 0; i < round; i++) {
final int idx = i;
version = incremental.runIncrementalCycle(state -> {
generateMovies(idx * numsInRound, numsInRound, "movie_").forEach(state::addOrModify);
});
}
HollowProducer.Incremental newIncremental = getIncremental(true);
// Restore to make some rounds in next cycle not changed
newIncremental.restore(restoreVersion, blobRetriever);
verify(publisher, times(0)).publish(any(HollowProducer.PublishArtifact.class));
// This round not changed
long newVersion = newIncremental.runIncrementalCycle(state -> {
generateMovies(0, numsInRound, "changed_movie_").forEach(state::addOrModify);
});
verify(publisher, times(4)).publish(any(HollowProducer.PublishArtifact.class));
// Remaining movies are not changed
for (int i = 1; i < round; i++) {
final int idx = i;
long roundVersion = newIncremental.runIncrementalCycle(state -> {
generateMovies(idx * numsInRound, numsInRound, "movie_").forEach(state::addIfAbsent);
});
System.out.println("a round");
}
// Version changed because movies' titles changed
assertNotEquals(version, newVersion);
// Do not publish any artifact since line 85
verify(publisher, times(4)).publish(any(HollowProducer.PublishArtifact.class));
}
private HollowProducer.Incremental getIncremental(boolean mockPublisher) {
if (mockPublisher) {
publisher = Mockito.mock(HollowProducer.Publisher.class);
doAnswer(invocationOnMock -> {
HollowProducer.PublishArtifact argument = invocationOnMock.getArgument(0);
System.out.println(argument.getPath());
return null;
}).when(publisher).publish(any(HollowProducer.PublishArtifact.class));
}
HollowProducer.Incremental incremental = HollowProducer.withPublisher(publisher)
.withAnnouncer(announcer)
// set it so that publisher will only publish snapshot at beginning and at last
.withNumStatesBetweenSnapshots(round - 2)
.buildIncremental();
incremental.initializeDataModel(Movie.class);
return incremental;
}
List<Movie> generateMovies(int startId, int cnt, String titlePrefix) {
final Set<Actor> empty = new HashSet<>();
return IntStream.range(startId, startId + cnt)
.mapToObj(id -> new Movie(id, titlePrefix +id, empty))
.collect(Collectors.toList());
}
}