Skip to content

Commit

Permalink
fix(opcua): Await writeAndFlush(msg) & send next msg async (#1147)
Browse files Browse the repository at this point in the history
* plc4j-driver-opcua: Await writeAndFlush(msg) & send next msg async

  All OPC-UA messages contain a sequence number, which is independent
from the normal TCP sequence numbers. This sequence number is checked
by the server, that may refuse to respond, if the sequence number is
lower than the last received message's. Packetization itself happens
asynchronously in netty, some time after `writeAndFlush(msg)` has been
invoked by the application code. If there are concurrent calls to this
function, there is no guarantee that messages are packeted in the same
order as they have been added to `writeAndFlush(msg)`. This, in some
cases, can cause OPC-UA messages being delivered to the server in
different order, than it is specified by their sequence numbers, and
the server may drop these messages, and the client eventually times out.

  As TCP guarantees delivering packets in the correct order, a trival
solution is to simply wait for packetization to complete, before adding
the next message to the pipeline.

  But yet comes a design problem: OPC-UA response handlers were written
in a way, that they may send new messages, while processing the response.
These handlers are executed on the netty event loop thread, which is
shared among all Netty I/O operations, such as receiving/sending messages.
This essentially means, that if a response handler is currently being
executed, you cannot start packetizing a new message, because that would
require the response handler to finish, which is waiting for guess what,
the packetization to complete.

  Solution to this is to execute response handlers asynchronously, so
they don't occupy the Netty event handler thread.

  This commit also fixes the `OpcuaPlcDriverTest` flakyness, experienced
after #1139 had been merged.

See also: https://ci-builds.apache.org/job/PLC4X/job/PLC4X/job/develop/1695/

* Put back a line, disappered after `git merge`

---------

Co-authored-by: Ben Hutcheson <ben.hutche@gmail.com>
  • Loading branch information
takraj and hutcheb committed Oct 18, 2023
1 parent 6b09853 commit b0bc847
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 25 deletions.
Expand Up @@ -20,6 +20,7 @@

import static java.lang.Thread.currentThread;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.ForkJoinPool.commonPool;

import java.time.Instant;
import org.apache.commons.lang3.RandomStringUtils;
Expand Down Expand Up @@ -179,7 +180,7 @@ public SecureChannel(OpcuaDriverContext driverContext, OpcuaConfiguration config
}
}

public void submit(ConversationContext<OpcuaAPU> context, Consumer<TimeoutException> onTimeout, BiConsumer<OpcuaAPU, Throwable> error, Consumer<byte[]> consumer, WriteBufferByteBased buffer) {
public synchronized void submit(ConversationContext<OpcuaAPU> context, Consumer<TimeoutException> onTimeout, BiConsumer<OpcuaAPU, Throwable> error, Consumer<byte[]> consumer, WriteBufferByteBased buffer) {
int transactionId = channelTransactionManager.getTransactionIdentifier();

//TODO: We need to split large messages up into chunks if it is larger than the sendBufferSize
Expand Down Expand Up @@ -236,7 +237,7 @@ public void submit(ConversationContext<OpcuaAPU> context, Consumer<TimeoutExcept
tokenId.set(opcuaResponse.getSecureTokenId());
channelId.set(opcuaResponse.getSecureChannelId());

consumer.accept(messageBuffer.toByteArray());
commonPool().submit(() -> consumer.accept(messageBuffer.toByteArray()));
}
});
} catch (Exception e) {
Expand Down Expand Up @@ -267,7 +268,7 @@ public void onConnect(ConversationContext<OpcuaAPU> context) {
.expectResponse(OpcuaAPU.class, REQUEST_TIMEOUT)
.check(p -> p.getMessage() instanceof OpcuaAcknowledgeResponse)
.unwrap(p -> (OpcuaAcknowledgeResponse) p.getMessage())
.handle(opcuaAcknowledgeResponse -> onConnectOpenSecureChannel(context, opcuaAcknowledgeResponse));
.handle(opcuaAcknowledgeResponse -> commonPool().submit(() -> onConnectOpenSecureChannel(context, opcuaAcknowledgeResponse)));
channelTransactionManager.submit(requestConsumer, channelTransactionManager.getTransactionIdentifier());
}

Expand Down Expand Up @@ -361,16 +362,18 @@ public void onConnectOpenSecureChannel(ConversationContext<OpcuaAPU> context, Op
LOGGER.error("Failed to connect to opc ua server for the following reason:- {}, {}", ((ResponseHeader) fault.getResponseHeader()).getServiceResult().getStatusCode(), OpcuaStatusCode.enumForValue(((ResponseHeader) fault.getResponseHeader()).getServiceResult().getStatusCode()));
} else {
LOGGER.debug("Got Secure Response Connection Response");
try {
OpenSecureChannelResponse openSecureChannelResponse = (OpenSecureChannelResponse) message.getBody();
ChannelSecurityToken securityToken = (ChannelSecurityToken) openSecureChannelResponse.getSecurityToken();
tokenId.set((int) securityToken.getTokenId());
channelId.set((int) securityToken.getChannelId());
lifetime = securityToken.getRevisedLifetime();
onConnectCreateSessionRequest(context);
} catch (PlcConnectionException e) {
LOGGER.error("Error occurred while connecting to OPC UA server", e);
}
OpenSecureChannelResponse openSecureChannelResponse = (OpenSecureChannelResponse) message.getBody();
ChannelSecurityToken securityToken = (ChannelSecurityToken) openSecureChannelResponse.getSecurityToken();
tokenId.set((int) securityToken.getTokenId());
channelId.set((int) securityToken.getChannelId());
lifetime = securityToken.getRevisedLifetime();
commonPool().submit(() -> {
try {
onConnectCreateSessionRequest(context);
} catch (PlcConnectionException e) {
LOGGER.error("Error occurred while connecting to OPC UA server", e);
}
});
}
} catch (ParseException e) {
LOGGER.error("Error parsing", e);
Expand Down Expand Up @@ -760,7 +763,7 @@ public void onDiscover(ConversationContext<OpcuaAPU> context) {
.unwrap(p -> (OpcuaAcknowledgeResponse) p.getMessage())
.handle(opcuaAcknowledgeResponse -> {
LOGGER.debug("Got Hello Response Connection Response");
onDiscoverOpenSecureChannel(context, opcuaAcknowledgeResponse);
commonPool().submit(() -> onDiscoverOpenSecureChannel(context, opcuaAcknowledgeResponse));
});

channelTransactionManager.submit(requestConsumer, channelTransactionManager.getTransactionIdentifier());
Expand Down Expand Up @@ -831,11 +834,14 @@ public void onDiscoverOpenSecureChannel(ConversationContext<OpcuaAPU> context, O
LOGGER.error("Failed to connect to opc ua server for the following reason:- {}, {}", ((ResponseHeader) fault.getResponseHeader()).getServiceResult().getStatusCode(), OpcuaStatusCode.enumForValue(((ResponseHeader) fault.getResponseHeader()).getServiceResult().getStatusCode()));
} else {
LOGGER.debug("Got Secure Response Connection Response");
try {
onDiscoverGetEndpointsRequest(context, opcuaOpenResponse, (OpenSecureChannelResponse) message.getBody());
} catch (PlcConnectionException e) {
LOGGER.error("Error occurred while connecting to OPC UA server");
}
commonPool().submit(() -> {
try {
onDiscoverGetEndpointsRequest(context, opcuaOpenResponse,
(OpenSecureChannelResponse) message.getBody());
} catch (PlcConnectionException e) {
LOGGER.error("Error occurred while connecting to OPC UA server");
}
});
}
} catch (ParseException e) {
LOGGER.debug("error caught", e);
Expand Down Expand Up @@ -940,7 +946,7 @@ public void onDiscoverGetEndpointsRequest(ConversationContext<OpcuaAPU> context,
} catch (NoSuchAlgorithmException e) {
LOGGER.error("Failed to find hashing algorithm");
}
onDiscoverCloseSecureChannel(context, response);
commonPool().submit(() -> onDiscoverCloseSecureChannel(context, response));
} catch (ParseException e) {
LOGGER.error("Error parsing", e);
}
Expand Down
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.plc4x.java.opcua.protocol;

import static java.util.concurrent.ForkJoinPool.commonPool;

import java.nio.ByteBuffer;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
Expand Down Expand Up @@ -98,7 +100,7 @@ public void onDisconnect(ConversationContext<OpcuaAPU> context) {
for (Map.Entry<Long, OpcuaSubscriptionHandle> subscriber : subscriptions.entrySet()) {
subscriber.getValue().stopSubscriber();
}
channel.onDisconnect(context);
commonPool().submit(() -> channel.onDisconnect(context));
}

@Override
Expand All @@ -118,7 +120,7 @@ public void onConnect(ConversationContext<OpcuaAPU> context) {
return;
}
}
this.channel.onConnect(context);
commonPool().submit(() -> this.channel.onConnect(context));
}

@Override
Expand All @@ -133,7 +135,7 @@ public void onDiscover(ConversationContext<OpcuaAPU> context) {
return;
}
}
channel.onDiscover(context);
commonPool().submit(() -> channel.onDiscover(context));
}

private SecureChannel createSecureChannel(PlcAuthentication authentication) {
Expand Down
Expand Up @@ -93,7 +93,7 @@ public boolean isPassive() {

@Override
public void sendToWire(T msg) {
pipeline.writeAndFlush(msg);
pipeline.writeAndFlush(msg).syncUninterruptibly();
}

@Override
Expand Down
Expand Up @@ -67,7 +67,7 @@ public boolean isPassive() {
@Override
public void sendToWire(T1 msg) {
logger.trace("Sending to wire {}", msg);
channelHandlerContext.channel().writeAndFlush(msg);
channelHandlerContext.channel().writeAndFlush(msg).syncUninterruptibly();
}

@Override
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.plc4x.java.spi;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import org.apache.plc4x.java.spi.events.ConnectEvent;
Expand All @@ -35,7 +36,9 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -65,6 +68,7 @@ void setUp() throws Exception {
doNothing().when(protocol).onConnect(captor.capture());

when(channelHandlerContext.channel()).thenReturn(channel);
when(channel.writeAndFlush(any())).thenReturn(mock(ChannelFuture.class));

wrapper.userEventTriggered(channelHandlerContext, new ConnectEvent());
conversationContext = captor.getValue();
Expand Down

0 comments on commit b0bc847

Please sign in to comment.