Skip to content

Commit

Permalink
Handle subscription transfer special cases + tests (#1700)
Browse files Browse the repository at this point in the history
- Fixed issue: After transfer, available sequence numbers were not properly ack/republished.
- Fixed issue: After transfer, race condition could delete subscription before transfer finished.
- Fixed issue: After reload of a subscription, duplicate client handles may have been used.
- Add tests which emulate the disconnection of a client/server, then transfers
- Add subscription variable to select if missing publish requests are just acknoledged (default) or republished after transfer completes. Set `Subscription.RepublishAfterTransfer` to `true`.
- Improve test framework to allow for testing against external servers using a .runsettings file
- improve publish request logging, by moving calls into `CoreClientUtils.EventLog`
- disable available sequence numbers check, it causes false negatives when many subscriptions are active
  • Loading branch information
mregen committed Feb 7, 2022
1 parent 8c1a659 commit 5d9234e
Show file tree
Hide file tree
Showing 13 changed files with 675 additions and 302 deletions.
4 changes: 3 additions & 1 deletion Libraries/Opc.Ua.Client/MonitoredItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -746,13 +746,15 @@ public void SetError(ServiceResult error)
/// </summary>
public void SetTransferResult(uint clientHandle)
{
// ensure the global counter is not duplicating future handle ids
Utils.LowerLimitIdentifier(ref s_GlobalClientHandle, clientHandle);
m_clientHandle = clientHandle;
m_status.SetTransferResult(this);
m_attributesModified = false;
}

/// <summary>
/// Updates the object with the results of a modify monitored item request.
/// Updates the object with the results of a delete monitored item request.
/// </summary>
public void SetDeleteResult(
StatusCode result,
Expand Down
60 changes: 59 additions & 1 deletion Libraries/Opc.Ua.Client/OpcUaClientEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,27 @@ internal class OpcUaClientEventSource : EventSource
{
private const int SubscriptionStateId = 1;
private const int NotificationId = SubscriptionStateId + 1;
private const int NotificationReceivedId = NotificationId + 1;
private const int PublishStartId = NotificationId + 1;
private const int PublishStopId = PublishStartId + 1;

/// <summary>
/// The client messages.
/// </summary>
private const string SubscriptionStateMessage = "Subscription {0}, Id={1}, LastNotificationTime={2:HH:mm:ss}, GoodPublishRequestCount={3}, PublishingInterval={4}, KeepAliveCount={5}, PublishingEnabled={6}, MonitoredItemCount={7}";
private const string NotificationMessage = "Notification: ClientHandle={0}, Value={1}";
private const string NotificationReceivedMessage = "NOTIFICATION RECEIVED: SubId={0}, SeqNo={1}";
private const string PublishStartMessage = "PUBLISH #{0} SENT";
private const string PublishStopMessage = "PUBLISH #{0} RECEIVED";

/// <summary>
/// The Client Event Ids used for event messages, when calling ILogger.
/// </summary>
private readonly EventId SubscriptionStateMessageEventId = new EventId(TraceMasks.Operation, nameof(SubscriptionState));
private readonly EventId NotificationEventId = new EventId(TraceMasks.Operation, nameof(Notification));
private readonly EventId NotificationReceivedEventId = new EventId(TraceMasks.Operation, nameof(NotificationReceived));
private readonly EventId PublishStartEventId = new EventId(TraceMasks.ServiceDetail, nameof(PublishStart));
private readonly EventId PublishStopEventId = new EventId(TraceMasks.ServiceDetail, nameof(PublishStop));

/// <summary>
/// The state of the client subscription.
Expand All @@ -87,14 +96,63 @@ internal class OpcUaClientEventSource : EventSource
}

/// <summary>
/// The state of the client subscription.
/// The notification message. Called internally to convert wrapped value.
/// </summary>
[Event(NotificationId, Message = NotificationMessage, Level = EventLevel.Verbose)]
public void Notification(int clientHandle, string value)
{
WriteEvent(NotificationId, value, clientHandle);
}

/// <summary>
/// A notification received in Publish complete.
/// </summary>
[Event(NotificationReceivedId, Message = NotificationReceivedMessage, Level = EventLevel.Verbose)]
public void NotificationReceived(int subscriptionId, int sequenceNumber)
{
if (IsEnabled())
{
WriteEvent(NotificationReceivedId, subscriptionId, sequenceNumber);
}
else if (Utils.Logger.IsEnabled(LogLevel.Trace))
{
Utils.LogTrace(NotificationReceivedEventId, NotificationReceivedMessage,
subscriptionId, sequenceNumber);
}
}

/// <summary>
/// A Publish begin received.
/// </summary>
[Event(PublishStartId, Message = PublishStartMessage, Level = EventLevel.Verbose)]
public void PublishStart(int requestHandle)
{
if (IsEnabled())
{
WriteEvent(PublishStartId, requestHandle);
}
else if (Utils.Logger.IsEnabled(LogLevel.Trace))
{
Utils.LogTrace(PublishStartEventId, PublishStartMessage, requestHandle);
}
}

/// <summary>
/// A Publish complete received.
/// </summary>
[Event(PublishStopId, Message = PublishStopMessage, Level = EventLevel.Verbose)]
public void PublishStop(int requestHandle)
{
if (IsEnabled())
{
WriteEvent(PublishStopId, requestHandle);
}
else if (Utils.Logger.IsEnabled(LogLevel.Trace))
{
Utils.LogTrace(PublishStopEventId, PublishStopMessage, requestHandle);
}
}

/// <summary>
/// Log a Notification.
/// </summary>
Expand Down
101 changes: 70 additions & 31 deletions Libraries/Opc.Ua.Client/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,10 @@ public int SubscriptionCount
/// <summary>
/// If the subscriptions are deleted when a session is closed.
/// </summary>
/// <remarks>
/// Default <c>true</c>, set to <c>false</c> if subscriptions need to
/// be transferred or for durable subscriptions.
/// </remarks>
public bool DeleteSubscriptionsOnClose
{
get { return m_deleteSubscriptionsOnClose; }
Expand Down Expand Up @@ -1315,12 +1319,13 @@ public void Save(string filePath, IEnumerable<Subscription> subscriptions)
}

/// <summary>
/// Load the list of subscriptions saved in a file.
/// Load the list of subscriptions saved in a stream.
/// </summary>
/// <param name="stream">The stream.</param>
/// <returns>The list of loaded subscriptions</returns>
public IEnumerable<Subscription> Load(Stream stream)
{
// secure settings
XmlReaderSettings settings = new XmlReaderSettings {
DtdProcessing = DtdProcessing.Prohibit,
XmlResolver = null,
Expand Down Expand Up @@ -3420,26 +3425,43 @@ internal bool RemoveTransferredSubscription(Subscription subscription)
}
if (subscription.TransferId == 0)
{
throw new ServiceResultException(StatusCodes.BadInvalidState, Utils.Format("A subscription can not be transferred due to missing Id."));
throw new ServiceResultException(StatusCodes.BadInvalidState, Utils.Format("A subscription can not be transferred due to missing transfer Id."));
}
subscriptionIds.Add(subscription.TransferId);
}

ResponseHeader responseHeader = TransferSubscriptions(null, subscriptionIds, sendInitialValues, out var results, out var diagnosticInfos);
if (!StatusCode.IsGood(responseHeader.ServiceResult))
lock (SyncRoot)
{
Utils.LogError("TransferSubscription failed: {0}", responseHeader.ServiceResult);
return false;
}
ResponseHeader responseHeader = TransferSubscriptions(null, subscriptionIds, sendInitialValues, out var results, out var diagnosticInfos);
if (!StatusCode.IsGood(responseHeader.ServiceResult))
{
Utils.LogError("TransferSubscription failed: {0}", responseHeader.ServiceResult);
return false;
}

ClientBase.ValidateResponse(results, subscriptionIds);
ClientBase.ValidateDiagnosticInfos(diagnosticInfos, subscriptionIds);
ClientBase.ValidateResponse(results, subscriptionIds);
ClientBase.ValidateDiagnosticInfos(diagnosticInfos, subscriptionIds);

for (int ii = 0; ii < subscriptions.Count; ii++)
{
if (StatusCode.IsGood(results[ii].StatusCode))
for (int ii = 0; ii < subscriptions.Count; ii++)
{
subscriptions[ii].Transfer(this, subscriptionIds[ii], results[ii].AvailableSequenceNumbers);
if (StatusCode.IsGood(results[ii].StatusCode))
{
if (subscriptions[ii].Transfer(this, subscriptionIds[ii], results[ii].AvailableSequenceNumbers))
{ // create ack for available sequence numbers
foreach (var sequenceNumber in results[ii].AvailableSequenceNumbers)
{
var ack = new SubscriptionAcknowledgement() {
SubscriptionId = subscriptionIds[ii],
SequenceNumber = sequenceNumber
};
m_acknowledgementsToSend.Add(ack);
}
}
}
else
{
Utils.LogError("SubscriptionId {0} failed to transfer, StatusCode={1}", subscriptionIds[ii], results[ii].StatusCode);
}
}
}

Expand Down Expand Up @@ -4153,8 +4175,11 @@ public IAsyncResult BeginPublish(int timeout)
state.RequestId = requestHeader.RequestHandle;
state.Timestamp = DateTime.UtcNow;

CoreClientUtils.EventLog.PublishStart((int)requestHeader.RequestHandle);

try
{

IAsyncResult result = BeginPublish(
requestHeader,
acknowledgementsToSend,
Expand All @@ -4163,8 +4188,6 @@ public IAsyncResult BeginPublish(int timeout)

AsyncRequestStarted(result, requestHeader.RequestHandle, DataTypes.PublishRequest);

Utils.LogTrace("PUBLISH #{0} SENT", requestHeader.RequestHandle);

return result;
}
catch (Exception e)
Expand All @@ -4188,10 +4211,10 @@ private void OnPublishComplete(IAsyncResult result)

AsyncRequestCompleted(result, requestHeader.RequestHandle, DataTypes.PublishRequest);

CoreClientUtils.EventLog.PublishStop((int)requestHeader.RequestHandle);

try
{
Utils.LogTrace("PUBLISH #{0} RECEIVED", requestHeader.RequestHandle);

// complete publish.
uint subscriptionId;
UInt32Collection availableSequenceNumbers;
Expand Down Expand Up @@ -4223,7 +4246,7 @@ private void OnPublishComplete(IAsyncResult result)
return;
}

Utils.LogTrace("NOTIFICATION RECEIVED: SubId={0}, SeqNo={1}", subscriptionId, notificationMessage.SequenceNumber);
CoreClientUtils.EventLog.NotificationReceived((int)subscriptionId, (int)notificationMessage.SequenceNumber);

// process response.
ProcessPublishResponse(
Expand Down Expand Up @@ -4316,6 +4339,8 @@ private void OnPublishComplete(IAsyncResult result)
case StatusCodes.BadNoSubscription:
case StatusCodes.BadSessionClosed:
case StatusCodes.BadSessionIdInvalid:
case StatusCodes.BadSecureChannelIdInvalid:
case StatusCodes.BadSecureChannelClosed:
case StatusCodes.BadServerHalted:
return;
}
Expand All @@ -4341,11 +4366,11 @@ private void OnPublishComplete(IAsyncResult result)
public bool Republish(uint subscriptionId, uint sequenceNumber)
{
// send publish request.
RequestHeader requestHeader = new RequestHeader();

requestHeader.TimeoutHint = (uint)OperationTimeout;
requestHeader.ReturnDiagnostics = (uint)(int)ReturnDiagnostics;
requestHeader.RequestHandle = Utils.IncrementIdentifier(ref m_publishCounter);
RequestHeader requestHeader = new RequestHeader {
TimeoutHint = (uint)OperationTimeout,
ReturnDiagnostics = (uint)(int)ReturnDiagnostics,
RequestHandle = Utils.IncrementIdentifier(ref m_publishCounter)
};

try
{
Expand All @@ -4360,7 +4385,7 @@ public bool Republish(uint subscriptionId, uint sequenceNumber)
sequenceNumber,
out notificationMessage);

Utils.LogInfo("Received Republish for {0}-{1}", subscriptionId, sequenceNumber);
Utils.LogInfo("Received Republish for {0}-{1}-{2}", subscriptionId, sequenceNumber, responseHeader.ServiceResult);

// process response.
ProcessPublishResponse(
Expand Down Expand Up @@ -4479,6 +4504,11 @@ public bool Republish(uint subscriptionId, uint sequenceNumber)
acknowledgementsToSend.Add(acknowledgement);
}

#if DEBUG_SEQUENTIALPUBLISHING
// Checks for debug info only.
// Once more than a single publish request is queued, the checks are invalid
// because a publish response may not include the latest ack information yet.

uint lastSentSequenceNumber = 0;
if (availableSequenceNumbers != null)
{
Expand All @@ -4487,7 +4517,6 @@ public bool Republish(uint subscriptionId, uint sequenceNumber)
if (m_latestAcknowledgementsSent.ContainsKey(subscriptionId))
{
lastSentSequenceNumber = m_latestAcknowledgementsSent[subscriptionId];

// If the last sent sequence number is uint.Max do not display the warning; the counter rolled over
// If the last sent sequence number is greater or equal to the available sequence number (returned by the publish),
// a warning must be logged.
Expand All @@ -4505,12 +4534,14 @@ public bool Republish(uint subscriptionId, uint sequenceNumber)
lastSentSequenceNumber = m_latestAcknowledgementsSent[subscriptionId];

// If the last sent sequence number is uint.Max do not display the warning; the counter rolled over
// If the last sent sequence number is greater or equal to the notificationMessage's sequence number (returned by the publish), a warning must be logged.
// If the last sent sequence number is greater or equal to the notificationMessage's sequence number (returned by the publish),
// a warning must be logged.
if (((lastSentSequenceNumber >= notificationMessage.SequenceNumber) && (lastSentSequenceNumber != uint.MaxValue)) || (lastSentSequenceNumber == notificationMessage.SequenceNumber) && (lastSentSequenceNumber == uint.MaxValue))
{
Utils.LogWarning("Received sequence number which was already acknowledged={0}", notificationMessage.SequenceNumber);
}
}
#endif

if (availableSequenceNumbers != null)
{
Expand Down Expand Up @@ -4577,12 +4608,20 @@ public bool Republish(uint subscriptionId, uint sequenceNumber)
}
else
{
// Delete abandoned subscription from server.
Utils.LogWarning("Received Publish Response for Unknown SubscriptionId={0}. Deleting abandoned subscription from server.", subscriptionId);
if (m_deleteSubscriptionsOnClose)
{
// Delete abandoned subscription from server.
Utils.LogWarning("Received Publish Response for Unknown SubscriptionId={0}. Deleting abandoned subscription from server.", subscriptionId);

Task.Run(() => {
DeleteSubscription(subscriptionId);
});
Task.Run(() => {
DeleteSubscription(subscriptionId);
});
}
else
{
// Do not delete publish requests of stale subscriptions
Utils.LogWarning("Received Publish Response for Unknown SubscriptionId={0}. Ignored.", subscriptionId);
}
}
}

Expand Down

0 comments on commit 5d9234e

Please sign in to comment.