-
Notifications
You must be signed in to change notification settings - Fork 70
/
PingPongSocketConnection.java
61 lines (52 loc) · 2.1 KB
/
PingPongSocketConnection.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import io.activej.csp.ChannelSupplier;
import io.activej.csp.binary.BinaryChannelSupplier;
import io.activej.csp.binary.ByteBufsDecoder;
import io.activej.eventloop.Eventloop;
import io.activej.net.SimpleServer;
import io.activej.net.socket.tcp.AsyncTcpSocketNio;
import java.io.IOException;
import java.net.InetSocketAddress;
import static io.activej.bytebuf.ByteBufStrings.wrapAscii;
import static io.activej.promise.Promises.loop;
import static io.activej.promise.Promises.repeat;
import static java.nio.charset.StandardCharsets.UTF_8;
@SuppressWarnings("Convert2MethodRef")
public final class PingPongSocketConnection {
private static final InetSocketAddress ADDRESS = new InetSocketAddress("localhost", 9022);
private static final int ITERATIONS = 3;
private static final String REQUEST_MSG = "PING";
private static final String RESPONSE_MSG = "PONG";
private static final ByteBufsDecoder<String> DECODER = ByteBufsDecoder.ofFixedSize(4)
.andThen(buf -> buf.asString(UTF_8));
//[START REGION_1]
public static void main(String[] args) throws IOException {
Eventloop eventloop = Eventloop.create().withCurrentThread();
SimpleServer server = SimpleServer.create(
socket -> {
BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));
repeat(() ->
bufsSupplier.decode(DECODER)
.whenResult(x -> System.out.println(x))
.then(() -> socket.write(wrapAscii(RESPONSE_MSG)))
.map($ -> true))
.whenComplete(socket::close);
})
.withListenAddress(ADDRESS)
.withAcceptOnce();
server.listen();
AsyncTcpSocketNio.connect(ADDRESS)
.whenResult(socket -> {
BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));
loop(0,
i -> i < ITERATIONS,
i -> socket.write(wrapAscii(REQUEST_MSG))
.then(() -> bufsSupplier.decode(DECODER)
.whenResult(x -> System.out.println(x))
.map($2 -> i + 1)))
.whenComplete(socket::close);
})
.whenException(e -> { throw new RuntimeException(e); });
eventloop.run();
}
//[END REGION_1]
}