Skip to content

Commit

Permalink
~ WIP use a semaphore to limit a session's concurrent method calls
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinherron committed Aug 24, 2023
1 parent c685a5b commit a500aec
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 22 deletions.
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -62,6 +63,9 @@ public class Session implements SessionServiceSet {

private static final int IDENTITY_HISTORY_MAX_SIZE = 10;

private static final int CONCURRENT_CALL_LIMIT =
Integer.getInteger("milo.session.concurrentCallLimit", 64);

private final Logger logger = LoggerFactory.getLogger(getClass());

private final List<LifecycleListener> listeners = Lists.newCopyOnWriteArrayList();
Expand All @@ -72,6 +76,8 @@ public class Session implements SessionServiceSet {

private final Map<ByteString, BrowseContinuationPoint> browseContinuationPoints = Maps.newConcurrentMap();

private final Semaphore callSemaphore = new Semaphore(CONCURRENT_CALL_LIMIT, true);

private volatile Object identityObject;
private volatile UserIdentityToken identityToken;

Expand Down Expand Up @@ -396,6 +402,10 @@ public void onCloseSession(ServiceRequest serviceRequest) {
serviceRequest.setServiceFault(StatusCodes.Bad_InternalError);
}

public Semaphore getCallSemaphore() {
return callSemaphore;
}

void close(boolean deleteSubscriptions) {
if (checkTimeoutFuture != null) {
checkTimeoutFuture.cancel(false);
Expand Down
Expand Up @@ -12,10 +12,12 @@

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;

import com.google.common.collect.Lists;
import org.eclipse.milo.opcua.sdk.core.Reference;
import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
import org.eclipse.milo.opcua.sdk.server.Session;
import org.eclipse.milo.opcua.sdk.server.UaNodeManager;
import org.eclipse.milo.opcua.sdk.server.api.methods.MethodInvocationHandler;
import org.eclipse.milo.opcua.sdk.server.api.services.MethodServices;
Expand Down Expand Up @@ -213,37 +215,56 @@ public void write(
/**
* Invoke one or more methods belonging to this {@link MethodServices}.
*
* @param context the {@link CallContext}.
* @param context the {@link CallContext}.
* @param requests The {@link CallMethodRequest}s for the methods to invoke.
*/
@Override
public void call(CallContext context, List<CallMethodRequest> requests) {
List<CallMethodResult> results = Lists.newArrayListWithCapacity(requests.size());

for (CallMethodRequest request : requests) {
try {
MethodInvocationHandler handler = getInvocationHandler(
request.getObjectId(),
request.getMethodId()
);
Semaphore semaphore = context.getSession()
.map(Session::getCallSemaphore)
.orElse(null);

results.add(handler.invoke(context, request));
} catch (UaException e) {
results.add(
new CallMethodResult(
e.getStatusCode(),
new StatusCode[0],
new DiagnosticInfo[0],
new Variant[0]
)
);
} catch (Throwable t) {
LoggerFactory.getLogger(getClass())
.error("Uncaught Throwable invoking method handler for methodId={}.", request.getMethodId(), t);
for (CallMethodRequest request : requests) {
if (semaphore == null || semaphore.tryAcquire()) {
try {
MethodInvocationHandler handler = getInvocationHandler(
request.getObjectId(),
request.getMethodId()
);

results.add(handler.invoke(context, request));
} catch (UaException e) {
results.add(
new CallMethodResult(
e.getStatusCode(),
new StatusCode[0],
new DiagnosticInfo[0],
new Variant[0]
)
);
} catch (Throwable t) {
LoggerFactory.getLogger(getClass())
.error("Uncaught Throwable invoking method handler for methodId={}.", request.getMethodId(), t);

results.add(
new CallMethodResult(
new StatusCode(StatusCodes.Bad_InternalError),
new StatusCode[0],
new DiagnosticInfo[0],
new Variant[0]
)
);
} finally {
if (semaphore != null) {
semaphore.release();
}
}
} else {
results.add(
new CallMethodResult(
new StatusCode(StatusCodes.Bad_InternalError),
new StatusCode(StatusCodes.Bad_ResourceUnavailable),
new StatusCode[0],
new DiagnosticInfo[0],
new Variant[0]
Expand All @@ -262,7 +283,7 @@ public void call(CallContext context, List<CallMethodRequest> requests) {
* @param methodId the {@link NodeId} identifying the method.
* @return the {@link MethodInvocationHandler} for {@code methodId}.
* @throws UaException a {@link UaException} containing the appropriate operation result if
* either the object or method can't be found.
* either the object or method can't be found.
*/
protected MethodInvocationHandler getInvocationHandler(NodeId objectId, NodeId methodId) throws UaException {
UaNode node = nodeManager.getNode(objectId)
Expand Down

0 comments on commit a500aec

Please sign in to comment.