/
AbstractBackgroundStreamingActorWithConfigWithStatusReport.java
443 lines (376 loc) · 15 KB
/
AbstractBackgroundStreamingActorWithConfigWithStatusReport.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.utils.health;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.concurrent.CompletionStage;
import java.util.stream.Stream;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.services.utils.akka.actors.ModifyConfigBehavior;
import org.eclipse.ditto.services.utils.akka.actors.RetrieveConfigBehavior;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.config.DittoConfigError;
import org.eclipse.ditto.services.utils.health.config.BackgroundStreamingConfig;
import org.eclipse.ditto.signals.commands.common.Shutdown;
import org.eclipse.ditto.signals.commands.common.ShutdownResponse;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;
import akka.Done;
import akka.actor.AbstractActorWithTimers;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
import akka.stream.UniqueKillSwitch;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
/**
* Common behavior for actors that stay alive for a long time running a stream over and over.
*
* @param <C> type of configuration.
*/
public abstract class AbstractBackgroundStreamingActorWithConfigWithStatusReport<C extends BackgroundStreamingConfig>
extends AbstractActorWithTimers implements RetrieveConfigBehavior, ModifyConfigBehavior {
/**
* Modifiable config to control this actor.
*/
protected C config;
/**
* The logger.
*/
protected final DittoDiagnosticLoggingAdapter log;
/**
* The actor materializer to materialize streams.
*/
protected final Materializer materializer;
private final Deque<Pair<Instant, Event>> events;
private KillSwitch killSwitch;
/**
* Initialize the actor with a background streaming config.
*
* @param config the background streaming config.
*/
protected AbstractBackgroundStreamingActorWithConfigWithStatusReport(final C config) {
this.config = config;
log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
materializer = Materializer.createMaterializer(this::getContext);
events = new ArrayDeque<>(config.getKeptEvents() + 1);
if (config.isEnabled()) {
scheduleWakeUp();
}
}
/**
* Enqueue an element into a deque of bounded size.
*
* @param queue the deque.
* @param element the element to enqueue.
* @param maxQueueSize the maximum size of the queue.
* @param <T> the type of elements.
*/
protected static <T> void enqueue(final Deque<Pair<Instant, T>> queue, final T element, final int maxQueueSize) {
queue.addFirst(Pair.create(Instant.now(), element));
if (queue.size() > maxQueueSize) {
queue.removeLast();
}
}
/**
* Construct a config object by parsing HOCON.
*
* @param config the HOCON.
* @return the config object.
*/
protected abstract C parseConfig(final Config config);
/**
* Get the stream that should be restarted again and again in the background as a source of whatever to be followed
* by a kill switch and a sink that ignores all elements.
*
* @return the source.
*/
protected abstract Source<?, ?> getSource();
/**
* Add message handling logic to the sleeping behavior of this actor.
*
* @param sleepingReceiveBuilder the builder for the sleeping behavior.
*/
protected void preEnhanceSleepingBehavior(final ReceiveBuilder sleepingReceiveBuilder) {
// do nothing by default
}
/**
* Add message handling logic to the streaming behavior of this actor.
*
* @param streamingReceiveBuilder the builder for the streaming behavior.
*/
protected void preEnhanceStreamingBehavior(final ReceiveBuilder streamingReceiveBuilder) {
// do nothing by default
}
/**
* Append fields to the status report.
*
* @param statusReportBuilder the builder for the status report.
*/
protected void postEnhanceStatusReport(final JsonObjectBuilder statusReportBuilder) {
// do nothing by default
}
@Override
public Receive createReceive() {
return sleeping();
}
@Override
public Config getConfig() {
return config.getConfig();
}
@Override
public Config setConfig(final Config config) {
final C previousConfig = this.config;
// TODO Ditto issue #439: replace ConfigWithFallback - it breaks AbstractConfigValue.withFallback!
// Workaround: re-parse my config
final Config fallback = ConfigFactory.parseString(getConfig().root().render(ConfigRenderOptions.concise()));
try {
this.config = parseConfig(config.withFallback(fallback));
} catch (final DittoConfigError | ConfigException e) {
log.error(e, "Failed to set config");
}
if (!previousConfig.isEnabled() && this.config.isEnabled()) {
scheduleWakeUp();
}
return this.config.getConfig();
}
private Receive sleeping() {
final ReceiveBuilder sleepingReceiveBuilder = ReceiveBuilder.create();
preEnhanceSleepingBehavior(sleepingReceiveBuilder);
return sleepingReceiveBuilder.match(WokeUp.class, this::wokeUp)
.match(Event.class, this::addCustomEventToLog)
.match(RetrieveHealth.class, this::retrieveHealth)
.match(ResetHealthEvents.class, this::resetHealthEvents)
.match(Shutdown.class, this::shutdownStream)
.build()
.orElse(retrieveConfigBehavior())
.orElse(modifyConfigBehavior());
}
private Receive streaming() {
final ReceiveBuilder streamingReceiveBuilder = ReceiveBuilder.create();
preEnhanceStreamingBehavior(streamingReceiveBuilder);
return streamingReceiveBuilder
.match(StreamTerminated.class, this::streamTerminated)
.match(Event.class, this::addCustomEventToLog)
.match(RetrieveHealth.class, this::retrieveHealth)
.match(ResetHealthEvents.class, this::resetHealthEvents)
.match(Shutdown.class, this::shutdownStream)
.build()
.orElse(retrieveConfigBehavior())
.orElse(modifyConfigBehavior());
}
private void wokeUp(final WokeUp wokeUp) {
log.info("Woke up.");
enqueue(events, wokeUp.enable(config.isEnabled()), config.getKeptEvents());
if (config.isEnabled()) {
restartStream();
getContext().become(streaming());
} else {
log.warning("Not waking up because disabled.");
}
}
/**
* Handle stream termination.
*
* @param streamTerminated the event of stream termination.
*/
protected void streamTerminated(final Event streamTerminated) {
enqueue(events, streamTerminated, config.getKeptEvents());
if (config.isEnabled()) {
log.info("Stream terminated. Will restart after quiet period.");
scheduleWakeUp();
} else {
log.warning("Stream terminated while disabled.");
}
getContext().become(sleeping());
}
protected Stream<Pair<Instant, Event>> getEventStream() {
return events.stream();
}
private void scheduleWakeUp() {
scheduleWakeUp(config.getQuietPeriod());
}
private void scheduleWakeUp(final Duration when) {
getTimers().startSingleTimer(WokeUp.class, WokeUp.ENABLED_INSTANCE, when);
}
private void shutdownStream(final Shutdown shutdown) {
log.info("Terminating stream on demand: <{}>", shutdown);
shutdownKillSwitch();
final Event streamTerminated = StreamTerminated.normally("Got " + shutdown);
enqueue(events, streamTerminated, config.getKeptEvents());
getContext().become(sleeping());
if (config.isEnabled()) {
final Duration wakeUpDelay = config.getQuietPeriod();
final String message = String.format("Restarting in <%s>.", wakeUpDelay);
scheduleWakeUp(wakeUpDelay);
getSender().tell(ShutdownResponse.of(message, shutdown.getDittoHeaders()), getSelf());
} else {
final String message = "Not restarting stream because I am disabled.";
getSender().tell(ShutdownResponse.of(message, shutdown.getDittoHeaders()), getSelf());
}
}
private void addCustomEventToLog(final Event event) {
enqueue(events, event, config.getKeptEvents());
}
private void restartStream() {
shutdownKillSwitch();
final Pair<UniqueKillSwitch, CompletionStage<Done>> materializedValues =
getSource().viaMat(KillSwitches.single(), Keep.right())
.toMat(Sink.ignore(), Keep.both())
.run(materializer);
killSwitch = materializedValues.first();
materializedValues.second()
.<Void>handle((result, error) -> {
final String description = String.format("Stream terminated. Result=<%s> Error=<%s>",
result, error);
if (error != null) {
log.error(error, description);
getSelf().tell(StreamTerminated.withError(description), getSelf());
} else {
log.info(description);
getSelf().tell(StreamTerminated.normally(description), getSelf());
}
return null;
});
}
private void shutdownKillSwitch() {
if (killSwitch != null) {
killSwitch.shutdown();
killSwitch = null;
}
}
private void retrieveHealth(final RetrieveHealth trigger) {
getSender().tell(RetrieveHealthResponse.of(renderStatusInfo(), trigger.getDittoHeaders()), getSelf());
}
private void resetHealthEvents(final ResetHealthEvents resetHealthEvents) {
events.clear();
getSender().tell(ResetHealthEventsResponse.of(resetHealthEvents.getDittoHeaders()), getSelf());
}
private StatusInfo renderStatusInfo() {
return StatusInfo.fromStatus(StatusInfo.Status.UP,
Collections.singletonList(StatusDetailMessage.of(getMostSevereLevelFromEvents(events), render())));
}
private JsonObject render() {
final JsonObjectBuilder statusReportBuilder = JsonObject.newBuilder()
.set(JsonFields.ENABLED, config.isEnabled())
.set(JsonFields.EVENTS, renderEvents(events));
postEnhanceStatusReport(statusReportBuilder);
return statusReportBuilder.build();
}
/**
* Get the most severe log level from evennts.
*
* @return The most severe log level to report.
*/
protected StatusDetailMessage.Level getMostSevereLevelFromEvents(final Deque<Pair<Instant, Event>> events) {
return events.stream()
.map(Pair::second)
.map(Event::level)
.max(Enum::compareTo)
.orElse(StatusDetailMessage.Level.DEFAULT);
}
/**
* Render known events as a JSON array.
*
* @param events events to render.
* @return the rendered events.
*/
protected JsonArray renderEvents(final Deque<Pair<Instant, Event>> events) {
return events.stream()
.map(this::renderEvent)
.collect(JsonCollectors.valuesToArray());
}
/**
* Render a single event as JSON for health reporting.
*
* @param element the event together with its timestamp.
* @return the rendered JSON object.
*/
protected JsonObject renderEvent(final Pair<Instant, Event> element) {
return JsonObject.newBuilder()
.set(element.first().toString(), element.second().name())
.build();
}
/**
* Event to report.
*/
protected interface Event {
String name();
default StatusDetailMessage.Level level() {
return StatusDetailMessage.Level.DEFAULT;
}
}
/**
* Event for when a stream started.
*/
protected static final class WokeUp implements Event {
private static final WokeUp ENABLED_INSTANCE = new WokeUp(true);
private final boolean enabled;
private WokeUp(final boolean enabled) {
this.enabled = enabled;
}
private WokeUp enable(final boolean isEnabled) {
return new WokeUp(isEnabled);
}
@Override
public String name() {
return enabled ? "WOKE_UP" : "Not waking up: I am disabled.";
}
}
/**
* Event for when a stream terminated.
*/
protected static final class StreamTerminated implements Event {
private static final StatusDetailMessage.Level STREAM_ERROR_STATUS_LEVEL = StatusDetailMessage.Level.WARN;
private final String whatHappened;
private final StatusDetailMessage.Level level;
private StreamTerminated(final String whatHappened, final StatusDetailMessage.Level level) {
this.whatHappened = whatHappened;
this.level = level;
}
private static StreamTerminated normally(final String whatHappened) {
return new StreamTerminated(whatHappened, StatusDetailMessage.Level.DEFAULT);
}
private static StreamTerminated withError(final String whatHappened) {
return new StreamTerminated(whatHappened, STREAM_ERROR_STATUS_LEVEL);
}
@Override
public String name() {
return whatHappened;
}
@Override
public StatusDetailMessage.Level level() {
return this.level;
}
}
private static final class JsonFields {
private static final JsonFieldDefinition<Boolean> ENABLED =
JsonFactory.newBooleanFieldDefinition("enabled");
private static final JsonFieldDefinition<JsonArray> EVENTS =
JsonFactory.newJsonArrayFieldDefinition("events");
}
}