diff --git a/opc-ua-stack/transport/src/main/java/org/eclipse/milo/opcua/stack/transport/client/AbstractUascClientTransport.java b/opc-ua-stack/transport/src/main/java/org/eclipse/milo/opcua/stack/transport/client/AbstractUascClientTransport.java index 9fae75e04..62b8bd1ac 100644 --- a/opc-ua-stack/transport/src/main/java/org/eclipse/milo/opcua/stack/transport/client/AbstractUascClientTransport.java +++ b/opc-ua-stack/transport/src/main/java/org/eclipse/milo/opcua/stack/transport/client/AbstractUascClientTransport.java @@ -23,6 +23,7 @@ import org.eclipse.milo.opcua.stack.core.types.UaRequestMessageType; import org.eclipse.milo.opcua.stack.core.types.UaResponseMessageType; import org.eclipse.milo.opcua.stack.core.types.structured.RequestHeader; +import org.eclipse.milo.opcua.stack.core.util.ExecutionQueue; import org.eclipse.milo.opcua.stack.transport.client.uasc.UascRequest; import org.eclipse.milo.opcua.stack.transport.client.uasc.UascResponseHandler; import org.slf4j.Logger; @@ -32,15 +33,19 @@ public abstract class AbstractUascClientTransport implements OpcClientTransport, protected final Logger logger = LoggerFactory.getLogger(getClass()); - protected final AtomicLong requestId = new AtomicLong(0L); + protected final AtomicLong requestId = new AtomicLong(1L); protected final Map> pendingRequests = new ConcurrentHashMap<>(); protected final Map pendingTimeouts = new ConcurrentHashMap<>(); + protected final ExecutionQueue executionQueue; + protected final OpcClientTransportConfig config; public AbstractUascClientTransport(OpcClientTransportConfig config) { this.config = config; + + executionQueue = new ExecutionQueue(config.getExecutor(), 1); } protected abstract CompletableFuture getChannel(); @@ -132,7 +137,7 @@ public void handleResponse(long requestId, UaResponseMessageType responseMessage if (responseFuture != null) { cancelRequestTimeout(requestId); - config.getExecutor().execute(() -> responseFuture.complete(responseMessage)); + executionQueue.submit(() -> responseFuture.complete(responseMessage)); } else { logger.warn("Received response for unknown request, requestId={}", requestId); } @@ -145,7 +150,7 @@ public void handleSendFailure(long requestId, UaException exception) { if (responseFuture != null) { cancelRequestTimeout(requestId); - config.getExecutor().execute(() -> responseFuture.completeExceptionally(exception)); + executionQueue.submit(() -> responseFuture.completeExceptionally(exception)); } else { logger.warn("Send failed for unknown request, requestId={}", requestId); } @@ -158,7 +163,7 @@ public void handleReceiveFailure(long requestId, UaException exception) { if (responseFuture != null) { cancelRequestTimeout(requestId); - config.getExecutor().execute(() -> responseFuture.completeExceptionally(exception)); + executionQueue.submit(() -> responseFuture.completeExceptionally(exception)); } else { logger.warn("Receive failed for unknown request, requestId={}", requestId); } @@ -177,7 +182,7 @@ public void handleChannelInactive() { private void failAndClearPending(UaException exception) { pendingRequests.forEach((requestId, f) -> { cancelRequestTimeout(requestId); - config.getExecutor().execute(() -> f.completeExceptionally(exception)); + executionQueue.submit(() -> f.completeExceptionally(exception)); }); pendingRequests.clear(); }