Skip to content

Commit

Permalink
Notify AddressSpaces of deleted DataItems when Subscription times out (
Browse files Browse the repository at this point in the history
…#954)

fixes #953
  • Loading branch information
kevinherron committed Apr 12, 2022
1 parent 739a70b commit d7ff6a2
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 7 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 the Eclipse Milo Authors
* Copyright (c) 2022 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand Down Expand Up @@ -145,6 +145,10 @@ public synchronized boolean isNotEmpty() {
return !isEmpty();
}

public synchronized boolean isWaitListEmpty() {
return waitList.isEmpty();
}

@Nullable
public synchronized ServiceRequest poll() {
long nowNanos = System.nanoTime();
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 the Eclipse Milo Authors
* Copyright (c) 2022 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -11,6 +11,7 @@
package org.eclipse.milo.opcua.sdk.server.subscriptions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -172,10 +173,22 @@ public void createSubscription(ServiceRequest service) {
server.getEventBus().post(new SubscriptionCreatedEvent(subscription));

subscription.setStateListener((s, ps, cs) -> {
if (cs == State.Closed) {
if (cs == State.Closing) {
subscriptions.remove(s.getId());
server.getSubscriptions().remove(s.getId());
server.getEventBus().post(new SubscriptionDeletedEvent(subscription));
server.getEventBus().post(new SubscriptionDeletedEvent(s));

/*
* Notify AddressSpaces the items for this subscription are deleted.
*/

byMonitoredItemType(
s.getMonitoredItems().values(),
dataItems -> server.getAddressSpaceManager().onDataItemsDeleted(dataItems),
eventItems -> server.getAddressSpaceManager().onEventItemsDeleted(eventItems)
);

s.getMonitoredItems().clear();
}
});

Expand Down Expand Up @@ -1087,7 +1100,10 @@ public void publish(ServiceRequest service) {
return;
}

if (subscriptions.isEmpty()) {
// waitList must also be empty because the last remaining subscription could have
// expired, which removes it from bookkeeping, but leaves it in the PublishQueue
// waitList if there were no available requests to send Bad_Timeout.
if (subscriptions.isEmpty() && publishQueue.isWaitListEmpty()) {
service.setServiceFault(StatusCodes.Bad_NoSubscription);
return;
}
Expand Down Expand Up @@ -1243,10 +1259,22 @@ public void addSubscription(Subscription subscription) {
server.getEventBus().post(new SubscriptionCreatedEvent(subscription));

subscription.setStateListener((s, ps, cs) -> {
if (cs == State.Closed) {
if (cs == State.Closing) {
subscriptions.remove(s.getId());
server.getSubscriptions().remove(s.getId());
server.getEventBus().post(new SubscriptionDeletedEvent(s));

/*
* Notify AddressSpaces the items for this subscription are deleted.
*/

byMonitoredItemType(
s.getMonitoredItems().values(),
dataItems -> server.getAddressSpaceManager().onDataItemsDeleted(dataItems),
eventItems -> server.getAddressSpaceManager().onEventItemsDeleted(eventItems)
);

s.getMonitoredItems().clear();
}
});
}
Expand Down Expand Up @@ -1287,7 +1315,7 @@ public void sendStatusChangeNotification(Subscription subscription, StatusCode s
* @param eventItemConsumer a {@link Consumer} that accepts a non-empty list of {@link EventItem}s.
*/
private static void byMonitoredItemType(
List<BaseMonitoredItem<?>> monitoredItems,
Collection<BaseMonitoredItem<?>> monitoredItems,
Consumer<List<DataItem>> dataItemConsumer,
Consumer<List<EventItem>> eventItemConsumer
) {
Expand Down

0 comments on commit d7ff6a2

Please sign in to comment.