Skip to content

Commit

Permalink
WIP transport refactor server
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinherron committed Sep 25, 2022
1 parent f8fb61b commit 74743aa
Show file tree
Hide file tree
Showing 6 changed files with 776 additions and 14 deletions.
@@ -1,35 +1,51 @@
package org.eclipse.milo.opcua.sdk.server;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

import com.google.common.collect.ForwardingTable;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.google.common.collect.Tables;
import org.eclipse.milo.opcua.sdk.server.services2.AttributeServiceSet2;
import org.eclipse.milo.opcua.sdk.server.services2.MethodServiceSet2;
import org.eclipse.milo.opcua.sdk.server.services2.MonitoredItemServiceSet2;
import org.eclipse.milo.opcua.sdk.server.services2.NodeManagementServiceSet2;
import org.eclipse.milo.opcua.sdk.server.services2.Service;
import org.eclipse.milo.opcua.sdk.server.services2.SessionServiceSet2;
import org.eclipse.milo.opcua.sdk.server.services2.SubscriptionServiceSet2;
import org.eclipse.milo.opcua.sdk.server.services2.ViewServiceSet2;
import org.eclipse.milo.opcua.stack.core.types.UaRequestMessageType;
import org.eclipse.milo.opcua.stack.core.types.UaResponseMessageType;
import org.eclipse.milo.opcua.stack.core.types.structured.ActivateSessionRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.AddReferencesRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.BrowseNextRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.BrowseRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.CallRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.CancelRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.CloseSessionRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.CreateMonitoredItemsRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.CreateSessionRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.CreateSubscriptionRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.DeleteMonitoredItemsRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.DeleteNodesRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.DeleteReferencesRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.DeleteSubscriptionsRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.HistoryReadRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.HistoryUpdateRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.ModifyMonitoredItemsRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.ModifySubscriptionRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.PublishRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.ReadRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.RegisterNodesRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.RepublishRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.SetMonitoringModeRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.SetPublishingModeRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.SetTriggeringRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.TransferSubscriptionsRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.TranslateBrowsePathsToNodeIdsRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.UnregisterNodesRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.WriteRequest;
import org.eclipse.milo.opcua.stack.transport.server.ServiceRequestContext;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public abstract class AbstractServiceHandler {
Expand Down Expand Up @@ -109,6 +125,33 @@ public void addServiceSet(String path, MonitoredItemServiceSet2 serviceSet) {
);
}

public void addServiceSet(String path, NodeManagementServiceSet2 serviceSet) {
serviceHandlerTable.put(
path,
Service.NODE_MANAGEMENT_ADD_NODES,
(context, request) ->
serviceSet.onAddNodes(context, (AddNodesRequest) request).thenApply(Function.identity())
);
serviceHandlerTable.put(
path,
Service.NODE_MANAGEMENT_DELETE_NODES,
(context, request) ->
serviceSet.onDeleteNodes(context, (DeleteNodesRequest) request).thenApply(Function.identity())
);
serviceHandlerTable.put(
path,
Service.NODE_MANAGEMENT_ADD_REFERENCES,
(context, request) ->
serviceSet.onAddReferences(context, (AddReferencesRequest) request).thenApply(Function.identity())
);
serviceHandlerTable.put(
path,
Service.NODE_MANAGEMENT_DELETE_REFERENCES,
(context, request) ->
serviceSet.onDeleteReferences(context, (DeleteReferencesRequest) request).thenApply(Function.identity())
);
}

public void addServiceSet(String path, SessionServiceSet2 serviceSet) {
serviceHandlerTable.put(
path,
Expand Down Expand Up @@ -136,6 +179,92 @@ public void addServiceSet(String path, SessionServiceSet2 serviceSet) {
);
}

public void addServiceSet(String path, SubscriptionServiceSet2 serviceSet) {
serviceHandlerTable.put(
path,
Service.SUBSCRIPTION_CREATE_SUBSCRIPTION,
(context, request) ->
serviceSet.onCreateSubscription(context, (CreateSubscriptionRequest) request)
.thenApply(Function.identity())
);
serviceHandlerTable.put(
path,
Service.SUBSCRIPTION_MODIFY_SUBSCRIPTION,
(context, request) ->
serviceSet.onModifySubscription(context, (ModifySubscriptionRequest) request)
.thenApply(Function.identity())
);
serviceHandlerTable.put(
path,
Service.SUBSCRIPTION_DELETE_SUBSCRIPTIONS,
(context, request) ->
serviceSet.onDeleteSubscriptions(context, (DeleteSubscriptionsRequest) request)
.thenApply(Function.identity())
);
serviceHandlerTable.put(
path,
Service.SUBSCRIPTION_TRANSFER_SUBSCRIPTIONS,
(context, request) ->
serviceSet.onTransferSubscriptions(context, (TransferSubscriptionsRequest) request)
.thenApply(Function.identity())
);
serviceHandlerTable.put(
path,
Service.SUBSCRIPTION_SET_PUBLISHING_MODE,
(context, request) ->
serviceSet.onSetPublishingMode(context, (SetPublishingModeRequest) request)
.thenApply(Function.identity())
);
serviceHandlerTable.put(
path,
Service.SUBSCRIPTION_PUBLISH,
(context, request) ->
serviceSet.onPublish(context, (PublishRequest) request)
.thenApply(Function.identity())
);
serviceHandlerTable.put(
path,
Service.SUBSCRIPTION_REPUBLISH,
(context, request) ->
serviceSet.onRepublish(context, (RepublishRequest) request)
.thenApply(Function.identity())
);
}

public void addServiceSet(String path, ViewServiceSet2 serviceSet) {
serviceHandlerTable.put(
path,
Service.VIEW_BROWSE,
(context, request) ->
serviceSet.onBrowse(context, (BrowseRequest) request).thenApply(Function.identity())
);
serviceHandlerTable.put(
path,
Service.VIEW_BROWSE_NEXT,
(context, request) ->
serviceSet.onBrowseNext(context, (BrowseNextRequest) request).thenApply(Function.identity())
);
serviceHandlerTable.put(
path,
Service.VIEW_BROWSE_TRANSLATE_BROWSE_PATHS,
(context, request) ->
serviceSet.onTranslateBrowsePaths(context, (TranslateBrowsePathsToNodeIdsRequest) request)
.thenApply(Function.identity())
);
serviceHandlerTable.put(
path,
Service.VIEW_BROWSE_REGISTER_NODES,
(context, request) ->
serviceSet.onRegisterNodes(context, (RegisterNodesRequest) request).thenApply(Function.identity())
);
serviceHandlerTable.put(
path,
Service.VIEW_BROWSE_UNREGISTER_NODES,
(context, request) ->
serviceSet.onUnregisterNodes(context, (UnregisterNodesRequest) request).thenApply(Function.identity())
);
}

protected void addServiceHandler(String path, Service service, ServiceHandler serviceHandler) {
serviceHandlerTable.put(path, service, serviceHandler);
}
Expand All @@ -151,16 +280,26 @@ CompletableFuture<UaResponseMessageType> handle(
);
}

private static class ServiceHandlerTable extends
ForwardingTable<String, Service, ServiceHandler> {
private static class ServiceHandlerTable {

private final ConcurrentMap<String, ConcurrentMap<Service, ServiceHandler>> table = new ConcurrentHashMap<>();

public @Nullable ServiceHandler get(String path, Service service) {
ConcurrentMap<Service, ServiceHandler> handlers = table.get(path);
if (handlers != null) {
return handlers.get(service);
} else {
return null;
}
}

private final Table<String, Service, ServiceHandler> delegate =
Tables.synchronizedTable(HashBasedTable.create());
public void put(String path, Service service, ServiceHandler handler) {
ConcurrentMap<Service, ServiceHandler> handlers = table.computeIfAbsent(
path,
k -> new ConcurrentHashMap<>()
);

@Override
protected @NotNull Table<String, Service, ServiceHandler> delegate() {
return delegate;
handlers.put(service, handler);
}

}
Expand Down
Expand Up @@ -42,6 +42,9 @@
import org.eclipse.milo.opcua.sdk.server.services2.impl.DefaultAttributeServiceSet2;
import org.eclipse.milo.opcua.sdk.server.services2.impl.DefaultMethodServiceSet2;
import org.eclipse.milo.opcua.sdk.server.services2.impl.DefaultMonitoredItemServiceSet2;
import org.eclipse.milo.opcua.sdk.server.services2.impl.DefaultNodeManagementServiceSet2;
import org.eclipse.milo.opcua.sdk.server.services2.impl.DefaultSubscriptionServiceSet2;
import org.eclipse.milo.opcua.sdk.server.services2.impl.DefaultViewServiceSet2;
import org.eclipse.milo.opcua.sdk.server.subscriptions.Subscription;
import org.eclipse.milo.opcua.stack.core.BuiltinReferenceType;
import org.eclipse.milo.opcua.stack.core.NamespaceTable;
Expand Down Expand Up @@ -142,13 +145,14 @@ public OpcUaServer(OpcUaServerConfig config) {
stackServer.addServiceSet(path, (ViewServiceSet) sessionManager);
});

// TODO should service sets that require a session all be implemented by SessionManager?
// Session-less eligible: View (minus register/unregister), Attribute, Method, Node Management, Query
paths.filter(path -> !path.endsWith("/discovery")).forEach(path -> {
addServiceSet(path, new DefaultAttributeServiceSet2(OpcUaServer.this));
addServiceSet(path, new DefaultMethodServiceSet2(OpcUaServer.this));
addServiceSet(path, new DefaultMonitoredItemServiceSet2(OpcUaServer.this));
addServiceSet(path, new DefaultNodeManagementServiceSet2(OpcUaServer.this));
addServiceSet(path, sessionManager);
addServiceSet(path, new DefaultSubscriptionServiceSet2(OpcUaServer.this));
addServiceSet(path, new DefaultViewServiceSet2(OpcUaServer.this));
});

ObjectTypeInitializer.initialize(stackServer.getNamespaceTable(), objectTypeManager);
Expand Down
@@ -0,0 +1,31 @@
package org.eclipse.milo.opcua.sdk.server.services2;

import java.util.concurrent.CompletableFuture;

import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.AddReferencesRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.AddReferencesResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.DeleteNodesRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.DeleteNodesResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.DeleteReferencesRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.DeleteReferencesResponse;
import org.eclipse.milo.opcua.stack.transport.server.ServiceRequestContext;

public interface NodeManagementServiceSet2 {

CompletableFuture<AddNodesResponse> onAddNodes(ServiceRequestContext context, AddNodesRequest request);

CompletableFuture<DeleteNodesResponse> onDeleteNodes(ServiceRequestContext context, DeleteNodesRequest request);

CompletableFuture<AddReferencesResponse> onAddReferences(
ServiceRequestContext context,
AddReferencesRequest request
);

CompletableFuture<DeleteReferencesResponse> onDeleteReferences(
ServiceRequestContext context,
DeleteReferencesRequest request
);

}
@@ -0,0 +1,52 @@
package org.eclipse.milo.opcua.sdk.server.services2;

import java.util.concurrent.CompletableFuture;

import org.eclipse.milo.opcua.stack.core.types.structured.CreateSubscriptionRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.CreateSubscriptionResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.DeleteSubscriptionsRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.DeleteSubscriptionsResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.ModifySubscriptionRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.ModifySubscriptionResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.PublishRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.PublishResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.RepublishRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.RepublishResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.SetPublishingModeRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.SetPublishingModeResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.TransferSubscriptionsRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.TransferSubscriptionsResponse;
import org.eclipse.milo.opcua.stack.transport.server.ServiceRequestContext;

public interface SubscriptionServiceSet2 {

CompletableFuture<CreateSubscriptionResponse> onCreateSubscription(
ServiceRequestContext context,
CreateSubscriptionRequest request
);

CompletableFuture<ModifySubscriptionResponse> onModifySubscription(
ServiceRequestContext context,
ModifySubscriptionRequest request
);

CompletableFuture<DeleteSubscriptionsResponse> onDeleteSubscriptions(
ServiceRequestContext context,
DeleteSubscriptionsRequest request
);

CompletableFuture<TransferSubscriptionsResponse> onTransferSubscriptions(
ServiceRequestContext context,
TransferSubscriptionsRequest request
);

CompletableFuture<SetPublishingModeResponse> onSetPublishingMode(
ServiceRequestContext context,
SetPublishingModeRequest request
);

CompletableFuture<PublishResponse> onPublish(ServiceRequestContext context, PublishRequest request);

CompletableFuture<RepublishResponse> onRepublish(ServiceRequestContext context, RepublishRequest request);

}

0 comments on commit 74743aa

Please sign in to comment.