Skip to content

Commit

Permalink
WIP transport refactor server
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinherron committed Oct 15, 2022
1 parent a33dd82 commit a723e23
Show file tree
Hide file tree
Showing 24 changed files with 194 additions and 109 deletions.
Expand Up @@ -46,7 +46,7 @@
import org.eclipse.milo.opcua.stack.core.types.structured.RegisteredServer;
import org.eclipse.milo.opcua.stack.core.types.structured.RequestHeader;
import org.eclipse.milo.opcua.stack.core.util.EndpointUtil;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplication;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplicationContext;
import org.eclipse.milo.opcua.stack.transport.client.OpcClientTransport;
import org.eclipse.milo.opcua.stack.transport.client.security.ClientCertificateValidator;
import org.eclipse.milo.opcua.stack.transport.client.tcp.OpcTcpClientTransport;
Expand All @@ -60,13 +60,13 @@

public class DiscoveryClient {

private final ClientApplication clientApplication;
private final ClientApplicationContext applicationContext;
private final OpcClientTransport transport;

public DiscoveryClient(EndpointDescription endpoint, OpcClientTransport transport) {
this.transport = transport;

clientApplication = new ClientApplication() {
applicationContext = new ClientApplicationContext() {
@Override
public EndpointDescription getEndpoint() {
return endpoint;
Expand Down Expand Up @@ -105,7 +105,7 @@ public UInteger getRequestTimeout() {
}

public CompletableFuture<DiscoveryClient> connect() {
return transport.connect(clientApplication).thenApply(c -> DiscoveryClient.this);
return transport.connect(applicationContext).thenApply(c -> DiscoveryClient.this);
}

public CompletableFuture<DiscoveryClient> disconnect() {
Expand Down
Expand Up @@ -137,7 +137,7 @@
import org.eclipse.milo.opcua.stack.core.util.ManifestUtil;
import org.eclipse.milo.opcua.stack.core.util.Namespaces;
import org.eclipse.milo.opcua.stack.core.util.Unit;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplication;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplicationContext;
import org.eclipse.milo.opcua.stack.transport.client.OpcClientTransport;
import org.eclipse.milo.opcua.stack.transport.client.tcp.OpcTcpClientTransport;
import org.eclipse.milo.opcua.stack.transport.client.tcp.OpcTcpClientTransportConfig;
Expand Down Expand Up @@ -297,7 +297,7 @@ public static OpcUaClient create(

private final SessionFsm sessionFsm;

private final ClientApplication clientApplication;
private final ClientApplicationContext applicationContext;

private final OpcUaClientConfig config;

Expand Down Expand Up @@ -362,7 +362,7 @@ public ServerTable getServerTable() {
}
};

clientApplication = new ClientApplication() {
applicationContext = new ClientApplicationContext() {
@Override
public EndpointDescription getEndpoint() {
return config.getEndpoint();
Expand Down Expand Up @@ -723,7 +723,7 @@ public RequestHeader newRequestHeader(NodeId authToken, UInteger requestTimeout)

@Override
public CompletableFuture<UaClient> connect() {
return transport.connect(clientApplication)
return transport.connect(applicationContext)
.thenCompose(c -> sessionFsm.openSession())
.thenApply(s -> OpcUaClient.this);
}
Expand Down
Expand Up @@ -86,7 +86,7 @@
import org.eclipse.milo.opcua.stack.core.util.ManifestUtil;
import org.eclipse.milo.opcua.stack.transport.server.OpcServerTransport;
import org.eclipse.milo.opcua.stack.transport.server.OpcServerTransportFactory;
import org.eclipse.milo.opcua.stack.transport.server.ServerApplication;
import org.eclipse.milo.opcua.stack.transport.server.ServerApplicationContext;
import org.eclipse.milo.opcua.stack.transport.server.ServiceRequestContext;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
Expand All @@ -96,7 +96,7 @@
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.ubyte;
import static org.eclipse.milo.opcua.stack.core.util.ConversionUtil.a;

public class OpcUaServer extends AbstractServiceHandler implements ServerApplication {
public class OpcUaServer extends AbstractServiceHandler implements ServerApplicationContext {

public static final String SDK_VERSION =
ManifestUtil.read("X-SDK-Version").orElse("dev");
Expand Down Expand Up @@ -419,11 +419,6 @@ public Long getNextSecureChannelTokenId() {
return secureChannelTokenIds.getAndIncrement();
}

@Override
public ExecutorService getExecutor() {
return config.getExecutor();
}

@Override
public CompletableFuture<UaResponseMessageType> handleServiceRequest(
ServiceRequestContext context,
Expand Down
Expand Up @@ -34,7 +34,7 @@
import org.eclipse.milo.opcua.stack.core.types.UaResponseMessageType;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
import org.eclipse.milo.opcua.stack.core.util.EndpointUtil;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplication;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplicationContext;
import org.eclipse.milo.opcua.stack.transport.client.OpcClientTransportConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,9 +50,9 @@ public class OpcClientHttpCodec extends MessageToMessageCodec<HttpResponse, UaRe
private final TransportProfile transportProfile;

private final OpcClientTransportConfig config;
private final ClientApplication application;
private final ClientApplicationContext application;

OpcClientHttpCodec(OpcClientTransportConfig config, ClientApplication application) {
OpcClientHttpCodec(OpcClientTransportConfig config, ClientApplicationContext application) {
this.config = config;
this.application = application;

Expand Down
Expand Up @@ -30,7 +30,7 @@
import org.eclipse.milo.opcua.stack.core.types.UaResponseMessageType;
import org.eclipse.milo.opcua.stack.core.util.EndpointUtil;
import org.eclipse.milo.opcua.stack.core.util.Unit;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplication;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplicationContext;
import org.eclipse.milo.opcua.stack.transport.client.OpcClientTransport;
import org.eclipse.milo.opcua.stack.transport.client.OpcClientTransportConfig;
import org.slf4j.Logger;
Expand All @@ -54,7 +54,7 @@ public OpcClientTransportConfig getConfig() {
}

@Override
public synchronized CompletableFuture<Unit> connect(ClientApplication application) {
public synchronized CompletableFuture<Unit> connect(ClientApplicationContext application) {
if (channelPool == null) {
channelPool = createChannelPool(config, application);
}
Expand Down Expand Up @@ -110,7 +110,7 @@ private synchronized void releaseChannel(Channel channel) {
}
}

private static ChannelPool createChannelPool(OpcClientTransportConfig config, ClientApplication application) {
private static ChannelPool createChannelPool(OpcClientTransportConfig config, ClientApplicationContext application) {
final String endpointUrl = application.getEndpoint().getEndpointUrl();

String host = EndpointUtil.getHost(endpointUrl);
Expand Down
Expand Up @@ -20,7 +20,7 @@
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplication;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplicationContext;
import org.eclipse.milo.opcua.stack.transport.client.uasc.ClientSecureChannel;
import org.eclipse.milo.opcua.stack.transport.client.uasc.UascClientAcknowledgeHandler;
import org.eclipse.milo.opcua.stack.transport.client.uasc.UascClientConfig;
Expand All @@ -34,13 +34,13 @@ public class OpcClientWebSocketBinaryFrameCodec extends MessageToMessageCodec<We
private final Logger logger = LoggerFactory.getLogger(getClass());

private final UascClientConfig config;
private final ClientApplication application;
private final ClientApplicationContext application;
private final Supplier<Long> requestIdSupplier;
private final CompletableFuture<ClientSecureChannel> handshake;

public OpcClientWebSocketBinaryFrameCodec(
UascClientConfig config,
ClientApplication application,
ClientApplicationContext application,
Supplier<Long> requestIdSupplier,
CompletableFuture<ClientSecureChannel> handshake
) {
Expand Down
Expand Up @@ -33,7 +33,6 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
Expand Down Expand Up @@ -62,7 +61,7 @@
import org.eclipse.milo.opcua.stack.core.util.EndpointUtil;
import org.eclipse.milo.opcua.stack.core.util.Unit;
import org.eclipse.milo.opcua.stack.transport.client.AbstractUascClientTransport;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplication;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplicationContext;
import org.eclipse.milo.opcua.stack.transport.client.uasc.ClientSecureChannel;
import org.eclipse.milo.opcua.stack.transport.client.uasc.UascClientConfig;
import org.slf4j.Logger;
Expand All @@ -72,8 +71,8 @@

public class OpcWebSocketClientTransport extends AbstractUascClientTransport {

private static final FsmContext.Key<ClientApplication> KEY_CLIENT_APPLICATION =
new FsmContext.Key<>("clientApplication", ClientApplication.class);
private static final FsmContext.Key<ClientApplicationContext> KEY_CLIENT_APPLICATION =
new FsmContext.Key<>("clientApplication", ClientApplicationContext.class);

private static final String CHANNEL_FSM_LOGGER_NAME = "org.eclipse.milo.opcua.stack.client.ChannelFsm";

Expand Down Expand Up @@ -109,7 +108,7 @@ public OpcWebSocketClientTransportConfig getConfig() {
}

@Override
public CompletableFuture<Unit> connect(ClientApplication application) {
public CompletableFuture<Unit> connect(ClientApplicationContext application) {
channelFsm.getFsm().withContext(
(Consumer<FsmContext<State, Event>>) ctx ->
ctx.set(KEY_CLIENT_APPLICATION, application)
Expand Down Expand Up @@ -140,14 +139,14 @@ private ClientChannelActions(UascClientConfig config) {

@Override
public CompletableFuture<Channel> connect(FsmContext<State, Event> ctx) {
ClientApplication application = (ClientApplication) ctx.get(KEY_CLIENT_APPLICATION);
ClientApplicationContext application = (ClientApplicationContext) ctx.get(KEY_CLIENT_APPLICATION);

var handshakeFuture = new CompletableFuture<ClientSecureChannel>();

var bootstrap = new Bootstrap();

bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
bootstrap.channel(NioSocketChannel.class)
.group(OpcWebSocketClientTransport.this.config.getEventLoop())
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5_000)
.option(ChannelOption.TCP_NODELAY, true)
Expand Down
Expand Up @@ -19,7 +19,7 @@
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;

public interface ClientApplication {
public interface ClientApplicationContext {

EndpointDescription getEndpoint();

Expand Down
Expand Up @@ -18,12 +18,37 @@

public interface OpcClientTransport {

/**
* Get the {@link OpcClientTransportConfig} associated with this transport.
*
* @return the {@link OpcClientTransportConfig} associated with this transport.
*/
OpcClientTransportConfig getConfig();

CompletableFuture<Unit> connect(ClientApplication application);

/**
* Connect this transport implementation.
*
* @param application the {@link ClientApplicationContext} associated with this transport.
* @return a {@link CompletableFuture} that completes successfully when this transport
* connects, or completes exceptionally if an error occurred.
*/
CompletableFuture<Unit> connect(ClientApplicationContext application);

/**
* Disconnect this transport implementation.
*
* @return a {@link CompletableFuture} that completes successfully when this transport
* disconnects, or completes exceptionally if an error occurred.
*/
CompletableFuture<Unit> disconnect();

/**
* Send a {@link UaRequestMessageType} on this transport implementation.
*
* @param requestMessage the {@link UaRequestMessageType} to send.
* @return a {@link CompletableFuture} that completes successfully with the
* {@link UaResponseMessageType} or completes exceptionally if an error occurred.
*/
CompletableFuture<UaResponseMessageType> sendRequestMessage(UaRequestMessageType requestMessage);

}
Expand Up @@ -18,12 +18,32 @@

public interface OpcClientTransportConfig {

/**
* Get the {@link ExecutorService} to be used by this transport.
*
* @return the {@link ExecutorService} to be used by this transport.
*/
ExecutorService getExecutor();

/**
* Get the {@link ScheduledExecutorService} to be used by this transport.
*
* @return the {@link ScheduledExecutorService} to be used by this transport.
*/
ScheduledExecutorService getScheduledExecutor();

/**
* Get the {@link EventLoopGroup} to be used by this transport.
*
* @return the {@link EventLoopGroup} to be used by this transport.
*/
EventLoopGroup getEventLoop();

/**
* Get the {@link HashedWheelTimer} to be used by this transport.
*
* @return the {@link HashedWheelTimer} to be used by this transport.
*/
HashedWheelTimer getWheelTimer();

}
Expand Up @@ -31,7 +31,6 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Timeout;
Expand All @@ -45,7 +44,7 @@
import org.eclipse.milo.opcua.stack.core.util.EndpointUtil;
import org.eclipse.milo.opcua.stack.core.util.Unit;
import org.eclipse.milo.opcua.stack.transport.client.AbstractUascClientTransport;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplication;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplicationContext;
import org.eclipse.milo.opcua.stack.transport.client.uasc.ClientSecureChannel;
import org.eclipse.milo.opcua.stack.transport.client.uasc.InboundUascResponseHandler.DelegatingUascResponseHandler;
import org.eclipse.milo.opcua.stack.transport.client.uasc.UascClientAcknowledgeHandler;
Expand All @@ -57,8 +56,8 @@

public class OpcTcpClientTransport extends AbstractUascClientTransport {

private static final FsmContext.Key<ClientApplication> KEY_CLIENT_APPLICATION =
new FsmContext.Key<>("clientApplication", ClientApplication.class);
private static final FsmContext.Key<ClientApplicationContext> KEY_CLIENT_APPLICATION =
new FsmContext.Key<>("clientApplication", ClientApplicationContext.class);

private static final String CHANNEL_FSM_LOGGER_NAME = "org.eclipse.milo.opcua.stack.client.ChannelFsm";

Expand Down Expand Up @@ -93,7 +92,7 @@ public OpcTcpClientTransportConfig getConfig() {
}

@Override
public CompletableFuture<Unit> connect(ClientApplication application) {
public CompletableFuture<Unit> connect(ClientApplicationContext application) {
channelFsm.getFsm().withContext(
ctx ->
ctx.set(KEY_CLIENT_APPLICATION, application)
Expand Down Expand Up @@ -128,14 +127,14 @@ private ClientChannelActions(UascClientConfig config) {

@Override
public CompletableFuture<Channel> connect(FsmContext<State, Event> ctx) {
ClientApplication application = (ClientApplication) ctx.get(KEY_CLIENT_APPLICATION);
ClientApplicationContext application = (ClientApplicationContext) ctx.get(KEY_CLIENT_APPLICATION);

var handshakeFuture = new CompletableFuture<ClientSecureChannel>();

var bootstrap = new Bootstrap();

bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
bootstrap.channel(NioSocketChannel.class)
.group(OpcTcpClientTransport.this.config.getEventLoop())
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
OpcTcpClientTransport.this.config.getConnectTimeout().intValue())
Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.eclipse.milo.opcua.stack.core.channel.messages.TcpMessageEncoder;
import org.eclipse.milo.opcua.stack.core.types.UaRequestMessageType;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplication;
import org.eclipse.milo.opcua.stack.transport.client.ClientApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,13 +53,13 @@ public class UascClientAcknowledgeHandler extends ByteToMessageCodec<UaRequestMe
private Timeout helloTimeout;

private final UascClientConfig config;
private final ClientApplication application;
private final ClientApplicationContext application;
private final Supplier<Long> requestIdSupplier;
private final CompletableFuture<ClientSecureChannel> handshakeFuture;

public UascClientAcknowledgeHandler(
UascClientConfig config,
ClientApplication application,
ClientApplicationContext application,
Supplier<Long> requestIdSupplier,
CompletableFuture<ClientSecureChannel> handshakeFuture
) {
Expand Down

0 comments on commit a723e23

Please sign in to comment.