From 8a2f138b7954eb05c10b1d7eb96ce209219cefc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Dywicki?= Date: Sun, 14 Apr 2024 18:45:26 +0200 Subject: [PATCH] fix(plc4j/opcua): Make sure UA subscription acknowledges are retained over publish cycles. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #1364. Signed-off-by: Ɓukasz Dywicki --- .../plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java index 1d514703fe5..dc75674484b 100644 --- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java +++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaSubscriptionHandle.java @@ -21,6 +21,7 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -65,6 +66,8 @@ public class OpcuaSubscriptionHandle extends DefaultPlcSubscriptionHandle { private final AtomicLong clientHandles = new AtomicLong(1L); private final RequestTransactionManager tm; + + private final List outstandingAcknowledgements = new CopyOnWriteArrayList(); private ScheduledFuture publishTask; public OpcuaSubscriptionHandle(OpcuaProtocolLogic plcSubscriber, RequestTransactionManager tm, @@ -166,7 +169,6 @@ public CompletableFuture onSubscribeCreateMonitoredItem * @return */ private void sendPublishRequest() { - List outstandingAcknowledgements = new LinkedList<>(); List outstandingRequests = new LinkedList<>(); //If we are waiting on a response and haven't received one, just wait until we do. A keep alive will be sent out eventually @@ -184,6 +186,7 @@ private void sendPublishRequest() { // we work in external thread - we need to coordinate access to conversation pipeline RequestTransaction transaction = tm.startRequest(); transaction.submit(() -> { + LOGGER.trace("Sent publish request with {} acks", ackLength); // Create Consumer for the response message, error and timeout to be sent to the Secure Channel conversation.submit(publishRequest, PublishResponse.class).thenAccept(responseMessage -> { outstandingRequests.remove(((ResponseHeader) responseMessage.getResponseHeader()).getRequestHandle());