-
Notifications
You must be signed in to change notification settings - Fork 70
/
CspExample.java
71 lines (62 loc) · 1.91 KB
/
CspExample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelOutput;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.dsl.WithChannelTransformer;
import io.activej.csp.process.AbstractCommunicatingProcess;
import io.activej.eventloop.Eventloop;
/**
* AsyncProcess that takes a string, sets it to upper-case and adds string's length in parentheses
*/
//[START EXAMPLE]
public final class CspExample extends AbstractCommunicatingProcess implements WithChannelTransformer<CspExample, String, String> {
private ChannelSupplier<String> input;
private ChannelConsumer<String> output;
@Override
public ChannelOutput<String> getOutput() {
return output -> {
this.output = output;
if (this.input != null && this.output != null) startProcess();
};
}
@Override
public ChannelInput<String> getInput() {
return input -> {
this.input = input;
if (this.input != null && this.output != null) startProcess();
return getProcessCompletion();
};
}
@Override
//[START REGION_1]
protected void doProcess() {
input.get()
.whenResult(data -> {
if (data == null) {
output.acceptEndOfStream()
.whenResult(this::completeProcess);
} else {
data = data.toUpperCase() + '(' + data.length() + ')';
output.accept(data)
.whenResult(this::doProcess);
}
})
.whenException(Throwable::printStackTrace);
}
//[END REGION_1]
@Override
protected void doClose(Exception e) {
System.out.println("Process has been closed with exception: " + e);
input.closeEx(e);
output.closeEx(e);
}
public static void main(String[] args) {
Eventloop eventloop = Eventloop.create().withCurrentThread();
CspExample process = new CspExample();
ChannelSupplier.of("hello", "world", "nice", "to", "see", "you")
.transformWith(process)
.streamTo(ChannelConsumer.ofConsumer(System.out::println));
eventloop.run();
}
}
//[END EXAMPLE]