Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow customization of Netty Bootstrap and ChannelPipeline #1241

Merged
merged 2 commits into from Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -253,7 +253,7 @@ public static OpcUaClient create(

try {
List<EndpointDescription> endpoints =
DiscoveryClient.getEndpoints(endpointUrl).get();
DiscoveryClient.getEndpoints(endpointUrl, configureTransport).get();

EndpointDescription endpoint = selectEndpoint.apply(endpoints).orElseThrow(() ->
new UaException(
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 the Eclipse Milo Authors
* Copyright (c) 2024 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -12,7 +12,10 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;

Expand Down Expand Up @@ -46,4 +49,22 @@ public interface OpcClientTransportConfig {
*/
HashedWheelTimer getWheelTimer();

/**
* Get a {@link Consumer} that will be given a chance to customize the {@link Bootstrap} used
* by this transport.
*
* @return a {@link Consumer} that will be given a chance to customize the {@link Bootstrap}
* used by this transport.
*/
Consumer<Bootstrap> getBootstrapCustomizer();

/**
* Get a {@link Consumer} that will be given a chance to customize the {@link ChannelPipeline}
* used by this transport.
*
* @return a {@link Consumer} that will be given a chance to customize the
* {@link ChannelPipeline} used by this transport.
*/
Consumer<ChannelPipeline> getChannelPipelineCustomizer();

}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 the Eclipse Milo Authors
* Copyright (c) 2024 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand Down Expand Up @@ -48,7 +48,6 @@
import org.eclipse.milo.opcua.stack.transport.client.uasc.ClientSecureChannel;
import org.eclipse.milo.opcua.stack.transport.client.uasc.InboundUascResponseHandler.DelegatingUascResponseHandler;
import org.eclipse.milo.opcua.stack.transport.client.uasc.UascClientAcknowledgeHandler;
import org.eclipse.milo.opcua.stack.transport.client.uasc.UascClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -75,7 +74,7 @@ public OpcTcpClientTransport(OpcTcpClientTransportConfig config) {
.setMaxIdleSeconds(0) // keep alive handled by SessionFsm
.setMaxReconnectDelaySeconds(16)
.setPersistent(true)
.setChannelActions(new ClientChannelActions(config))
.setChannelActions(new ClientChannelActions())
.setExecutor(config.getExecutor())
.setScheduler(config.getScheduledExecutor())
.setLoggerName(CHANNEL_FSM_LOGGER_NAME)
Expand Down Expand Up @@ -119,12 +118,6 @@ private class ClientChannelActions implements ChannelActions {

private final Logger logger = LoggerFactory.getLogger(CHANNEL_FSM_LOGGER_NAME);

private final UascClientConfig config;

private ClientChannelActions(UascClientConfig config) {
this.config = config;
}

@Override
public CompletableFuture<Channel> connect(FsmContext<State, Event> ctx) {
ClientApplicationContext application = (ClientApplicationContext) ctx.get(KEY_CLIENT_APPLICATION);
Expand All @@ -151,9 +144,13 @@ protected void initChannel(SocketChannel ch) {

ch.pipeline().addLast(new DelegatingUascResponseHandler(OpcTcpClientTransport.this));
ch.pipeline().addLast(acknowledgeHandler);

config.getChannelPipelineCustomizer().accept(ch.pipeline());
}
});

config.getBootstrapCustomizer().accept(bootstrap);

String endpointUrl = application.getEndpoint().getEndpointUrl();

String host = EndpointUtil.getHost(endpointUrl);
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 the Eclipse Milo Authors
* Copyright (c) 2024 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -12,7 +12,10 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import org.eclipse.milo.opcua.stack.core.Stack;
Expand All @@ -30,6 +33,8 @@ public class OpcTcpClientTransportConfigBuilder {
private ScheduledExecutorService scheduledExecutor;
private NioEventLoopGroup eventLoop;
private HashedWheelTimer wheelTimer;
private Consumer<Bootstrap> bootstrapCustomizer = b -> {};
private Consumer<ChannelPipeline> channelPipelineCustomizer = p -> {};

public OpcTcpClientTransportConfigBuilder setConnectTimeout(UInteger connectTimeout) {
this.connectTimeout = connectTimeout;
Expand Down Expand Up @@ -66,6 +71,36 @@ public OpcTcpClientTransportConfigBuilder setWheelTimer(HashedWheelTimer wheelTi
return this;
}

/**
* Set a {@link Consumer} that will be given a chance to customize the {@link Bootstrap} used
* by this transport.
*
* @param bootstrapCustomizer a {@link Consumer} that will be given a chance to customize the
* {@link Bootstrap} used by this transport.
* @return this {@link OpcTcpClientTransportConfigBuilder}.
*/
public OpcTcpClientTransportConfigBuilder setBootstrapCustomizer(
Consumer<Bootstrap> bootstrapCustomizer) {

this.bootstrapCustomizer = bootstrapCustomizer;
return this;
}

/**
* Set a {@link Consumer} that will be given a chance to customize the {@link ChannelPipeline}
* used by this transport.
*
* @param channelPipelineCustomizer a {@link Consumer} that will be given a chance to customize
* the {@link ChannelPipeline} used by this transport.
* @return this {@link OpcTcpClientTransportConfigBuilder}.
*/
public OpcTcpClientTransportConfigBuilder setChannelPipelineCustomizer(
Consumer<ChannelPipeline> channelPipelineCustomizer) {

this.channelPipelineCustomizer = channelPipelineCustomizer;
return this;
}

public OpcTcpClientTransportConfig build() {
if (executor == null) {
executor = Stack.sharedExecutor();
Expand All @@ -87,7 +122,9 @@ public OpcTcpClientTransportConfig build() {
executor,
scheduledExecutor,
eventLoop,
wheelTimer
wheelTimer,
bootstrapCustomizer,
channelPipelineCustomizer
);
}

Expand All @@ -100,6 +137,8 @@ static class OpcTcpClientTransportConfigImpl implements OpcTcpClientTransportCon
private final ScheduledExecutorService scheduledExecutor;
private final NioEventLoopGroup eventLoop;
private final HashedWheelTimer wheelTimer;
private final Consumer<Bootstrap> bootstrapCustomizer;
private final Consumer<ChannelPipeline> channelPipelineCustomizer;

public OpcTcpClientTransportConfigImpl(
UInteger connectTimeout,
Expand All @@ -108,7 +147,9 @@ public OpcTcpClientTransportConfigImpl(
ExecutorService executor,
ScheduledExecutorService scheduledExecutor,
NioEventLoopGroup eventLoop,
HashedWheelTimer wheelTimer
HashedWheelTimer wheelTimer,
Consumer<Bootstrap> bootstrapCustomizer,
Consumer<ChannelPipeline> channelPipelineCustomizer
) {

this.connectTimeout = connectTimeout;
Expand All @@ -118,9 +159,10 @@ public OpcTcpClientTransportConfigImpl(
this.scheduledExecutor = scheduledExecutor;
this.eventLoop = eventLoop;
this.wheelTimer = wheelTimer;
this.bootstrapCustomizer = bootstrapCustomizer;
this.channelPipelineCustomizer = channelPipelineCustomizer;
}


@Override
public UInteger getConnectTimeout() {
return connectTimeout;
Expand Down Expand Up @@ -156,6 +198,16 @@ public HashedWheelTimer getWheelTimer() {
return wheelTimer;
}

@Override
public Consumer<Bootstrap> getBootstrapCustomizer() {
return bootstrapCustomizer;
}

@Override
public Consumer<ChannelPipeline> getChannelPipelineCustomizer() {
return channelPipelineCustomizer;
}

}

}