Skip to content

Commit

Permalink
~ deliver responses on ExecutionQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinherron committed Oct 19, 2022
1 parent 1b2dcca commit 4459755
Showing 1 changed file with 10 additions and 5 deletions.
Expand Up @@ -23,6 +23,7 @@
import org.eclipse.milo.opcua.stack.core.types.UaRequestMessageType;
import org.eclipse.milo.opcua.stack.core.types.UaResponseMessageType;
import org.eclipse.milo.opcua.stack.core.types.structured.RequestHeader;
import org.eclipse.milo.opcua.stack.core.util.ExecutionQueue;
import org.eclipse.milo.opcua.stack.transport.client.uasc.UascRequest;
import org.eclipse.milo.opcua.stack.transport.client.uasc.UascResponseHandler;
import org.slf4j.Logger;
Expand All @@ -32,15 +33,19 @@ public abstract class AbstractUascClientTransport implements OpcClientTransport,

protected final Logger logger = LoggerFactory.getLogger(getClass());

protected final AtomicLong requestId = new AtomicLong(0L);
protected final AtomicLong requestId = new AtomicLong(1L);

protected final Map<Long, CompletableFuture<UaResponseMessageType>> pendingRequests = new ConcurrentHashMap<>();
protected final Map<Long, Timeout> pendingTimeouts = new ConcurrentHashMap<>();

protected final ExecutionQueue executionQueue;

protected final OpcClientTransportConfig config;

public AbstractUascClientTransport(OpcClientTransportConfig config) {
this.config = config;

executionQueue = new ExecutionQueue(config.getExecutor(), 1);
}

protected abstract CompletableFuture<Channel> getChannel();
Expand Down Expand Up @@ -132,7 +137,7 @@ public void handleResponse(long requestId, UaResponseMessageType responseMessage
if (responseFuture != null) {
cancelRequestTimeout(requestId);

config.getExecutor().execute(() -> responseFuture.complete(responseMessage));
executionQueue.submit(() -> responseFuture.complete(responseMessage));
} else {
logger.warn("Received response for unknown request, requestId={}", requestId);
}
Expand All @@ -145,7 +150,7 @@ public void handleSendFailure(long requestId, UaException exception) {
if (responseFuture != null) {
cancelRequestTimeout(requestId);

config.getExecutor().execute(() -> responseFuture.completeExceptionally(exception));
executionQueue.submit(() -> responseFuture.completeExceptionally(exception));
} else {
logger.warn("Send failed for unknown request, requestId={}", requestId);
}
Expand All @@ -158,7 +163,7 @@ public void handleReceiveFailure(long requestId, UaException exception) {
if (responseFuture != null) {
cancelRequestTimeout(requestId);

config.getExecutor().execute(() -> responseFuture.completeExceptionally(exception));
executionQueue.submit(() -> responseFuture.completeExceptionally(exception));
} else {
logger.warn("Receive failed for unknown request, requestId={}", requestId);
}
Expand All @@ -177,7 +182,7 @@ public void handleChannelInactive() {
private void failAndClearPending(UaException exception) {
pendingRequests.forEach((requestId, f) -> {
cancelRequestTimeout(requestId);
config.getExecutor().execute(() -> f.completeExceptionally(exception));
executionQueue.submit(() -> f.completeExceptionally(exception));
});
pendingRequests.clear();
}
Expand Down

0 comments on commit 4459755

Please sign in to comment.