/
PersistentActorCompileOnlyTest.java
329 lines (265 loc) · 9.17 KB
/
PersistentActorCompileOnlyTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.javadsl;
import akka.actor.Scheduler;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.ActorRef;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.EventAdapter;
import akka.actor.testkit.typed.javadsl.TestInbox;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.SideEffect;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletionStage;
import static akka.actor.typed.javadsl.AskPattern.ask;
public class PersistentActorCompileOnlyTest {
public static abstract class Simple {
//#event-wrapper
public static class Wrapper<T> {
private final T t;
public Wrapper(T t) {
this.t = t;
}
public T getT() {
return t;
}
}
public static class EventAdapterExample extends EventAdapter<SimpleEvent, Wrapper<SimpleEvent>> {
@Override
public Wrapper<SimpleEvent> toJournal(SimpleEvent simpleEvent) {
return new Wrapper<>(simpleEvent);
}
@Override
public SimpleEvent fromJournal(Wrapper<SimpleEvent> simpleEventWrapper) {
return simpleEventWrapper.getT();
}
}
//#event-wrapper
//#command
public static class SimpleCommand {
public final String data;
public SimpleCommand(String data) {
this.data = data;
}
}
//#command
//#event
static class SimpleEvent {
private final String data;
SimpleEvent(String data) {
this.data = data;
}
}
//#event
//#state
static class SimpleState {
private final List<String> events;
SimpleState(List<String> events) {
this.events = events;
}
SimpleState() {
this.events = new ArrayList<>();
}
SimpleState addEvent(SimpleEvent event) {
List<String> newEvents = new ArrayList<>(events);
newEvents.add(event.data);
return new SimpleState(newEvents);
}
}
//#state
//#behavior
public static PersistentBehavior<SimpleCommand, SimpleEvent, SimpleState> pb =
new PersistentBehavior<SimpleCommand, SimpleEvent, SimpleState>(new PersistenceId("p1")) {
@Override
public SimpleState emptyState() {
return new SimpleState();
}
//#command-handler
@Override
public CommandHandler<SimpleCommand, SimpleEvent, SimpleState> commandHandler() {
return (state, cmd) -> Effect().persist(new SimpleEvent(cmd.data));
}
//#command-handler
//#event-handler
@Override
public EventHandler<SimpleState, SimpleEvent> eventHandler() {
return (state, event) -> state.addEvent(event);
}
//#event-handler
//#install-event-adapter
@Override
public EventAdapter<SimpleEvent, Wrapper<SimpleEvent>> eventAdapter() {
return new EventAdapterExample();
}
//#install-event-adapter
};
//#behavior
}
static abstract class WithAck {
public static class Ack {
}
interface MyCommand {
}
public static class Cmd implements MyCommand {
private final String data;
private final ActorRef<Ack> sender;
public Cmd(String data, ActorRef<Ack> sender) {
this.data = data;
this.sender = sender;
}
}
interface MyEvent {
}
public static class Evt implements MyEvent {
private final String data;
public Evt(String data) {
this.data = data;
}
}
static class ExampleState {
private List<String> events = new ArrayList<>();
}
//#commonChainedEffects
// Factored out Chained effect
static final SideEffect<ExampleState> commonChainedEffect =
SideEffect.create(s -> System.out.println("Command handled!"));
//#commonChainedEffects
private PersistentBehavior<MyCommand, MyEvent, ExampleState> pa =
new PersistentBehavior<MyCommand, MyEvent, ExampleState>(new PersistenceId("pa")) {
@Override
public ExampleState emptyState() {
return new ExampleState();
}
@Override
public CommandHandler<MyCommand, MyEvent, ExampleState> commandHandler() {
//#commonChainedEffects
return commandHandlerBuilder(ExampleState.class)
.matchCommand(Cmd.class, (state, cmd) -> Effect().persist(new Evt(cmd.data))
.thenRun(() -> cmd.sender.tell(new Ack()))
.andThen(commonChainedEffect)
)
.build();
//#commonChainedEffects
}
@Override
public EventHandler<ExampleState, MyEvent> eventHandler() {
return eventHandlerBuilder()
.matchEvent(Evt.class, (state, event) -> {
state.events.add(event.data);
return state;
})
.build();
}
};
}
static abstract class RecoveryComplete {
interface Command {
}
static class DoSideEffect implements Command {
final String data;
DoSideEffect(String data) {
this.data = data;
}
}
static class AcknowledgeSideEffect implements Command {
final int correlationId;
AcknowledgeSideEffect(int correlationId) {
this.correlationId = correlationId;
}
}
interface Event {
}
static class IntentRecord implements Event {
final int correlationId;
final String data;
IntentRecord(int correlationId, String data) {
this.correlationId = correlationId;
this.data = data;
}
}
static class SideEffectAcknowledged implements Event {
final int correlationId;
SideEffectAcknowledged(int correlationId) {
this.correlationId = correlationId;
}
}
static class EventsInFlight {
final int nextCorrelationId;
final Map<Integer, String> dataByCorrelationId;
EventsInFlight(int nextCorrelationId, Map<Integer, String> dataByCorrelationId) {
this.nextCorrelationId = nextCorrelationId;
this.dataByCorrelationId = dataByCorrelationId;
}
}
static class Request {
final int correlationId;
final String data;
final ActorRef<Response> sender;
Request(int correlationId, String data, ActorRef<Response> sender) {
this.correlationId = correlationId;
this.data = data;
this.sender = sender;
}
}
static class Response {
final int correlationId;
Response(int correlationId) {
this.correlationId = correlationId;
}
}
static ActorRef<Request> sideEffectProcessor = TestInbox.<Request>create().getRef();
static Duration timeout = Duration.ofSeconds(1);
private static void performSideEffect(ActorRef<AcknowledgeSideEffect> sender, int correlationId, String data, Scheduler scheduler) {
CompletionStage<Response> what = ask(sideEffectProcessor, (ActorRef<Response> ar) -> new Request(correlationId, data, ar), timeout, scheduler);
what.thenApply(r -> new AcknowledgeSideEffect(r.correlationId))
.thenAccept(sender::tell);
}
// #actor-context
public Behavior<Command> behavior(PersistenceId persistenceId) {
return Behaviors.setup(ctx -> new MyPersistentBehavior(persistenceId, ctx));
}
// #actor-context
// #actor-context
class MyPersistentBehavior extends PersistentBehavior<Command, Event, RecoveryComplete.EventsInFlight> {
// this makes the context available to the command handler etc.
private final ActorContext<Command> ctx;
public MyPersistentBehavior(PersistenceId persistenceId, ActorContext<Command> ctx) {
super(persistenceId);
this.ctx = ctx;
}
// #actor-context
@Override
public EventsInFlight emptyState() {
return new EventsInFlight(0, Collections.emptyMap());
}
@Override
public CommandHandler<Command, Event, EventsInFlight> commandHandler() {
return commandHandlerBuilder(EventsInFlight.class)
.matchCommand(DoSideEffect.class,
(state, cmd) -> Effect().persist(new IntentRecord(state.nextCorrelationId, cmd.data))
.thenRun(() -> performSideEffect(ctx.getSelf().narrow(), state.nextCorrelationId, cmd.data, ctx.getSystem().scheduler())))
.matchCommand(AcknowledgeSideEffect.class, (state, command) -> Effect().persist(new SideEffectAcknowledged(command.correlationId)))
.build();
}
@Override
public EventHandler<EventsInFlight, Event> eventHandler() {
return eventHandlerBuilder()
.matchEvent(IntentRecord.class, (state, event) -> {
int nextCorrelationId = event.correlationId;
Map<Integer, String> newOutstanding = new HashMap<>(state.dataByCorrelationId);
newOutstanding.put(event.correlationId, event.data);
return new EventsInFlight(nextCorrelationId, newOutstanding);
})
.matchEvent(SideEffectAcknowledged.class, (state, event) -> {
Map<Integer, String> newOutstanding = new HashMap<>(state.dataByCorrelationId);
newOutstanding.remove(event.correlationId);
return new EventsInFlight(state.nextCorrelationId, newOutstanding);
})
.build();
}
}
}
}