Skip to content

Commit

Permalink
WIP transport refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinherron committed Sep 19, 2022
1 parent 52fdc1f commit 00de8c6
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 10 deletions.
@@ -0,0 +1,13 @@
/*
* Copyright (c) 2022 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.milo.opcua.stack.transport;

public interface ClientConfig extends OpcTransportConfig {}
@@ -0,0 +1,13 @@
/*
* Copyright (c) 2022 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.milo.opcua.stack.transport;

interface ClientConfigBuilder {}
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2022 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.milo.opcua.stack.transport;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;

import org.eclipse.milo.opcua.stack.core.Stack;
import org.eclipse.milo.opcua.stack.core.StatusCodes;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
import org.eclipse.milo.opcua.stack.core.util.EndpointUtil;

public class ClientFactory {


public static AbstractClient create(OpcTransportConfig config) {
return null;
}

public static AbstractClient create(String endpointUrl) {
return null;
}

public static AbstractClient create(
String endpointUrl,
Function<List<EndpointDescription>, Optional<EndpointDescription>> selectEndpoint,
Function<ClientConfigBuilder, ClientConfig> buildConfig
) throws UaException {

String scheme = EndpointUtil.getScheme(endpointUrl);

String profileUri;

switch (Objects.requireNonNullElse(scheme, "").toLowerCase()) {
case "opc.tcp":
profileUri = Stack.TCP_UASC_UABINARY_TRANSPORT_URI;
break;

case "http":
case "https":
case "opc.http":
case "opc.https":
profileUri = Stack.HTTPS_UABINARY_TRANSPORT_URI;
break;

case "opc.ws":
case "opc.wss":
profileUri = Stack.WSS_UASC_UABINARY_TRANSPORT_URI;
break;

default:
throw new UaException(
StatusCodes.Bad_InternalError,
"unsupported protocol: " + scheme
);
}

var configBuilder = new ClientConfigBuilder() {};
ClientConfig config = buildConfig.apply(configBuilder);

OpcTransport transport = TransportFactory.getInstance().create(profileUri, config);

return new AbstractClient(transport) {};
}


}
Expand Up @@ -60,7 +60,7 @@

import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;

public class OpcTcpTransport {
public class OpcTcpTransport implements OpcTransport {

private static final String CHANNEL_FSM_LOGGER_NAME = "org.eclipse.milo.opcua.stack.client.ChannelFsm";

Expand Down Expand Up @@ -95,22 +95,25 @@ public OpcTcpTransport(UascClientConfig config) {
channelFsm = factory.newChannelFsm();
}

@Override
public CompletableFuture<Unit> connect() {
return channelFsm.connect().thenApply(c -> Unit.VALUE);
}

@Override
public CompletableFuture<Unit> disconnect() {
return channelFsm.disconnect().thenApply(c -> Unit.VALUE);
}

public CompletableFuture<Channel> getChannel() {
return channelFsm.getChannel();
}

@Override
public CompletableFuture<UaResponseMessageType> sendRequestMessage(UaRequestMessageType requestMessage) {
return getChannel().thenCompose(ch -> sendRequestMessage(requestMessage, ch));
}

private CompletableFuture<Channel> getChannel() {
return channelFsm.getChannel();
}

private CompletableFuture<UaResponseMessageType> sendRequestMessage(
UaRequestMessageType requestMessage,
Channel channel
Expand Down
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2022 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.milo.opcua.stack.transport;

import org.eclipse.milo.opcua.stack.transport.uasc.UascClientConfig;

interface OpcTransportConfig extends UascClientConfig {

}
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2022 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.milo.opcua.stack.transport;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import org.eclipse.milo.opcua.stack.core.Stack;
import org.eclipse.milo.opcua.stack.core.StatusCodes;
import org.eclipse.milo.opcua.stack.core.UaException;

public class TransportFactory {

private static final TransportFactory INSTANCE;

static {
INSTANCE = new TransportFactory();
INSTANCE.register(Stack.TCP_UASC_UABINARY_TRANSPORT_URI, new OpcTcpTransportFactory());
}

public static TransportFactory getInstance() {
return INSTANCE;
}

private final Map<String, Function<OpcTransportConfig, OpcTransport>> functions = new ConcurrentHashMap<>();

public OpcTransport create(String profileUri, OpcTransportConfig config) throws UaException {
Function<OpcTransportConfig, OpcTransport> f = functions.get(profileUri);

if (f != null) {
return f.apply(config);
} else {
throw new UaException(StatusCodes.Bad_NotSupported, "transport: " + profileUri);
}
}

public void register(String profileUri, Function<OpcTransportConfig, OpcTransport> createTransport) {
functions.put(profileUri, createTransport);
}

private static class OpcTcpTransportFactory implements Function<OpcTransportConfig, OpcTransport> {
@Override
public OpcTransport apply(OpcTransportConfig clientConfig) {
return new OpcTcpTransport(clientConfig);
}
}

}
Expand Up @@ -16,7 +16,6 @@

import io.netty.util.HashedWheelTimer;
import org.eclipse.milo.opcua.stack.core.channel.EncodingLimits;
import org.eclipse.milo.opcua.stack.core.encoding.EncodingContext;
import org.eclipse.milo.opcua.stack.core.security.CertificateValidator;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
Expand All @@ -33,8 +32,6 @@ public interface UascClientConfig {

CertificateValidator getCertificateValidator();

EncodingContext getEncodingContext();

EncodingLimits getEncodingLimits();

UInteger getAcknowledgeTimeout();
Expand Down
Expand Up @@ -28,6 +28,8 @@
import io.netty.handler.codec.ByteToMessageCodec;
import io.netty.util.Timeout;
import org.eclipse.milo.opcua.stack.client.transport.uasc.ClientSecureChannel;
import org.eclipse.milo.opcua.stack.core.NamespaceTable;
import org.eclipse.milo.opcua.stack.core.ServerTable;
import org.eclipse.milo.opcua.stack.core.StatusCodes;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.UaSerializationException;
Expand All @@ -38,17 +40,23 @@
import org.eclipse.milo.opcua.stack.core.channel.ChunkDecoder.DecodedMessage;
import org.eclipse.milo.opcua.stack.core.channel.ChunkEncoder;
import org.eclipse.milo.opcua.stack.core.channel.ChunkEncoder.EncodedMessage;
import org.eclipse.milo.opcua.stack.core.channel.EncodingLimits;
import org.eclipse.milo.opcua.stack.core.channel.MessageAbortException;
import org.eclipse.milo.opcua.stack.core.channel.MessageDecodeException;
import org.eclipse.milo.opcua.stack.core.channel.MessageEncodeException;
import org.eclipse.milo.opcua.stack.core.channel.headers.AsymmetricSecurityHeader;
import org.eclipse.milo.opcua.stack.core.channel.messages.ErrorMessage;
import org.eclipse.milo.opcua.stack.core.channel.messages.MessageType;
import org.eclipse.milo.opcua.stack.core.channel.messages.TcpMessageDecoder;
import org.eclipse.milo.opcua.stack.core.encoding.EncodingContext;
import org.eclipse.milo.opcua.stack.core.encoding.EncodingManager;
import org.eclipse.milo.opcua.stack.core.encoding.OpcUaEncodingManager;
import org.eclipse.milo.opcua.stack.core.encoding.binary.OpcUaBinaryDecoder;
import org.eclipse.milo.opcua.stack.core.encoding.binary.OpcUaBinaryEncoder;
import org.eclipse.milo.opcua.stack.core.security.CertificateValidator;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.DataTypeManager;
import org.eclipse.milo.opcua.stack.core.types.OpcUaDataTypeManager;
import org.eclipse.milo.opcua.stack.core.types.UaMessageType;
import org.eclipse.milo.opcua.stack.core.types.UaRequestMessageType;
import org.eclipse.milo.opcua.stack.core.types.UaResponseMessageType;
Expand Down Expand Up @@ -108,8 +116,8 @@ public UascClientMessageHandler(
this.handshakeFuture = handshakeFuture;
this.channelParameters = channelParameters;

binaryDecoder = new OpcUaBinaryDecoder(config.getEncodingContext());
binaryEncoder = new OpcUaBinaryEncoder(config.getEncodingContext());
binaryDecoder = new OpcUaBinaryDecoder(newEncodingContext(config.getEncodingLimits()));
binaryEncoder = new OpcUaBinaryEncoder(newEncodingContext(config.getEncodingLimits()));

chunkDecoder = new ChunkDecoder(channelParameters, config.getEncodingLimits());
chunkEncoder = new ChunkEncoder(channelParameters);
Expand Down Expand Up @@ -683,4 +691,46 @@ private static int getMessageLength(ByteBuf buffer, int maxMessageLength) throws
}
}

private static EncodingContext newEncodingContext(EncodingLimits encodingLimits) {
return new DefaultEncodingContext(encodingLimits);
}

private static class DefaultEncodingContext implements EncodingContext {

private final NamespaceTable namespaceTable = new NamespaceTable();
private final ServerTable serverTable = new ServerTable();

private final EncodingLimits encodingLimits;

private DefaultEncodingContext(EncodingLimits encodingLimits) {
this.encodingLimits = encodingLimits;
}

@Override
public DataTypeManager getDataTypeManager() {
return OpcUaDataTypeManager.getInstance();
}

@Override
public EncodingManager getEncodingManager() {
return OpcUaEncodingManager.getInstance();
}

@Override
public EncodingLimits getEncodingLimits() {
return encodingLimits;
}

@Override
public NamespaceTable getNamespaceTable() {
return namespaceTable;
}

@Override
public ServerTable getServerTable() {
return serverTable;
}

}

}

0 comments on commit 00de8c6

Please sign in to comment.