Skip to content

Commit

Permalink
refactor(plc4x-server): Cleanup, add tests, and add option to specify…
Browse files Browse the repository at this point in the history
… port number (#1162)
  • Loading branch information
takraj committed Oct 18, 2023
1 parent 0a62cb2 commit 8091bfd
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 35 deletions.
7 changes: 7 additions & 0 deletions plc4j/tools/plc4x-server/pom.xml
Expand Up @@ -92,6 +92,13 @@
<version>0.12.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-driver-plc4x</artifactId>
<version>0.12.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
Expand Down
Expand Up @@ -16,76 +16,144 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.tools.plc4xserver;

import static java.lang.Runtime.getRuntime;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.plc4x.java.api.exceptions.PlcException;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.ToIntFunction;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.plc4x.readwrite.Plc4xConstants;
import org.apache.plc4x.java.plc4x.readwrite.Plc4xMessage;
import org.apache.plc4x.java.spi.connection.GeneratedProtocolMessageCodec;
import org.apache.plc4x.java.spi.generation.ByteOrder;
import org.apache.plc4x.java.tools.plc4xserver.protocol.Plc4xServerAdapter;

import java.util.function.ToIntFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Plc4xServer {

public static final String SERVER_PORT_PROPERTY = "plc4x.server.port";
public static final String SERVER_PORT_ENVIRONMENT_VARIABLE = "PLC4X_SERVER_PORT";
public static int DEFAULT_PORT = Plc4xConstants.PLC4XTCPDEFAULTPORT;

private static final Logger LOG = LoggerFactory.getLogger(Plc4xServerAdapter.class);

private EventLoopGroup loopGroup;
private EventLoopGroup workerGroup;
private ChannelFuture channelFuture;
private Integer port;

public static void main(String[] args) throws Exception {
final Plc4xServer server = new Plc4xServer();

public Plc4xServer() {
Future<Void> serverFuture = server.start(
Arrays.stream(args).findFirst() // port number given as first command line argument
.or(() -> Optional.ofNullable(System.getProperty(SERVER_PORT_PROPERTY)))
.or(() -> Optional.ofNullable(System.getenv(SERVER_PORT_ENVIRONMENT_VARIABLE)))
.map(Integer::parseInt)
.orElse(DEFAULT_PORT)
);
CompletableFuture<Void> serverRunning = new CompletableFuture<>();
getRuntime().addShutdownHook(new Thread(() -> serverRunning.complete(null)));

try {
LOG.info("Server is configured to listen on TCP port {}", server.getPort());
serverFuture.get();
LOG.info("Server is ready.");
serverRunning.get();
} catch (InterruptedException e) {
throw new PlcRuntimeException(e);
} finally {
LOG.info("Server is shutting down...");
server.stop();
}
}

public void start() throws PlcException {
if(loopGroup != null) {
return;
public Integer getPort() {
return port;
}

public Future<Void> start() {
return start(0);
}

public Future<Void> start(int port) {
if (port == 0) {
this.port = findRandomFreePort();
} else {
this.port = port;
}

try {
loopGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
if (loopGroup != null) {
return CompletableFuture.completedFuture(null);
}

loopGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(loopGroup, workerGroup)
channelFuture = new ServerBootstrap()
.group(loopGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new GeneratedProtocolMessageCodec<>(
Plc4xMessage.class, Plc4xMessage::staticParse, ByteOrder.BIG_ENDIAN, null,
new ByteLengthEstimator(), null));
pipeline.addLast(new Plc4xServerAdapter());
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);

bootstrap.bind(Plc4xConstants.PLC4XTCPDEFAULTPORT).sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PlcException(e);
}
.childHandler(new SocketChannelChannelInitializer())
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.bind(this.port);

return channelFuture;
}

public void stop() {
if(workerGroup == null) {
if (workerGroup == null) {
return;
}

channelFuture.cancel(true);

workerGroup.shutdownGracefully();
loopGroup.shutdownGracefully();

workerGroup = null;
loopGroup = null;
}

/** Estimate the Length of a Packet */
public static class ByteLengthEstimator implements ToIntFunction<ByteBuf> {
private static class SocketChannelChannelInitializer extends ChannelInitializer<SocketChannel> {

@Override
public void initChannel(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(
new GeneratedProtocolMessageCodec<>(
Plc4xMessage.class,
Plc4xMessage::staticParse,
ByteOrder.BIG_ENDIAN,
null,
new ByteLengthEstimator(),
null
)
);
pipeline.addLast(new Plc4xServerAdapter());
}
}

private static class ByteLengthEstimator implements ToIntFunction<ByteBuf> {

@Override
public int applyAsInt(ByteBuf byteBuf) {
if (byteBuf.readableBytes() >= 3) {
Expand All @@ -95,8 +163,13 @@ public int applyAsInt(ByteBuf byteBuf) {
}
}

public static void main(String[] args) throws Exception {
new Plc4xServer().start();
private static int findRandomFreePort() {
final int port;
try (ServerSocket socket = new ServerSocket(0)) {
port = socket.getLocalPort();
} catch (IOException e) {
throw new RuntimeException("Couldn't find any free port.", e);
}
return port;
}

}
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.tools.plc4xserver;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.plc4x.java.DefaultPlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.PlcConnectionManager;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class Plc4xServerTest {

private static final Plc4xServer SERVER = new Plc4xServer();
private static final String CONNECTION_STRING_TEMPLATE = "plc4x://localhost:%d?remote-connection-string=%s";
private static final String CONNECTION_STRING_SIMULATED_ENCODED = "simulated%3A%2F%2Flocalhost";
private static final long TIMEOUT_VALUE = 10;
private static final TimeUnit TIMEOUT_UNIT = TimeUnit.SECONDS;

private final PlcConnectionManager connectionManager = new DefaultPlcDriverManager();

@BeforeAll
public static void setUp() throws ExecutionException, InterruptedException, TimeoutException {
SERVER.start().get(TIMEOUT_VALUE, TIMEOUT_UNIT);
}

@AfterAll
public static void tearDown() {
SERVER.stop();
}

@Test
public void testWrite() throws Exception {
final PlcWriteResponse response;

try (PlcConnection connection = connectionManager.getConnection(
String.format(CONNECTION_STRING_TEMPLATE, SERVER.getPort(), CONNECTION_STRING_SIMULATED_ENCODED))) {
final PlcWriteRequest request = connection.writeRequestBuilder()
.addTagAddress(
"foo",
"STATE/foo:DINT",
42
)
.build();
response = request.execute().get(TIMEOUT_VALUE, TIMEOUT_UNIT);
}

assertEquals(PlcResponseCode.OK, response.getResponseCode("foo"));
}

@Test
public void testRead() throws Exception {
final PlcReadResponse response;

try (PlcConnection connection = connectionManager.getConnection(
String.format(CONNECTION_STRING_TEMPLATE, SERVER.getPort(), CONNECTION_STRING_SIMULATED_ENCODED))) {
final PlcReadRequest request = connection.readRequestBuilder()
.addTagAddress(
"foo",
"RANDOM/foo:DINT"
)
.build();
response = request.execute().get(TIMEOUT_VALUE, TIMEOUT_UNIT);
}

assertEquals(PlcResponseCode.OK, response.getResponseCode("foo"));
assertNotNull(response.getPlcValue("foo"));
assertInstanceOf(Integer.class, response.getPlcValue("foo").getObject());
}

@Test
public void testReadWriteSameConnection() throws Exception {
final PlcWriteResponse writeResponse;
final PlcReadResponse readResponse;

try (PlcConnection connection = connectionManager.getConnection(
String.format(CONNECTION_STRING_TEMPLATE, SERVER.getPort(), CONNECTION_STRING_SIMULATED_ENCODED))) {
final PlcWriteRequest writeRequest = connection.writeRequestBuilder()
.addTagAddress(
"foo",
"STATE/foo:DINT",
21
)
.build();
writeResponse = writeRequest.execute().get(TIMEOUT_VALUE, TIMEOUT_UNIT);

final PlcReadRequest readRequest = connection.readRequestBuilder()
.addTagAddress(
"foo",
"STATE/foo:DINT"
)
.build();
readResponse = readRequest.execute().get(TIMEOUT_VALUE, TIMEOUT_UNIT);
}

assertEquals(PlcResponseCode.OK, writeResponse.getResponseCode("foo"));
assertEquals(PlcResponseCode.OK, readResponse.getResponseCode("foo"));

assertInstanceOf(Integer.class, readResponse.getPlcValue("foo").getObject());
assertEquals(21, readResponse.getInteger("foo"));
}
}

0 comments on commit 8091bfd

Please sign in to comment.