/
Monitor.java
66 lines (54 loc) · 2.31 KB
/
Monitor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/*
* Copyright (C) 2020-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.sourceorflow;
import akka.Done;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.FlowMonitor;
import akka.stream.FlowMonitorState;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
/** */
public class Monitor {
// #monitor
private static <T> void printMonitorState(FlowMonitorState.StreamState<T> state) {
if (state == FlowMonitorState.finished()) {
System.out.println("Stream is initialized but hasn't processed any element");
} else if (state instanceof FlowMonitorState.Received) {
FlowMonitorState.Received msg = (FlowMonitorState.Received) state;
System.out.println("Last message received: " + msg.msg());
} else if (state instanceof FlowMonitorState.Failed) {
Throwable cause = ((FlowMonitorState.Failed) state).cause();
System.out.println("Stream failed with cause: " + cause.getMessage());
} else {
System.out.println("Stream completed already");
}
}
// #monitor
public static void main(String[] args)
throws InterruptedException, TimeoutException, ExecutionException {
ActorSystem actorSystem = ActorSystem.create("25fps-stream");
// #monitor
Source<Integer, FlowMonitor<Integer>> monitoredSource =
Source.fromIterator(() -> Arrays.asList(0, 1, 2, 3, 4, 5).iterator())
.throttle(5, Duration.ofSeconds(1))
.monitorMat(Keep.right());
Pair<FlowMonitor<Integer>, CompletionStage<Done>> run =
monitoredSource.toMat(Sink.foreach(System.out::println), Keep.both()).run(actorSystem);
FlowMonitor<Integer> monitor = run.first();
// If we peek in the monitor too early, it's possible it was not initialized yet.
printMonitorState(monitor.state());
// Periodically check the monitor
Source.tick(Duration.ofMillis(200), Duration.ofMillis(400), "")
.runForeach(__ -> printMonitorState(monitor.state()), actorSystem);
// #monitor
run.second().toCompletableFuture().whenComplete((x, t) -> actorSystem.terminate());
}
}