Skip to content

Commit

Permalink
WIP transport refactor server
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinherron committed Sep 30, 2022
1 parent b9b7968 commit 7cf180f
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 10 deletions.
Expand Up @@ -59,6 +59,7 @@
import org.eclipse.milo.opcua.stack.core.ServerTable;
import org.eclipse.milo.opcua.stack.core.Stack;
import org.eclipse.milo.opcua.stack.core.StatusCodes;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.channel.EncodingLimits;
import org.eclipse.milo.opcua.stack.core.channel.messages.ErrorMessage;
import org.eclipse.milo.opcua.stack.core.encoding.DefaultEncodingManager;
Expand Down Expand Up @@ -464,15 +465,15 @@ public CompletableFuture<UaResponseMessageType> handleServiceRequest(

Service service = Service.from(requestMessage.getTypeId());

ServiceHandler serviceHandler = service != null ?
getServiceHandler(path, service) : null;
ServiceHandler serviceHandler = service != null ? getServiceHandler(path, service) : null;

if (serviceHandler != null) {
return serviceHandler.handle(context, requestMessage);
} else {
// TODO ServiceFault Bad_NotImplemented
logger.warn("No ServiceHandler registered for path={} service={}", path, service);

return CompletableFuture.failedFuture(new UaException(StatusCodes.Bad_NotImplemented));
}
return null;
}


Expand Down
Expand Up @@ -13,6 +13,7 @@
import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.jetbrains.annotations.Nullable;
import org.slf4j.LoggerFactory;

public enum Service {

Expand Down Expand Up @@ -125,6 +126,8 @@ public enum Service {
case 845: return SUBSCRIPTION_DELETE_SUBSCRIPTIONS;
//@formatter:on
default:
LoggerFactory.getLogger(Service.class)
.warn("Unknown service id: " + id);
return null;
}
}
Expand Down
Expand Up @@ -15,9 +15,9 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.eclipse.milo.opcua.stack.core.Stack;
import org.eclipse.milo.opcua.stack.core.StatusCodes;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
Expand All @@ -38,6 +38,11 @@ public class PublishQueue {

private final LinkedHashMap<UInteger, WaitingSubscription> waitList = new LinkedHashMap<>();

private final ExecutorService executor;

public PublishQueue(ExecutorService executor) {
this.executor = executor;
}

public synchronized void addRequest(PendingPublish pending) {
List<WaitingSubscription> waitingSubscriptions = List.copyOf(waitList.values());
Expand Down Expand Up @@ -96,8 +101,7 @@ public synchronized void addRequest(PendingPublish pending) {

final WaitingSubscription ws = subscription;

// TODO use configured executor
Stack.sharedExecutor().execute(() -> ws.subscription.onPublish(pending));
executor.execute(() -> ws.subscription.onPublish(pending));
} else {
pendingQueue.add(pending);
}
Expand All @@ -122,8 +126,7 @@ public synchronized void addSubscription(Subscription subscription) {
PendingPublish pending = poll();

if (pending != null) {
// TODO use configured executor
Stack.sharedExecutor().execute(() -> subscription.onPublish(pending));
executor.execute(() -> subscription.onPublish(pending));
} else {
waitList.putIfAbsent(subscription.getId(), new WaitingSubscription(subscription));
}
Expand Down
Expand Up @@ -117,19 +117,22 @@ private static UInteger nextSubscriptionId() {

private final Logger logger = LoggerFactory.getLogger(getClass());

private final PublishQueue publishQueue = new PublishQueue();

private final Map<UInteger, Subscription> subscriptions = new ConcurrentHashMap<>();
private final List<Subscription> transferred = new CopyOnWriteArrayList<>();

private final AtomicLong monitoredItemCount = new AtomicLong(0L);

private final PublishQueue publishQueue;

private final Session session;
private final OpcUaServer server;

public SubscriptionManager(Session session, OpcUaServer server) {
this.session = session;
this.server = server;

publishQueue = new PublishQueue(server.getConfig().getExecutor());
}

public Session getSession() {
Expand Down

0 comments on commit 7cf180f

Please sign in to comment.