Skip to content

Commit

Permalink
fix(plc4j/spi) Make sure OPC UA discover event is fired prior connect…
Browse files Browse the repository at this point in the history
…ed event.

Signed-off-by: Łukasz Dywicki <luke@code-house.org>
  • Loading branch information
splatch committed Nov 22, 2023
1 parent 5c35097 commit d023f06
Show file tree
Hide file tree
Showing 14 changed files with 645 additions and 11 deletions.
Expand Up @@ -86,10 +86,19 @@ protected org.apache.plc4x.java.api.value.PlcValueHandler getValueHandler() {
return new PlcValueHandler();
}

protected boolean fireDiscoverEvent() {
return true;
}

protected boolean awaitDisconnectComplete() {
return true;
}

protected boolean awaitDiscoverComplete() {
return true;
}


@Override
protected ProtocolStackConfigurer<OpcuaAPU> getStackConfigurer() {
return SingleProtocolStackConfigurer.builder(OpcuaAPU.class, OpcuaAPU::staticParse)
Expand Down
Expand Up @@ -745,6 +745,7 @@ private void onDisconnectCloseSecureChannel(ConversationContext<OpcuaAPU> contex
public void onDiscover(ConversationContext<OpcuaAPU> context) {
if (!driverContext.getEncrypted()) {
LOGGER.debug("not encrypted, ignoring onDiscover");
context.fireDiscovered(configuration);
return;
}
// Only the TCP transport supports login.
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.listener.EventListener;
import org.apache.plc4x.java.spi.EventListenerMessageCodec;
import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
import org.apache.plc4x.java.spi.configuration.Configuration;
Expand Down Expand Up @@ -108,7 +109,7 @@ private ChannelHandler getMessageCodec(Configuration configuration) {
@Override
public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration configuration, ChannelPipeline pipeline,
PlcAuthentication authentication, boolean passive,
List<EventListener> ignore) {
List<EventListener> listeners) {
if (null == protocol) {
if (this.encryptionHandler != null) {
pipeline.addLast("ENCRYPT", this.encryptionHandler);
Expand All @@ -118,6 +119,7 @@ public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration conf
if (driverContextClass != null) {
protocol.setDriverContext(configure(configuration, createInstance(driverContextClass)));
}
pipeline.addLast(new EventListenerMessageCodec(listeners));
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(new NettyHashTimerTimeoutManager(), pipeline, passive, protocol,
authentication, basePacketClass);
pipeline.addLast("WRAPPER", context);
Expand Down
Expand Up @@ -27,6 +27,7 @@
import io.netty.handler.codec.MessageToMessageCodec;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.listener.EventListener;
import org.apache.plc4x.java.spi.EventListenerMessageCodec;
import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
import org.apache.plc4x.java.spi.configuration.Configuration;
Expand Down Expand Up @@ -92,7 +93,7 @@ private ChannelHandler getMessageCodec(Configuration configuration) {
@Override
public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration configuration, ChannelPipeline pipeline,
PlcAuthentication authentication, boolean passive,
List<EventListener> ignore) {
List<EventListener> listeners) {
if (this.encryptionHandler != null) {
pipeline.addLast(this.encryptionHandler);
}
Expand All @@ -102,6 +103,7 @@ public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration conf
if (driverContext != null) {
protocol.setDriverContext(driverContext);
}
pipeline.addLast(new EventListenerMessageCodec(listeners));
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(new NettyHashTimerTimeoutManager(), pipeline, passive, protocol, authentication, basePacketClass);
pipeline.addLast(context);
return protocol;
Expand Down
Expand Up @@ -222,7 +222,7 @@ public ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupComp
return new ChannelInitializer<>() {
@Override
protected void initChannel(Channel channel) {
// Build the protocol stack for communicating with the s7 protocol.
// Build the protocol stack for communicating with desired protocol.
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
Expand All @@ -239,7 +239,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
super.userEventTriggered(ctx, evt);
} else if (evt instanceof DiscoveredEvent) {
sessionDiscoverCompleteFuture.complete(((DiscoveredEvent) evt).getConfiguration());
} else if (evt instanceof ConnectEvent) {
} else if (evt instanceof ConnectEvent || evt instanceof DiscoverEvent) {
// Fix for https://github.com/apache/plc4x/issues/801
if (!sessionSetupCompleteFuture.isCompletedExceptionally()) {
if (awaitSessionSetupComplete) {
Expand All @@ -248,7 +248,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
configuration,
pipeline,
getAuthentication(),
channelFactory.isPassive()
channelFactory.isPassive(),
listeners
)
);
}
Expand All @@ -275,7 +276,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws P
// Fix for https://github.com/apache/plc4x/issues/801
if (!awaitSessionSetupComplete) {
setProtocol(stackConfigurer.configurePipeline(configuration, pipeline, getAuthentication(),
channelFactory.isPassive()));
channelFactory.isPassive(), listeners));
}
}
};
Expand Down
Expand Up @@ -30,10 +30,6 @@

public interface ProtocolStackConfigurer<T extends Message> {

default Plc4xProtocolBase<T> configurePipeline(Configuration configuration, ChannelPipeline pipeline, PlcAuthentication authentication, boolean passive) {
return configurePipeline(configuration, pipeline, authentication, passive, Collections.emptyList());
}

Plc4xProtocolBase<T> configurePipeline(Configuration configuration, ChannelPipeline pipeline, PlcAuthentication authentication, boolean passive, List<EventListener> listeners);

}
Expand Up @@ -27,6 +27,7 @@
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.listener.EventListener;
import org.apache.plc4x.java.spi.EventListenerMessageCodec;
import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
import org.apache.plc4x.java.spi.configuration.Configuration;
Expand Down Expand Up @@ -103,7 +104,7 @@ private ChannelHandler getMessageCodec(Configuration configuration) {
@Override
public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration configuration, ChannelPipeline pipeline,
PlcAuthentication authentication, boolean passive,
List<EventListener> ignore) {
List<EventListener> listeners) {
if (this.encryptionHandler != null) {
pipeline.addLast(this.encryptionHandler);
}
Expand All @@ -112,6 +113,7 @@ public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration conf
if (driverContextClass != null) {
protocol.setDriverContext(configure(configuration, createInstance(driverContextClass)));
}
pipeline.addLast(new EventListenerMessageCodec(listeners));
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(new NettyHashTimerTimeoutManager(), pipeline, passive, protocol,
authentication, basePacketClass);
pipeline.addLast(context);
Expand Down
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.spi.connection;

import static java.util.concurrent.ForkJoinPool.commonPool;
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.netty.channel.ChannelPipeline;
import java.util.List;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.listener.EventListener;
import org.apache.plc4x.java.spi.ConversationContext;
import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
import org.apache.plc4x.java.spi.configuration.Configuration;
import org.apache.plc4x.java.spi.generation.Message;
import org.apache.plc4x.java.spi.netty.NettyHashTimerTimeoutManager;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DefaultNettyPlcConnectionTest {

private final Logger logger = LoggerFactory.getLogger(DefaultNettyPlcConnectionTest.class);

@Test
void checkInitializationSequence() throws Exception {
ChannelFactory channelFactory = new TestChannelFactory();

final GateKeeper discovery = new GateKeeper("discovery");
final GateKeeper connect = new GateKeeper("connect");
final GateKeeper disconnect = new GateKeeper("disconnect");
final GateKeeper close = new GateKeeper("close");

ProtocolStackConfigurer<Message> stackConfigurer = new ProtocolStackConfigurer<>() {
@Override
public Plc4xProtocolBase<Message> configurePipeline(Configuration configuration, ChannelPipeline pipeline, PlcAuthentication authentication, boolean passive, List<EventListener> listeners) {
TestProtocolBase base = new TestProtocolBase(discovery, connect, disconnect, close);
Plc4xNettyWrapper<Message> context = new Plc4xNettyWrapper<>(new NettyHashTimerTimeoutManager(), pipeline, passive, base, authentication, Message.class);
pipeline.addLast(context);
return base;
}
};

DefaultNettyPlcConnection connection = new PlcConnectionFactory().withDiscovery().create(channelFactory, stackConfigurer);
commonPool().submit(new Runnable() {
@Override
public void run() {
try {
logger.info("Activating connection");
connection.connect();
} catch (PlcConnectionException e) {
throw new RuntimeException(e);
}
}
});

logger.info("Warming up");
expect(false, false, false, false, discovery, connect, disconnect, close);
discovery.permitIn();

discovery.awaitOut();
logger.info("Verify discovery phase completion");
expect(true, false, false, false, discovery, connect, disconnect, close);
connect.permitIn();

connect.awaitOut();
logger.info("Verify connection completion");
expect(true, true, false, false, discovery, connect, disconnect, close);

logger.info("Close connection");
commonPool().submit(new Runnable() {
@Override
public void run() {
try {
logger.info("Closing connection");
connection.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});

disconnect.permitIn();
expect(true, true, true, false, discovery, connect, disconnect, close);
disconnect.awaitOut();

logger.info("Verify connection termination");
close.permitIn();
expect(true, true, true, true, discovery, connect, disconnect, close);
close.awaitOut();

logger.info("Connection lifecycle sequence has been confirmed");
}

private static void expect(boolean discovered, boolean connected, boolean disconnected, boolean closed,
GateKeeper discovery, GateKeeper connect, GateKeeper disconnect, GateKeeper close) {

assertEquals(
discovered + "," + connected + "," + disconnected + "," + closed,
(discovery.entered()) + "," +
(connect.entered()) + "," +
(disconnect.entered() + "," +
(close.entered())),
"Expectation for state flags (discover, connect, disconnect, close) failed"
);
}

static class TestProtocolBase extends Plc4xProtocolBase<Message> {

private final Logger logger = LoggerFactory.getLogger(TestProtocolBase.class);;
private final GateKeeper discover;
private final GateKeeper connect;
private final GateKeeper close;
private final GateKeeper disconnect;

public TestProtocolBase(GateKeeper discover, GateKeeper connect, GateKeeper disconnect, GateKeeper close) {
this.discover = discover;
this.connect = connect;
this.close = close;
this.disconnect = disconnect;
}

@Override
public void onDiscover(ConversationContext<Message> context) {
logger.info("On Discover");
await(discover);
context.fireDiscovered(null);
discover.permitOut();
}


@Override
public void onConnect(ConversationContext<Message> context) {
logger.info("On Connect");
await(connect);
super.onConnect(context);
context.fireConnected();
connect.permitOut();
}

@Override
public void onDisconnect(ConversationContext<Message> context) {
logger.info("On Disconnect");
await(disconnect);
super.onDisconnect(context);
context.fireDisconnected();
disconnect.permitOut();
}

@Override
public void close(ConversationContext<Message> context) {
logger.info("On Close");
await(close);
close.permitOut();
}

private void await(GateKeeper signal) {
try {
if (!signal.awaitIn()) {
throw new RuntimeException("Await for " + signal.gate() + " lock failed");
}
} catch (InterruptedException e) {
logger.error("Failed to await for a signal " + signal.gate());
throw new RuntimeException(e);
}
}
}

}

0 comments on commit d023f06

Please sign in to comment.