/
FlowStreamRefsDocTest.java
161 lines (126 loc) · 4.27 KB
/
FlowStreamRefsDocTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/*
* Copyright (C) 2015-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream;
import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.stream.*;
import akka.stream.javadsl.*;
import akka.testkit.javadsl.TestKit;
import jdocs.AbstractJavaTest;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
public class FlowStreamRefsDocTest extends AbstractJavaTest {
static ActorSystem system = null;
static Materializer mat = null;
@Test
public void compileOnlySpec() {
// do nothing
}
// #offer-source
static class RequestLogs {
public final long streamId;
public RequestLogs(long streamId) {
this.streamId = streamId;
}
}
static class LogsOffer {
final SourceRef<String> sourceRef;
public LogsOffer(SourceRef<String> sourceRef) {
this.sourceRef = sourceRef;
}
}
static class DataSource extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(RequestLogs.class, this::handleRequestLogs).build();
}
private void handleRequestLogs(RequestLogs requestLogs) {
Source<String, NotUsed> logs = streamLogs(requestLogs.streamId);
SourceRef<String> logsRef = logs.runWith(StreamRefs.sourceRef(), mat);
getSender().tell(new LogsOffer(logsRef), getSelf());
}
private Source<String, NotUsed> streamLogs(long streamId) {
return Source.repeat("[INFO] some interesting logs here (for id: " + streamId + ")");
}
}
// #offer-source
public void offerSource() {
new TestKit(system) {
{
// #offer-source-use
ActorRef sourceActor = system.actorOf(Props.create(DataSource.class), "dataSource");
sourceActor.tell(new RequestLogs(1337), getTestActor());
LogsOffer offer = expectMsgClass(LogsOffer.class);
offer.sourceRef.getSource().runWith(Sink.foreach(log -> System.out.println(log)), mat);
// #offer-source-use
}
};
}
// #offer-sink
static class PrepareUpload {
final String id;
public PrepareUpload(String id) {
this.id = id;
}
}
static class MeasurementsSinkReady {
final String id;
final SinkRef<String> sinkRef;
public MeasurementsSinkReady(String id, SinkRef<String> ref) {
this.id = id;
this.sinkRef = ref;
}
}
static class DataReceiver extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
PrepareUpload.class,
prepare -> {
Sink<String, NotUsed> sink = logsSinkFor(prepare.id);
SinkRef<String> sinkRef = StreamRefs.<String>sinkRef().to(sink).run(mat);
getSender().tell(new MeasurementsSinkReady(prepare.id, sinkRef), getSelf());
})
.build();
}
private Sink<String, NotUsed> logsSinkFor(String id) {
return Sink.<String>ignore().mapMaterializedValue(done -> NotUsed.getInstance());
}
}
// #offer-sink
public void offerSink() {
new TestKit(system) {
{
// #offer-sink-use
ActorRef receiver = system.actorOf(Props.create(DataReceiver.class), "dataReceiver");
receiver.tell(new PrepareUpload("system-42-tmp"), getTestActor());
MeasurementsSinkReady ready = expectMsgClass(MeasurementsSinkReady.class);
Source.repeat("hello").runWith(ready.sinkRef.getSink(), mat);
// #offer-sink-use
}
};
}
public void configureTimeouts() {
new TestKit(system) {
{
// #attr-sub-timeout
FiniteDuration timeout = FiniteDuration.create(5, TimeUnit.SECONDS);
Attributes timeoutAttributes = StreamRefAttributes.subscriptionTimeout(timeout);
// configuring Sink.sourceRef (notice that we apply the attributes to the Sink!):
Source.repeat("hello")
.runWith(StreamRefs.<String>sourceRef().addAttributes(timeoutAttributes), mat);
// configuring SinkRef.source:
StreamRefs.<String>sinkRef()
.addAttributes(timeoutAttributes)
.runWith(Sink.<String>ignore(), mat); // not very interesting sink, just an example
// #attr-sub-timeout
}
};
}
}